Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable MultiProcessor in processor.go #217

Merged
merged 11 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
go.opentelemetry.io/otel v1.24.0
go.opentelemetry.io/otel/sdk v1.24.0
go.opentelemetry.io/otel/trace v1.24.0
golang.org/x/sync v0.7.0
google.golang.org/protobuf v1.33.0
)

Expand Down
4 changes: 4 additions & 0 deletions v2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
Expand Down
34 changes: 20 additions & 14 deletions v2/metrics/processor/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

const (
subsystem = "goshuttle_handler"
receiverNameLabel = "receiverName"
messageTypeLabel = "messageType"
deliveryCountLabel = "deliveryCount"
successLabel = "success"
Expand All @@ -29,12 +30,12 @@ func NewRegistry() *Registry {
Name: "message_received_total",
Help: "total number of messages received by the processor",
Subsystem: subsystem,
}, []string{}),
}, []string{receiverNameLabel}),
MessageHandledCount: prom.NewCounterVec(prom.CounterOpts{
Name: "message_handled_total",
Help: "total number of messages handled by this handler",
Subsystem: subsystem,
}, []string{messageTypeLabel, deliveryCountLabel}),
}, []string{receiverNameLabel, messageTypeLabel, deliveryCountLabel}),
MessageLockRenewedCount: prom.NewCounterVec(prom.CounterOpts{
Name: "message_lock_renewed_total",
Help: "total number of message lock renewal",
Expand All @@ -49,7 +50,7 @@ func NewRegistry() *Registry {
Name: "concurrent_message_count",
Help: "number of messages being handled concurrently",
Subsystem: subsystem,
}, []string{messageTypeLabel}),
}, []string{receiverNameLabel, messageTypeLabel}),
}
}

Expand Down Expand Up @@ -85,10 +86,10 @@ type Recorder interface {
IncMessageDeadlineReachedCount(msg *azservicebus.ReceivedMessage)
IncMessageLockRenewedFailure(msg *azservicebus.ReceivedMessage)
IncMessageLockRenewedSuccess(msg *azservicebus.ReceivedMessage)
DecConcurrentMessageCount(msg *azservicebus.ReceivedMessage)
IncMessageHandled(msg *azservicebus.ReceivedMessage)
IncMessageReceived(float64)
IncConcurrentMessageCount(msg *azservicebus.ReceivedMessage)
IncMessageHandled(receiverName string, msg *azservicebus.ReceivedMessage)
IncMessageReceived(receiverName string, count float64)
IncConcurrentMessageCount(receiverName string, msg *azservicebus.ReceivedMessage)
DecConcurrentMessageCount(receiverName string, msg *azservicebus.ReceivedMessage)
}

// IncMessageLockRenewedSuccess increase the message lock renewal success counter
Expand All @@ -106,20 +107,25 @@ func (m *Registry) IncMessageLockRenewedFailure(msg *azservicebus.ReceivedMessag
}

// IncMessageHandled increase the message Handled
func (m *Registry) IncMessageHandled(msg *azservicebus.ReceivedMessage) {
func (m *Registry) IncMessageHandled(receiverName string, msg *azservicebus.ReceivedMessage) {
labels := getMessageTypeLabel(msg)
labels[receiverNameLabel] = receiverName
labels[deliveryCountLabel] = strconv.FormatUint(uint64(msg.DeliveryCount), 10)
m.MessageHandledCount.With(labels).Inc()
}

// IncConcurrentMessageCount increases the concurrent message counter
func (m *Registry) IncConcurrentMessageCount(msg *azservicebus.ReceivedMessage) {
m.ConcurrentMessageCount.With(getMessageTypeLabel(msg)).Inc()
func (m *Registry) IncConcurrentMessageCount(receiverName string, msg *azservicebus.ReceivedMessage) {
labels := getMessageTypeLabel(msg)
labels[receiverNameLabel] = receiverName
m.ConcurrentMessageCount.With(labels).Inc()
}

// DecConcurrentMessageCount decreases the concurrent message counter
func (m *Registry) DecConcurrentMessageCount(msg *azservicebus.ReceivedMessage) {
m.ConcurrentMessageCount.With(getMessageTypeLabel(msg)).Dec()
func (m *Registry) DecConcurrentMessageCount(receiverName string, msg *azservicebus.ReceivedMessage) {
labels := getMessageTypeLabel(msg)
labels[receiverNameLabel] = receiverName
m.ConcurrentMessageCount.With(labels).Dec()
}

// IncMessageDeadlineReachedCount increases the message deadline reached counter
Expand All @@ -129,8 +135,8 @@ func (m *Registry) IncMessageDeadlineReachedCount(msg *azservicebus.ReceivedMess
}

// IncMessageReceived increases the message received counter
func (m *Registry) IncMessageReceived(count float64) {
m.MessageReceivedCount.With(map[string]string{}).Add(count)
func (m *Registry) IncMessageReceived(receiverName string, count float64) {
m.MessageReceivedCount.WithLabelValues(receiverName).Add(count)
}

// Informer allows to inspect metrics value stored in the registry at runtime
Expand Down
2 changes: 1 addition & 1 deletion v2/metrics/processor/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestRegistry_Init(t *testing.T) {
g.Expect(func() { r.Init(prometheus.NewRegistry()) }).ToNot(Panic())
g.Expect(func() { r.Init(fRegistry) }).ToNot(Panic())
g.Expect(fRegistry.collectors).To(HaveLen(5))
Metric.IncMessageReceived(10)
Metric.IncMessageReceived("testReceiverName", 10)
}

func TestNewInformerDefault(t *testing.T) {
Expand Down
130 changes: 100 additions & 30 deletions v2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
Expand All @@ -25,6 +26,18 @@ type MessageSettler interface {
RenewMessageLock(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.RenewMessageLockOptions) error
}

type ReceiverEx struct { // shuttle.Receiver is already an exported interface
name string
sbReceiver Receiver
}

func NewReceiverEx(name string, sbReceiver Receiver) *ReceiverEx {
return &ReceiverEx{
name: name,
sbReceiver: sbReceiver,
}
}

type Handler interface {
Handle(context.Context, MessageSettler, *azservicebus.ReceivedMessage)
}
Expand All @@ -39,10 +52,10 @@ func (f HandlerFunc) Handle(ctx context.Context, settler MessageSettler, message
// Processor encapsulates the message pump and concurrency handling of servicebus.
// it exposes a handler API to provides a middleware based message processing pipeline.
type Processor struct {
receiver Receiver
receivers map[string]*ReceiverEx
options ProcessorOptions
handle Handler
concurrencyTokens chan struct{} // tracks how many concurrent messages are currently being handled by the processor
concurrencyTokens chan struct{} // tracks how many concurrent messages are currently being handled by the processor, shared across all receivers
}

// ProcessorOptions configures the processor
Expand All @@ -60,8 +73,8 @@ type ProcessorOptions struct {
StartRetryDelayStrategy RetryDelayStrategy
}

func NewProcessor(receiver Receiver, handler HandlerFunc, options *ProcessorOptions) *Processor {
opts := ProcessorOptions{
func applyProcessorOptions(options *ProcessorOptions) *ProcessorOptions {
opts := &ProcessorOptions{
MaxConcurrency: 1,
ReceiveInterval: to.Ptr(1 * time.Second),
StartMaxAttempt: 1,
Expand All @@ -81,24 +94,78 @@ func NewProcessor(receiver Receiver, handler HandlerFunc, options *ProcessorOpti
opts.StartRetryDelayStrategy = options.StartRetryDelayStrategy
}
}
return opts
}

func NewProcessor(receiver Receiver, handler HandlerFunc, options *ProcessorOptions) *Processor {
opts := applyProcessorOptions(options)
receiverEx := NewReceiverEx("receiver", receiver)
return &Processor{
receivers: map[string]*ReceiverEx{receiverEx.name: receiverEx},
handle: handler,
options: *opts,
concurrencyTokens: make(chan struct{}, opts.MaxConcurrency),
}
}

func NewMultiProcessor(receiversEx []*ReceiverEx, handler HandlerFunc, options *ProcessorOptions) *Processor {
opts := applyProcessorOptions(options)
var receivers = make(map[string]*ReceiverEx)
for _, receiver := range receiversEx {
receivers[receiver.name] = receiver
}
return &Processor{
receiver: receiver,
receivers: receivers,
handle: handler,
options: opts,
options: *opts,
concurrencyTokens: make(chan struct{}, opts.MaxConcurrency),
}
}

// Start starts the processor and blocks until an error occurs or the context is canceled.
// Start starts processing on all the receivers of the processor and blocks until all processors are stopped or the context is canceled.
// It will retry starting the processor based on the StartMaxAttempt and StartRetryDelayStrategy.
// Returns a combined list of errors encountered during each processor start.
func (p *Processor) Start(ctx context.Context) error {
wg := sync.WaitGroup{}
errChan := make(chan error, len(p.receivers))
for name := range p.receivers {
wg.Add(1)
Copy link
Member

@serbrech serbrech Apr 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, if startOne panics, or if the cahnnel is closed for example, and we try to push err on it, we will not call wg.Done() because we don't recover from the panic, I believe.
So I think that the wg.Wait() will keep us stuck forever.

could you try to add a test for that by generating a panic somehow inside startOne ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in terms of fixing it, I've been itching to use conc library in go-shuttle, but unsure whether that's wise as a library.
we should handle the panic I think...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added panic handling + UTs

go func(receiverName string) {
defer func() {
if rec := recover(); rec != nil {
errChan <- fmt.Errorf("panic recovered from processor %s: %v", receiverName, rec)
}
wg.Done()
}()
err := p.startOne(ctx, receiverName)
if err != nil {
errChan <- err
}
}(name)
}
wg.Wait()
close(errChan)
var allErrs []error
for err := range errChan {
allErrs = append(allErrs, err)
}
return errors.Join(allErrs...)
}

// startOne starts a processor with the receiverName and blocks until an error occurs or the context is canceled.
// It will retry starting the processor based on the StartMaxAttempt and StartRetryDelayStrategy.
// Returns a combined list of errors during the start attempts or ctx.Err() if the context
// is cancelled during the retries.
func (p *Processor) Start(ctx context.Context) error {
func (p *Processor) startOne(ctx context.Context, receiverName string) error {
receiverEx, ok := p.receivers[receiverName]
if !ok {
return fmt.Errorf("processor %s not found", receiverName)
}
var savedError error
for attempt := 0; attempt < p.options.StartMaxAttempt; attempt++ {
if err := p.start(ctx); err != nil {
if err := p.start(ctx, receiverEx); err != nil {
savedError = errors.Join(savedError, err)
log(ctx, fmt.Sprintf("processor start attempt %d failed: %v", attempt, err))
log(ctx, fmt.Sprintf("processor %s start attempt %d failed: %v", receiverName, attempt, err))
if attempt+1 == p.options.StartMaxAttempt { // last attempt, return early
break
}
Expand All @@ -115,16 +182,18 @@ func (p *Processor) Start(ctx context.Context) error {
}

// start starts the processor and blocks until an error occurs or the context is canceled.
func (p *Processor) start(ctx context.Context) error {
log(ctx, "starting processor")
messages, err := p.receiver.ReceiveMessages(ctx, p.options.MaxConcurrency, nil)
func (p *Processor) start(ctx context.Context, receiverEx *ReceiverEx) error {
receiverName := receiverEx.name
receiver := receiverEx.sbReceiver
log(ctx, fmt.Sprintf("starting processor %s", receiverName))
messages, err := receiver.ReceiveMessages(ctx, p.options.MaxConcurrency, nil)
if err != nil {
return err
return fmt.Errorf("processor %s failed to receive messages: %w", receiverName, err)
}
log(ctx, fmt.Sprintf("received %d messages - initial", len(messages)))
processor.Metric.IncMessageReceived(float64(len(messages)))
log(ctx, fmt.Sprintf("processor %s received %d messages - initial", receiverName, len(messages)))
processor.Metric.IncMessageReceived(receiverName, float64(len(messages)))
for _, msg := range messages {
p.process(ctx, msg)
p.process(ctx, receiverEx, msg)
}
for ctx.Err() == nil {
select {
Expand All @@ -133,37 +202,38 @@ func (p *Processor) start(ctx context.Context) error {
if ctx.Err() != nil || maxMessages == 0 {
break
}
messages, err := p.receiver.ReceiveMessages(ctx, maxMessages, nil)
messages, err := receiver.ReceiveMessages(ctx, maxMessages, nil)
if err != nil {
return err
return fmt.Errorf("processor %s failed to receive messages: %w", receiverName, err)
}
log(ctx, fmt.Sprintf("received %d messages from processor loop", len(messages)))
processor.Metric.IncMessageReceived(float64(len(messages)))
log(ctx, fmt.Sprintf("processor %s received %d messages from processor loop", receiverName, len(messages)))
processor.Metric.IncMessageReceived(receiverName, float64(len(messages)))
for _, msg := range messages {
p.process(ctx, msg)
p.process(ctx, receiverEx, msg)
}
case <-ctx.Done():
log(ctx, "context done, stop receiving")
log(ctx, fmt.Sprintf("context done, stop receiving from processor %s", receiverName))
break
}
}
log(ctx, "exiting processor")
return ctx.Err()
log(ctx, fmt.Sprintf("exiting processor %s", receiverName))
return fmt.Errorf("processor %s stopped: %w", receiverName, ctx.Err())
}

func (p *Processor) process(ctx context.Context, message *azservicebus.ReceivedMessage) {
func (p *Processor) process(ctx context.Context, receiverEx *ReceiverEx, message *azservicebus.ReceivedMessage) {
receiverName := receiverEx.name
p.concurrencyTokens <- struct{}{}
go func() {
msgContext, cancel := context.WithCancel(ctx)
// cancel messageContext when we get out of this goroutine
defer cancel()
defer func() {
<-p.concurrencyTokens
processor.Metric.IncMessageHandled(message)
processor.Metric.DecConcurrentMessageCount(message)
processor.Metric.IncMessageHandled(receiverName, message)
processor.Metric.DecConcurrentMessageCount(receiverName, message)
}()
processor.Metric.IncConcurrentMessageCount(message)
p.handle.Handle(msgContext, p.receiver, message)
processor.Metric.IncConcurrentMessageCount(receiverName, message)
p.handle.Handle(msgContext, receiverEx.sbReceiver, message)
}()
}

Expand Down
5 changes: 5 additions & 0 deletions v2/processor_fake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type fakeReceiver struct {
SetupReceivedMessages chan *azservicebus.ReceivedMessage
*fakeSettler
SetupMaxReceiveCalls int
SetupReceivePanic string
}

func (f *fakeReceiver) ReceiveMessages(_ context.Context, maxMessages int, _ *azservicebus.ReceiveMessagesOptions) ([]*azservicebus.ReceivedMessage, error) {
Expand All @@ -69,6 +70,10 @@ func (f *fakeReceiver) ReceiveMessages(_ context.Context, maxMessages int, _ *az
}
}

if f.SetupReceivePanic != "" {
panic(f.SetupReceivePanic)
}

// return an error if we request more messages than there are available.
if len(f.ReceiveCalls) >= f.SetupMaxReceiveCalls {
return result, fmt.Errorf("max receive calls exceeded")
Expand Down
Loading
Loading