Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Commit

Permalink
fixes #106
Browse files Browse the repository at this point in the history
  • Loading branch information
devigned committed Mar 12, 2019
1 parent a91c2d6 commit 701f11e
Show file tree
Hide file tree
Showing 16 changed files with 114 additions and 29 deletions.
2 changes: 1 addition & 1 deletion batch_disposition.go
Expand Up @@ -79,4 +79,4 @@ func (m *Message) sendDisposition(ctx context.Context, dispositionStatus Message
err = fmt.Errorf("unsupported bulk disposition status %q", dispositionStatus)
}
return err
}
}
2 changes: 1 addition & 1 deletion batch_disposition_test.go
Expand Up @@ -33,7 +33,7 @@ func TestBatchDispositionIterator(t *testing.T) {

func TestBatchDispositionUnsupportedStatus(t *testing.T) {
status := MessageStatus(suspendedDisposition)
id := uuid.UUID{}
id := uuid.UUID{}
bdi := BatchDispositionIterator{
LockTokenIDs: []*uuid.UUID{
&id,
Expand Down
15 changes: 15 additions & 0 deletions errors.go
Expand Up @@ -29,6 +29,11 @@ type (
// ErrNoMessages is returned when an operation returned no messages. It is not indicative that there will not be
// more messages in the future.
ErrNoMessages struct{}

// ErrNotFound is returned when an entity is not found (404)
ErrNotFound struct{
EntityPath string
}
)

func (e ErrMissingField) Error() string {
Expand Down Expand Up @@ -64,3 +69,13 @@ func (e ErrAMQP) Error() string {
func (e ErrNoMessages) Error() string {
return "no messages available"
}

func (e ErrNotFound) Error() string {
return fmt.Sprintf("entity at %s not found", e.EntityPath)
}

// IsErrNotFound returns true if the error argument is an ErrNotFound type
func IsErrNotFound(err error) bool {
_, ok := err.(ErrNotFound)
return ok
}
12 changes: 12 additions & 0 deletions errors_test.go
@@ -1,9 +1,12 @@
package servicebus

import (
"errors"
"fmt"
"reflect"
"testing"

"github.com/stretchr/testify/assert"
)

func TestErrMissingField_Error(t *testing.T) {
Expand Down Expand Up @@ -57,3 +60,12 @@ func TestErrIncorrectType_Error(t *testing.T) {
})
}
}

func TestErrNotFound_Error(t *testing.T) {
err := ErrNotFound{EntityPath: "/foo/bar"}
assert.Equal(t, "entity at /foo/bar not found", err.Error())
assert.True(t, IsErrNotFound(err))

otherErr := errors.New("foo")
assert.False(t, IsErrNotFound(otherErr))
}
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -15,5 +15,6 @@ require (
github.com/uber/jaeger-lib v1.5.0 // indirect
go.opencensus.io v0.15.0
go.uber.org/atomic v1.3.2 // indirect
golang.org/x/net v0.0.0-20190311183353-d8887717615a // indirect
pack.ag/amqp v0.10.2
)
7 changes: 5 additions & 2 deletions go.sum
Expand Up @@ -40,11 +40,14 @@ go.opencensus.io v0.15.0/go.mod h1:UffZAU+4sDEINUGP/B7UfBBkq4fqLu9zXAX7ke6CHW0=
go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
golang.org/x/crypto v0.0.0-20181001203147-e3636079e1a4/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519 h1:x6rhz8Y9CjbgQkccRGmELH6K+LJj7tOoh3XWeC1yaQM=
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
pack.ag/amqp v0.8.0 h1:JT0f88Hsbo5D+s8bBdleDOHvMDoYcaBW6GplAUqtxC4=
pack.ag/amqp v0.8.0/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4=
pack.ag/amqp v0.10.1 h1:+NUHSIOCRt62A7+RXL/kPOlEeljIdrpte1HNgdhIn8w=
pack.ag/amqp v0.10.1/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4=
pack.ag/amqp v0.10.2 h1:tOg29Eqx2kmgcDJa7OAjH9N3jqGA1gHf5iIAnBMsa5U=
pack.ag/amqp v0.10.2/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4=
3 changes: 2 additions & 1 deletion namespace_useragent_test.go
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/stretchr/testify/assert"
)

func TestNamespaceWithUserAgentOption(t *testing.T) {
userAgent := "custom-user-agent"
nsUserAgentOption := NamespaceWithUserAgent(userAgent)
Expand All @@ -42,4 +43,4 @@ func TestNamespaceWithoutUserAgentOption(t *testing.T) {
ns, err := NewNamespace(nsUserAgentOption)
assert.Nil(t, err)
assert.Equal(t, rootUserAgent, ns.getUserAgent())
}
}
20 changes: 16 additions & 4 deletions queue.go
Expand Up @@ -640,22 +640,34 @@ func (q *Queue) Close(ctx context.Context) error {
if q.receiver != nil {
if err := q.receiver.Close(ctx); err != nil {
if q.sender != nil {
if err := q.sender.Close(ctx); err != nil {
if err := q.sender.Close(ctx); err != nil && !isConnectionClosed(err) {
log.For(ctx).Error(err)
}
}
log.For(ctx).Error(err)
return err

if !isConnectionClosed(err) {
log.For(ctx).Error(err)
return err
}

return nil
}
}

if q.sender != nil {
return q.sender.Close(ctx)
err := q.sender.Close(ctx)
if err != nil && !isConnectionClosed(err) {
return err
}
}

return nil
}

func isConnectionClosed(err error) bool {
return err.Error() == "amqp: connection closed"
}

func (q *Queue) newReceiver(ctx context.Context, opts ...ReceiverOption) (*Receiver, error) {
span, ctx := q.startSpanFromContext(ctx, "sb.Queue.NewReceiver")
defer span.Finish()
Expand Down
4 changes: 2 additions & 2 deletions queue_manager.go
Expand Up @@ -345,7 +345,7 @@ func (qm *QueueManager) Get(ctx context.Context, name string) (*QueueEntity, err
}

if res.StatusCode == http.StatusNotFound {
return nil, nil
return nil, ErrNotFound{EntityPath: res.Request.URL.Path}
}

b, err := ioutil.ReadAll(res.Body)
Expand All @@ -358,7 +358,7 @@ func (qm *QueueManager) Get(ctx context.Context, name string) (*QueueEntity, err
err = xml.Unmarshal(b, &entry)
if err != nil {
if isEmptyFeed(b) {
return nil, nil
return nil, ErrNotFound{EntityPath: res.Request.URL.Path}
}
return nil, formatManagementError(b)
}
Expand Down
14 changes: 12 additions & 2 deletions queue_test.go
Expand Up @@ -221,7 +221,17 @@ func (suite *serviceBusSuite) TestQueueUnmarshal() {
suite.EqualValues(servicebus.EntityStatusActive, *q.Status)
}

func (suite *serviceBusSuite) TestQueueManagementWrites() {
func (suite *serviceBusSuite) TestQueueManager_NotFound() {
ns := suite.getNewSasInstance()
qm := ns.NewQueueManager()
entity, err := qm.Get(context.Background(), "somethingNotThere")
suite.Nil(entity)
suite.Require().NotNil(err)
suite.True(IsErrNotFound(err))
suite.Equal("entity at /somethingNotThere not found", err.Error())
}

func (suite *serviceBusSuite) TestQueueManagement_Writes() {
tests := map[string]func(context.Context, *testing.T, *QueueManager, string){
"TestPutDefaultQueue": testPutQueue,
}
Expand Down Expand Up @@ -771,7 +781,7 @@ func (suite *serviceBusSuite) queueMessageTest(
func makeQueue(ctx context.Context, t *testing.T, ns *Namespace, name string, opts ...QueueManagementOption) func() {
qm := ns.NewQueueManager()
entity, err := qm.Get(ctx, name)
if !assert.NoError(t, err) {
if err != nil && !IsErrNotFound(err) {
assert.FailNow(t, "could not GET a queue entity")
}

Expand Down
6 changes: 5 additions & 1 deletion subscription.go
Expand Up @@ -273,7 +273,11 @@ func (s *Subscription) RenewLocks(ctx context.Context, messages ...*Message) err
// Close the underlying connection to Service Bus
func (s *Subscription) Close(ctx context.Context) error {
if s.receiver != nil {
return s.receiver.Close(ctx)
err := s.receiver.Close(ctx)
if err != nil && !isConnectionClosed(err) {
log.For(ctx).Error(err)
return err
}
}
return nil
}
Expand Down
8 changes: 5 additions & 3 deletions subscription_manager.go
Expand Up @@ -281,7 +281,7 @@ func (sm *SubscriptionManager) Get(ctx context.Context, name string) (*Subscript
}

if res.StatusCode == http.StatusNotFound {
return nil, nil
return nil, ErrNotFound{EntityPath: res.Request.URL.Path}
}

b, err := ioutil.ReadAll(res.Body)
Expand All @@ -293,7 +293,9 @@ func (sm *SubscriptionManager) Get(ctx context.Context, name string) (*Subscript
err = xml.Unmarshal(b, &entry)
if err != nil {
if isEmptyFeed(b) {
return nil, nil
// seems the only way to catch 404 is if the feed is empty. If no subscriptions exist, the GET returns 200
// and an empty feed.
return nil, ErrNotFound{EntityPath: res.Request.URL.Path}
}
return nil, formatManagementError(b)
}
Expand All @@ -315,7 +317,7 @@ func (sm *SubscriptionManager) ListRules(ctx context.Context, subscriptionName s
}

if res.StatusCode == http.StatusNotFound {
return nil, nil
return nil, ErrNotFound{EntityPath: res.Request.URL.Path}
}

b, err := ioutil.ReadAll(res.Body)
Expand Down
21 changes: 16 additions & 5 deletions subscription_test.go
Expand Up @@ -95,7 +95,7 @@ const (
</entry>`
)

func (suite *serviceBusSuite) TestSubscriptionRuleEntryUnmarshal() {
func (suite *serviceBusSuite) TestSubscriptionRuleEntry_Unmarshal() {
var entry ruleEntry
err := xml.Unmarshal([]byte(ruleEntryContent), &entry)
suite.NoError(err)
Expand All @@ -108,7 +108,7 @@ func (suite *serviceBusSuite) TestSubscriptionRuleEntryUnmarshal() {
suite.Equal("TrueFilter", entry.Content.RuleDescription.Filter.Type)
}

func (suite *serviceBusSuite) TestSubscriptionEntryUnmarshal() {
func (suite *serviceBusSuite) TestSubscriptionEntry_Unmarshal() {
var entry subscriptionEntry
err := xml.Unmarshal([]byte(subscriptionEntryContent), &entry)
suite.NoError(err)
Expand All @@ -120,7 +120,7 @@ func (suite *serviceBusSuite) TestSubscriptionEntryUnmarshal() {
suite.Equal("PT1M", *entry.Content.SubscriptionDescription.LockDuration)
}

func (suite *serviceBusSuite) TestSubscriptionUnmarshal() {
func (suite *serviceBusSuite) TestSubscriptionEntity_Unmarshal() {
var entry subscriptionEntry
err := xml.Unmarshal([]byte(subscriptionEntryContent), &entry)
suite.NoError(err)
Expand All @@ -135,7 +135,18 @@ func (suite *serviceBusSuite) TestSubscriptionUnmarshal() {
suite.EqualValues(servicebus.EntityStatusActive, *s.Status)
}

func (suite *serviceBusSuite) TestSubscriptionManagementWrites() {
func (suite *serviceBusSuite) TestSubscriptionManager_NotFound() {
ns := suite.getNewSasInstance()
sm, err := ns.NewSubscriptionManager("foo")
suite.Require().NoError(err)
subEntity, err := sm.Get(context.Background(), "bar")
suite.Nil(subEntity)
suite.Require().NotNil(err)
suite.True(IsErrNotFound(err))
suite.Equal("entity at /foo/subscriptions/bar not found", err.Error())
}

func (suite *serviceBusSuite) TestSubscriptionManagement_Writes() {
tests := map[string]func(context.Context, *testing.T, *SubscriptionManager, string){
"TestPutDefaultSubscription": testPutSubscription,
}
Expand Down Expand Up @@ -629,7 +640,7 @@ func (suite *serviceBusSuite) subscriptionMessageTest(tests map[string]func(cont
func makeSubscription(ctx context.Context, t *testing.T, topic *Topic, name string, opts ...SubscriptionManagementOption) func() {
sm := topic.NewSubscriptionManager()
entity, err := sm.Get(ctx, name)
if !assert.NoError(t, err) {
if assert.Error(t, err) && !IsErrNotFound(err) {
assert.FailNow(t, "could not GET a subscription")
}

Expand Down
6 changes: 5 additions & 1 deletion topic.go
Expand Up @@ -127,7 +127,11 @@ func (t *Topic) Close(ctx context.Context) error {
defer span.Finish()

if t.sender != nil {
return t.sender.Close(ctx)
err := t.sender.Close(ctx)
if err != nil && !isConnectionClosed(err) {
log.For(ctx).Error(err)
return err
}
}

return nil
Expand Down
4 changes: 2 additions & 2 deletions topic_manager.go
Expand Up @@ -167,7 +167,7 @@ func (tm *TopicManager) Get(ctx context.Context, name string) (*TopicEntity, err
}

if res.StatusCode == http.StatusNotFound {
return nil, nil
return nil, ErrNotFound{EntityPath: res.Request.URL.Path}
}

b, err := ioutil.ReadAll(res.Body)
Expand All @@ -180,7 +180,7 @@ func (tm *TopicManager) Get(ctx context.Context, name string) (*TopicEntity, err
err = xml.Unmarshal(b, &entry)
if err != nil {
if isEmptyFeed(b) {
return nil, nil
return nil, ErrNotFound{EntityPath: res.Request.URL.Path}
}
return nil, formatManagementError(b)
}
Expand Down
18 changes: 14 additions & 4 deletions topic_test.go
Expand Up @@ -76,7 +76,7 @@ const (
</entry>`
)

func (suite *serviceBusSuite) TestTopicEntryUnmarshal() {
func (suite *serviceBusSuite) TestTopicEntry_Unmarshal() {
var entry topicEntry
err := xml.Unmarshal([]byte(topicEntry1), &entry)
suite.Nil(err)
Expand All @@ -88,7 +88,7 @@ func (suite *serviceBusSuite) TestTopicEntryUnmarshal() {
suite.NotNil(entry.Content)
}

func (suite *serviceBusSuite) TestTopicUnmarshal() {
func (suite *serviceBusSuite) TestTopicEntryAndDescription_Unmarshal() {
var entry atom.Entry
err := xml.Unmarshal([]byte(topicEntry1), &entry)
suite.Nil(err)
Expand All @@ -107,7 +107,17 @@ func (suite *serviceBusSuite) TestTopicUnmarshal() {
suite.EqualValues(servicebus.EntityStatusActive, *td.Status)
}

func (suite *serviceBusSuite) TestTopicManagementWrites() {
func (suite *serviceBusSuite) TestTopicManager_NotFound() {
ns := suite.getNewSasInstance()
tm := ns.NewTopicManager()
subEntity, err := tm.Get(context.Background(), "bar")
suite.Nil(subEntity)
suite.Require().NotNil(err)
suite.True(IsErrNotFound(err))
suite.Equal("entity at /bar not found", err.Error())
}

func (suite *serviceBusSuite) TestTopicManagement_Writes() {
tests := map[string]func(context.Context, *testing.T, *TopicManager, string){
"TestPutDefaultTopic": testPutTopic,
}
Expand Down Expand Up @@ -319,7 +329,7 @@ func testTopicSend(ctx context.Context, t *testing.T, topic *Topic) {
func makeTopic(ctx context.Context, t *testing.T, ns *Namespace, name string, opts ...TopicManagementOption) func() {
tm := ns.NewTopicManager()
entity, err := tm.Get(ctx, name)
if !assert.NoError(t, err) {
if err != nil && !IsErrNotFound(err) {
assert.FailNow(t, "could not GET a subscription")
}

Expand Down

0 comments on commit 701f11e

Please sign in to comment.