diff --git a/pubsub/service.go b/pubsub/service.go index 1a29df03ecb..e8d636a01bb 100644 --- a/pubsub/service.go +++ b/pubsub/service.go @@ -47,7 +47,7 @@ type defaultRetryer struct { } // Logic originally from -// https://github.com/GoogleCloudPlatform/google-cloud-java/blob/master/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StatusUtil.java +// https://github.com/googleapis/java-pubsub/blob/main/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StatusUtil.java func (r *defaultRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) { s, ok := status.FromError(err) if !ok { // includes io.EOF, normal stream close, which causes us to reopen @@ -91,3 +91,18 @@ func (r *streamingPullRetryer) Retry(err error) (pause time.Duration, shouldRetr return r.defaultRetryer.Retry(err) } } + +type publishRetryer struct { + defaultRetryer gax.Retryer +} + +func (r *publishRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) { + s, ok := status.FromError(err) + if !ok { + return r.defaultRetryer.Retry(err) + } + if s.Code() == codes.Internal && strings.Contains(s.Message(), "string field contains invalid UTF-8") { + return 0, false + } + return r.defaultRetryer.Retry(err) +} diff --git a/pubsub/topic.go b/pubsub/topic.go index 9d5ab8b3556..a90503cc8a6 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -675,10 +675,19 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) if orderingKey != "" && t.scheduler.IsPaused(orderingKey) { err = fmt.Errorf("pubsub: Publishing for ordering key, %s, paused due to previous error. Call topic.ResumePublish(orderingKey) before resuming publishing", orderingKey) } else { + // Apply custom publish retryer on top of user specified retryer and + // default retryer. + opts := t.c.pubc.CallOptions.Publish + var settings gax.CallSettings + for _, opt := range opts { + opt.Resolve(&settings) + } + r := &publishRetryer{defaultRetryer: settings.Retry()} res, err = t.c.pubc.Publish(ctx, &pb.PublishRequest{ Topic: t.name, Messages: pbMsgs, - }, gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(maxSendRecvBytes))) + }, gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(maxSendRecvBytes)), + gax.WithRetry(func() gax.Retryer { return r })) } end := time.Now() if err != nil { diff --git a/pubsub/topic_test.go b/pubsub/topic_test.go index 33f095abc92..9115b43ab8a 100644 --- a/pubsub/topic_test.go +++ b/pubsub/topic_test.go @@ -18,6 +18,7 @@ import ( "bytes" "context" "fmt" + "strings" "sync" "testing" "time" @@ -584,6 +585,30 @@ func TestPublishFlowControl_Block(t *testing.T) { publish4Completed.Wait() } +func TestInvalidUTF8(t *testing.T) { + ctx := context.Background() + c, srv := newFake(t) + defer c.Close() + defer srv.Close() + + topic, err := c.CreateTopic(ctx, "invalid-utf8-topic") + if err != nil { + t.Fatal(err) + } + res := topic.Publish(ctx, &Message{ + Data: []byte("foo"), + Attributes: map[string]string{ + "attr": "a\xc5z", + }, + }) + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + _, err = res.Get(ctx) + if err == nil || !strings.Contains(err.Error(), "string field contains invalid UTF-8") { + t.Fatalf("expected invalid UTF-8 error, got: %v", err) + } +} + // publishSingleMessage publishes a single message to a topic. func publishSingleMessage(ctx context.Context, t *Topic, data string) *PublishResult { return t.Publish(ctx, &Message{