Skip to content

Commit

Permalink
feat(rabbitmq): Add to template generator (#2246)
Browse files Browse the repository at this point in the history
* feat(rabbitmq): Add to template generator

* feat(rabbitmq): Move pattern constraint in the right place

* feat(rabbitmq): Refactor RabbitMq E2E tests

* feat(rabbitmq): Add missing annotations to some fields

* feat(rabbitmq): Refactor existing RabbitMq tests

* Outbound GitLab Connector enhancements (#2214)

* feat(gitlab): list repository branches

* feat(gitlab): create repository branch

* feat(gitlab): create repository file

* feat(gitlab): create merge request

* chore(gitlab): review feedback

* fix(rest): more meaningful error message on invalid host (#2245)

* fix(deps): update aws-java-sdk monorepo to v1.12.688 (#2253)

Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>

* fix(deps): update dependency com.google.apis:google-api-services-sheets to v4-rev20240319-2.0.0 (#2254)

Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>

* fix(sendgrid): capitalize sender group name in the template (#2252)

* chore(deps): update dependency org.cyclonedx:cyclonedx-maven-plugin to v2.8.0 (#2256)

Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>

* fix(deps): update dependency com.microsoft.graph:microsoft-graph to v6.5.0 (#2258)

Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>

* feat(rabbitmq): Add RabbitMq InbC E2E tests

* feat(rabbitmq): Add URI Pattern validation

* feat(rabbitmq): Add missing $ to the URI Pattern

* feat(rabbitmq): Revert BpmnFile debug change

* feat(rabbitmq): Use the existing secrets Pattern for consistency

* bug(inboundc): Fix potential infinite retries on message idempotency check fails (#2281)

* ci: fix integration test GHA (#2274)

* fix(generator): FEEL mode = disabled not applied in some cases (#2286)

* feat(template generator): set default value feel = false for all inbound connectors (#2264)

* fix(generator): standardize line endings in generated element templates (#2288)

* chore(sns): migrate to element template generator (#2266)

* chore(slack): migrate slack inbound to generator (#2279)

* chore(inbound,sqs): move to generator (#2249)

* chore(inbound,sqs): move to generator

* fix(sqs): fixed correct template overrides

Co-authored-by: Oleksii Ivanov <108869886+Oleksiivanov@users.noreply.github.com>

* chore(inbound,sqs): corrected generator props

---------

Co-authored-by: Oleksii Ivanov <108869886+Oleksiivanov@users.noreply.github.com>

* feat(rabbitmq): Rebase main + re generate all templates

---------

Co-authored-by: Wolfgang Amann <8766375+wollefitz@users.noreply.github.com>
Co-authored-by: Igor Petrov <108870003+igpetrov@users.noreply.github.com>
Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
Co-authored-by: Pavel Kotelevsky <38818382+chillleader@users.noreply.github.com>
Co-authored-by: Oleksii Ivanov <108869886+Oleksiivanov@users.noreply.github.com>
Co-authored-by: Mark Farkas <119574841+markfarkas-camunda@users.noreply.github.com>
  • Loading branch information
7 people committed Apr 4, 2024
1 parent 165d915 commit 61c7bef
Show file tree
Hide file tree
Showing 16 changed files with 1,578 additions and 1,256 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,28 @@ public BpmnFile(BpmnModelInstance bpmnModelInstance) {
this.bpmnModelInstance = bpmnModelInstance;
}

public static BpmnModelInstance replace(String resourceName, Replace... replaces) {
try {
var resource = BpmnFile.class.getClassLoader().getResource(resourceName);
var file = new File(resource.toURI());
return replace(file, replaces);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public static BpmnModelInstance replace(File file, Replace... replaces) {
try {
var modelXml = IOUtils.toString(file.toURI(), StandardCharsets.UTF_8);
for (var replace : replaces) {
modelXml = modelXml.replaceAll(replace.oldValue, replace.newValue);
}
return Bpmn.readModelFromStream(IOUtils.toInputStream(modelXml, StandardCharsets.UTF_8));
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public BpmnFile writeToFile(File file) {
bpmnFile = file;
Bpmn.writeModelToFile(bpmnFile, bpmnModelInstance);
Expand Down Expand Up @@ -62,35 +84,13 @@ public BpmnModelInstance apply(File template, String elementId, File output) {
}
}

public static BpmnModelInstance replace(String resourceName, Replace... replaces) {
try {
var resource = BpmnFile.class.getClassLoader().getResource(resourceName);
var file = new File(resource.toURI());
return replace(file, replaces);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public static BpmnModelInstance replace(File file, Replace... replaces) {
try {
var modelXml = IOUtils.toString(file.toURI(), StandardCharsets.UTF_8);
for (var replace : replaces) {
modelXml = modelXml.replaceAll(replace.oldValue, replace.newValue);
}
return Bpmn.readModelFromStream(IOUtils.toInputStream(modelXml, StandardCharsets.UTF_8));
} catch (Exception e) {
throw new RuntimeException(e);
}
public BpmnModelInstance getBpmnModelInstance() {
return bpmnModelInstance;
}

public record Replace(String oldValue, String newValue) {
public static Replace replace(String oldValue, String newValue) {
return new Replace(oldValue, newValue);
}
}

public BpmnModelInstance getBpmnModelInstance() {
return bpmnModelInstance;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.camunda.connector.runtime.inbound.importer.ProcessDefinitionSearch;
import io.camunda.operate.CamundaOperateClient;
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import java.io.File;
import java.util.Collections;
Expand All @@ -32,9 +31,10 @@
import org.springframework.boot.test.web.server.LocalServerPort;

public abstract class BaseRabbitMqTest {
protected static final String ELEMENT_TEMPLATE_PATH =
protected static final String ELEMENT_ID = "elementId";
protected static final String OUTBOUND_ELEMENT_TEMPLATE_PATH =
"../../connectors/rabbitmq/element-templates/rabbitmq-outbound-connector.json";

protected static final String INTERMEDIATE_CATCH_EVENT_BPMN = "intermediate-catch-event.bpmn";
@TempDir File tempDir;

@Autowired ZeebeClient zeebeClient;
Expand All @@ -50,32 +50,7 @@ void beforeEach() {
when(processDefinitionSearch.query()).thenReturn(Collections.emptyList());
}

protected ZeebeTest setupTestWithBpmnModel(String taskName, File elementTemplate) {
BpmnModelInstance model = getBpmnModelInstance(taskName);
BpmnModelInstance updatedModel = getBpmnModelInstance(model, elementTemplate, taskName);
return getZeebeTest(updatedModel);
}

protected static BpmnModelInstance getBpmnModelInstance(final String serviceTaskName) {
return Bpmn.createProcess()
.executable()
.startEvent()
.serviceTask(serviceTaskName)
.endEvent()
.done();
}

protected ZeebeTest getZeebeTest(final BpmnModelInstance updatedModel) {
return ZeebeTest.with(zeebeClient)
.deploy(updatedModel)
.createInstance()
.waitForProcessCompletion();
}

protected BpmnModelInstance getBpmnModelInstance(
final BpmnModelInstance model, final File elementTemplate, final String taskName) {
return new BpmnFile(model)
.writeToFile(new File(tempDir, "test.bpmn"))
.apply(elementTemplate, taskName, new File(tempDir, "result.bpmn"));
return ZeebeTest.with(zeebeClient).deploy(updatedModel).createInstance();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH
* under one or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. Camunda licenses this file to you under the Apache License,
* Version 2.0; you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.camunda.connector.e2e;

import static io.camunda.connector.e2e.BpmnFile.replace;
import static org.mockito.Mockito.when;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import io.camunda.connector.api.json.ConnectorsObjectMapperSupplier;
import io.camunda.connector.e2e.app.TestConnectorRuntimeApplication;
import io.camunda.connector.runtime.inbound.lifecycle.InboundConnectorManager;
import io.camunda.operate.exception.OperateException;
import io.camunda.operate.model.ProcessDefinition;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.model.bpmn.instance.Process;
import io.camunda.zeebe.process.test.assertions.BpmnAssert;
import io.camunda.zeebe.spring.test.ZeebeSpringTest;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.testcontainers.containers.RabbitMQContainer;
import org.testcontainers.utility.DockerImageName;

@SpringBootTest(
classes = {TestConnectorRuntimeApplication.class},
properties = {
"spring.main.allow-bean-definition-overriding=true",
"camunda.connector.webhook.enabled=false",
"camunda.connector.polling.enabled=true"
},
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@ZeebeSpringTest
@ExtendWith(MockitoExtension.class)
public class RabbitMqInboundStartEventTests extends BaseRabbitMqTest {

private static final String QUEUE_NAME = "testQueue";
private static final String EXCHANGE_NAME = "testExchange";
private static final String ROUTING_KEY = "testRoutingKey";
private static String PORT;
private static RabbitMQContainer rabbitMQContainer;
private static ConnectionFactory factory;

@Autowired InboundConnectorManager inboundManager;
@Mock private ProcessDefinition processDef;

@BeforeAll
public static void setup() throws IOException, TimeoutException {
rabbitMQContainer =
new RabbitMQContainer(DockerImageName.parse("rabbitmq:3.7.25-management-alpine"));
rabbitMQContainer.start();
PORT = String.valueOf(rabbitMQContainer.getAmqpPort());
factory = new ConnectionFactory();
factory.setHost(rabbitMQContainer.getHost());
factory.setPort(rabbitMQContainer.getAmqpPort());
factory.setUsername(rabbitMQContainer.getAdminUsername());
factory.setPassword(rabbitMQContainer.getAdminPassword());

try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
}
}

@AfterAll
public static void tearDown() {
rabbitMQContainer.stop();
}

@BeforeEach
public void cleanQueue() throws IOException, TimeoutException {
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// Purge the queue to ensure it is empty before conducting the test
channel.queuePurge(QUEUE_NAME);
}
}

@Test
public void credentialsAuthenticationReceiveMessageTest() throws Exception {
var model =
replace(
INTERMEDIATE_CATCH_EVENT_BPMN,
BpmnFile.Replace.replace("rabbitMqAuthType", "credentials"),
BpmnFile.Replace.replace("rabbitMqUserName", rabbitMQContainer.getAdminUsername()),
BpmnFile.Replace.replace("rabbitMqPassword", rabbitMQContainer.getAdminPassword()),
BpmnFile.Replace.replace("rabbitMqPort", PORT));
assertIntermediateCatchEventUsingModel(model);
}

@Test
public void uriAuthenticationReceiveMessageTest() throws Exception {
String uri =
String.format(
"amqp://%s:%s@localhost:%s/%%2f",
rabbitMQContainer.getAdminUsername(), rabbitMQContainer.getAdminPassword(), PORT);

var model =
replace(
INTERMEDIATE_CATCH_EVENT_BPMN,
BpmnFile.Replace.replace("rabbitMqAuthType", "uri"),
BpmnFile.Replace.replace("rabbitMqUri", uri));

assertIntermediateCatchEventUsingModel(model);
}

private void assertIntermediateCatchEventUsingModel(BpmnModelInstance model) throws Exception {
Object expectedJsonResponse =
ConnectorsObjectMapperSupplier.DEFAULT_MAPPER.readValue(
"{\"message\":{\"consumerTag\":\"myConsumerTag\",\"body\":{\"foo\": {\"bar\": \"barValue\"}},\"properties\":{}}}",
Object.class);

mockProcessDefinition(model);
inboundManager.handleNewProcessDefinitions(Set.of(processDef));

var bpmnTest = getZeebeTest(model);
postMessage();
bpmnTest = bpmnTest.waitForProcessCompletion();

BpmnAssert.assertThat(bpmnTest.getProcessInstanceEvent())
.hasVariableWithValue("allResult", expectedJsonResponse);

BpmnAssert.assertThat(bpmnTest.getProcessInstanceEvent())
.hasVariableWithValue("partialResult", "barValue");
}

private void postMessage() throws Exception {
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
byte[] messageBodyBytes = "{\"foo\": {\"bar\": \"barValue\"}}".getBytes();
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, messageBodyBytes);
}
}

private void mockProcessDefinition(BpmnModelInstance model) throws OperateException {
when(camundaOperateClient.getProcessDefinitionModel(1L)).thenReturn(model);
when(processDef.getKey()).thenReturn(1L);
when(processDef.getTenantId()).thenReturn(zeebeClient.getConfiguration().getDefaultTenantId());
when(processDef.getBpmnProcessId())
.thenReturn(model.getModelElementsByType(Process.class).stream().findFirst().get().getId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.rabbitmq.client.GetResponse;
import io.camunda.connector.e2e.app.TestConnectorRuntimeApplication;
import io.camunda.connector.rabbitmq.outbound.RabbitMqResult;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.spring.test.ZeebeSpringTest;
import java.io.File;
import java.io.IOException;
Expand All @@ -51,7 +53,7 @@
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@ZeebeSpringTest
@ExtendWith(MockitoExtension.class)
public class RabbitMqTests extends BaseRabbitMqTest {
public class RabbitMqOutboundTests extends BaseRabbitMqTest {
private static final String QUEUE_NAME = "testQueue";
private static final String EXCHANGE_NAME = "testExchange";
private static final String ROUTING_KEY = "testRoutingKey";
Expand Down Expand Up @@ -82,6 +84,11 @@ public static void setup() throws IOException, TimeoutException {
}
}

@AfterAll
public static void tearDown() {
rabbitMQContainer.stop();
}

@BeforeEach
public void cleanQueue() throws IOException, TimeoutException {
try (Connection connection = factory.newConnection();
Expand All @@ -91,15 +98,10 @@ public void cleanQueue() throws IOException, TimeoutException {
}
}

@AfterAll
public static void tearDown() {
rabbitMQContainer.stop();
}

@Test
public void credentialsAuthenticationSendMessageTest() throws Exception {
var elementTemplate =
ElementTemplate.from(ELEMENT_TEMPLATE_PATH)
ElementTemplate.from(OUTBOUND_ELEMENT_TEMPLATE_PATH)
.property("authentication.authType", "credentials")
.property("authentication.userName", rabbitMQContainer.getAdminUsername())
.property("authentication.password", rabbitMQContainer.getAdminPassword())
Expand All @@ -112,7 +114,7 @@ public void credentialsAuthenticationSendMessageTest() throws Exception {
.property("resultVariable", "result")
.writeTo(new File(tempDir, "template.json"));

ZeebeTest bpmnTest = setupTestWithBpmnModel("rabbitMqTask", elementTemplate);
ZeebeTest bpmnTest = setupTestWithBpmnModel(elementTemplate);

RabbitMqResult result = RabbitMqResult.success();
assertThat(bpmnTest.getProcessInstanceEvent()).hasVariableWithValue("result", result);
Expand All @@ -138,7 +140,7 @@ public void uriAuthenticationSendMessageTest() throws Exception {
"%2F");

var elementTemplate =
ElementTemplate.from(ELEMENT_TEMPLATE_PATH)
ElementTemplate.from(OUTBOUND_ELEMENT_TEMPLATE_PATH)
.property("authentication.authType", "uri")
.property("authentication.uri", uri)
.property("routing.exchange", EXCHANGE_NAME)
Expand All @@ -147,7 +149,7 @@ public void uriAuthenticationSendMessageTest() throws Exception {
.property("resultVariable", "result")
.writeTo(new File(tempDir, "template.json"));

ZeebeTest bpmnTest = setupTestWithBpmnModel("rabbitMqTask", elementTemplate);
ZeebeTest bpmnTest = setupTestWithBpmnModel(elementTemplate);

RabbitMqResult result = RabbitMqResult.success();
assertThat(bpmnTest.getProcessInstanceEvent()).hasVariableWithValue("result", result);
Expand All @@ -171,4 +173,21 @@ private String consumeMessage() throws Exception {
}
return receivedMessage;
}

protected BpmnModelInstance getBpmnModelInstance() {
return Bpmn.createProcess().executable().startEvent().serviceTask(ELEMENT_ID).endEvent().done();
}

protected ZeebeTest setupTestWithBpmnModel(File elementTemplate) {
BpmnModelInstance model = getBpmnModelInstance();
BpmnModelInstance updatedModel = getBpmnModelInstance(model, elementTemplate);
return getZeebeTest(updatedModel).waitForProcessCompletion();
}

protected BpmnModelInstance getBpmnModelInstance(
final BpmnModelInstance model, final File elementTemplate) {
return new BpmnFile(model)
.writeToFile(new File(tempDir, "test.bpmn"))
.apply(elementTemplate, ELEMENT_ID, new File(tempDir, "result.bpmn"));
}
}

0 comments on commit 61c7bef

Please sign in to comment.