Skip to content

Commit

Permalink
Merge pull request #87 from Azure/sender-close
Browse files Browse the repository at this point in the history
close sender when hub is closed and close gracefully
  • Loading branch information
devigned committed Dec 17, 2018
2 parents 5bee80e + f71fe8e commit 804d4a4
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 53 deletions.
8 changes: 4 additions & 4 deletions .travis.yml
Expand Up @@ -5,10 +5,10 @@ go:
- 1.11.x
before_install:
- cd ${TRAVIS_HOME}
- go get github.com/mattn/goveralls
- go get golang.org/x/tools/cmd/cover
- go get github.com/fzipp/gocyclo
- go get golang.org/x/lint/golint
- go get -u github.com/mattn/goveralls
- go get -u golang.org/x/tools/cmd/cover
- go get -u github.com/fzipp/gocyclo
- go get -u golang.org/x/lint/golint
- cd ${TRAVIS_BUILD_DIR}
jobs:
include:
Expand Down
4 changes: 4 additions & 0 deletions changelog.md
Expand Up @@ -2,6 +2,10 @@

## `head`

## `v1.1.1`
- close sender when hub is closed
- ensure links, session and connections are closed gracefully

## `v1.1.0`
- add receive option to receive from a timestamp
- fix sender recovery on temporary network failures
Expand Down
5 changes: 0 additions & 5 deletions go.mod
Expand Up @@ -11,16 +11,11 @@ require (
github.com/fortytw2/leaktest v1.2.0 // indirect
github.com/joho/godotenv v1.3.0
github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7
github.com/kr/pretty v0.1.0 // indirect
github.com/mitchellh/go-homedir v1.0.0 // indirect
github.com/mitchellh/mapstructure v1.1.2
github.com/pkg/errors v0.8.0 // indirect
github.com/sirupsen/logrus v1.1.1
github.com/stretchr/testify v1.2.2
go.opencensus.io v0.18.0
golang.org/x/net v0.0.0-20181017193950-04a2e542c03f // indirect
golang.org/x/sys v0.0.0-20181011152604-fa43e7bc11ba // indirect
google.golang.org/api v0.0.0-20181018171847-1ee037c97071 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
pack.ag/amqp v0.10.2
)
18 changes: 3 additions & 15 deletions go.sum
@@ -1,4 +1,3 @@
git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999 h1:sihTnRgTOUSCQz0iS0pjZuFQy/z7GXCJgSBg3+rZKHw=
git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg=
github.com/Azure/azure-amqp-common-go v1.1.0 h1:N9wkRiVrtVJZ2KNkZICF8LaUHh9xeM1OzbaURJL6DfM=
github.com/Azure/azure-amqp-common-go v1.1.0/go.mod h1:UyBV3i5qcJW9DWm0K/R8de0/BBABq/CExmiWNUdoSvQ=
Expand Down Expand Up @@ -31,11 +30,6 @@ github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7 h1:K//n/AqR5HjG3q
github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7/go.mod h1:2iMrUgbbvHEiQClaW2NsSzMyGHqN+rDFqY705q49KG0=
github.com/konsorten/go-windows-terminal-sequences v0.0.0-20180402223658-b729f2633dfe h1:CHRGQ8V7OlCYtwaKPJi3iA7J+YdNKdo8j7nG5IgDhjs=
github.com/konsorten/go-windows-terminal-sequences v0.0.0-20180402223658-b729f2633dfe/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/mitchellh/go-homedir v1.0.0 h1:vKb8ShqSby24Yrqr/yDYkuFz8d0WUjys40rvnGC8aR0=
github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
Expand All @@ -58,25 +52,19 @@ go.opencensus.io v0.18.0 h1:Mk5rgZcggtbvtAun5aJzAtjKKN/t0R3jJPlWILlv938=
go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd h1:nTDtHvHSdCn1m6ITfMRqtOd/9+7a3s8RBNOZ3eYZzJA=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181017193950-04a2e542c03f h1:4pRM7zYwpBjCnfA1jRmhItLxYJkaEnsmuAcRtA347DA=
golang.org/x/net v0.0.0-20181017193950-04a2e542c03f/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e h1:o3PsSEY8E4eXWkXrIP9YJALUkVZqzHJT5DOasTyn8Vs=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181011152604-fa43e7bc11ba h1:nZJIJPGow0Kf9bU9QTc1U6OXbs/7Hu4e+cNv+hxH+Zc=
golang.org/x/sys v0.0.0-20181011152604-fa43e7bc11ba/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
google.golang.org/api v0.0.0-20180910000450-7ca32eb868bf/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
google.golang.org/api v0.0.0-20181018171847-1ee037c97071 h1:qjOR+Rh/K+7PymkUirbqZFFawRD+wReBwkcC1VJBT/U=
google.golang.org/api v0.0.0-20181018171847-1ee037c97071/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
google.golang.org/genproto v0.0.0-20180831171423-11092d34479b/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405 h1:829vOVxxusYHC+IqBtkX5mbKtsY9fheQiQn0MZRVLfQ=
gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
pack.ag/amqp v0.10.2 h1:tOg29Eqx2kmgcDJa7OAjH9N3jqGA1gHf5iIAnBMsa5U=
pack.ag/amqp v0.10.2/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4=
35 changes: 30 additions & 5 deletions hub.go
Expand Up @@ -51,7 +51,7 @@ const (
rootUserAgent = "/golang-event-hubs"

// Version is the semantic version number
Version = "1.1.0"
Version = "1.1.1"
)

type (
Expand Down Expand Up @@ -488,16 +488,18 @@ func (h *Hub) GetRuntimeInformation(ctx context.Context) (*HubRuntimeInformation
span, ctx := h.startSpanFromContext(ctx, "eh.Hub.GetRuntimeInformation")
defer span.End()
client := newClient(h.namespace, h.name)
conn, err := h.namespace.newConnection()
c, err := h.namespace.newConnection()
if err != nil {
log.For(ctx).Error(err)
return nil, err
}
info, err := client.GetHubRuntimeInformation(ctx, conn)

info, err := client.GetHubRuntimeInformation(ctx, c)
if err != nil {
log.For(ctx).Error(err)
return nil, err
}

return info, nil
}

Expand All @@ -506,14 +508,16 @@ func (h *Hub) GetPartitionInformation(ctx context.Context, partitionID string) (
span, ctx := h.startSpanFromContext(ctx, "eh.Hub.GetPartitionInformation")
defer span.End()
client := newClient(h.namespace, h.name)
conn, err := h.namespace.newConnection()
c, err := h.namespace.newConnection()
if err != nil {
return nil, err
}
info, err := client.GetHubPartitionRuntimeInformation(ctx, conn, partitionID)

info, err := client.GetHubPartitionRuntimeInformation(ctx, c, partitionID)
if err != nil {
return nil, err
}

return info, nil
}

Expand All @@ -522,6 +526,27 @@ func (h *Hub) Close(ctx context.Context) error {
span, ctx := h.startSpanFromContext(ctx, "eh.Hub.Close")
defer span.End()

if h.sender != nil {
if err := h.sender.Close(ctx); err != nil {
log.For(ctx).Error(err)
if rErr := h.closeReceivers(ctx); rErr != nil {
log.For(ctx).Error(rErr)
}

// return originating error
return err
}
}

// close receivers and return error
return h.closeReceivers(ctx)
}

// closeReceivers will close the receivers on the hub and return the last error
func (h *Hub) closeReceivers(ctx context.Context) error {
span, ctx := h.startSpanFromContext(ctx, "eh.Hub.closeReceivers")
defer span.End()

var lastErr error
for _, r := range h.receivers {
if err := r.Close(ctx); err != nil {
Expand Down
40 changes: 19 additions & 21 deletions internal/test/suite.go
Expand Up @@ -42,8 +42,6 @@ import (
"github.com/joho/godotenv"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/suite"
"go.opencensus.io/exporter/jaeger"
"go.opencensus.io/trace"
)

var (
Expand Down Expand Up @@ -108,9 +106,9 @@ func (suite *BaseSuite) SetupSuite() {
suite.FailNow("failed provisioning")
}

if !suite.NoError(suite.setupTracing()) {
suite.FailNow("failed to setup tracing")
}
//if !suite.NoError(suite.setupTracing()) {
// suite.FailNow("failed to setup tracing")
//}
}

// TearDownSuite might one day destroy all of the resources in the suite, but I'm not sure we want to do that just yet...
Expand Down Expand Up @@ -354,22 +352,22 @@ func getRmGroupClientWithToken(subscriptionID string, env azure.Environment) *rm
return &groupsClient
}

func (suite *BaseSuite) setupTracing() error {
if os.Getenv("TRACING") != "true" {
return nil
}
exporter, err := jaeger.NewExporter(jaeger.Options{
AgentEndpoint: "localhost:6831",
Process: jaeger.Process{
ServiceName: "eh-tests",
},
})
if err != nil {
return err
}
trace.RegisterExporter(exporter)
return nil
}
//func (suite *BaseSuite) setupTracing() error {
// if os.Getenv("TRACING") != "true" {
// return nil
// }
// exporter, err := jaeger.NewExporter(jaeger.Options{
// AgentEndpoint: "localhost:6831",
// Process: jaeger.Process{
// ServiceName: "eh-tests",
// },
// })
// if err != nil {
// return err
// }
// trace.RegisterExporter(exporter)
// return nil
//}

// MustGetEnv will panic or return the env var for a given string key
func MustGetEnv(key string) string {
Expand Down
24 changes: 24 additions & 0 deletions receiver.go
Expand Up @@ -163,6 +163,30 @@ func (r *receiver) Close(ctx context.Context) error {
r.done()
}

err := r.receiver.Close(ctx)
if err != nil {
log.For(ctx).Error(err)
if sessionErr := r.session.Close(ctx); sessionErr != nil {
log.For(ctx).Error(sessionErr)
}

if connErr := r.connection.Close(); connErr != nil {
log.For(ctx).Error(connErr)
}

return err
}

if sessionErr := r.session.Close(ctx); sessionErr != nil {
log.For(ctx).Error(sessionErr)

if connErr := r.connection.Close(); connErr != nil {
log.For(ctx).Error(connErr)
}

return sessionErr
}

return r.connection.Close()
}

Expand Down
26 changes: 25 additions & 1 deletion sender.go
Expand Up @@ -95,6 +95,30 @@ func (s *sender) Close(ctx context.Context) error {
span, _ := s.startProducerSpanFromContext(ctx, "eh.sender.Close")
defer span.End()

err := s.sender.Close(ctx)
if err != nil {
log.For(ctx).Error(err)
if sessionErr := s.session.Close(ctx); sessionErr != nil {
log.For(ctx).Error(sessionErr)
}

if connErr := s.connection.Close(); connErr != nil {
log.For(ctx).Error(connErr)
}

return err
}

if sessionErr := s.session.Close(ctx); sessionErr != nil {
log.For(ctx).Error(sessionErr)

if connErr := s.connection.Close(); connErr != nil {
log.For(ctx).Error(connErr)
}

return sessionErr
}

return s.connection.Close()
}

Expand Down Expand Up @@ -147,7 +171,7 @@ func (s *sender) trySend(ctx context.Context, evt eventer) error {
switch err.(type) {
case *amqp.Error, *amqp.DetachError, net.Error:
if netErr, ok := err.(net.Error); ok {
if !netErr.Temporary(){
if !netErr.Temporary() {
return netErr
}
}
Expand Down
4 changes: 2 additions & 2 deletions storage/credential_test.go
Expand Up @@ -74,7 +74,7 @@ func (ts *testSuite) TestCredential() {
}

containerURL := azblob.NewContainerURL(*fooURL, pipeline)
defer func(){
defer func() {
if res, err := containerURL.Delete(ctx, azblob.ContainerAccessConditions{}); err != nil {
log.Fatal(err, res)
}
Expand All @@ -89,4 +89,4 @@ func (ts *testSuite) TestCredential() {
if err != nil {
ts.T().Error(err)
}
}
}

0 comments on commit 804d4a4

Please sign in to comment.