From 323e1007e943630c6af4f43583589355fafc7e35 Mon Sep 17 00:00:00 2001 From: engelmi Date: Thu, 16 Dec 2021 13:30:32 +0100 Subject: [PATCH] changed message handler to accept incoming message struct --- consumer.go | 32 ++++++++++++++++++++++--- examples/gopher.go | 3 +-- examples/panic/panic.go | 3 +-- examples/simpleConsume/simpleConsume.go | 3 +-- message.go | 24 +++++++++++++++++++ producer.go | 31 ++++++++++-------------- 6 files changed, 69 insertions(+), 27 deletions(-) create mode 100644 message.go diff --git a/consumer.go b/consumer.go index 3e82e49..79b69b5 100644 --- a/consumer.go +++ b/consumer.go @@ -19,7 +19,7 @@ type Consumer interface { StopListening() error } -type MessageHandler func(ctx context.Context, msg *sqs.Message) error +type MessageHandler func(ctx context.Context, msg IncomingMessage) error type consumer struct { *internal.Client @@ -70,7 +70,7 @@ func (c *consumer) StartListening(ctx context.Context, wg *sync.WaitGroup) { func (c *consumer) StopListening() error { if c.cancelFunc == nil { - errors.New("Consumer has not been started yet") + return errors.New("Consumer has not been started yet") } c.cancelFunc() return nil @@ -116,13 +116,18 @@ func (c *consumer) processMessage(ctx context.Context, msg *sqs.Message) { } func (c *consumer) callHandlerFunc(ctx context.Context, msg *sqs.Message) (err error) { + if msg == nil { + c.logger.WithContext(ctx).Info("Skipping empty message") + return nil + } + defer func() { if r := recover(); r != nil { err = errors.New(fmt.Sprintf("SQS-Handler function paniced: %v", r)) } }() - return c.handlerFunc(ctx, msg) + return c.handlerFunc(ctx, c.mapFromSqsMessage(msg)) } func (c *consumer) acknowledgeMessage(ctx context.Context, msg *sqs.Message) error { @@ -136,3 +141,24 @@ func (c *consumer) acknowledgeMessage(ctx context.Context, msg *sqs.Message) err return errors.WithStack(err) } + +func (c *consumer) mapFromSqsMessage(awsmsg *sqs.Message) IncomingMessage { + msgAttrValues := map[string]*IncomingMessageAttributeValue{} + for key, value := range awsmsg.MessageAttributes { + msgAttrValues[key] = &IncomingMessageAttributeValue{ + DataType: value.DataType, + BinaryValue: value.BinaryValue, + StringValue: value.StringValue, + } + } + + return IncomingMessage{ + MessageId: awsmsg.MessageId, + ReceiptHandle: awsmsg.ReceiptHandle, + Body: awsmsg.Body, + MD5OfBody: awsmsg.MD5OfBody, + MessageAttributes: msgAttrValues, + MD5OfMessageAttributes: awsmsg.MD5OfMessageAttributes, + Attributes: awsmsg.Attributes, + } +} diff --git a/examples/gopher.go b/examples/gopher.go index dd94465..33b3cf4 100644 --- a/examples/gopher.go +++ b/examples/gopher.go @@ -5,7 +5,6 @@ import ( "fmt" "time" - "github.com/aws/aws-sdk-go/service/sqs" gosqs "github.com/engelmi/go-sqs" ) @@ -68,7 +67,7 @@ func (g Gopher) Consume() { PollTimeout: 10 * time.Second, AckTimeout: 2 * time.Second, MaxNumberOfMessages: 10, - }, func(ctx context.Context, receivedMsg *sqs.Message) error { + }, func(ctx context.Context, receivedMsg gosqs.IncomingMessage) error { msg := fmt.Sprintf("%s: Got message '%s'", g.name, *receivedMsg.Body) fmt.Println(msg) diff --git a/examples/panic/panic.go b/examples/panic/panic.go index 65a8c03..d018b8f 100644 --- a/examples/panic/panic.go +++ b/examples/panic/panic.go @@ -6,7 +6,6 @@ import ( "sync" "time" - "github.com/aws/aws-sdk-go/service/sqs" gosqs "github.com/engelmi/go-sqs" "github.com/engelmi/go-sqs/examples" "github.com/sirupsen/logrus" @@ -15,7 +14,7 @@ import ( func main() { wg := &sync.WaitGroup{} - handler := func(ctx context.Context, receivedMsg *sqs.Message) error { + handler := func(ctx context.Context, receivedMsg gosqs.IncomingMessage) error { fmt.Println(fmt.Sprintf("Got message '%s'", *receivedMsg.Body)) panic(fmt.Sprintf("Handler panic for message '%s'", *receivedMsg.Body)) } diff --git a/examples/simpleConsume/simpleConsume.go b/examples/simpleConsume/simpleConsume.go index 2780d83..042333b 100644 --- a/examples/simpleConsume/simpleConsume.go +++ b/examples/simpleConsume/simpleConsume.go @@ -6,7 +6,6 @@ import ( "sync" "time" - "github.com/aws/aws-sdk-go/service/sqs" gosqs "github.com/engelmi/go-sqs" "github.com/engelmi/go-sqs/examples" ) @@ -15,7 +14,7 @@ func main() { wg := &sync.WaitGroup{} wg.Add(1) - handler := func(ctx context.Context, receivedMsg *sqs.Message) error { + handler := func(ctx context.Context, receivedMsg gosqs.IncomingMessage) error { defer wg.Done() fmt.Println(fmt.Sprintf("Got message '%s'", *receivedMsg.Body)) diff --git a/message.go b/message.go new file mode 100644 index 0000000..c681953 --- /dev/null +++ b/message.go @@ -0,0 +1,24 @@ +package gosqs + +type OutgoingMessage struct { + DeduplicationId *string + GroupId *string + Payload []byte + Attributes map[string]string +} + +type IncomingMessage struct { + MessageId *string + ReceiptHandle *string + Body *string + MD5OfBody *string + MessageAttributes map[string]*IncomingMessageAttributeValue + MD5OfMessageAttributes *string + Attributes map[string]*string +} + +type IncomingMessageAttributeValue struct { + BinaryValue []byte + DataType *string + StringValue *string +} diff --git a/producer.go b/producer.go index 13f9058..537b778 100644 --- a/producer.go +++ b/producer.go @@ -16,13 +16,6 @@ type Producer interface { Send(ctx context.Context, msg OutgoingMessage) (*string, error) } -type OutgoingMessage struct { - DeduplicationId *string - GroupId *string - Payload []byte - Attributes map[string]string -} - type producer struct { *internal.Client timeout time.Duration @@ -41,6 +34,18 @@ func NewProducer(config ProducerConfig) (Producer, error) { } func (p *producer) Send(ctx context.Context, msg OutgoingMessage) (*string, error) { + timeoutCtx, cancel := context.WithTimeout(context.Background(), p.timeout) + defer cancel() + + output, err := p.Sqs.SendMessageWithContext(timeoutCtx, p.mapToAwsMessage(msg)) + if err != nil { + return nil, errors.Wrap(err, "Failed to send message") + } + + return output.MessageId, nil +} + +func (p *producer) mapToAwsMessage(msg OutgoingMessage) *sqs.SendMessageInput { attributes := make(map[string]*sqs.MessageAttributeValue) for key, value := range msg.Attributes { attributes[key] = &sqs.MessageAttributeValue{ @@ -50,23 +55,13 @@ func (p *producer) Send(ctx context.Context, msg OutgoingMessage) (*string, erro } payload := string(msg.Payload) - input := sqs.SendMessageInput{ + return &sqs.SendMessageInput{ QueueUrl: aws.String(p.QueueUrl), MessageBody: aws.String(payload), MessageAttributes: attributes, MessageGroupId: msg.GroupId, MessageDeduplicationId: msg.DeduplicationId, } - - timeoutCtx, cancel := context.WithTimeout(context.Background(), p.timeout) - defer cancel() - - output, err := p.Sqs.SendMessageWithContext(timeoutCtx, &input) - if err != nil { - return nil, errors.Wrap(err, "Failed to send message") - } - - return output.MessageId, nil } func MarshalToJson(payload interface{}) ([]byte, error) {