Skip to content

Commit

Permalink
chore(inbound,sqs): move to generator (#2249)
Browse files Browse the repository at this point in the history
* chore(inbound,sqs): move to generator

* fix(sqs): fixed correct template overrides

Co-authored-by: Oleksii Ivanov <108869886+Oleksiivanov@users.noreply.github.com>

* chore(inbound,sqs): corrected generator props

---------

Co-authored-by: Oleksii Ivanov <108869886+Oleksiivanov@users.noreply.github.com>
  • Loading branch information
igpetrov and Oleksiivanov committed Apr 2, 2024
1 parent b3a776f commit 165d915
Show file tree
Hide file tree
Showing 13 changed files with 1,141 additions and 1,366 deletions.
499 changes: 248 additions & 251 deletions connectors/aws/aws-sqs/element-templates/aws-sqs-boundary-connector.json

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

557 changes: 274 additions & 283 deletions connectors/aws/aws-sqs/element-templates/aws-sqs-start-message.json

Large diffs are not rendered by default.

21 changes: 21 additions & 0 deletions connectors/aws/aws-sqs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,27 @@
</file>
</files>
</connector>
<connector>
<connectorClass>io.camunda.connector.inbound.SqsExecutable</connectorClass>
<files>
<file>
<templateId>io.camunda.connectors.AWSSQS.StartEvent.v1</templateId>
<templateFileName>aws-sqs-start-event-connector.json</templateFileName>
</file>
<file>
<templateId>io.camunda.connectors.AWSSQS.startmessage.v1</templateId>
<templateFileName>aws-sqs-start-message.json</templateFileName>
</file>
<file>
<templateId>io.camunda.connectors.AWSSQS.intermediate.v1</templateId>
<templateFileName>aws-sqs-inbound-intermediate-connector.json</templateFileName>
</file>
<file>
<templateId>io.camunda.connectors.AWSSQS.boundary.v1</templateId>
<templateFileName>aws-sqs-boundary-connector.json</templateFileName>
</file>
</files>
</connector>
</connectors>
<includeDependencies>
<includeDependency>io.camunda.connector:connector-aws-base</includeDependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,6 @@ public class MessageMapper {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageMapper.class);

public static SqsInboundMessage toSqsInboundMessage(final Message message) {
SqsInboundMessage sqsInboundMessage = new SqsInboundMessage();
sqsInboundMessage.setMessageId(message.getMessageId());
sqsInboundMessage.setReceiptHandle(message.getReceiptHandle());
sqsInboundMessage.setMD5OfMessageAttributes(message.getMD5OfMessageAttributes());
sqsInboundMessage.setAttributes(message.getAttributes());

Map<String, io.camunda.connector.inbound.model.message.MessageAttributeValue>
sqsInboundMessageAttributes = new HashMap<>();
for (Map.Entry<String, MessageAttributeValue> entry :
Expand All @@ -35,12 +29,15 @@ public static SqsInboundMessage toSqsInboundMessage(final Message message) {

sqsInboundMessageAttributes.put(entry.getKey(), sqsInboundMessageAttribute);
}
sqsInboundMessage.setMessageAttributes(sqsInboundMessageAttributes);

sqsInboundMessage.setBody(toObjectIfPossible(message.getBody()));
sqsInboundMessage.setmD5OfBody(message.getMD5OfBody());

return sqsInboundMessage;
return new SqsInboundMessage(
message.getMessageId(),
message.getReceiptHandle(),
message.getMD5OfBody(),
toObjectIfPossible(message.getBody()),
message.getAttributes(),
message.getMD5OfMessageAttributes(),
sqsInboundMessageAttributes);
}

private static Object toObjectIfPossible(final String body) {
Expand All @@ -54,15 +51,11 @@ private static Object toObjectIfPossible(final String body) {

private static io.camunda.connector.inbound.model.message.MessageAttributeValue
toSqsInboundMessageAttribute(final MessageAttributeValue attributeValue) {
io.camunda.connector.inbound.model.message.MessageAttributeValue sqsInboundMessageAttribute =
new io.camunda.connector.inbound.model.message.MessageAttributeValue();

sqsInboundMessageAttribute.setBinaryValue(attributeValue.getBinaryValue());
sqsInboundMessageAttribute.setDataType(attributeValue.getDataType());
sqsInboundMessageAttribute.setStringValue(attributeValue.getStringValue());
sqsInboundMessageAttribute.setBinaryListValues(attributeValue.getBinaryListValues());
sqsInboundMessageAttribute.setStringListValues(attributeValue.getStringListValues());

return sqsInboundMessageAttribute;
return new io.camunda.connector.inbound.model.message.MessageAttributeValue(
attributeValue.getStringValue(),
attributeValue.getBinaryValue(),
attributeValue.getStringListValues(),
attributeValue.getBinaryListValues(),
attributeValue.getDataType());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
import io.camunda.connector.aws.CredentialsProviderSupport;
import io.camunda.connector.common.suppliers.AmazonSQSClientSupplier;
import io.camunda.connector.common.suppliers.DefaultAmazonSQSClientSupplier;
import io.camunda.connector.generator.dsl.BpmnType;
import io.camunda.connector.generator.java.annotation.ElementTemplate;
import io.camunda.connector.generator.java.annotation.ElementTemplate.ConnectorElementType;
import io.camunda.connector.generator.java.annotation.ElementTemplate.PropertyGroup;
import io.camunda.connector.inbound.model.SqsInboundProperties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -22,6 +26,44 @@
import org.slf4j.LoggerFactory;

@InboundConnector(name = "AWS SQS Inbound", type = "io.camunda:aws-sqs-inbound:1")
@ElementTemplate(
id = "io.camunda.connectors.AWSSQS.inbound.v1",
name = "Amazon SQS Connector",
icon = "icon.svg",
version = 9,
inputDataClass = SqsInboundProperties.class,
description = "Receive message from a queue",
documentationRef =
"https://docs.camunda.io/docs/components/connectors/out-of-the-box-connectors/amazon-sqs/?amazonsqs=inbound",
propertyGroups = {
@PropertyGroup(id = "authentication", label = "Authentication"),
@PropertyGroup(id = "configuration", label = "Configuration"),
@PropertyGroup(id = "queueProperties", label = "Queue properties"),
@PropertyGroup(id = "messagePollingProperties", label = "Message polling properties"),
@PropertyGroup(id = "input", label = "Use next attribute names for activation condition")
},
elementTypes = {
@ConnectorElementType(
appliesTo = BpmnType.START_EVENT,
elementType = BpmnType.START_EVENT,
templateIdOverride = "io.camunda.connectors.AWSSQS.StartEvent.v1",
templateNameOverride = "Amazon SQS Start Event Connector"),
@ConnectorElementType(
appliesTo = BpmnType.START_EVENT,
elementType = BpmnType.MESSAGE_START_EVENT,
templateIdOverride = "io.camunda.connectors.AWSSQS.startmessage.v1",
templateNameOverride = "Amazon SQS Message Start Event Connector"),
@ConnectorElementType(
appliesTo = {BpmnType.INTERMEDIATE_THROW_EVENT, BpmnType.INTERMEDIATE_CATCH_EVENT},
elementType = BpmnType.INTERMEDIATE_CATCH_EVENT,
templateIdOverride = "io.camunda.connectors.AWSSQS.intermediate.v1",
templateNameOverride = "Amazon SQS Intermediate Message Catch Event connector"),
@ConnectorElementType(
appliesTo = BpmnType.BOUNDARY_EVENT,
elementType = BpmnType.BOUNDARY_EVENT,
templateIdOverride = "io.camunda.connectors.AWSSQS.boundary.v1",
templateNameOverride = "Amazon SQS Boundary Event Connector")
})
public class SqsExecutable implements InboundConnectorExecutable {
private static final Logger LOGGER = LoggerFactory.getLogger(SqsExecutable.class);
private final AmazonSQSClientSupplier sqsClientSupplier;
Expand Down Expand Up @@ -50,7 +92,7 @@ public void activate(final InboundConnectorContext context) {

var region =
AwsUtils.extractRegionOrDefault(
properties.getConfiguration(), properties.getQueue().getRegion());
properties.getConfiguration(), properties.getQueue().region());
amazonSQS =
sqsClientSupplier.sqsClient(
CredentialsProviderSupport.credentialsProvider(properties), region);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public SqsQueueConsumer(

@Override
public void run() {
LOGGER.info("Started SQS consumer for queue {}", properties.getQueue().getUrl());
LOGGER.info("Started SQS consumer for queue {}", properties.getQueue().url());

final ReceiveMessageRequest receiveMessageRequest = createReceiveMessageRequest();
ReceiveMessageResult receiveMessageResult;
Expand All @@ -51,7 +51,7 @@ public void run() {
for (Message message : messages) {
try {
context.correlate(MessageMapper.toSqsInboundMessage(message));
sqsClient.deleteMessage(properties.getQueue().getUrl(), message.getReceiptHandle());
sqsClient.deleteMessage(properties.getQueue().url(), message.getReceiptHandle());
} catch (ConnectorInputException e) {
LOGGER.warn("NACK - failed to parse SQS message body: {}", e.getMessage());
}
Expand All @@ -60,19 +60,19 @@ public void run() {
LOGGER.debug("NACK - failed to correlate event", e);
}
} while (queueConsumerActive.get());
LOGGER.info("Stopping SQS consumer for queue {}", properties.getQueue().getUrl());
LOGGER.info("Stopping SQS consumer for queue {}", properties.getQueue().url());
}

private ReceiveMessageRequest createReceiveMessageRequest() {
return new ReceiveMessageRequest()
.withWaitTimeSeconds(Integer.valueOf(properties.getQueue().getPollingWaitTime()))
.withQueueUrl(properties.getQueue().getUrl())
.withWaitTimeSeconds(Integer.valueOf(properties.getQueue().pollingWaitTime()))
.withQueueUrl(properties.getQueue().url())
.withMessageAttributeNames(
Optional.ofNullable(properties.getQueue().getMessageAttributeNames())
Optional.ofNullable(properties.getQueue().messageAttributeNames())
.filter(list -> !list.isEmpty())
.orElse(ALL_ATTRIBUTES_KEY))
.withAttributeNames(
Optional.ofNullable(properties.getQueue().getAttributeNames())
Optional.ofNullable(properties.getQueue().attributeNames())
.filter(list -> !list.isEmpty())
.orElse(ALL_ATTRIBUTES_KEY));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,108 +6,46 @@
*/
package io.camunda.connector.inbound.model;

import jakarta.validation.constraints.NotEmpty;
import io.camunda.connector.generator.dsl.Property.FeelMode;
import io.camunda.connector.generator.java.annotation.TemplateProperty;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.Pattern;
import java.util.List;
import java.util.Objects;

public class SqsInboundQueueProperties {
@Deprecated private String region;
@NotEmpty private String url;
private List<String> attributeNames;
private List<String> messageAttributeNames;

// TODO: when migrating to template generator, default to 20 (max possible value)
@Pattern(regexp = "^([0-9]?|1[0-9]|20|secrets\\..+)$")
private String pollingWaitTime;

@Deprecated
public String getRegion() {
return region;
}

@Deprecated
public void setRegion(final String region) {
this.region = region;
}

public String getUrl() {
return url;
}

public void setUrl(final String url) {
this.url = url;
}

public boolean isContainAttributeNames() {
return attributeNames != null && !attributeNames.isEmpty();
}

public boolean isContainMessageAttributeNames() {
return messageAttributeNames != null && !messageAttributeNames.isEmpty();
}

public List<String> getAttributeNames() {
return attributeNames;
}

public void setAttributeNames(final List<String> attributeNames) {
this.attributeNames = attributeNames;
}

public List<String> getMessageAttributeNames() {
return messageAttributeNames;
}

public void setMessageAttributeNames(final List<String> messageAttributeNames) {
this.messageAttributeNames = messageAttributeNames;
}

public String getPollingWaitTime() {
return pollingWaitTime;
}

public void setPollingWaitTime(final String pollingWaitTime) {
this.pollingWaitTime = pollingWaitTime;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final SqsInboundQueueProperties that = (SqsInboundQueueProperties) o;
return Objects.equals(region, that.region)
&& Objects.equals(url, that.url)
&& Objects.equals(attributeNames, that.attributeNames)
&& Objects.equals(messageAttributeNames, that.messageAttributeNames)
&& Objects.equals(pollingWaitTime, that.pollingWaitTime);
}

@Override
public int hashCode() {
return Objects.hash(region, url, attributeNames, messageAttributeNames, pollingWaitTime);
}

@Override
public String toString() {
return "SqsInboundQueueProperties{"
+ "region='"
+ region
+ "'"
+ ", url='"
+ url
+ "'"
+ ", attributeNames="
+ attributeNames
+ ", messageAttributeNames="
+ messageAttributeNames
+ ", pollingWaitTime='"
+ pollingWaitTime
+ "'"
+ "}";
}
}
public record SqsInboundQueueProperties(
@Deprecated @TemplateProperty(ignore = true) String region,
@TemplateProperty(
id = "queue.url",
label = "Queue URL",
group = "queueProperties",
description = "Specify the URL of the SQS queue where you would like to subscribe to",
feel = FeelMode.disabled)
@NotBlank
String url,
@TemplateProperty(
id = "queue.attributeNames",
label = "Attribute names",
group = "input",
description =
"Array of queue attribute names. See <a href=\"https://docs.camunda.io/docs/components/connectors/out-of-the-box-connectors/amazon-sqs/?amazonsqs=inbound\" target=\"_blank\">documentation</a> for details",
feel = FeelMode.optional)
List<String> attributeNames,
@TemplateProperty(
id = "queue.messageAttributeNames",
label = "Message attribute names",
group = "input",
description =
"Array of message attribute names. See <a href=\"https://docs.camunda.io/docs/components/connectors/out-of-the-box-connectors/amazon-sqs/?amazonsqs=inbound\" target=\"_blank\">documentation</a> for details",
feel = FeelMode.optional)
List<String> messageAttributeNames,
@TemplateProperty(
id = "queue.pollingWaitTime",
label = "Polling wait time",
group = "messagePollingProperties",
defaultValue = "20",
description =
"The duration (in seconds) for which the call waits for a message to arrive in the queue before returning. See <a href=\"https://docs.camunda.io/docs/components/connectors/out-of-the-box-connectors/amazon-sqs/?amazonsqs=inbound\" target=\"_blank\">documentation</a> for details",
feel = FeelMode.disabled)
@Pattern(regexp = "^([0-9]?|1[0-9]|20|secrets\\..+)$")
@NotBlank
String pollingWaitTime) {}

0 comments on commit 165d915

Please sign in to comment.