diff --git a/go.work.sum b/go.work.sum index 8d1e2b9d1181..e8aba172f8ad 100644 --- a/go.work.sum +++ b/go.work.sum @@ -1,5 +1,4 @@ cloud.google.com/go/gaming v1.9.0 h1:7vEhFnZmd931Mo7sZ6pJy7uQPDxF7m7v8xtBheG08tc= -cloud.google.com/go/gaming v1.10.1/go.mod h1:XQQvtfP8Rb9Rxnxm5wFVpAp9zCQkJi2bLIb7iHGwB3s= github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/bwesterb/go-ristretto v1.2.3/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0= @@ -8,33 +7,19 @@ github.com/chzyer/readline v1.5.1/go.mod h1:Eh+b79XXUwfKfcPLepksvw2tcLE/Ct21YObk github.com/elazarl/goproxy v0.0.0-20221015165544-a0805db90819/go.mod h1:Ro8st/ElPeALwNFlcTpWmkr6IoMFfkjXAvTHpevnDsM= github.com/gliderlabs/ssh v0.3.5/go.mod h1:8XB4KraRrX39qHhT6yxPsHedjA08I/uBVwj4xC+/+z4= github.com/go-git/go-git-fixtures/v4 v4.3.2-0.20230305113008-0c11038e723f/go.mod h1:8LHG1a3SRW71ettAD/jW13h8c6AqjVSeL11RAdgaqpo= -github.com/google/go-pkcs11 v0.2.0/go.mod h1:6eQoGcuNJpa7jnd5pMGdkSaQpNDYvPlXWMcjXXThLlY= +github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/google/s2a-go v0.1.3/go.mod h1:Ej+mSEMGRnqRzjc7VtF+jdBwYG5fuJfiZ8ELkjEwM0A= -github.com/googleapis/enterprise-certificate-proxy v0.2.4/go.mod h1:AwSRAtLfXpU5Nm3pW+v7rGDHp09LsPtGY9MduiEsR9k= github.com/googleapis/gax-go/v2 v2.9.1/go.mod h1:4FG3gMrVZlyMp5itSYKMU9z/lBE7+SbnUOvzH2HqbEY= github.com/ianlancetaylor/demangle v0.0.0-20230524184225-eabc099b10ab/go.mod h1:gx7rwoVhcfuVKG5uya9Hs3Sxj7EIvldVofAWIUtGouw= github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4= github.com/mmcloughlin/avo v0.5.0/go.mod h1:ChHFdoV7ql95Wi7vuq2YT1bwCJqiWdZrQ1im3VujLYM= github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= -github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -golang.org/x/crypto v0.10.0/go.mod h1:o4eNf7Ede1fv+hwOwZsTHl9EsPFO6q6ZvYR8vYfY45I= +go.opentelemetry.io/otel v1.14.0/go.mod h1:o4buv+dJzx8rohcUeRmWUZhqupFvzWis188WlggnNeU= +go.opentelemetry.io/otel/trace v1.14.0/go.mod h1:8avnQLK+CG77yNLUae4ea2JDQ6iT+gozhnZjy/rw9G8= golang.org/x/mod v0.9.0 h1:KENHtAZL2y3NLMYZeHY9DW8HW8V+kQyJsY/V9JlKvCs= -golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= golang.org/x/net v0.11.0/go.mod h1:2L/ixqYpgIVXmeoSA/4Lu7BzTG4KIyPIryS4IsOd1oQ= -golang.org/x/oauth2 v0.7.0/go.mod h1:hPLQkd9LyjfXTiRohC/41GhcFqxisoUQ99sCUOHO9x4= -golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY= -golang.org/x/term v0.9.0/go.mod h1:M6DEAAIenWoTxdKrOltXcmDY3rSplQUkrvaDU5FcQyo= -golang.org/x/term v0.10.0/go.mod h1:lpqdcUyK/oCiQxvxVrppt5ggO2KCZ5QblwqPnfZ6d5o= -golang.org/x/text v0.10.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/tools v0.7.0 h1:W4OVu8VVOaIO0yzWMNdepAulS7YfoS3Zabrm8DOXXU4= google.golang.org/api v0.123.0/go.mod h1:gcitW0lvnyWjSp9nKxAbdHKIZ6vF4aajGueeslZOyms= -google.golang.org/api v0.128.0/go.mod h1:Y611qgqaE92On/7g65MQgxYul3c0rEB894kniWLY750= google.golang.org/genproto v0.0.0-20230526161137-0005af68ea54/go.mod h1:zqTuNwFlFRsw5zIts5VnzLQxSRqh+CGOTVMlYbY0Eyk= -google.golang.org/genproto/googleapis/api v0.0.0-20230629202037-9506855d4529/go.mod h1:vHYtlOoi6TsQ3Uk2yxR7NI5z8uoV+3pZtR4jmHIkRig= google.golang.org/genproto/googleapis/bytestream v0.0.0-20230629202037-9506855d4529/go.mod h1:ylj+BE99M198VPbBh6A8d9n3w8fChvyLK3wwBOjXBFA= -google.golang.org/genproto/googleapis/bytestream v0.0.0-20230711160842-782d3b101e98/go.mod h1:3QoBVwTHkXbY1oRGzlhwhOykfcATQN43LJ6iT8Wy8kE= google.golang.org/genproto/googleapis/bytestream v0.0.0-20230720185612-659f7aaaa771/go.mod h1:3QoBVwTHkXbY1oRGzlhwhOykfcATQN43LJ6iT8Wy8kE= -google.golang.org/grpc v1.52.3/go.mod h1:pu6fVzoFb+NBYNAvQL08ic+lvB2IojljRYuun5vorUY= -google.golang.org/grpc v1.56.1/go.mod h1:I9bI3vqKfayGqPUAwGdOSu7kt6oIJLixfffKrpXqQ9s= diff --git a/pubsub/trace_test.go b/pubsub/trace_test.go index bf0ab1bcf96b..fb7c0823af1c 100644 --- a/pubsub/trace_test.go +++ b/pubsub/trace_test.go @@ -84,7 +84,7 @@ func TestTrace_PublishSpan(t *testing.T) { attribute.String(orderingAttribute, m.OrderingKey), semconv.MessagingSystemKey.String("pubsub"), }, - ChildSpanCount: 1, + ChildSpanCount: 2, InstrumentationLibrary: instrumentation.Scope{ Name: "cloud.google.com/go/pubsub", Version: internal.Version, @@ -98,7 +98,7 @@ func TestTrace_PublishSpan(t *testing.T) { }, }, tracetest.SpanStub{ - Name: publishSchedulerSpanName, + Name: publishBatcherSpanName, InstrumentationLibrary: instrumentation.Scope{ Name: "cloud.google.com/go/pubsub", Version: internal.Version, @@ -130,13 +130,13 @@ func TestTrace_PublishSpan(t *testing.T) { } defer topic.Stop() - spans := getSpans(e) + got := getSpans(e) opts := []cmp.Option{ cmp.Comparer(spanStubComparer), cmpopts.SortSlices(sortSpanStub), } - if diff := testutil.Diff(spans, expectedSpans, opts...); diff != "" { - log.Printf("print spans: %+v\n", spans) + if diff := testutil.Diff(got, expectedSpans, opts...); diff != "" { + log.Printf("print spans: %+v\n", got) log.Printf("\n\nprint expected spans: %+v\n", expectedSpans) t.Errorf("diff: -got, +want:\n%s\n", diff) } @@ -155,7 +155,7 @@ func TestTrace_PublishSpanError(t *testing.T) { m := &Message{ Data: []byte("test"), - OrderingKey: "my-key", + OrderingKey: "m", } msgSize := proto.Size(&pb.PubsubMessage{ @@ -167,50 +167,91 @@ func TestTrace_PublishSpanError(t *testing.T) { topicID := "t" topicName := fmt.Sprintf("projects/P/topics/%s", topicID) - expectedSpans := tracetest.SpanStubs{ - tracetest.SpanStub{ - Name: fmt.Sprintf("%s %s", topicName, publisherSpanName), - SpanKind: trace.SpanKindProducer, - Attributes: []attribute.KeyValue{ - semconv.MessagingDestinationKindTopic, - semconv.MessagingDestinationKey.String(topicName), - semconv.MessagingMessageIDKey.String(""), - semconv.MessagingMessagePayloadSizeBytesKey.Int(msgSize), - attribute.String(orderingAttribute, m.OrderingKey), - semconv.MessagingSystemKey.String("pubsub"), - }, - ChildSpanCount: 0, - InstrumentationLibrary: instrumentation.Scope{ - Name: "cloud.google.com/go/pubsub", - Version: internal.Version, - }, - Status: sdktrace.Status{ - Code: codes.Error, - Description: errTopicOrderingNotEnabled.Error(), - }, - }, - } - topic, err := c.CreateTopic(ctx, topicID) if err != nil { t.Fatalf("failed to create topic: %v", err) } - r := topic.Publish(ctx, m) - _, err = r.Get(ctx) - if err == nil { - t.Fatal("expected err, got nil") - } - defer topic.Stop() + // Publishing a message with an ordering key without enabling ordering topic ordering + // should fail. + t.Run("no ordering key", func(t *testing.T) { + r := topic.Publish(ctx, m) + _, err = r.Get(ctx) + if err == nil { + t.Fatal("expected err, got nil") + } + + want := getPublishSpanStubsWithError(topicName, m, msgSize, errTopicOrderingNotEnabled) + + got := getSpans(e) + opts := []cmp.Option{ + cmp.Comparer(spanStubComparer), + cmpopts.SortSlices(sortSpanStub), + } + if diff := testutil.Diff(got, want, opts...); diff != "" { + t.Errorf("diff: -got, +want:\n%s\n", diff) + } + e.Reset() + topic.ResumePublish(m.OrderingKey) + }) + + t.Run("stopped topic", func(t *testing.T) { + // Publishing a message with a stopped publisher should fail too + topic.ResumePublish(m.OrderingKey) + topic.EnableMessageOrdering = true + topic.Stop() + r := topic.Publish(ctx, m) + _, err = r.Get(ctx) + if err == nil { + t.Fatal("expected err, got nil") + } + + got := getSpans(e) + want := getPublishSpanStubsWithError(topicName, m, msgSize, ErrTopicStopped) + opts := []cmp.Option{ + cmp.Comparer(spanStubComparer), + cmpopts.SortSlices(sortSpanStub), + } + if diff := testutil.Diff(got, want, opts...); diff != "" { + t.Errorf("diff: -got, +want:\n%s\n", diff) + } + e.Reset() + topic.ResumePublish(m.OrderingKey) + }) + + t.Run("flow control error", func(t *testing.T) { + // Use a different topic here than above since + // we need to adjust the flow control settings, + // which are immutable after publish. + topicID := "t2" + + topic, err := c.CreateTopic(ctx, topicID) + if err != nil { + t.Fatalf("failed to create topic: %v", err) + } + topic.EnableMessageOrdering = true + topic.PublishSettings.FlowControlSettings = FlowControlSettings{ + LimitExceededBehavior: FlowControlSignalError, + MaxOutstandingBytes: 1, + } + + r := topic.Publish(ctx, m) + _, err = r.Get(ctx) + if err == nil { + t.Fatal("expected err, got nil") + } + + got := getSpans(e) + want := getFlowControlSpanStubs(ErrFlowControllerMaxOutstandingBytes) + opts := []cmp.Option{ + cmp.Comparer(spanStubComparer), + cmpopts.SortSlices(sortSpanStub), + } + if diff := testutil.Diff(got, want, opts...); diff != "" { + t.Errorf("diff: -got, +want:\n%s\n", diff) + } + }) - spans := getSpans(e) - opts := []cmp.Option{ - cmp.Comparer(spanStubComparer), - cmpopts.SortSlices(sortSpanStub), - } - if diff := testutil.Diff(spans, expectedSpans, opts...); diff != "" { - t.Errorf("diff: -got, +want:\n%s\n", diff) - } } func spanStubComparer(a, b tracetest.SpanStub) bool { @@ -245,3 +286,44 @@ func getSpans(e *tracetest.InMemoryExporter) tracetest.SpanStubs { return e.GetSpans() } + +func getPublishSpanStubsWithError(topicName string, m *Message, msgSize int, err error) tracetest.SpanStubs { + return tracetest.SpanStubs{ + tracetest.SpanStub{ + Name: fmt.Sprintf("%s %s", topicName, publisherSpanName), + SpanKind: trace.SpanKindProducer, + Attributes: []attribute.KeyValue{ + semconv.MessagingDestinationKindTopic, + semconv.MessagingDestinationKey.String(topicName), + semconv.MessagingMessageIDKey.String(""), + semconv.MessagingMessagePayloadSizeBytesKey.Int(msgSize), + attribute.String(orderingAttribute, m.OrderingKey), + semconv.MessagingSystemKey.String("pubsub"), + }, + InstrumentationLibrary: instrumentation.Scope{ + Name: "cloud.google.com/go/pubsub", + Version: internal.Version, + }, + Status: sdktrace.Status{ + Code: codes.Error, + Description: err.Error(), + }, + }, + } +} + +func getFlowControlSpanStubs(err error) tracetest.SpanStubs { + return tracetest.SpanStubs{ + tracetest.SpanStub{ + Name: publishFlowControlSpanName, + InstrumentationLibrary: instrumentation.Scope{ + Name: "cloud.google.com/go/pubsub", + Version: internal.Version, + }, + Status: sdktrace.Status{ + Code: codes.Error, + Description: err.Error(), + }, + }, + } +}