Skip to content

Commit

Permalink
pubsub: add ErrorAs (#1197)
Browse files Browse the repository at this point in the history
Fixes #957.
  • Loading branch information
jba committed Jan 30, 2019
1 parent 2dc1db3 commit b4346d1
Show file tree
Hide file tree
Showing 15 changed files with 152 additions and 1 deletion.
8 changes: 8 additions & 0 deletions pubsub/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ type Topic interface {
// for more background.
As(i interface{}) bool

// ErrorAs allows providers to expose provider-specific types for errors.
// See As.
ErrorAs(error, interface{}) bool

// ErrorCode should return a code that describes the error, which was returned by
// one of the other methods in this interface.
ErrorCode(error) gcerrors.ErrorCode
Expand Down Expand Up @@ -137,6 +141,10 @@ type Subscription interface {
// for more background.
As(i interface{}) bool

// ErrorAs allows providers to expose provider-specific types for errors.
// See As.
ErrorAs(error, interface{}) bool

// ErrorCode should return a code that describes the error, which was returned by
// one of the other methods in this interface.
ErrorCode(error) gcerrors.ErrorCode
Expand Down
24 changes: 24 additions & 0 deletions pubsub/drivertest/drivertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ type AsTest interface {
TopicCheck(t *pubsub.Topic) error
// SubscriptionCheck will be called to allow verification of Subscription.As.
SubscriptionCheck(s *pubsub.Subscription) error
// ErrorCheck will be called to allow verification of Topic.ErrorAs.
// (It is assumed that Subscription.ErrorAs shares the same implementation.)
// The error will be the one returned from SendBatch when called with
// a non-existent subscription.
ErrorCheck(t *pubsub.Topic, err error) error
}

type verifyAsFailsOnNil struct{}
Expand All @@ -94,6 +99,16 @@ func (verifyAsFailsOnNil) SubscriptionCheck(s *pubsub.Subscription) error {
return nil
}

func (verifyAsFailsOnNil) ErrorCheck(t *pubsub.Topic, err error) (ret error) {
defer func() {
if recover() == nil {
ret = errors.New("want Topic.ErrorAs to panic when passed nil")
}
}()
t.ErrorAs(err, nil)
return nil
}

// RunConformanceTests runs conformance tests for provider implementations of pubsub.
func RunConformanceTests(t *testing.T, newHarness HarnessMaker, asTests []AsTest) {
t.Run("TestSendReceive", func(t *testing.T) {
Expand Down Expand Up @@ -396,6 +411,15 @@ func testAs(t *testing.T, newHarness HarnessMaker, st AsTest) {
if err := st.SubscriptionCheck(sub); err != nil {
t.Error(err)
}
dt, err := h.MakeNonexistentTopic(ctx)
if err != nil {
t.Fatal(err)
}
top = pubsub.NewTopic(dt)
defer top.Shutdown(ctx)
if err := st.ErrorCheck(top, top.Send(ctx, &pubsub.Message{})); err != nil {
t.Error(err)
}
}

// Publishes a large number of messages to topic concurrently, and then times
Expand Down
25 changes: 25 additions & 0 deletions pubsub/gcppubsub/gcppubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
// gcspubsub exposes the following types for As:
// - Topic: *raw.PublisherClient
// - Subscription: *raw.SubscriberClient
// - Error: *google.golang.org/grpc/status.Status
package gcppubsub // import "gocloud.dev/pubsub/gcppubsub"

import (
Expand All @@ -39,6 +40,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/oauth"
"google.golang.org/grpc/status"
)

const endPoint = "pubsub.googleapis.com:443"
Expand Down Expand Up @@ -137,6 +139,24 @@ func (t *topic) As(i interface{}) bool {
return true
}

// ErrorAs implements driver.Topic.ErrorAs
func (*topic) ErrorAs(err error, target interface{}) bool {
return errorAs(err, target)
}

func errorAs(err error, target interface{}) bool {
s, ok := status.FromError(err)
if !ok {
return false
}
p, ok := target.(**status.Status)
if !ok {
return false
}
*p = s
return true
}

func (*topic) ErrorCode(err error) gcerrors.ErrorCode {
return gcerr.GRPCCode(err)
}
Expand Down Expand Up @@ -215,6 +235,11 @@ func (s *subscription) As(i interface{}) bool {
return true
}

// ErrorAs implements driver.Subscription.ErrorAs
func (*subscription) ErrorAs(err error, target interface{}) bool {
return errorAs(err, target)
}

func (*subscription) ErrorCode(err error) gcerrors.ErrorCode {
return gcerr.GRPCCode(err)
}
13 changes: 13 additions & 0 deletions pubsub/gcppubsub/gcppubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"gocloud.dev/pubsub/driver"
"gocloud.dev/pubsub/drivertest"
pubsubpb "google.golang.org/genproto/googleapis/pubsub/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

const (
Expand Down Expand Up @@ -181,6 +183,17 @@ func (gcpAsTest) SubscriptionCheck(sub *pubsub.Subscription) error {
return nil
}

func (gcpAsTest) ErrorCheck(t *pubsub.Topic, err error) error {
var s *status.Status
if !t.ErrorAs(err, &s) {
return fmt.Errorf("failed to convert %v (%T) to a gRPC Status", err, err)
}
if s.Code() != codes.NotFound {
return fmt.Errorf("got code %s, want NotFound", s.Code())
}
return nil
}

func sanitize(testName string) string {
return strings.Replace(testName, "/", "_", -1)
}
Binary file modified pubsub/gcppubsub/testdata/TestConformance/TestAs/gcp_test.replay
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file modified pubsub/gcppubsub/testdata/TestConformance/TestSendReceive.replay
Binary file not shown.
Binary file not shown.
10 changes: 10 additions & 0 deletions pubsub/mempubsub/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ func (t *topic) As(i interface{}) bool {
return true
}

// ErrorAs implements driver.Topic.ErrorAs
func (*topic) ErrorAs(error, interface{}) bool {
return false
}

// ErrorCode implements driver.Topic.ErrorCode
func (*topic) ErrorCode(error) gcerrors.ErrorCode { return gcerrors.Unknown }

Expand Down Expand Up @@ -222,5 +227,10 @@ func (*subscription) IsRetryable(error) bool { return false }
// As implements driver.Subscription.As.
func (s *subscription) As(i interface{}) bool { return false }

// ErrorAs implements driver.Subscription.ErrorAs
func (*subscription) ErrorAs(error, interface{}) bool {
return false
}

// ErrorCode implements driver.Subscription.ErrorCode
func (*subscription) ErrorCode(error) gcerrors.ErrorCode { return gcerrors.Unknown }
21 changes: 21 additions & 0 deletions pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,27 @@ func (t *Topic) As(i interface{}) bool {
return t.driver.As(i)
}

// ErrorAs converts err to provider-specific types.
// See Topic.As for more details.
// ErrorAs panics if target is nil or not a pointer.
// ErrorAs returns false if err == nil.
func (t *Topic) ErrorAs(err error, target interface{}) bool {
return errorAs(t.driver.ErrorAs, err, target)
}

func errorAs(erras func(error, interface{}) bool, err error, target interface{}) bool {
if target == nil || reflect.TypeOf(target).Kind() != reflect.Ptr {
panic("pubsub: ErrorAs target must be a non-nil pointer")
}
if err == nil {
return false
}
if e, ok := err.(*gcerr.Error); ok {
err = e.Unwrap()
}
return erras(err, target)
}

// NewTopic is for use by provider implementations.
var NewTopic = newTopic

Expand Down
1 change: 1 addition & 0 deletions pubsub/rabbitpubsub/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@
// As
// - Topic: *amqp.Connection
// - Subscription: *amqp.Connection
// - Error: *amqp.Error and MultiError
package rabbitpubsub // import "gocloud.dev/pubsub/rabbitpubsub"
34 changes: 33 additions & 1 deletion pubsub/rabbitpubsub/rabbit.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,13 @@ func (t *topic) SendBatch(ctx context.Context, ms []*driver.Message) error {
return err
}
}
return <-errc
err := <-errc
// If there is only one error, return it rather than a MultiError. That
// will work better with ErrorCode and ErrorAs.
if merr, ok := err.(MultiError); ok && len(merr) == 1 {
return merr[0]
}
return err
}

// Read from the channels established with NotifyPublish and NotifyReturn.
Expand Down Expand Up @@ -340,6 +346,27 @@ func (t *topic) As(i interface{}) bool {
return true
}

// ErrorAs implements driver.Topic.ErrorAs
func (*topic) ErrorAs(err error, target interface{}) bool {
return errorAs(err, target)
}

func errorAs(err error, target interface{}) bool {
switch e := err.(type) {
case *amqp.Error:
if p, ok := target.(**amqp.Error); ok {
*p = e
return true
}
case MultiError:
if p, ok := target.(*MultiError); ok {
*p = e
return true
}
}
return false
}

// OpenSubscription returns a *pubsub.Subscription corresponding to the named queue.
// See the package documentation for an example.
//
Expand Down Expand Up @@ -524,3 +551,8 @@ func (s *subscription) As(i interface{}) bool {
*c = conn.conn
return true
}

// ErrorAs implements driver.Subscription.ErrorAs
func (*subscription) ErrorAs(err error, target interface{}) bool {
return errorAs(err, target)
}
17 changes: 17 additions & 0 deletions pubsub/rabbitpubsub/rabbit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,20 @@ func (r rabbitAsTest) SubscriptionCheck(sub *pubsub.Subscription) error {
}
return nil
}

func (rabbitAsTest) ErrorCheck(t *pubsub.Topic, err error) error {
var aerr *amqp.Error
if !t.ErrorAs(err, &aerr) {
return fmt.Errorf("failed to convert %v (%T) to an amqp.Error", err, err)
}
if aerr.Code != amqp.NotFound {
return fmt.Errorf("got code %v, want NotFound", aerr.Code)
}

err = MultiError{err}
var merr MultiError
if !t.ErrorAs(err, &merr) {
return fmt.Errorf("failed to convert %v (%T) to a MultiError", err, err)
}
return nil
}

0 comments on commit b4346d1

Please sign in to comment.