Skip to content

Commit

Permalink
Merge pull request #104 from Azure/issue-101
Browse files Browse the repository at this point in the history
bump amqp version and ensure sender is using unsettled
  • Loading branch information
devigned committed Apr 17, 2019
2 parents 783ca54 + 56547db commit aca4cab
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 12 deletions.
3 changes: 2 additions & 1 deletion changelog.md
@@ -1,6 +1,7 @@
# Change Log

## `head`
## `v1.1.4`
- update to amqp 0.11.0 and change sender to use unsettled rather than receiver second mode

## `v1.1.3`
- fix leak in partition persistence
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -14,5 +14,5 @@ require (
github.com/sirupsen/logrus v1.1.1
github.com/stretchr/testify v1.2.2
go.opencensus.io v0.18.0
pack.ag/amqp v0.10.2
pack.ag/amqp v0.11.0
)
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -73,5 +73,5 @@ gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405 h1:829vOVxxusYHC+IqBtkX5mbK
gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
pack.ag/amqp v0.8.0/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4=
pack.ag/amqp v0.10.2 h1:tOg29Eqx2kmgcDJa7OAjH9N3jqGA1gHf5iIAnBMsa5U=
pack.ag/amqp v0.10.2/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4=
pack.ag/amqp v0.11.0 h1:ot/IA0enDkt4/c8xfbCO7AZzjM4bHys/UffnFmnHUnU=
pack.ag/amqp v0.11.0/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4=
27 changes: 21 additions & 6 deletions hub.go
Expand Up @@ -51,7 +51,7 @@ const (
rootUserAgent = "/golang-event-hubs"

// Version is the semantic version number
Version = "1.1.3"
Version = "1.1.4"
)

type (
Expand Down Expand Up @@ -528,18 +528,29 @@ func (h *Hub) Close(ctx context.Context) error {

if h.sender != nil {
if err := h.sender.Close(ctx); err != nil {
log.For(ctx).Error(err)
if rErr := h.closeReceivers(ctx); rErr != nil {
log.For(ctx).Error(rErr)
if !isConnectionClosed(rErr) {
log.For(ctx).Error(rErr)
}
}

if !isConnectionClosed(err) {
log.For(ctx).Error(err)
return err
}

// return originating error
return err
return nil
}
}

// close receivers and return error
return h.closeReceivers(ctx)
err := h.closeReceivers(ctx)
if err != nil && !isConnectionClosed(err) {
log.For(ctx).Error(err)
return err
}

return nil
}

// closeReceivers will close the receivers on the hub and return the last error
Expand Down Expand Up @@ -696,3 +707,7 @@ func (h *Hub) getSender(ctx context.Context) (*sender, error) {
}
return h.sender, nil
}

func isConnectionClosed(err error) bool {
return err.Error() == "amqp: connection closed"
}
90 changes: 90 additions & 0 deletions hub_examples_test.go
@@ -0,0 +1,90 @@
package eventhub_test

import (
"context"
"fmt"
"os"
"time"

"github.com/joho/godotenv"

"github.com/Azure/azure-event-hubs-go"
)

func init() {
if err := godotenv.Load(); err != nil {
fmt.Println("FATAL: ", err)
}
}

func ExampleHub_helloWorld(){
ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second)
defer cancel()

connStr := os.Getenv("EVENTHUB_CONNECTION_STRING")
if connStr == "" {
fmt.Println("FATAL: expected environment variable EVENTHUB_CONNECTION_STRING not set")
return
}

hubManager, err := eventhub.NewHubManagerFromConnectionString(connStr)
if err != nil {
fmt.Println(err)
return
}

hubEntity, err := ensureHub(ctx, hubManager, "ExampleHub_helloWorld")
if err != nil {
fmt.Println(err)
return
}

// Create a client to communicate with EventHub
hub, err := eventhub.NewHubFromConnectionString(connStr + ";EntityPath=" + hubEntity.Name)
if err != nil {
fmt.Println(err)
return
}

err = hub.Send(ctx, eventhub.NewEventFromString("Hello World!"))
if err != nil {
fmt.Println(err)
return
}

exit := make(chan struct{})
handler := func(ctx context.Context, event *eventhub.Event) error {
text := string(event.Data)
fmt.Println(text)
exit <- struct{}{}
return nil
}

for _, partitionID := range *hubEntity.PartitionIDs {
_, err = hub.Receive(ctx, partitionID, handler)
}

// wait for the first handler to get called with "Hello World!"
<-exit
err = hub.Close(ctx)
if err != nil {
fmt.Println(err)
return
}
// Output: Hello World!
}

func ensureHub(ctx context.Context, em *eventhub.HubManager, name string, opts ...eventhub.HubManagementOption) (*eventhub.HubEntity, error) {
he, err := em.Get(ctx, name)
if err == nil {
_ = em.Delete(ctx, name)
}

he, err = em.Put(ctx, name, opts...)
if err != nil {
fmt.Println(err)
return nil, err
}

return he, nil
}
1 change: 1 addition & 0 deletions internal/test/suite.go
Expand Up @@ -133,6 +133,7 @@ func (suite *BaseSuite) RandomHub(opts ...HubMgmtOption) (*mgmt.Model, func()) {
suite.Require().NotNil(model)
suite.Require().NotNil(model.PartitionIds)
suite.Require().Len(*model.PartitionIds, 4)
time.Sleep(250 * time.Millisecond) // introduce a bit of a delay before using the hub
return model, func() {
if model != nil {
suite.DeleteEventHub(*model.Name)
Expand Down
2 changes: 1 addition & 1 deletion sender.go
Expand Up @@ -233,7 +233,7 @@ func (s *sender) newSessionAndLink(ctx context.Context) error {
}

amqpSender, err := amqpSession.NewSender(
amqp.LinkReceiverSettle(amqp.ModeSecond),
amqp.LinkSenderSettle(amqp.ModeUnsettled),
amqp.LinkTargetAddress(s.getAddress()),
)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion storage/eph_test.go
Expand Up @@ -194,7 +194,11 @@ func (ts *testSuite) newTestContainerByName(containerName string) func() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if res, err := containerURL.Delete(ctx, azblob.ContainerAccessConditions{}); err != nil {
ts.NoError(err, res)
msg := "error deleting container url"
if res != nil {
msg = fmt.Sprintf("status code: %q; error code: %q", res.StatusCode(), res.ErrorCode())
}
ts.NoError(err, msg)
}
}
}
Expand Down

0 comments on commit aca4cab

Please sign in to comment.