Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refractor LockRenewer and Enable Compatibility with MultiProcessor #222

Merged
merged 9 commits into from
Apr 23, 2024
Merged
23 changes: 18 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,32 @@ We are assuming that both the publisher and the listener are using go-shuttle
The processor handles the message pump and feeds your message handler.
It allows concurrent message handling and provides a message handler middleware pipeline to compose message handling behavior

### Concurrent message handling
### Processor Options

`MaxConcurrency` and `ReceiveInterval` configures the concurrent message handling for the processor.

`StartMaxAttempt` and `StartRetryDelayStrategy` configures the retry behaviour for the processor.

```golang
// ProcessorOptions configures the processor
// MaxConcurrency defaults to 1. Not setting MaxConcurrency, or setting it to 0 or a negative value will fallback to the default.
// ReceiveInterval defaults to 2 seconds if not set.
// StartMaxAttempt defaults to 1 if not set (no retries). Not setting StartMaxAttempt, or setting it to non-positive value will fallback to the default.
// StartRetryDelayStrategy defaults to a fixed 5-second delay if not set.
type ProcessorOptions struct {
MaxConcurrency int
ReceiveInterval *time.Duration
MaxConcurrency int
ReceiveInterval *time.Duration

StartMaxAttempt int
StartRetryDelayStrategy RetryDelayStrategy
}
```

see [Processor example](v2/processor_test.go)
### MultiProcessor

`NewMultiProcessor` takes in a list of receivers and a message handler. It creates a processor for each receiver and starts them concurrently.

see [Processor and MultiProcessor examples](v2/processor_test.go)

## Middlewares:
GoSHuttle provides a few middleware to simplify the implementation of the message handler in the application code
Expand Down Expand Up @@ -55,7 +68,7 @@ This middleware will renew the lock on each message every 30 seconds until the m

```golang
renewInterval := 30 * time.Second
shuttle.NewLockRenewalHandler(receiver, &shuttle.LockRenewalOptions{Interval: &renewInterval}, handler)
shuttle.NewRenewLockHandler(&shuttle.LockRenewalOptions{Interval: &renewInterval}, handler)
```

see setup in [Processor example](v2/processor_test.go)
Expand Down
12 changes: 9 additions & 3 deletions v2/lockrenewer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ type LockRenewalOptions struct {
MetricRecorder processor.Recorder
}

// NewLockRenewalHandler returns a middleware handler that will renew the lock on the message at the specified interval.
func NewLockRenewalHandler(lockRenewer LockRenewer, options *LockRenewalOptions, handler Handler) HandlerFunc {
// NewRenewLockHandler returns a middleware handler that will renew the lock on the message at the specified interval.
func NewRenewLockHandler(options *LockRenewalOptions, handler Handler) HandlerFunc {
interval := 10 * time.Second
cancelMessageContextOnStop := true
metricRecorder := processor.Metric
Expand All @@ -50,7 +50,7 @@ func NewLockRenewalHandler(lockRenewer LockRenewer, options *LockRenewalOptions,
return func(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage) {
plr := &peekLockRenewer{
next: handler,
lockRenewer: lockRenewer,
lockRenewer: settler,
renewalInterval: &interval,
metrics: metricRecorder,
cancelMessageCtxOnStop: cancelMessageContextOnStop,
Expand All @@ -64,6 +64,12 @@ func NewLockRenewalHandler(lockRenewer LockRenewer, options *LockRenewalOptions,
}
}

// Deprecated: use NewRenewLockHandler
// NewLockRenewalHandler returns a middleware handler that will renew the lock on the message at the specified interval.
func NewLockRenewalHandler(lockRenewer LockRenewer, options *LockRenewalOptions, handler Handler) HandlerFunc {
return NewRenewLockHandler(options, handler)
}

// peekLockRenewer starts a background goroutine that renews the message lock at the given interval until Stop() is called
// or until the passed in context is canceled.
// it is a pass through handler if the renewalInterval is nil
Expand Down
89 changes: 54 additions & 35 deletions v2/lockrenewer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@ import (
"github.com/Azure/go-shuttle/v2/metrics/processor"
)

type fakeSBLockRenewer struct {
RenewCount atomic.Int32
type fakeSBRenewLockSettler struct {
fakeSettler

PerMessage map[*azservicebus.ReceivedMessage]*atomic.Int32
mapLock sync.Mutex
Err error
}

func (r *fakeSBLockRenewer) RenewMessageLock(ctx context.Context, message *azservicebus.ReceivedMessage,
func (r *fakeSBRenewLockSettler) RenewMessageLock(ctx context.Context, message *azservicebus.ReceivedMessage,
_ *azservicebus.RenewMessageLockOptions) error {
r.RenewCount.Add(1)
r.RenewCalled.Add(1)
r.mapLock.Lock()
defer r.mapLock.Unlock()
if r.PerMessage == nil {
Expand All @@ -44,11 +45,10 @@ func (r *fakeSBLockRenewer) RenewMessageLock(ctx context.Context, message *azser
}

func Test_StopRenewingOnHandlerCompletion(t *testing.T) {
renewer := &fakeSBLockRenewer{}
settler := &fakeSettler{}
settler := &fakeSBRenewLockSettler{}
g := NewWithT(t)
interval := 100 * time.Millisecond
lr := shuttle.NewLockRenewalHandler(renewer, &shuttle.LockRenewalOptions{Interval: &interval},
lr := shuttle.NewRenewLockHandler(&shuttle.LockRenewalOptions{Interval: &interval},
shuttle.HandlerFunc(func(ctx context.Context, settler shuttle.MessageSettler,
message *azservicebus.ReceivedMessage) {
err := settler.CompleteMessage(ctx, message, nil)
Expand All @@ -60,17 +60,16 @@ func Test_StopRenewingOnHandlerCompletion(t *testing.T) {
lr.Handle(ctx, settler, msg)
g.Expect(settler.CompleteCalled.Load()).To(Equal(int32(1)))
g.Consistently(
func(g Gomega) { g.Expect(renewer.RenewCount.Load()).To(Equal(int32(0))) },
func(g Gomega) { g.Expect(settler.RenewCalled.Load()).To(Equal(int32(0))) },
130*time.Millisecond,
20*time.Millisecond).Should(Succeed())
}

func Test_RenewalHandlerStayIndependentPerMessage(t *testing.T) {
renewer := &fakeSBLockRenewer{}
settler := &fakeSettler{}
settler := &fakeSBRenewLockSettler{}
g := NewWithT(t)
interval := 50 * time.Millisecond
lr := shuttle.NewLockRenewalHandler(renewer, &shuttle.LockRenewalOptions{Interval: &interval},
lr := shuttle.NewRenewLockHandler(&shuttle.LockRenewalOptions{Interval: &interval},
shuttle.HandlerFunc(func(ctx context.Context, settler shuttle.MessageSettler,
message *azservicebus.ReceivedMessage) {
// Sleep > 100ms to allow 2 renewal to happen
Expand All @@ -94,9 +93,9 @@ func Test_RenewalHandlerStayIndependentPerMessage(t *testing.T) {
cancel2()
g.Eventually(
func(g Gomega) {
g.Expect(renewer.PerMessage[msg2]).To(BeNil(), "msg2 should not be in the map")
g.Expect(renewer.PerMessage[msg1]).ToNot(BeNil(), "msg1 should be in the map")
g.Expect(renewer.PerMessage[msg1].Load()).To(Equal(int32(2)))
g.Expect(settler.PerMessage[msg2]).To(BeNil(), "msg2 should not be in the map")
g.Expect(settler.PerMessage[msg1]).ToNot(BeNil(), "msg1 should be in the map")
g.Expect(settler.PerMessage[msg1].Load()).To(Equal(int32(2)))
},
200*time.Millisecond,
10*time.Millisecond).Should(Succeed())
Expand All @@ -110,28 +109,48 @@ func Test_RenewalHandlerStayIndependentPerMessage(t *testing.T) {
}

func Test_RenewPeriodically(t *testing.T) {
renewer := &fakeSBLockRenewer{}
settler := &fakeSBRenewLockSettler{}
interval := 50 * time.Millisecond
lr := shuttle.NewLockRenewalHandler(renewer, &shuttle.LockRenewalOptions{Interval: &interval},
lr := shuttle.NewRenewLockHandler(&shuttle.LockRenewalOptions{Interval: &interval},
shuttle.HandlerFunc(func(ctx context.Context, settler shuttle.MessageSettler,
message *azservicebus.ReceivedMessage) {
time.Sleep(150 * time.Millisecond)
}))
msg := &azservicebus.ReceivedMessage{}
ctx, cancel := context.WithTimeout(context.TODO(), 120*time.Millisecond)
defer cancel()
lr.Handle(ctx, &fakeSettler{}, msg)
lr.Handle(ctx, settler, msg)
g := NewWithT(t)
g.Eventually(
func(g Gomega) { g.Expect(settler.RenewCalled.Load()).To(Equal(int32(2))) },
130*time.Millisecond,
20*time.Millisecond).Should(Succeed())
}

//nolint:staticcheck // still need to cover the deprecated func
func Test_NewLockRenewalHandler_RenewPeriodically(t *testing.T) {
settler := &fakeSBRenewLockSettler{}
interval := 50 * time.Millisecond
lr := shuttle.NewLockRenewalHandler(settler, &shuttle.LockRenewalOptions{Interval: &interval},
shuttle.HandlerFunc(func(ctx context.Context, settler shuttle.MessageSettler,
message *azservicebus.ReceivedMessage) {
time.Sleep(150 * time.Millisecond)
}))
msg := &azservicebus.ReceivedMessage{}
ctx, cancel := context.WithTimeout(context.TODO(), 120*time.Millisecond)
defer cancel()
lr.Handle(ctx, settler, msg)
g := NewWithT(t)
g.Eventually(
func(g Gomega) { g.Expect(renewer.RenewCount.Load()).To(Equal(int32(2))) },
func(g Gomega) { g.Expect(settler.RenewCalled.Load()).To(Equal(int32(2))) },
130*time.Millisecond,
20*time.Millisecond).Should(Succeed())
}

func Test_RenewPeriodically_Error(t *testing.T) {
type testCase struct {
name string
renewer *fakeSBLockRenewer
settler *fakeSBRenewLockSettler
isRenewerCanceled bool
cancelCtxOnStop *bool
gotMessageCtx context.Context
Expand All @@ -140,22 +159,22 @@ func Test_RenewPeriodically_Error(t *testing.T) {
testCases := []testCase{
{
name: "continue periodic renewal on unknown error",
renewer: &fakeSBLockRenewer{Err: fmt.Errorf("unknown error")},
settler: &fakeSBRenewLockSettler{Err: fmt.Errorf("unknown error")},
verify: func(g Gomega, tc *testCase, metrics *processor.Informer) {
g.Eventually(
func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(2))) },
func(g Gomega) { g.Expect(tc.settler.RenewCalled.Load()).To(Equal(int32(2))) },
130*time.Millisecond,
20*time.Millisecond).Should(Succeed())
},
},
{
name: "stop periodic renewal on context canceled",
isRenewerCanceled: false,
renewer: &fakeSBLockRenewer{Err: context.Canceled},
settler: &fakeSBRenewLockSettler{Err: context.Canceled},
verify: func(g Gomega, tc *testCase, metrics *processor.Informer) {
g.Consistently(
func(g Gomega) {
g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(1)),
g.Expect(tc.settler.RenewCalled.Load()).To(Equal(int32(1)),
"should not attempt to renew")
g.Expect(metrics.GetMessageLockRenewedFailureCount()).To(Equal(float64(0)),
"should not record failure metric")
Expand All @@ -167,43 +186,43 @@ func Test_RenewPeriodically_Error(t *testing.T) {
{
name: "stop periodic renewal on context canceled",
isRenewerCanceled: true,
renewer: &fakeSBLockRenewer{Err: context.Canceled},
settler: &fakeSBRenewLockSettler{Err: context.Canceled},
verify: func(g Gomega, tc *testCase, metrics *processor.Informer) {
g.Consistently(
func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(0))) },
func(g Gomega) { g.Expect(tc.settler.RenewCalled.Load()).To(Equal(int32(0))) },
130*time.Millisecond,
20*time.Millisecond).Should(Succeed())
},
},
{
name: "stop periodic renewal on permanent error (lockLost)",
renewer: &fakeSBLockRenewer{Err: &azservicebus.Error{Code: azservicebus.CodeLockLost}},
settler: &fakeSBRenewLockSettler{Err: &azservicebus.Error{Code: azservicebus.CodeLockLost}},
verify: func(g Gomega, tc *testCase, metrics *processor.Informer) {
g.Consistently(
func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(1))) },
func(g Gomega) { g.Expect(tc.settler.RenewCalled.Load()).To(Equal(int32(1))) },
130*time.Millisecond,
20*time.Millisecond).Should(Succeed())
},
},
{
name: "cancel message context on stop by default",
renewer: &fakeSBLockRenewer{Err: &azservicebus.Error{Code: azservicebus.CodeLockLost}},
settler: &fakeSBRenewLockSettler{Err: &azservicebus.Error{Code: azservicebus.CodeLockLost}},
verify: func(g Gomega, tc *testCase, metrics *processor.Informer) {
g.Consistently(
func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(1))) },
func(g Gomega) { g.Expect(tc.settler.RenewCalled.Load()).To(Equal(int32(1))) },
130*time.Millisecond,
20*time.Millisecond).Should(Succeed())
g.Expect(tc.gotMessageCtx.Err()).To(Equal(context.Canceled))
},
},
{
name: "does not cancel message context on stop if disabled",
renewer: &fakeSBLockRenewer{Err: &azservicebus.Error{Code: azservicebus.CodeLockLost}},
settler: &fakeSBRenewLockSettler{Err: &azservicebus.Error{Code: azservicebus.CodeLockLost}},
cancelCtxOnStop: to.Ptr(false),
verify: func(g Gomega, tc *testCase, metrics *processor.Informer) {
g.Consistently(
func(g Gomega) {
g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(1)))
g.Expect(tc.settler.RenewCalled.Load()).To(Equal(int32(1)))
g.Expect(tc.gotMessageCtx.Err()).To(BeNil())
},
100*time.Millisecond,
Expand All @@ -212,10 +231,10 @@ func Test_RenewPeriodically_Error(t *testing.T) {
},
{
name: "continue periodic renewal on transient error (timeout)",
renewer: &fakeSBLockRenewer{Err: &azservicebus.Error{Code: azservicebus.CodeTimeout}},
settler: &fakeSBRenewLockSettler{Err: &azservicebus.Error{Code: azservicebus.CodeTimeout}},
verify: func(g Gomega, tc *testCase, metrics *processor.Informer) {
g.Eventually(
func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(2))) },
func(g Gomega) { g.Expect(tc.settler.RenewCalled.Load()).To(Equal(int32(2))) },
140*time.Millisecond,
20*time.Millisecond).Should(Succeed())
},
Expand All @@ -229,7 +248,7 @@ func Test_RenewPeriodically_Error(t *testing.T) {
reg := processor.NewRegistry()
reg.Init(prometheus.NewRegistry())
informer := processor.NewInformerFor(reg)
lr := shuttle.NewLockRenewalHandler(tc.renewer,
lr := shuttle.NewRenewLockHandler(
&shuttle.LockRenewalOptions{
Interval: &interval,
CancelMessageContextOnStop: tc.cancelCtxOnStop,
Expand All @@ -251,7 +270,7 @@ func Test_RenewPeriodically_Error(t *testing.T) {
cancel()
}
defer cancel()
lr.Handle(ctx, &fakeSettler{}, msg)
lr.Handle(ctx, tc.settler, msg)
tc.verify(NewWithT(t), &tc, informer)
})
}
Expand Down
2 changes: 1 addition & 1 deletion v2/managedsettling_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func ExampleNewManagedSettlingHandler() {
lockRenewalInterval := 10 * time.Second
p := shuttle.NewProcessor(receiver,
shuttle.NewPanicHandler(nil,
shuttle.NewLockRenewalHandler(receiver, &shuttle.LockRenewalOptions{Interval: &lockRenewalInterval},
shuttle.NewRenewLockHandler(&shuttle.LockRenewalOptions{Interval: &lockRenewalInterval},
shuttle.NewManagedSettlingHandler(&shuttle.ManagedSettlingOptions{
RetryDecision: &shuttle.MaxAttemptsRetryDecision{MaxAttempts: 2},
RetryDelayStrategy: &shuttle.ConstantDelayStrategy{Delay: 2 * time.Second},
Expand Down
2 changes: 2 additions & 0 deletions v2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func applyProcessorOptions(options *ProcessorOptions) *ProcessorOptions {
return opts
}

// NewProcessor creates a new processor with the provided receiver and handler.
func NewProcessor(receiver Receiver, handler HandlerFunc, options *ProcessorOptions) *Processor {
opts := applyProcessorOptions(options)
receiverEx := NewReceiverEx("receiver", receiver)
Expand All @@ -108,6 +109,7 @@ func NewProcessor(receiver Receiver, handler HandlerFunc, options *ProcessorOpti
}
}

// NewMultiProcessor creates a new processor with a list of receivers and a handler.
func NewMultiProcessor(receiversEx []*ReceiverEx, handler HandlerFunc, options *ProcessorOptions) *Processor {
opts := applyProcessorOptions(options)
var receivers = make(map[string]*ReceiverEx)
Expand Down
Loading
Loading