Skip to content

Commit

Permalink
changed message handler to accept incoming message struct
Browse files Browse the repository at this point in the history
  • Loading branch information
engelmi committed Dec 16, 2021
1 parent 9dbed02 commit 323e100
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 27 deletions.
32 changes: 29 additions & 3 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type Consumer interface {
StopListening() error
}

type MessageHandler func(ctx context.Context, msg *sqs.Message) error
type MessageHandler func(ctx context.Context, msg IncomingMessage) error

type consumer struct {
*internal.Client
Expand Down Expand Up @@ -70,7 +70,7 @@ func (c *consumer) StartListening(ctx context.Context, wg *sync.WaitGroup) {

func (c *consumer) StopListening() error {
if c.cancelFunc == nil {
errors.New("Consumer has not been started yet")
return errors.New("Consumer has not been started yet")
}
c.cancelFunc()
return nil
Expand Down Expand Up @@ -116,13 +116,18 @@ func (c *consumer) processMessage(ctx context.Context, msg *sqs.Message) {
}

func (c *consumer) callHandlerFunc(ctx context.Context, msg *sqs.Message) (err error) {
if msg == nil {
c.logger.WithContext(ctx).Info("Skipping empty message")
return nil
}

defer func() {
if r := recover(); r != nil {
err = errors.New(fmt.Sprintf("SQS-Handler function paniced: %v", r))
}
}()

return c.handlerFunc(ctx, msg)
return c.handlerFunc(ctx, c.mapFromSqsMessage(msg))
}

func (c *consumer) acknowledgeMessage(ctx context.Context, msg *sqs.Message) error {
Expand All @@ -136,3 +141,24 @@ func (c *consumer) acknowledgeMessage(ctx context.Context, msg *sqs.Message) err

return errors.WithStack(err)
}

func (c *consumer) mapFromSqsMessage(awsmsg *sqs.Message) IncomingMessage {
msgAttrValues := map[string]*IncomingMessageAttributeValue{}
for key, value := range awsmsg.MessageAttributes {
msgAttrValues[key] = &IncomingMessageAttributeValue{
DataType: value.DataType,
BinaryValue: value.BinaryValue,
StringValue: value.StringValue,
}
}

return IncomingMessage{
MessageId: awsmsg.MessageId,
ReceiptHandle: awsmsg.ReceiptHandle,
Body: awsmsg.Body,
MD5OfBody: awsmsg.MD5OfBody,
MessageAttributes: msgAttrValues,
MD5OfMessageAttributes: awsmsg.MD5OfMessageAttributes,
Attributes: awsmsg.Attributes,
}
}
3 changes: 1 addition & 2 deletions examples/gopher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"time"

"github.com/aws/aws-sdk-go/service/sqs"
gosqs "github.com/engelmi/go-sqs"
)

Expand Down Expand Up @@ -68,7 +67,7 @@ func (g Gopher) Consume() {
PollTimeout: 10 * time.Second,
AckTimeout: 2 * time.Second,
MaxNumberOfMessages: 10,
}, func(ctx context.Context, receivedMsg *sqs.Message) error {
}, func(ctx context.Context, receivedMsg gosqs.IncomingMessage) error {
msg := fmt.Sprintf("%s: Got message '%s'", g.name, *receivedMsg.Body)
fmt.Println(msg)

Expand Down
3 changes: 1 addition & 2 deletions examples/panic/panic.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"sync"
"time"

"github.com/aws/aws-sdk-go/service/sqs"
gosqs "github.com/engelmi/go-sqs"
"github.com/engelmi/go-sqs/examples"
"github.com/sirupsen/logrus"
Expand All @@ -15,7 +14,7 @@ import (
func main() {
wg := &sync.WaitGroup{}

handler := func(ctx context.Context, receivedMsg *sqs.Message) error {
handler := func(ctx context.Context, receivedMsg gosqs.IncomingMessage) error {
fmt.Println(fmt.Sprintf("Got message '%s'", *receivedMsg.Body))
panic(fmt.Sprintf("Handler panic for message '%s'", *receivedMsg.Body))
}
Expand Down
3 changes: 1 addition & 2 deletions examples/simpleConsume/simpleConsume.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"sync"
"time"

"github.com/aws/aws-sdk-go/service/sqs"
gosqs "github.com/engelmi/go-sqs"
"github.com/engelmi/go-sqs/examples"
)
Expand All @@ -15,7 +14,7 @@ func main() {
wg := &sync.WaitGroup{}
wg.Add(1)

handler := func(ctx context.Context, receivedMsg *sqs.Message) error {
handler := func(ctx context.Context, receivedMsg gosqs.IncomingMessage) error {
defer wg.Done()

fmt.Println(fmt.Sprintf("Got message '%s'", *receivedMsg.Body))
Expand Down
24 changes: 24 additions & 0 deletions message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package gosqs

type OutgoingMessage struct {
DeduplicationId *string
GroupId *string
Payload []byte
Attributes map[string]string
}

type IncomingMessage struct {
MessageId *string
ReceiptHandle *string
Body *string
MD5OfBody *string
MessageAttributes map[string]*IncomingMessageAttributeValue
MD5OfMessageAttributes *string
Attributes map[string]*string
}

type IncomingMessageAttributeValue struct {
BinaryValue []byte
DataType *string
StringValue *string
}
31 changes: 13 additions & 18 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,6 @@ type Producer interface {
Send(ctx context.Context, msg OutgoingMessage) (*string, error)
}

type OutgoingMessage struct {
DeduplicationId *string
GroupId *string
Payload []byte
Attributes map[string]string
}

type producer struct {
*internal.Client
timeout time.Duration
Expand All @@ -41,6 +34,18 @@ func NewProducer(config ProducerConfig) (Producer, error) {
}

func (p *producer) Send(ctx context.Context, msg OutgoingMessage) (*string, error) {
timeoutCtx, cancel := context.WithTimeout(context.Background(), p.timeout)
defer cancel()

output, err := p.Sqs.SendMessageWithContext(timeoutCtx, p.mapToAwsMessage(msg))
if err != nil {
return nil, errors.Wrap(err, "Failed to send message")
}

return output.MessageId, nil
}

func (p *producer) mapToAwsMessage(msg OutgoingMessage) *sqs.SendMessageInput {
attributes := make(map[string]*sqs.MessageAttributeValue)
for key, value := range msg.Attributes {
attributes[key] = &sqs.MessageAttributeValue{
Expand All @@ -50,23 +55,13 @@ func (p *producer) Send(ctx context.Context, msg OutgoingMessage) (*string, erro
}

payload := string(msg.Payload)
input := sqs.SendMessageInput{
return &sqs.SendMessageInput{
QueueUrl: aws.String(p.QueueUrl),
MessageBody: aws.String(payload),
MessageAttributes: attributes,
MessageGroupId: msg.GroupId,
MessageDeduplicationId: msg.DeduplicationId,
}

timeoutCtx, cancel := context.WithTimeout(context.Background(), p.timeout)
defer cancel()

output, err := p.Sqs.SendMessageWithContext(timeoutCtx, &input)
if err != nil {
return nil, errors.Wrap(err, "Failed to send message")
}

return output.MessageId, nil
}

func MarshalToJson(payload interface{}) ([]byte, error) {
Expand Down

0 comments on commit 323e100

Please sign in to comment.