Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Add SNS FIFO Topic support #130

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
38 changes: 38 additions & 0 deletions docs/src/main/asciidoc/sns.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,44 @@ configure the SNS client to setup the default converter.
<aws-messaging:notification-messaging-template id="notificationMessagingTemplate" />
----

FIFO SNS Topics have additional required and optional request parameters. For example MessageGroupId is a required
parameter for a FIFO topic. These parameters can be set by specifying them in the headers map.
Spring Cloud AWS extracts the keys and sets the parameters on the underlying SNS SendMessage request.

To specify message attributes on the SNS SendMessage request, additional headers can be added to the header map.

This example shows how to add the MessageGroupId parameter (required for FIFO topics) and MessageDeduplicationId parameter
(optional) to the request. The additional header is added as a MessageAttribute. The attribute Type is based on the java
type of the value by Spring Cloud AWS.

[source,java,indent=0]
----
import com.amazonaws.services.sns.AmazonSNS;
import io.awspring.cloud.messaging.core.NotificationMessagingTemplate;
import io.awspring.cloud.messaging.core.TopicMessageChannel;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.HashMap;

public class SnsNotificationSender {

private final NotificationMessagingTemplate notificationMessagingTemplate;

@Autowired
public SnsNotificationSender(AmazonSNS amazonSns) {
this.notificationMessagingTemplate = new NotificationMessagingTemplate(amazonSns);
}

public void send(String message, String messageGroupId, String messageDeduplicationId) {
HashMap<String, Object> headers = new HashMap<>();
headers.put(TopicMessageChannel.MESSAGE_GROUP_ID_HEADER, messageGroupId);
headers.put(TopicMessageChannel.MESSAGE_DEDUPLICATION_ID_HEADER, messageDeduplicationId);
headers.put("attributeName", "attributeValue");
this.notificationMessagingTemplate.convertAndSend("physicalTopicName", message, headers);
}
}
----

==== Annotation-driven HTTP notification endpoint
SNS supports multiple endpoint types (SQS, Email, HTTP, HTTPS), Spring Cloud AWS provides support for HTTP(S) endpoints.
SNS sends three type of requests to an HTTP topic listener endpoint, for each of them annotations are provided:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ public class TopicMessageChannel extends AbstractMessageChannel {
*/
public static final String NOTIFICATION_SUBJECT_HEADER = "NOTIFICATION_SUBJECT_HEADER";

/**
* Message group id for SNS message (applies only to FIFO topic).
*/
public static final String MESSAGE_GROUP_ID_HEADER = "MessageGroupId";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this value be the same as the SQS headers in SqsMessageHeaders.java?

Suggested change
public static final String MESSAGE_GROUP_ID_HEADER = "MessageGroupId";
public static final String MESSAGE_GROUP_ID_HEADER = "message-group-id";

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct that this does not follow the naming of SqsMessageHeaders. I followed the AWS header naming. I do not see added value in adding another translation layer which is confusing to users?
I also understand the need for consistency so maintainer can say which is preferred :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be in for keeping consistency with AWS naming, but since SQS has been implemented already like that 4 years ago, lets keep it in sync with SQS implementation.

We can reconsider for 3.0


/**
* Message Deduplication id for SNS message.
*/
public static final String MESSAGE_DEDUPLICATION_ID_HEADER = "MessageDeduplicationId";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as before

Suggested change
public static final String MESSAGE_DEDUPLICATION_ID_HEADER = "MessageDeduplicationId";
public static final String MESSAGE_DEDUPLICATION_ID_HEADER = "message-deduplication-id";


private final JsonStringEncoder jsonStringEncoder = JsonStringEncoder.getInstance();

private final AmazonSNS amazonSns;
Expand All @@ -71,6 +81,13 @@ protected boolean sendInternal(Message<?> message, long timeout) {
if (!messageAttributes.isEmpty()) {
publishRequest.withMessageAttributes(messageAttributes);
}
if (message.getHeaders().containsKey(MESSAGE_GROUP_ID_HEADER)) {
publishRequest.withMessageGroupId(message.getHeaders().get(MESSAGE_GROUP_ID_HEADER, String.class));
}
if (message.getHeaders().containsKey(MESSAGE_DEDUPLICATION_ID_HEADER)) {
publishRequest.withMessageDeduplicationId(
message.getHeaders().get(MESSAGE_DEDUPLICATION_ID_HEADER, String.class));
}
this.amazonSns.publish(publishRequest);

return true;
Expand All @@ -82,6 +99,10 @@ private Map<String, MessageAttributeValue> getMessageAttributes(Message<?> messa
String messageHeaderName = messageHeader.getKey();
Object messageHeaderValue = messageHeader.getValue();

if (isSkipHeader(messageHeaderName)) {
continue;
}

if (MessageHeaders.CONTENT_TYPE.equals(messageHeaderName) && messageHeaderValue != null) {
messageAttributes.put(messageHeaderName, getContentTypeMessageAttribute(messageHeaderValue));
}
Expand Down Expand Up @@ -112,6 +133,10 @@ else if (messageHeaderValue instanceof List) {
return messageAttributes;
}

private boolean isSkipHeader(String headerName) {
return MESSAGE_GROUP_ID_HEADER.equals(headerName) || MESSAGE_DEDUPLICATION_ID_HEADER.equals(headerName);
}

private MessageAttributeValue getStringArrayMessageAttribute(List<Object> messageHeaderValue) {

List<String> stringValues = messageHeaderValue.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,23 @@

package io.awspring.cloud.messaging.core;

import java.util.HashMap;
import java.util.Locale;
import java.util.Map;

import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.services.sns.model.ListTopicsRequest;
import com.amazonaws.services.sns.model.ListTopicsResult;
import com.amazonaws.services.sns.model.PublishRequest;
import com.amazonaws.services.sns.model.PublishResult;
import com.amazonaws.services.sns.model.Topic;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;

import org.springframework.messaging.core.DestinationResolver;
import org.springframework.messaging.support.MessageBuilder;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.isNotNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -106,4 +111,26 @@ void convertAndSend_withPayloadAndSubject_shouldSetSubject() throws Exception {
new PublishRequest(physicalTopicName, "My message", "My subject").withMessageAttributes(isNotNull()));
}

@Test
void convertAndSend_withPayloadAndMessageGroupIdHeader_shouldSetMessageGroupIdParameter() throws Exception {
// Arrange
AmazonSNS amazonSns = mock(AmazonSNS.class);
NotificationMessagingTemplate notificationMessagingTemplate = new NotificationMessagingTemplate(amazonSns);
String physicalTopicName = "arn:aws:sns:eu-west:123456789012:test";
when(amazonSns.listTopics(new ListTopicsRequest(null)))
.thenReturn(new ListTopicsResult().withTopics(new Topic().withTopicArn(physicalTopicName)));
ArgumentCaptor<PublishRequest> publishRequestArgumentCaptor = ArgumentCaptor.forClass(PublishRequest.class);
when(amazonSns.publish(publishRequestArgumentCaptor.capture())).thenReturn(new PublishResult());

// Act
Map<String, Object> headers = new HashMap<>();
headers.put(TopicMessageChannel.MESSAGE_GROUP_ID_HEADER, "id-5");
notificationMessagingTemplate.convertAndSend(physicalTopicName, "My message", headers);

// Assert
PublishRequest publishRequest = publishRequestArgumentCaptor.getValue();
assertThat(publishRequest.getMessage()).isEqualTo("My message");
assertThat(publishRequest.getMessageGroupId()).isEqualTo("id-5");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -244,4 +244,46 @@ public void sendMessage_withStringArrayMessageHeader_shouldBeSentAsTopicMessageA
.isEqualTo(MessageAttributeDataTypes.STRING_ARRAY);
}

@Test
public void sendMessage_withMessageGroupIdHeader_shouldSetMessageGroupIdOnPublishRequestAndNotSetItAsMessageAttribute() {
// Arrange
AmazonSNS amazonSns = mock(AmazonSNS.class);
ArgumentCaptor<PublishRequest> publishRequestArgumentCaptor = ArgumentCaptor.forClass(PublishRequest.class);
when(amazonSns.publish(publishRequestArgumentCaptor.capture())).thenReturn(new PublishResult());

Message<String> message = MessageBuilder.withPayload("Hello")
.setHeader(TopicMessageChannel.MESSAGE_GROUP_ID_HEADER, "id-5").build();
MessageChannel messageChannel = new TopicMessageChannel(amazonSns, "topicArn");

// Act
boolean sent = messageChannel.send(message);

// Assert
assertThat(sent).isTrue();
assertThat(publishRequestArgumentCaptor.getValue().getMessageAttributes()
.containsKey(TopicMessageChannel.MESSAGE_GROUP_ID_HEADER)).isFalse();
assertThat(publishRequestArgumentCaptor.getValue().getMessageGroupId()).isEqualTo("id-5");
}

@Test
public void sendMessage_withMessageDeduplicationIdHeader_shouldSetMessageDeduplicationIdOnPublishRequestAndNotSetItAsMessageAttribute() {
// Arrange
AmazonSNS amazonSns = mock(AmazonSNS.class);
ArgumentCaptor<PublishRequest> publishRequestArgumentCaptor = ArgumentCaptor.forClass(PublishRequest.class);
when(amazonSns.publish(publishRequestArgumentCaptor.capture())).thenReturn(new PublishResult());

Message<String> message = MessageBuilder.withPayload("Hello")
.setHeader(TopicMessageChannel.MESSAGE_DEDUPLICATION_ID_HEADER, "id-5").build();
MessageChannel messageChannel = new TopicMessageChannel(amazonSns, "topicArn");

// Act
boolean sent = messageChannel.send(message);

// Assert
assertThat(sent).isTrue();
assertThat(publishRequestArgumentCaptor.getValue().getMessageAttributes()
.containsKey(TopicMessageChannel.MESSAGE_DEDUPLICATION_ID_HEADER)).isFalse();
assertThat(publishRequestArgumentCaptor.getValue().getMessageDeduplicationId()).isEqualTo("id-5");
}

}