Skip to content

Commit

Permalink
fix: get message attributes method
Browse files Browse the repository at this point in the history
  • Loading branch information
inaciogu committed Jan 10, 2024
1 parent 2c5e674 commit d71f1b2
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 12 deletions.
36 changes: 26 additions & 10 deletions client/sqs_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,30 @@ func (s *SQSClient) ReceiveMessages(queueUrl string) error {
return nil
}

func (s *SQSClient) getMessageAttributes(messageBody map[string]interface{}) map[string]string {
func (s *SQSClient) getMessageAttributes(message sqs.Message) map[string]string {
attributes := make(map[string]string)
snsMessageAttributes := make(MessageAttributes)

if s.clientOptions.From != OriginSNS {
for key, value := range message.MessageAttributes {
attributes[key] = *value.StringValue
}

return attributes
}

var messageBody map[string]interface{}

err := json.Unmarshal([]byte(*message.Body), &messageBody)

if err != nil {
fmt.Println(err.Error())

return attributes
}

messageAttributes, ok := messageBody["MessageAttributes"].(map[string]interface{})

if !ok {
return attributes
}
Expand Down Expand Up @@ -186,20 +205,18 @@ func (s *SQSClient) ProcessMessage(message *sqs.Message) {
return
}

messageAttributes = s.getMessageAttributes(*message)
if s.clientOptions.From == OriginSNS {
var snsMessageBody map[string]interface{}

formattedSNSBody := strings.ReplaceAll(messageBody["Message"].(string), "'", "")
var snsBody map[string]interface{}

err := json.Unmarshal([]byte(formattedSNSBody), &snsMessageBody)
err = json.Unmarshal([]byte(messageBody["Message"].(string)), &snsBody)

if err != nil {
fmt.Println(err.Error())

return
}
messageAttributes = s.getMessageAttributes(messageBody)
messageBody = snsMessageBody
messageBody = snsBody
}

meta := MessageMetadata{
Expand All @@ -216,7 +233,7 @@ func (s *SQSClient) ProcessMessage(message *sqs.Message) {
handled := s.clientOptions.Handle(translatedMessage)

if !handled {
fmt.Println("failed to handle message")
fmt.Printf("failed to handle message with ID: %s\n", meta.MessageId)

s.client.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{
QueueUrl: queueUrl,
Expand All @@ -227,8 +244,7 @@ func (s *SQSClient) ProcessMessage(message *sqs.Message) {
return
}

fmt.Printf("message handled: %s\n", translatedMessage.Content)
fmt.Printf("metadata: %s\n", messageAttributes)
fmt.Printf("message handled ID: %s\n", meta.MessageId)

s.client.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: queueUrl,
Expand Down
14 changes: 12 additions & 2 deletions client/sqs_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,21 @@ func (uts *UnitTest) TestProcessMessage_Handled() {
return true
},
PollingWaitTimeSeconds: 20,
From: client.OriginSQS,
From: client.OriginSNS,
})

message := &sqs.Message{
Body: aws.String(`{"content": "fake-content"}`),
Body: aws.String(`{
"Message": "{\n \"asda\": \"asdas\"\n}",
"MessageId": "fake-message-id",
"ReceiptHandle": "fake-receipt-handle",
"MessageAttributes": {
"attribute1": {
"Type": "String",
"Value": "value1"
}
}
}`),
ReceiptHandle: aws.String("fake-receipt-handle"),
MessageId: aws.String("fake-message-id"),
}
Expand Down

0 comments on commit d71f1b2

Please sign in to comment.