Skip to content

Commit

Permalink
feature-2730 fix the escaping feature for rabbitMQ outbound connector (
Browse files Browse the repository at this point in the history
…#2778)

* feature-2730 fix the escaping feature for rabbitMQ outbound connector

* feature-2730 remove the escape feature in the connectors implementing them

* feature-2730 format files

* feature-2730 format files json

* feature-2730 file format is inconsistent for JS
  • Loading branch information
mathias-vandaele committed Jun 26, 2024
1 parent ee7825a commit 6234b28
Show file tree
Hide file tree
Showing 14 changed files with 132 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.camunda.connector.sns.outbound.model.SnsConnectorResult;
import io.camunda.connector.sns.suppliers.SnsClientSupplier;
import java.util.Optional;
import org.apache.commons.text.StringEscapeUtils;

@OutboundConnector(
name = "AWS SNS Outbound",
Expand Down Expand Up @@ -81,7 +80,7 @@ private PublishResult sendMsgToSns(final AmazonSNS snsClient, SnsConnectorReques
try {
String topicMessage =
request.getTopic().getMessage() instanceof String
? StringEscapeUtils.unescapeJson(request.getTopic().getMessage().toString())
? request.getTopic().getMessage().toString()
: objectMapper.writeValueAsString(request.getTopic().getMessage());
PublishRequest message =
new PublishRequest()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,30 @@ public abstract class BaseTest {
}
}""";

protected static final String REQUEST_WITH_JSON_MSG_BODY_SPECIAL_CHAR =
"""
{
"authentication":{
"secretKey":"abc",
"accessKey":"def"
},
"topic":{
"message":{"key":"\\"normal\\" value"},
"messageAttributes":{
"attribute2":{
"StringValue":"attribute 2 value",
"DataType":"String"
},
"attribute1":{
"StringValue":"attribute 1 value",
"DataType":"String"
}
},
"subject":"MySubject",
"region":"us-east-1",
"topicArn":"arn:aws:sns:us-east-1:000000000000:test"
}
}""";

protected static final ObjectMapper objectMapper = ObjectMapperSupplier.getMapperInstance();
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,30 @@ public void execute_shouldExecuteRequestWithJsonTypeMsg() throws JsonProcessingE
String message = requestArgumentCaptor.getValue().getMessage();
Assertions.assertThat(message).isEqualTo("{\"key\":\"value\"}");
}

@Test
public void execute_shouldExecuteRequestWithJsonTypeMsgShouldNotEscape() {
// Given
Mockito.when(snsClient.publish(requestArgumentCaptor.capture())).thenReturn(publishResult);
SnsClientSupplier snsClientSupplier = Mockito.mock(SnsClientSupplier.class);
Mockito.when(
snsClientSupplier.getSnsClient(
any(AWSCredentialsProvider.class), ArgumentMatchers.anyString()))
.thenReturn(snsClient);
connector = new SnsConnectorFunction(snsClientSupplier, objectMapper);
context =
OutboundConnectorContextBuilder.create()
.secret(AWS_ACCESS_KEY, ACTUAL_ACCESS_KEY)
.secret(AWS_SECRET_KEY, ACTUAL_SECRET_KEY)
.variables(REQUEST_WITH_JSON_MSG_BODY_SPECIAL_CHAR)
.build();

// When
connector.execute(context);

// Then
Mockito.verify(snsClient, Mockito.times(1)).shutdown();
String message = requestArgumentCaptor.getValue().getMessage();
Assertions.assertThat(message).isEqualTo("{\"key\":\"\\\"normal\\\" value\"}");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.commons.text.StringEscapeUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand Down Expand Up @@ -96,8 +95,7 @@ public KafkaConnectorConsumer(

public void startConsumer() {
if (elementProps.avro() != null) {
var schemaString = StringEscapeUtils.unescapeJson(elementProps.avro().schema());
Schema schema = new Schema.Parser().setValidate(true).parse(schemaString);
Schema schema = new Schema.Parser().setValidate(true).parse(elementProps.avro().schema());
AvroSchema avroSchema = new AvroSchema(schema);
AvroMapper avroMapper = new AvroMapper();
avroObjectReader = avroMapper.reader(avroSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Properties;
import org.apache.commons.text.StringEscapeUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.config.TopicConfig;
Expand All @@ -24,15 +23,12 @@

public class KafkaPropertyTransformer {

private static final Logger LOG = LoggerFactory.getLogger(KafkaPropertyTransformer.class);

static final String DEFAULT_GROUP_ID_PREFIX = "kafka-inbound-connector";

protected static final String DEFAULT_KEY_DESERIALIZER =
"org.apache.kafka.common.serialization.StringDeserializer";

protected static final String BYTE_ARRAY_DESERIALIZER =
"org.apache.kafka.common.serialization.ByteArrayDeserializer";
static final String DEFAULT_GROUP_ID_PREFIX = "kafka-inbound-connector";
private static final Logger LOG = LoggerFactory.getLogger(KafkaPropertyTransformer.class);

public static Properties getKafkaProperties(
KafkaConnectorProperties props, InboundConnectorContext context) {
Expand Down Expand Up @@ -109,7 +105,7 @@ private static Object parseKey(
return objectReader.readTree((String) consumerRecord.key());
} catch (Exception e) {
LOG.debug("Cannot parse key to json object -> use as string");
return StringEscapeUtils.unescapeJson((String) consumerRecord.key());
return consumerRecord.key();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.avro.Schema;
import org.apache.commons.text.StringEscapeUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand Down Expand Up @@ -82,13 +81,13 @@ public Object execute(final OutboundConnectorContext context) {
}

public static byte[] produceAvroMessage(final KafkaConnectorRequest request) throws Exception {
var schemaString = StringEscapeUtils.unescapeJson(request.avro().schema());
var schemaString = request.avro().schema();
Schema raw = new Schema.Parser().setValidate(true).parse(schemaString);
AvroSchema schema = new AvroSchema(raw);
AvroMapper avroMapper = new AvroMapper();
Object messageValue = request.message().value();
if (messageValue instanceof String messageValueAsString) {
messageValue = objectMapper.readTree(StringEscapeUtils.unescapeJson(messageValueAsString));
messageValue = objectMapper.readTree(messageValueAsString);
}
return avroMapper.writer(schema).writeValueAsBytes(messageValue);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[
{
"testDescription": "Regular happy case",
"testDescription":"Regular happy case",
"authentication":{
"username":"myLogin",
"password":"mySecretPassword"
Expand All @@ -15,7 +15,7 @@
}
},
{
"testDescription": "Regular happy case with headers",
"testDescription":"Regular happy case with headers",
"authentication":{
"username":"myLogin",
"password":"mySecretPassword"
Expand All @@ -28,13 +28,13 @@
"key":"Happy",
"value":"Case"
},
"headers": {
"headers":{
"headerKey1":"headerValue",
"headerKey2":"headerValue2"
}
},
{
"testDescription": "Regular happy case with Avro Schema",
"testDescription":"Regular happy case with Avro Schema",
"authentication":{
"username":"myLogin",
"password":"mySecretPassword"
Expand All @@ -45,16 +45,17 @@
},
"message":{
"key":"Happy",
"value":{"name": "Testname", "age": 40,
"emails": ["test@camunda.com"]
"value":{
"name":"Testname", "age":40,
"emails":["test@camunda.com"]
}
},
"avro": {
"schema": "{\"type\": \"record\",\"name\": \"Employee\",\"fields\": [{\"name\": \"name\", \"type\": \"string\"},{\"name\": \"age\", \"type\": \"int\"},{\"name\": \"emails\", \"type\": {\"type\": \"array\", \"items\": \"string\"}},{\"name\": \"boss\", \"type\": [\"Employee\",\"null\"]}]}"
"avro":{
"schema":"{\"type\": \"record\",\"name\": \"Employee\",\"fields\": [{\"name\": \"name\", \"type\": \"string\"},{\"name\": \"age\", \"type\": \"int\"},{\"name\": \"emails\", \"type\": {\"type\": \"array\", \"items\": \"string\"}},{\"name\": \"boss\", \"type\": [\"Employee\",\"null\"]}]}"
}
},
{
"testDescription": "With additional properties",
"testDescription":"With additional properties",
"authentication":{
"username":"myLogin",
"password":"mySecretPassword"
Expand All @@ -67,12 +68,12 @@
"key":"Happy",
"value":"Case"
},
"additionalProperties" : {
"transaction.timeout.ms": "20000"
"additionalProperties":{
"transaction.timeout.ms":"20000"
}
},
{
"testDescription": "With overridden properties",
"testDescription":"With overridden properties",
"authentication":{
"username":"myLogin",
"password":"mySecretPassword"
Expand All @@ -81,18 +82,18 @@
"bootstrapServers":"kafka-stub.kafka.cloud:1234",
"topicName":"some-awesome-topic"
},
"groupId": "test-group-id",
"groupId":"test-group-id",
"message":{
"key":"Happy",
"value":"Case"
},
"additionalProperties" : {
"delivery.timeout.ms": "20000",
"additionalProperties":{
"delivery.timeout.ms":"20000",
"value.serializer":"org.apache.kafka.common.serialization.StringSerializer"
}
},
{
"testDescription": "With value as JSON",
"testDescription":"With value as JSON",
"authentication":{
"username":"myLogin",
"password":"mySecretPassword"
Expand All @@ -103,14 +104,17 @@
},
"message":{
"key":"Happy",
"value": {"documentId": "1234567890", "signee": "User Testerson", "contentBase64": "QXQgbGVhc3Qgc29tZWJvZHkgcmVhZHMgdGhlIHRlc3RzLiBHcmVhdCBqb2Ih"}
"value":{
"documentId":"1234567890", "signee":"User Testerson",
"contentBase64":"QXQgbGVhc3Qgc29tZWJvZHkgcmVhZHMgdGhlIHRlc3RzLiBHcmVhdCBqb2Ih"
}
},
"additionalProperties" : {
"delivery.timeout.ms": "20000"
"additionalProperties":{
"delivery.timeout.ms":"20000"
}
},
{
"testDescription": "Username as secret",
"testDescription":"Username as secret",
"authentication":{
"username":"{{secrets.USER_NAME}}",
"password":"mySecretPassword"
Expand All @@ -125,7 +129,7 @@
}
},
{
"testDescription": "Password as secret",
"testDescription":"Password as secret",
"authentication":{
"username":"myLogin",
"password":"{{secrets.PASSWORD}}"
Expand All @@ -140,7 +144,7 @@
}
},
{
"testDescription": "Bootstrap server as secret",
"testDescription":"Bootstrap server as secret",
"authentication":{
"username":"myLogin",
"password":"mySecretPassword"
Expand All @@ -155,7 +159,7 @@
}
},
{
"testDescription": "Topic name as secret",
"testDescription":"Topic name as secret",
"authentication":{
"username":"myLogin",
"password":"mySecretPassword"
Expand All @@ -168,5 +172,20 @@
"key":"Happy",
"value":"Case"
}
},
{
"testDescription":"Topic name as secret",
"authentication":{
"username":"myLogin",
"password":"mySecretPassword"
},
"topic":{
"bootstrapServers":"kafka-stub.kafka.cloud:1234",
"topicName":"{{secrets.TOPIC_NAME}}"
},
"message":{
"key":"Happy \"birthday\"",
"value":"Case"
}
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import io.camunda.connector.model.request.data.SendMessageToChannel;
import java.util.Locale;
import java.util.Optional;
import org.apache.commons.text.StringEscapeUtils;

public record SendMessageToChannelOperation(SendMessageToChannel model)
implements ChannelOperation {
Expand All @@ -25,7 +24,7 @@ public Object invoke(final GraphServiceClient graphClient) {
Optional.ofNullable(model.bodyType())
.map(type -> BodyType.forValue(type.toLowerCase(Locale.ROOT)))
.orElse(BodyType.Text));
body.setContent(StringEscapeUtils.unescapeJson(model.content()));
body.setContent(model.content());
chatMessage.setBody(body);
return graphClient
.teams()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import io.camunda.connector.model.request.data.SendMessageInChat;
import java.util.Locale;
import java.util.Optional;
import org.apache.commons.text.StringEscapeUtils;

public record SendMessageInChatChatOperation(SendMessageInChat model) implements ChatOperation {
@Override
Expand All @@ -24,7 +23,7 @@ public Object invoke(final GraphServiceClient graphClient) {
Optional.ofNullable(model.bodyType())
.map(type -> BodyType.forValue(type.toLowerCase(Locale.ROOT)))
.orElse(BodyType.Text));
body.setContent(StringEscapeUtils.unescapeJson(model.content()));
body.setContent(model.content());
chatMessage.setBody(body);
return graphClient.chats().byChatId(model.chatId()).messages().post(chatMessage);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ public void invoke_shouldSetTextBodyTypeByDefault() {
assertThat(chatMessage.getBody().getContent()).isEqualTo("content");
}

@Test
public void invoke_shouldSetTextBodyTypeContentIsNotEscaped() {
// Given SendMessageInChat without bodyType
// When
sendMessageInChat = new SendMessageInChat(ActualValue.Chat.CHAT_ID, "\"normal\" content", null);
operationFactory.getService(sendMessageInChat).invoke(graphServiceClient);
// Then
ChatMessage chatMessage = chatMessageCaptor.getValue();
assertThat(chatMessage.getBody().getContentType()).isEqualTo(BodyType.Text);
assertThat(chatMessage.getBody().getContent()).isEqualTo("\"normal\" content");
}

@ParameterizedTest
@ValueSource(strings = {"html", "HTML", "text", "TexT"})
public void invoke_shouldSetTextBodyType(String input) {
Expand Down
Loading

0 comments on commit 6234b28

Please sign in to comment.