Skip to content

Commit

Permalink
sender recovery on conn drops and too large batch test
Browse files Browse the repository at this point in the history
  • Loading branch information
devigned committed Apr 18, 2019
1 parent aca4cab commit d71a6c2
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 28 deletions.
3 changes: 3 additions & 0 deletions changelog.md
@@ -1,5 +1,8 @@
# Change Log

## `v1.1.5`
- add sender recovery handling for `amqp.ErrLinkClose`, `amqp.ErrConnClosed` and `amqp.ErrSessionClosed`

## `v1.1.4`
- update to amqp 0.11.0 and change sender to use unsettled rather than receiver second mode

Expand Down
20 changes: 17 additions & 3 deletions hub.go
Expand Up @@ -39,19 +39,21 @@ import (
"github.com/Azure/azure-amqp-common-go/log"
"github.com/Azure/azure-amqp-common-go/persist"
"github.com/Azure/azure-amqp-common-go/sas"
"github.com/Azure/azure-event-hubs-go/atom"
"github.com/Azure/azure-sdk-for-go/services/eventhub/mgmt/2017-04-01/eventhub"
"github.com/Azure/go-autorest/autorest/azure"
"github.com/Azure/go-autorest/autorest/date"
"github.com/Azure/go-autorest/autorest/to"
"pack.ag/amqp"

"github.com/Azure/azure-event-hubs-go/atom"
)

const (
maxUserAgentLen = 128
rootUserAgent = "/golang-event-hubs"

// Version is the semantic version number
Version = "1.1.4"
Version = "1.1.5"
)

type (
Expand Down Expand Up @@ -708,6 +710,18 @@ func (h *Hub) getSender(ctx context.Context) (*sender, error) {
return h.sender, nil
}

func isRecoverableCloseError(err error) bool {
return isConnectionClosed(err) || isSessionClosed(err) || isLinkClosed(err)
}

func isConnectionClosed(err error) bool {
return err.Error() == "amqp: connection closed"
return err == amqp.ErrConnClosed
}

func isLinkClosed(err error) bool {
return err == amqp.ErrLinkClosed
}

func isSessionClosed(err error) bool {
return err == amqp.ErrSessionClosed
}
4 changes: 2 additions & 2 deletions hub_examples_test.go
Expand Up @@ -17,7 +17,7 @@ func init() {
}
}

func ExampleHub_helloWorld(){
func ExampleHub_helloWorld() {
ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second)
defer cancel()

Expand Down Expand Up @@ -87,4 +87,4 @@ func ensureHub(ctx context.Context, em *eventhub.HubManager, name string, opts .
}

return he, nil
}
}
20 changes: 19 additions & 1 deletion hub_test.go
Expand Up @@ -40,10 +40,11 @@ import (
"github.com/Azure/azure-amqp-common-go/auth"
"github.com/Azure/azure-amqp-common-go/sas"
"github.com/Azure/azure-amqp-common-go/uuid"
"github.com/Azure/azure-event-hubs-go/internal/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"github.com/Azure/azure-event-hubs-go/internal/test"
)

type (
Expand Down Expand Up @@ -259,6 +260,7 @@ func (suite *eventHubSuite) TestPartitioned() {
"TestSendTooBig": testSendTooBig,
"TestSendAndReceive": testBasicSendAndReceive,
"TestBatchSendAndReceive": testBatchSendAndReceive,
"TestBatchSendTooLarge": testBatchSendTooLarge,
}

for name, testFunc := range tests {
Expand Down Expand Up @@ -318,6 +320,22 @@ func testBatchSendAndReceive(ctx context.Context, t *testing.T, client *Hub, par
}
}

func testBatchSendTooLarge(ctx context.Context, t *testing.T, client *Hub, _ string) {
events := make([]*Event, 200000)

var wg sync.WaitGroup
wg.Add(len(events))

for idx := range events {
events[idx] = NewEventFromString(test.RandomString("foo", 10))
}
batch := &EventBatch{
Events: events,
}

assert.EqualError(t, client.SendBatch(ctx, batch), "encoded message size exceeds max of 1046528")
}

func testBasicSendAndReceive(ctx context.Context, t *testing.T, client *Hub, partitionID string) {
numMessages := rand.Intn(100) + 20
var wg sync.WaitGroup
Expand Down
29 changes: 18 additions & 11 deletions receiver.go
Expand Up @@ -85,24 +85,21 @@ func ReceiveWithConsumerGroup(consumerGroup string) ReceiveOption {
// ReceiveWithStartingOffset configures the receiver to start at a given position in the event stream
func ReceiveWithStartingOffset(offset string) ReceiveOption {
return func(receiver *receiver) error {
receiver.storeLastReceivedCheckpoint(persist.NewCheckpoint(offset, 0, time.Time{}))
return nil
return receiver.storeLastReceivedCheckpoint(persist.NewCheckpoint(offset, 0, time.Time{}))
}
}

// ReceiveWithLatestOffset configures the receiver to start at a given position in the event stream
func ReceiveWithLatestOffset() ReceiveOption {
return func(receiver *receiver) error {
receiver.storeLastReceivedCheckpoint(persist.NewCheckpointFromEndOfStream())
return nil
return receiver.storeLastReceivedCheckpoint(persist.NewCheckpointFromEndOfStream())
}
}

// ReceiveFromTimestamp configures the receiver to start receiving from a specific point in time in the event stream
func ReceiveFromTimestamp(t time.Time) ReceiveOption {
return func(receiver *receiver) error {
receiver.storeLastReceivedCheckpoint(persist.NewCheckpoint("", 0, t))
return nil
return receiver.storeLastReceivedCheckpoint(persist.NewCheckpoint("", 0, t))
}
}

Expand Down Expand Up @@ -250,12 +247,22 @@ func (r *receiver) handleMessage(ctx context.Context, msg *amqp.Message, handler

err := handler(ctx, event)
if err != nil {
msg.Modify(true, false, nil)
err = msg.Modify(true, false, nil)
if err != nil {
log.For(ctx).Error(err)
}
log.For(ctx).Error(fmt.Errorf("message modified(true, false, nil): id: %v", id))
return
}
msg.Accept()
r.storeLastReceivedCheckpoint(event.GetCheckpoint())
err = msg.Accept()
if err != nil {
log.For(ctx).Error(err)
}

err = r.storeLastReceivedCheckpoint(event.GetCheckpoint())
if err != nil {
log.For(ctx).Error(err)
}
}

func (r *receiver) listenForMessages(ctx context.Context, msgChan chan *amqp.Message) {
Expand All @@ -276,7 +283,7 @@ func (r *receiver) listenForMessages(ctx context.Context, msgChan chan *amqp.Mes
default:
if amqpErr, ok := err.(*amqp.DetachError); ok && amqpErr.RemoteError != nil && amqpErr.RemoteError.Condition == "amqp:link:stolen" {
log.For(ctx).Debug("link has been stolen by a higher epoch")
r.Close(ctx)
_ = r.Close(ctx)
return
}

Expand All @@ -302,7 +309,7 @@ func (r *receiver) listenForMessages(ctx context.Context, msgChan chan *amqp.Mes
if retryErr != nil {
log.For(ctx).Debug("retried, but error was unrecoverable")
r.lastError = retryErr
r.Close(ctx)
_ = r.Close(ctx)
return
}
}
Expand Down
30 changes: 19 additions & 11 deletions sender.go
Expand Up @@ -157,6 +157,19 @@ func (s *sender) trySend(ctx context.Context, evt eventer) error {
sp.AddAttributes(trace.StringAttribute("he.message_id", str))
}

recvr := func(err error) {
duration := s.recoveryBackoff.Duration()
log.For(ctx).Debug("amqp error, delaying " + string(duration/time.Millisecond) + " millis: " + err.Error())
time.Sleep(duration)
err = s.Recover(ctx)
if err != nil {
log.For(ctx).Debug("failed to recover connection")
} else {
log.For(ctx).Debug("recovered connection")
s.recoveryBackoff.Reset()
}
}

for {
select {
case <-ctx.Done():
Expand All @@ -176,18 +189,13 @@ func (s *sender) trySend(ctx context.Context, evt eventer) error {
}
}

duration := s.recoveryBackoff.Duration()
log.For(ctx).Debug("amqp error, delaying " + string(duration/time.Millisecond) + " millis: " + err.Error())
time.Sleep(duration)
err = s.Recover(ctx)
if err != nil {
log.For(ctx).Debug("failed to recover connection")
} else {
log.For(ctx).Debug("recovered connection")
s.recoveryBackoff.Reset()
}
recvr(err)
default:
return err
if !isRecoverableCloseError(err) {
return err
}

recvr(err)
}
}
}
Expand Down

0 comments on commit d71a6c2

Please sign in to comment.