Skip to content
Permalink
Browse files
fix(pubsublite): remove publish error translation (#3843)
Return errors from the internal/wire package as they contain more context.
  • Loading branch information
tmdiep committed Mar 25, 2021
1 parent b5b4da6 commit d8d8f68e8a70e2353048578f5d22fa1cd2ca6482
@@ -21,6 +21,7 @@ import (
"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/pubsublite/internal/test"
"github.com/google/go-cmp/cmp"
"golang.org/x/xerrors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
@@ -187,8 +188,8 @@ func TestPublishBatcherAddMessage(t *testing.T) {

t.Run("oversized message", func(t *testing.T) {
msg := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'0'}, MaxPublishRequestBytes)}
if gotErr, wantMsg := batcher.AddMessage(msg, nil), "MaxPublishRequestBytes"; !test.ErrorHasMsg(gotErr, wantMsg) {
t.Errorf("AddMessage(%v) got err: %v, want err msg: %q", msg, gotErr, wantMsg)
if gotErr := batcher.AddMessage(msg, nil); !xerrors.Is(gotErr, ErrOversizedMessage) {
t.Errorf("AddMessage(%v) got err: %v, want err: %q", msg, gotErr, ErrOversizedMessage)
}
})

@@ -19,41 +19,28 @@ import (

"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite/internal/wire"
"golang.org/x/xerrors"
"google.golang.org/api/option"
"google.golang.org/api/support/bundler"

ipubsub "cloud.google.com/go/internal/pubsub"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)

var (
// ErrOverflow is set for a PublishResult when publish buffers overflow.
ErrOverflow = bundler.ErrOverflow
// Use errors.Is for comparing errors.
ErrOverflow = wire.ErrOverflow

// ErrOversizedMessage is set for a PublishResult when a published message
// exceeds MaxPublishRequestBytes.
ErrOversizedMessage = bundler.ErrOversizedItem
// exceeds MaxPublishRequestBytes. Use errors.Is for comparing errors.
ErrOversizedMessage = wire.ErrOversizedMessage

// ErrPublisherStopped is set for a PublishResult when a message cannot be
// published because the publisher client has stopped or is in the process of
// stopping. PublisherClient.Error() returns the error that caused the
// publisher client to terminate (if any).
// publisher client to terminate (if any). Use errors.Is for comparing errors.
ErrPublisherStopped = wire.ErrServiceStopped
)

// translateError transforms a subset of errors to what would be returned by the
// pubsub package.
func translateError(err error) error {
if xerrors.Is(err, wire.ErrOversizedMessage) {
return ErrOversizedMessage
}
if xerrors.Is(err, wire.ErrOverflow) {
return ErrOverflow
}
return err
}

// PublisherClient is a Pub/Sub Lite client to publish messages to a given
// topic. A PublisherClient is safe to use from multiple goroutines.
//
@@ -130,7 +117,6 @@ func (p *PublisherClient) Publish(ctx context.Context, msg *pubsub.Message) *pub
}

p.wirePub.Publish(msgpb, func(metadata *wire.MessageMetadata, err error) {
err = translateError(err)
if metadata != nil {
ipubsub.SetPublishResult(result, metadata.String(), err)
} else {
@@ -21,8 +21,6 @@ import (
"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite/internal/test"
"cloud.google.com/go/pubsublite/internal/wire"
"golang.org/x/xerrors"
"google.golang.org/api/support/bundler"

pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)
@@ -176,62 +174,3 @@ func TestPublisherClientTransformMessageError(t *testing.T) {
t.Errorf("Publisher.Stopped: got %v, want %v", got, want)
}
}

func TestPublisherClientTranslatePublishResultErrors(t *testing.T) {
ctx := context.Background()
input := &pubsub.Message{
Data: []byte("data"),
OrderingKey: "ordering_key",
}
wantMsg := &pb.PubSubMessage{
Data: []byte("data"),
Key: []byte("ordering_key"),
}

for _, tc := range []struct {
desc string
wireErr error
wantErr error
}{
{
desc: "oversized message",
wireErr: wire.ErrOversizedMessage,
wantErr: bundler.ErrOversizedItem,
},
{
desc: "oversized message wrapped",
wireErr: xerrors.Errorf("placeholder error message: %w", wire.ErrOversizedMessage),
wantErr: bundler.ErrOversizedItem,
},
{
desc: "buffer overflow",
wireErr: wire.ErrOverflow,
wantErr: bundler.ErrOverflow,
},
{
desc: "service stopped",
wireErr: wire.ErrServiceStopped,
wantErr: wire.ErrServiceStopped,
},
} {
t.Run(tc.desc, func(t *testing.T) {
verifier := test.NewRPCVerifier(t)
verifier.Push(wantMsg, nil, tc.wireErr)
defer verifier.Flush()

pubClient := newTestPublisherClient(verifier, DefaultPublishSettings)
result := pubClient.Publish(ctx, input)

_, gotErr := result.Get(ctx)
if !test.ErrorEqual(gotErr, tc.wantErr) {
t.Errorf("Publish() got err: (%v), want err: (%v)", gotErr, tc.wantErr)
}
if !test.ErrorEqual(pubClient.Error(), tc.wireErr) {
t.Errorf("PublisherClient.Error() got: (%v), want: (%v)", pubClient.Error(), tc.wireErr)
}
if got, want := pubClient.wirePub.(*mockWirePublisher).Stopped, false; got != want {
t.Errorf("Publisher.Stopped: got %v, want %v", got, want)
}
})
}
}

0 comments on commit d8d8f68

Please sign in to comment.