Skip to content

Commit

Permalink
wrap sbSender calls in their own func
Browse files Browse the repository at this point in the history
  • Loading branch information
karenychen committed May 8, 2024
1 parent 0ba6faf commit ec00f0b
Showing 1 changed file with 20 additions and 10 deletions.
30 changes: 20 additions & 10 deletions v2/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,7 @@ func (d *Sender) SendMessage(ctx context.Context, mb MessageBody, options ...fun

errChan := make(chan error)
go func() {
d.mu.RLock()
err := d.sbSender.SendMessage(ctx, msg, nil) // sendMessageOptions currently does nothing
d.mu.RUnlock()
err := d.sendMessage(ctx, msg, nil) // sendMessageOptions currently does nothing
if err != nil {
errChan <- fmt.Errorf("failed to send message: %w", err)
} else {
Expand Down Expand Up @@ -144,9 +142,7 @@ func (d *Sender) SendMessageBatch(ctx context.Context, messages []*azservicebus.
return fmt.Errorf("failed to send message: %w", ctx.Err())
}

d.mu.RLock()
batch, err := d.sbSender.NewMessageBatch(ctx, &azservicebus.MessageBatchOptions{})
d.mu.RUnlock()
batch, err := d.newMessageBatch(ctx, &azservicebus.MessageBatchOptions{})
if err != nil {
return err
}
Expand All @@ -164,10 +160,7 @@ func (d *Sender) SendMessageBatch(ctx context.Context, messages []*azservicebus.
errChan := make(chan error)

go func() {
d.mu.RLock()
err := d.sbSender.SendMessageBatch(ctx, batch, nil)
d.mu.RUnlock()
if err != nil {
if err := d.sendMessageBatch(ctx, batch, nil); err != nil {
errChan <- fmt.Errorf("failed to send message batch: %w", err)
} else {
errChan <- nil
Expand All @@ -186,7 +179,24 @@ func (d *Sender) SendMessageBatch(ctx context.Context, messages []*azservicebus.
}
return err
}
}

func (d *Sender) sendMessage(ctx context.Context, msg *azservicebus.Message, options *azservicebus.SendMessageOptions) error {
d.mu.RLock()
defer d.mu.RUnlock()
return d.sbSender.SendMessage(ctx, msg, options)
}

func (d *Sender) sendMessageBatch(ctx context.Context, batch *azservicebus.MessageBatch, options *azservicebus.SendMessageBatchOptions) error {
d.mu.RLock()
defer d.mu.RUnlock()
return d.sbSender.SendMessageBatch(ctx, batch, options)
}

func (d *Sender) newMessageBatch(ctx context.Context, options *azservicebus.MessageBatchOptions) (*azservicebus.MessageBatch, error) {
d.mu.RLock()
defer d.mu.RUnlock()
return d.sbSender.NewMessageBatch(ctx, options)
}

// AzSender returns the underlying azservicebus.Sender instance.
Expand Down

0 comments on commit ec00f0b

Please sign in to comment.