Skip to content

Commit

Permalink
fix(pubsub): prevent infinite retry with publishing invalid utf-8 cha…
Browse files Browse the repository at this point in the history
…rs (#5728)

* fix(pubsub): prevent infinite retry with publishing invalid utf-8 chars

* use user specified publish retryer if provided

* wrap default/user retry settings with custom publish retryer

* remove switch statement since theres only one case
  • Loading branch information
hongalex committed Mar 7, 2022
1 parent a0ec5cf commit 0a4dab9
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 2 deletions.
17 changes: 16 additions & 1 deletion pubsub/service.go
Expand Up @@ -47,7 +47,7 @@ type defaultRetryer struct {
}

// Logic originally from
// https://github.com/GoogleCloudPlatform/google-cloud-java/blob/master/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StatusUtil.java
// https://github.com/googleapis/java-pubsub/blob/main/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StatusUtil.java
func (r *defaultRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) {
s, ok := status.FromError(err)
if !ok { // includes io.EOF, normal stream close, which causes us to reopen
Expand Down Expand Up @@ -91,3 +91,18 @@ func (r *streamingPullRetryer) Retry(err error) (pause time.Duration, shouldRetr
return r.defaultRetryer.Retry(err)
}
}

type publishRetryer struct {
defaultRetryer gax.Retryer
}

func (r *publishRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) {
s, ok := status.FromError(err)
if !ok {
return r.defaultRetryer.Retry(err)
}
if s.Code() == codes.Internal && strings.Contains(s.Message(), "string field contains invalid UTF-8") {
return 0, false
}
return r.defaultRetryer.Retry(err)
}
11 changes: 10 additions & 1 deletion pubsub/topic.go
Expand Up @@ -675,10 +675,19 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage)
if orderingKey != "" && t.scheduler.IsPaused(orderingKey) {
err = fmt.Errorf("pubsub: Publishing for ordering key, %s, paused due to previous error. Call topic.ResumePublish(orderingKey) before resuming publishing", orderingKey)
} else {
// Apply custom publish retryer on top of user specified retryer and
// default retryer.
opts := t.c.pubc.CallOptions.Publish
var settings gax.CallSettings
for _, opt := range opts {
opt.Resolve(&settings)
}
r := &publishRetryer{defaultRetryer: settings.Retry()}
res, err = t.c.pubc.Publish(ctx, &pb.PublishRequest{
Topic: t.name,
Messages: pbMsgs,
}, gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(maxSendRecvBytes)))
}, gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(maxSendRecvBytes)),
gax.WithRetry(func() gax.Retryer { return r }))
}
end := time.Now()
if err != nil {
Expand Down
25 changes: 25 additions & 0 deletions pubsub/topic_test.go
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"fmt"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -584,6 +585,30 @@ func TestPublishFlowControl_Block(t *testing.T) {
publish4Completed.Wait()
}

func TestInvalidUTF8(t *testing.T) {
ctx := context.Background()
c, srv := newFake(t)
defer c.Close()
defer srv.Close()

topic, err := c.CreateTopic(ctx, "invalid-utf8-topic")
if err != nil {
t.Fatal(err)
}
res := topic.Publish(ctx, &Message{
Data: []byte("foo"),
Attributes: map[string]string{
"attr": "a\xc5z",
},
})
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
_, err = res.Get(ctx)
if err == nil || !strings.Contains(err.Error(), "string field contains invalid UTF-8") {
t.Fatalf("expected invalid UTF-8 error, got: %v", err)
}
}

// publishSingleMessage publishes a single message to a topic.
func publishSingleMessage(ctx context.Context, t *Topic, data string) *PublishResult {
return t.Publish(ctx, &Message{
Expand Down

0 comments on commit 0a4dab9

Please sign in to comment.