Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub: add ErrorAs #1197

Merged
merged 5 commits into from Jan 30, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions pubsub/driver/driver.go
Expand Up @@ -87,6 +87,10 @@ type Topic interface {
// for more background.
As(i interface{}) bool

// ErrorAs allows providers to expose provider-specific types for errors.
// See As.
ErrorAs(error, interface{}) bool

// ErrorCode should return a code that describes the error, which was returned by
// one of the other methods in this interface.
ErrorCode(error) gcerrors.ErrorCode
Expand Down Expand Up @@ -137,6 +141,10 @@ type Subscription interface {
// for more background.
As(i interface{}) bool

// ErrorAs allows providers to expose provider-specific types for errors.
// See As.
ErrorAs(error, interface{}) bool

// ErrorCode should return a code that describes the error, which was returned by
// one of the other methods in this interface.
ErrorCode(error) gcerrors.ErrorCode
Expand Down
18 changes: 18 additions & 0 deletions pubsub/drivertest/drivertest.go
Expand Up @@ -72,6 +72,11 @@ type AsTest interface {
TopicCheck(t *pubsub.Topic) error
// SubscriptionCheck will be called to allow verification of Subscription.As.
SubscriptionCheck(s *pubsub.Subscription) error
// ErrorCheck will be called to allow verification of Topic.ErrorAs.
// (It is assumed that Subscription.ErrorAs shares the same implementation.)
// The error will be the one returned from SendBatch when called with
// a non-existent subscription.
ErrorCheck(t *pubsub.Topic, err error) error
}

type verifyAsFailsOnNil struct{}
Expand All @@ -94,6 +99,10 @@ func (verifyAsFailsOnNil) SubscriptionCheck(s *pubsub.Subscription) error {
return nil
}

func (verifyAsFailsOnNil) ErrorCheck(t *pubsub.Topic, err error) error {
return nil
vangent marked this conversation as resolved.
Show resolved Hide resolved
}

// RunConformanceTests runs conformance tests for provider implementations of pubsub.
func RunConformanceTests(t *testing.T, newHarness HarnessMaker, asTests []AsTest) {
t.Run("TestSendReceive", func(t *testing.T) {
Expand Down Expand Up @@ -396,6 +405,15 @@ func testAs(t *testing.T, newHarness HarnessMaker, st AsTest) {
if err := st.SubscriptionCheck(sub); err != nil {
t.Error(err)
}
dt, err := h.MakeNonexistentTopic(ctx)
if err != nil {
t.Fatal(err)
}
top = pubsub.NewTopic(dt)
defer top.Shutdown(ctx)
if err := st.ErrorCheck(top, top.Send(ctx, &pubsub.Message{})); err != nil {
t.Error(err)
}
}

// Publishes a large number of messages to topic concurrently, and then times
Expand Down
22 changes: 22 additions & 0 deletions pubsub/gcppubsub/gcppubsub.go
Expand Up @@ -21,6 +21,7 @@
// gcspubsub exposes the following types for As:
// - Topic: *raw.PublisherClient
// - Subscription: *raw.SubscriberClient
// - Error: *google.golang.org/grpc/status.Status
package gcppubsub // import "gocloud.dev/pubsub/gcppubsub"

import (
Expand All @@ -39,6 +40,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/oauth"
"google.golang.org/grpc/status"
)

const endPoint = "pubsub.googleapis.com:443"
Expand Down Expand Up @@ -137,6 +139,21 @@ func (t *topic) As(i interface{}) bool {
return true
}

// ErrorAs implements driver.Topic.ErrorAs
func (*topic) ErrorAs(err error, target interface{}) bool {
return errorAs(err, target)
}

func errorAs(err error, target interface{}) bool {
if s, ok := status.FromError(err); ok {
jba marked this conversation as resolved.
Show resolved Hide resolved
if p, ok := target.(**status.Status); ok {
*p = s
return true
}
}
return false
}

func (*topic) ErrorCode(err error) gcerrors.ErrorCode {
return gcerr.GRPCCode(err)
}
Expand Down Expand Up @@ -215,6 +232,11 @@ func (s *subscription) As(i interface{}) bool {
return true
}

// ErrorAs implements driver.Subscription.ErrorAs
func (*subscription) ErrorAs(err error, target interface{}) bool {
return errorAs(err, target)
}

func (*subscription) ErrorCode(err error) gcerrors.ErrorCode {
return gcerr.GRPCCode(err)
}
13 changes: 13 additions & 0 deletions pubsub/gcppubsub/gcppubsub_test.go
Expand Up @@ -28,6 +28,8 @@ import (
"gocloud.dev/pubsub/driver"
"gocloud.dev/pubsub/drivertest"
pubsubpb "google.golang.org/genproto/googleapis/pubsub/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

const (
Expand Down Expand Up @@ -181,6 +183,17 @@ func (gcpAsTest) SubscriptionCheck(sub *pubsub.Subscription) error {
return nil
}

func (gcpAsTest) ErrorCheck(t *pubsub.Topic, err error) error {
var s *status.Status
if !t.ErrorAs(err, &s) {
return fmt.Errorf("failed to convert %v (%T) to a gRPC Status", err, err)
}
if s.Code() != codes.NotFound {
return fmt.Errorf("got code %s, want NotFound", s.Code())
}
return nil
}

func sanitize(testName string) string {
return strings.Replace(testName, "/", "_", -1)
}
Binary file modified pubsub/gcppubsub/testdata/TestConformance/TestAs/gcp_test.replay
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file modified pubsub/gcppubsub/testdata/TestConformance/TestSendReceive.replay
Binary file not shown.
Binary file not shown.
10 changes: 10 additions & 0 deletions pubsub/mempubsub/mem.go
Expand Up @@ -88,6 +88,11 @@ func (t *topic) As(i interface{}) bool {
return true
}

// ErrorAs implements driver.Topic.ErrorAs
func (*topic) ErrorAs(error, interface{}) bool {
return false
}

// ErrorCode implements driver.Topic.ErrorCode
func (*topic) ErrorCode(error) gcerrors.ErrorCode { return gcerrors.Unknown }

Expand Down Expand Up @@ -222,5 +227,10 @@ func (*subscription) IsRetryable(error) bool { return false }
// As implements driver.Subscription.As.
func (s *subscription) As(i interface{}) bool { return false }

// ErrorAs implements driver.Subscription.ErrorAs
func (*subscription) ErrorAs(error, interface{}) bool {
return false
}

// ErrorCode implements driver.Subscription.ErrorCode
func (*subscription) ErrorCode(error) gcerrors.ErrorCode { return gcerrors.Unknown }
21 changes: 21 additions & 0 deletions pubsub/pubsub.go
Expand Up @@ -166,6 +166,27 @@ func (t *Topic) As(i interface{}) bool {
return t.driver.As(i)
}

// ErrorAs converts err to provider-specific types.
// See Topic.As for more details.
// ErrorAs panics if target is nil or not a pointer.
vangent marked this conversation as resolved.
Show resolved Hide resolved
// ErrorAs returns false if err == nil.
func (t *Topic) ErrorAs(err error, target interface{}) bool {
return errorAs(t.driver.ErrorAs, err, target)
}

func errorAs(erras func(error, interface{}) bool, err error, target interface{}) bool {
if target == nil || reflect.TypeOf(target).Kind() != reflect.Ptr {
panic("pubsub: ErrorAs target must be a non-nil pointer")
}
if err == nil {
return false
}
if e, ok := err.(*gcerr.Error); ok {
err = e.Unwrap()
}
return erras(err, target)
}

// NewTopic is for use by provider implementations.
var NewTopic = newTopic

Expand Down
1 change: 1 addition & 0 deletions pubsub/rabbitpubsub/doc.go
Expand Up @@ -28,4 +28,5 @@
// As
// - Topic: *amqp.Connection
// - Subscription: *amqp.Connection
// - Error: *amqp.Error
jba marked this conversation as resolved.
Show resolved Hide resolved
package rabbitpubsub // import "gocloud.dev/pubsub/rabbitpubsub"
28 changes: 27 additions & 1 deletion pubsub/rabbitpubsub/rabbit.go
Expand Up @@ -146,7 +146,13 @@ func (t *topic) SendBatch(ctx context.Context, ms []*driver.Message) error {
return err
}
}
return <-errc
err := <-errc
// If there is only one error, return it rather than a MultiError. That
// will work better with ErrorCode and ErrorAs.
if merr, ok := err.(MultiError); ok && len(merr) == 1 {
return merr[0]
}
return err
}

// Read from the channels established with NotifyPublish and NotifyReturn.
Expand Down Expand Up @@ -340,6 +346,21 @@ func (t *topic) As(i interface{}) bool {
return true
}

// ErrorAs implements driver.Topic.ErrorAs
func (*topic) ErrorAs(err error, target interface{}) bool {
return errorAs(err, target)
}

func errorAs(err error, target interface{}) bool {
if aerr, ok := err.(*amqp.Error); ok {
jba marked this conversation as resolved.
Show resolved Hide resolved
jba marked this conversation as resolved.
Show resolved Hide resolved
if p, ok := target.(**amqp.Error); ok {
*p = aerr
return true
}
}
return false
}

// OpenSubscription returns a *pubsub.Subscription corresponding to the named queue.
// See the package documentation for an example.
//
Expand Down Expand Up @@ -524,3 +545,8 @@ func (s *subscription) As(i interface{}) bool {
*c = conn.conn
return true
}

// ErrorAs implements driver.Subscription.ErrorAs
func (*subscription) ErrorAs(err error, target interface{}) bool {
return errorAs(err, target)
}
11 changes: 11 additions & 0 deletions pubsub/rabbitpubsub/rabbit_test.go
Expand Up @@ -293,3 +293,14 @@ func (r rabbitAsTest) SubscriptionCheck(sub *pubsub.Subscription) error {
}
return nil
}

func (rabbitAsTest) ErrorCheck(t *pubsub.Topic, err error) error {
var aerr *amqp.Error
if !t.ErrorAs(err, &aerr) {
return fmt.Errorf("failed to convert %v (%T) to an amqp.Error", err, err)
}
if aerr.Code != amqp.NotFound {
return fmt.Errorf("got code %v, want NotFound", aerr.Code)
}
return nil
}