diff --git a/pubsub/driver/driver.go b/pubsub/driver/driver.go index 3fc6ae45aa..8e49ebe9c0 100644 --- a/pubsub/driver/driver.go +++ b/pubsub/driver/driver.go @@ -87,6 +87,10 @@ type Topic interface { // for more background. As(i interface{}) bool + // ErrorAs allows providers to expose provider-specific types for errors. + // See As. + ErrorAs(error, interface{}) bool + // ErrorCode should return a code that describes the error, which was returned by // one of the other methods in this interface. ErrorCode(error) gcerrors.ErrorCode @@ -137,6 +141,10 @@ type Subscription interface { // for more background. As(i interface{}) bool + // ErrorAs allows providers to expose provider-specific types for errors. + // See As. + ErrorAs(error, interface{}) bool + // ErrorCode should return a code that describes the error, which was returned by // one of the other methods in this interface. ErrorCode(error) gcerrors.ErrorCode diff --git a/pubsub/drivertest/drivertest.go b/pubsub/drivertest/drivertest.go index 5d034e28f2..f92c9b5455 100644 --- a/pubsub/drivertest/drivertest.go +++ b/pubsub/drivertest/drivertest.go @@ -72,6 +72,11 @@ type AsTest interface { TopicCheck(t *pubsub.Topic) error // SubscriptionCheck will be called to allow verification of Subscription.As. SubscriptionCheck(s *pubsub.Subscription) error + // ErrorCheck will be called to allow verification of Topic.ErrorAs. + // (It is assumed that Subscription.ErrorAs shares the same implementation.) + // The error will be the one returned from SendBatch when called with + // a non-existent subscription. + ErrorCheck(t *pubsub.Topic, err error) error } type verifyAsFailsOnNil struct{} @@ -94,6 +99,16 @@ func (verifyAsFailsOnNil) SubscriptionCheck(s *pubsub.Subscription) error { return nil } +func (verifyAsFailsOnNil) ErrorCheck(t *pubsub.Topic, err error) (ret error) { + defer func() { + if recover() == nil { + ret = errors.New("want Topic.ErrorAs to panic when passed nil") + } + }() + t.ErrorAs(err, nil) + return nil +} + // RunConformanceTests runs conformance tests for provider implementations of pubsub. func RunConformanceTests(t *testing.T, newHarness HarnessMaker, asTests []AsTest) { t.Run("TestSendReceive", func(t *testing.T) { @@ -396,6 +411,15 @@ func testAs(t *testing.T, newHarness HarnessMaker, st AsTest) { if err := st.SubscriptionCheck(sub); err != nil { t.Error(err) } + dt, err := h.MakeNonexistentTopic(ctx) + if err != nil { + t.Fatal(err) + } + top = pubsub.NewTopic(dt) + defer top.Shutdown(ctx) + if err := st.ErrorCheck(top, top.Send(ctx, &pubsub.Message{})); err != nil { + t.Error(err) + } } // Publishes a large number of messages to topic concurrently, and then times diff --git a/pubsub/gcppubsub/gcppubsub.go b/pubsub/gcppubsub/gcppubsub.go index dd0b4da2a8..fd761a45d1 100644 --- a/pubsub/gcppubsub/gcppubsub.go +++ b/pubsub/gcppubsub/gcppubsub.go @@ -21,6 +21,7 @@ // gcspubsub exposes the following types for As: // - Topic: *raw.PublisherClient // - Subscription: *raw.SubscriberClient +// - Error: *google.golang.org/grpc/status.Status package gcppubsub // import "gocloud.dev/pubsub/gcppubsub" import ( @@ -39,6 +40,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/oauth" + "google.golang.org/grpc/status" ) const endPoint = "pubsub.googleapis.com:443" @@ -137,6 +139,24 @@ func (t *topic) As(i interface{}) bool { return true } +// ErrorAs implements driver.Topic.ErrorAs +func (*topic) ErrorAs(err error, target interface{}) bool { + return errorAs(err, target) +} + +func errorAs(err error, target interface{}) bool { + s, ok := status.FromError(err) + if !ok { + return false + } + p, ok := target.(**status.Status) + if !ok { + return false + } + *p = s + return true +} + func (*topic) ErrorCode(err error) gcerrors.ErrorCode { return gcerr.GRPCCode(err) } @@ -215,6 +235,11 @@ func (s *subscription) As(i interface{}) bool { return true } +// ErrorAs implements driver.Subscription.ErrorAs +func (*subscription) ErrorAs(err error, target interface{}) bool { + return errorAs(err, target) +} + func (*subscription) ErrorCode(err error) gcerrors.ErrorCode { return gcerr.GRPCCode(err) } diff --git a/pubsub/gcppubsub/gcppubsub_test.go b/pubsub/gcppubsub/gcppubsub_test.go index b035164644..79d65314ee 100644 --- a/pubsub/gcppubsub/gcppubsub_test.go +++ b/pubsub/gcppubsub/gcppubsub_test.go @@ -28,6 +28,8 @@ import ( "gocloud.dev/pubsub/driver" "gocloud.dev/pubsub/drivertest" pubsubpb "google.golang.org/genproto/googleapis/pubsub/v1" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) const ( @@ -181,6 +183,17 @@ func (gcpAsTest) SubscriptionCheck(sub *pubsub.Subscription) error { return nil } +func (gcpAsTest) ErrorCheck(t *pubsub.Topic, err error) error { + var s *status.Status + if !t.ErrorAs(err, &s) { + return fmt.Errorf("failed to convert %v (%T) to a gRPC Status", err, err) + } + if s.Code() != codes.NotFound { + return fmt.Errorf("got code %s, want NotFound", s.Code()) + } + return nil +} + func sanitize(testName string) string { return strings.Replace(testName, "/", "_", -1) } diff --git a/pubsub/gcppubsub/testdata/TestConformance/TestAs/gcp_test.replay b/pubsub/gcppubsub/testdata/TestConformance/TestAs/gcp_test.replay index f2150112c9..89073b03d1 100644 Binary files a/pubsub/gcppubsub/testdata/TestConformance/TestAs/gcp_test.replay and b/pubsub/gcppubsub/testdata/TestConformance/TestAs/gcp_test.replay differ diff --git a/pubsub/gcppubsub/testdata/TestConformance/TestAs/verify_As_returns_false_when_passed_nil.replay b/pubsub/gcppubsub/testdata/TestConformance/TestAs/verify_As_returns_false_when_passed_nil.replay index 8fe3814413..032ecdb4f0 100644 Binary files a/pubsub/gcppubsub/testdata/TestConformance/TestAs/verify_As_returns_false_when_passed_nil.replay and b/pubsub/gcppubsub/testdata/TestConformance/TestAs/verify_As_returns_false_when_passed_nil.replay differ diff --git a/pubsub/gcppubsub/testdata/TestConformance/TestNonExistentSubscriptionSucceedsOnOpenButFailsOnSend.replay b/pubsub/gcppubsub/testdata/TestConformance/TestNonExistentSubscriptionSucceedsOnOpenButFailsOnSend.replay index ed2f8b572a..aab2165097 100644 Binary files a/pubsub/gcppubsub/testdata/TestConformance/TestNonExistentSubscriptionSucceedsOnOpenButFailsOnSend.replay and b/pubsub/gcppubsub/testdata/TestConformance/TestNonExistentSubscriptionSucceedsOnOpenButFailsOnSend.replay differ diff --git a/pubsub/gcppubsub/testdata/TestConformance/TestNonExistentTopicSucceedsOnOpenButFailsOnSend.replay b/pubsub/gcppubsub/testdata/TestConformance/TestNonExistentTopicSucceedsOnOpenButFailsOnSend.replay index 5cdf7424b4..47107aefd3 100644 Binary files a/pubsub/gcppubsub/testdata/TestConformance/TestNonExistentTopicSucceedsOnOpenButFailsOnSend.replay and b/pubsub/gcppubsub/testdata/TestConformance/TestNonExistentTopicSucceedsOnOpenButFailsOnSend.replay differ diff --git a/pubsub/gcppubsub/testdata/TestConformance/TestSendReceive.replay b/pubsub/gcppubsub/testdata/TestConformance/TestSendReceive.replay index e6fe60e6ee..2951187c05 100644 Binary files a/pubsub/gcppubsub/testdata/TestConformance/TestSendReceive.replay and b/pubsub/gcppubsub/testdata/TestConformance/TestSendReceive.replay differ diff --git a/pubsub/gcppubsub/testdata/TestConformance/TestSendReceiveTwo.replay b/pubsub/gcppubsub/testdata/TestConformance/TestSendReceiveTwo.replay index f8304a006a..9eb845832e 100644 Binary files a/pubsub/gcppubsub/testdata/TestConformance/TestSendReceiveTwo.replay and b/pubsub/gcppubsub/testdata/TestConformance/TestSendReceiveTwo.replay differ diff --git a/pubsub/mempubsub/mem.go b/pubsub/mempubsub/mem.go index 45bd389886..8cfa5b9804 100644 --- a/pubsub/mempubsub/mem.go +++ b/pubsub/mempubsub/mem.go @@ -88,6 +88,11 @@ func (t *topic) As(i interface{}) bool { return true } +// ErrorAs implements driver.Topic.ErrorAs +func (*topic) ErrorAs(error, interface{}) bool { + return false +} + // ErrorCode implements driver.Topic.ErrorCode func (*topic) ErrorCode(error) gcerrors.ErrorCode { return gcerrors.Unknown } @@ -222,5 +227,10 @@ func (*subscription) IsRetryable(error) bool { return false } // As implements driver.Subscription.As. func (s *subscription) As(i interface{}) bool { return false } +// ErrorAs implements driver.Subscription.ErrorAs +func (*subscription) ErrorAs(error, interface{}) bool { + return false +} + // ErrorCode implements driver.Subscription.ErrorCode func (*subscription) ErrorCode(error) gcerrors.ErrorCode { return gcerrors.Unknown } diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index 167cafeef8..9e4146ed62 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -166,6 +166,27 @@ func (t *Topic) As(i interface{}) bool { return t.driver.As(i) } +// ErrorAs converts err to provider-specific types. +// See Topic.As for more details. +// ErrorAs panics if target is nil or not a pointer. +// ErrorAs returns false if err == nil. +func (t *Topic) ErrorAs(err error, target interface{}) bool { + return errorAs(t.driver.ErrorAs, err, target) +} + +func errorAs(erras func(error, interface{}) bool, err error, target interface{}) bool { + if target == nil || reflect.TypeOf(target).Kind() != reflect.Ptr { + panic("pubsub: ErrorAs target must be a non-nil pointer") + } + if err == nil { + return false + } + if e, ok := err.(*gcerr.Error); ok { + err = e.Unwrap() + } + return erras(err, target) +} + // NewTopic is for use by provider implementations. var NewTopic = newTopic diff --git a/pubsub/rabbitpubsub/doc.go b/pubsub/rabbitpubsub/doc.go index 0396a86a07..937b38b911 100644 --- a/pubsub/rabbitpubsub/doc.go +++ b/pubsub/rabbitpubsub/doc.go @@ -28,4 +28,5 @@ // As // - Topic: *amqp.Connection // - Subscription: *amqp.Connection +// - Error: *amqp.Error and MultiError package rabbitpubsub // import "gocloud.dev/pubsub/rabbitpubsub" diff --git a/pubsub/rabbitpubsub/rabbit.go b/pubsub/rabbitpubsub/rabbit.go index 79f7442b7a..4e1506e8c4 100644 --- a/pubsub/rabbitpubsub/rabbit.go +++ b/pubsub/rabbitpubsub/rabbit.go @@ -146,7 +146,13 @@ func (t *topic) SendBatch(ctx context.Context, ms []*driver.Message) error { return err } } - return <-errc + err := <-errc + // If there is only one error, return it rather than a MultiError. That + // will work better with ErrorCode and ErrorAs. + if merr, ok := err.(MultiError); ok && len(merr) == 1 { + return merr[0] + } + return err } // Read from the channels established with NotifyPublish and NotifyReturn. @@ -340,6 +346,27 @@ func (t *topic) As(i interface{}) bool { return true } +// ErrorAs implements driver.Topic.ErrorAs +func (*topic) ErrorAs(err error, target interface{}) bool { + return errorAs(err, target) +} + +func errorAs(err error, target interface{}) bool { + switch e := err.(type) { + case *amqp.Error: + if p, ok := target.(**amqp.Error); ok { + *p = e + return true + } + case MultiError: + if p, ok := target.(*MultiError); ok { + *p = e + return true + } + } + return false +} + // OpenSubscription returns a *pubsub.Subscription corresponding to the named queue. // See the package documentation for an example. // @@ -524,3 +551,8 @@ func (s *subscription) As(i interface{}) bool { *c = conn.conn return true } + +// ErrorAs implements driver.Subscription.ErrorAs +func (*subscription) ErrorAs(err error, target interface{}) bool { + return errorAs(err, target) +} diff --git a/pubsub/rabbitpubsub/rabbit_test.go b/pubsub/rabbitpubsub/rabbit_test.go index 7906359914..ccb8675183 100644 --- a/pubsub/rabbitpubsub/rabbit_test.go +++ b/pubsub/rabbitpubsub/rabbit_test.go @@ -293,3 +293,20 @@ func (r rabbitAsTest) SubscriptionCheck(sub *pubsub.Subscription) error { } return nil } + +func (rabbitAsTest) ErrorCheck(t *pubsub.Topic, err error) error { + var aerr *amqp.Error + if !t.ErrorAs(err, &aerr) { + return fmt.Errorf("failed to convert %v (%T) to an amqp.Error", err, err) + } + if aerr.Code != amqp.NotFound { + return fmt.Errorf("got code %v, want NotFound", aerr.Code) + } + + err = MultiError{err} + var merr MultiError + if !t.ErrorAs(err, &merr) { + return fmt.Errorf("failed to convert %v (%T) to a MultiError", err, err) + } + return nil +}