Skip to content
This repository has been archived by the owner on Jul 12, 2023. It is now read-only.

Commit

Permalink
Spring cloud stream functional bindings + functional channel bindings…
Browse files Browse the repository at this point in the history
… configuration (#875)

* test-support library removed

* Use functional programming approach for Spring Cloud Stream in Connectors

* Codacy fixes

* Merge nested if statements

* empty

* revert empty

* fix format

* prettier fix

* prettier fix

* removed empty line

* prettier fix

* prettier fix

* fix method

* migrated new RestConnector

* prettier

* AAE-11278 use @ConnectorBinding

* AAE-11278 fix imports

* Revert "AAE-11278 fix imports"

This reverts commit 9b257de.

* AAE-11278 fix imports

* Revert "AAE-11278 fix imports"

This reverts commit f2872b4.

* Revert "AAE-11278 use @ConnectorBinding"

This reverts commit 60f270c.

* Improve test assertions

* AAE-11278 new preview version

* change connector bindings

* preview upgrade

* fix version

* migrate input/output bindings

* change apply method

* cleanup

* fix bindings

* fix catalog test

* prettier

* prettier

* prettier

* prettier

* prettier

* cleanup

---------

Co-authored-by: Jakub Sokołowski <30300796+jsokolowskii@users.noreply.github.com>
Co-authored-by: Elias Ricken de Medeiros <26007058+erdemedeiros@users.noreply.github.com>
  • Loading branch information
3 people committed Jan 30, 2023
1 parent 2f4dc3b commit 242d10f
Show file tree
Hide file tree
Showing 22 changed files with 813 additions and 114 deletions.
4 changes: 3 additions & 1 deletion activiti-cloud-query/starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<artifactId>spring-cloud-stream</artifactId>
<type>test-jar</type>
<scope>test</scope>
<classifier>test-binder</classifier>
</dependency>
</dependencies>

Expand Down
4 changes: 3 additions & 1 deletion example-cloud-connector/starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<artifactId>spring-cloud-stream</artifactId>
<type>test-jar</type>
<scope>test</scope>
<classifier>test-binder</classifier>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2017-2020 Alfresco Software, Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.activiti.cloud.examples.connectors;

import org.springframework.context.annotation.Configuration;

@Configuration
public class ConnectorConfiguration
implements
ExampleConnectorChannels,
HeadersConnectorChannels,
MoviesDescriptionConnectorChannels,
MultiInstanceConnector.Channels,
TestBpmnErrorConnector.Channels,
RestConnector.Channels,
TestErrorConnector.Channels {}
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,21 @@
import java.util.Map;
import org.activiti.cloud.api.process.model.IntegrationRequest;
import org.activiti.cloud.api.process.model.IntegrationResult;
import org.activiti.cloud.common.messaging.functional.Connector;
import org.activiti.cloud.common.messaging.functional.ConnectorBinding;
import org.activiti.cloud.connectors.starter.channels.IntegrationResultSender;
import org.activiti.cloud.connectors.starter.configuration.ConnectorProperties;
import org.activiti.cloud.connectors.starter.model.IntegrationResultBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(ExampleConnectorChannels.class)
public class ExampleConnector {
@ConnectorBinding(input = ExampleConnectorChannels.EXAMPLE_CONNECTOR_CONSUMER, condition = "")
@Component(ExampleConnectorChannels.EXAMPLE_CONNECTOR_CONSUMER + "Connector")
public class ExampleConnector implements Connector<IntegrationRequest, Void> {

private final Logger logger = LoggerFactory.getLogger(ExampleConnector.class);

Expand All @@ -49,34 +49,69 @@ public class ExampleConnector {
//just a convenience - not recommended in real implementations
private String var1Copy = "";

@Autowired
private ConnectorProperties connectorProperties;
private final ConnectorProperties connectorProperties;

private final ObjectMapper objectMapper;

private final IntegrationResultSender integrationResultSender;

public ExampleConnector(IntegrationResultSender integrationResultSender, ObjectMapper objectMapper) {
@Autowired
public ExampleConnector(
ConnectorProperties connectorProperties,
IntegrationResultSender integrationResultSender,
ObjectMapper objectMapper
) {
this.connectorProperties = connectorProperties;
this.objectMapper = objectMapper;
this.integrationResultSender = integrationResultSender;
}

@StreamListener(value = ExampleConnectorChannels.EXAMPLE_CONNECTOR_CONSUMER)
public void performTask(IntegrationRequest event) throws InterruptedException {
@Override
public Void apply(IntegrationRequest event) {
logger.info(append("service-name", appName), ">>> In example-cloud-connector");

String var1 =
ExampleConnector.class.getSimpleName() +
" was called for instance " +
event.getIntegrationContext().getProcessInstanceId();

var1Copy = String.valueOf(var1);
var1Copy = var1;

Object jsonVar = event.getIntegrationContext().getInBoundVariables().get("test_json_variable_name");
Object longJsonVar = event.getIntegrationContext().getInBoundVariables().get("test_long_json_variable_name");

Map<String, Object> results = new HashMap<>();

processJsonVar(jsonVar, results);

processLongJsonVar(longJsonVar, results);

Object intVar = event.getIntegrationContext().getInBoundVariables().get("test_int_variable_name");
processIntVar(results, intVar);

Object boolVar = event.getIntegrationContext().getInBoundVariables().get("test_bool_variable_name");
processBoolVar(results, boolVar);

Object bigDecimalVar = event.getIntegrationContext().getInBoundVariable("test_bigdecimal_variable_name");
processBigDecimalVar(results, bigDecimalVar);

Object longVar = event.getIntegrationContext().getInBoundVariable("test_long_variable_name");
processLongVar(results, longVar);

Object dateVar = event.getIntegrationContext().getInBoundVariable("test_date_variable_name");
processDateVar(results, dateVar);

results.put("var1", var1);
Message<IntegrationResult> message = IntegrationResultBuilder
.resultFor(event, connectorProperties)
.withOutboundVariables(results)
.buildMessage();

integrationResultSender.send(message);
return null;
}

private void processJsonVar(Object jsonVar, Map<String, Object> results) {
if (jsonVar != null) {
logger.info("jsonVar value type " + jsonVar.getClass().getTypeName());
logger.info("jsonVar value as string " + jsonVar.toString());
Expand All @@ -87,51 +122,48 @@ public void performTask(IntegrationRequest event) throws InterruptedException {
"able to convert test_json_variable_name to " + CustomPojo.class.getName()
);
}
}

if (longJsonVar != null && longJsonVar instanceof LinkedHashMap) {
if (((LinkedHashMap) longJsonVar).get("verylongjson").toString().length() >= 4000) {
results.put("test_long_json_variable_result", "able to read long json");
}
private void processLongJsonVar(Object longJsonVar, Map<String, Object> results) {
if (
longJsonVar instanceof LinkedHashMap &&
((LinkedHashMap<?, ?>) longJsonVar).get("verylongjson").toString().length() >= 4000
) {
results.put("test_long_json_variable_result", "able to read long json");
}
}

Object intVar = event.getIntegrationContext().getInBoundVariables().get("test_int_variable_name");
if (intVar != null && intVar instanceof Integer) {
private void processIntVar(Map<String, Object> results, Object intVar) {
if (intVar instanceof Integer) {
results.put("test_int_variable_result", "able to read integer");
}
}

Object boolVar = event.getIntegrationContext().getInBoundVariables().get("test_bool_variable_name");
if (boolVar != null && boolVar instanceof Boolean) {
private void processBoolVar(Map<String, Object> results, Object boolVar) {
if (boolVar instanceof Boolean) {
results.put("test_bool_variable_result", "able to read boolean");
}
}

Object bigDecimalVar = event.getIntegrationContext().getInBoundVariable("test_bigdecimal_variable_name");
private void processBigDecimalVar(Map<String, Object> results, Object bigDecimalVar) {
logger.info("bigDecimalVar value as string " + bigDecimalVar);
if (
bigDecimalVar != null &&
bigDecimalVar instanceof BigDecimal &&
BigDecimal.valueOf(1234567890L, 2).equals(bigDecimalVar)
) {
if (bigDecimalVar instanceof BigDecimal && BigDecimal.valueOf(1234567890L, 2).equals(bigDecimalVar)) {
results.put("test_bigdecimal_variable_result", bigDecimalVar);
}
}

Object longVar = event.getIntegrationContext().getInBoundVariable("test_long_variable_name");
private void processLongVar(Map<String, Object> results, Object longVar) {
logger.info("longVar value as string " + longVar);
if (longVar != null && longVar instanceof Long && Long.valueOf(1234567890L).equals(longVar)) {
if (longVar instanceof Long && Long.valueOf(1234567890L).equals(longVar)) {
results.put("test_long_variable_result", longVar);
}
}

Object dateVar = event.getIntegrationContext().getInBoundVariable("test_date_variable_name");
private void processDateVar(Map<String, Object> results, Object dateVar) {
logger.info("dateVar value as string " + dateVar);
if (dateVar != null && dateVar instanceof Date) {
if (dateVar instanceof Date) {
results.put("test_date_variable_result", dateVar);
}

results.put("var1", var1);
Message<IntegrationResult> message = IntegrationResultBuilder
.resultFor(event, connectorProperties)
.withOutboundVariables(results)
.buildMessage();
integrationResultSender.send(message);
}

public String getVar1Copy() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
*/
package org.activiti.cloud.examples.connectors;

import org.springframework.cloud.stream.annotation.Input;
import org.activiti.cloud.common.messaging.functional.InputBinding;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.messaging.SubscribableChannel;

public interface ExampleConnectorChannels {
String EXAMPLE_CONNECTOR_CONSUMER = "exampleConnectorConsumer";

@Input(EXAMPLE_CONNECTOR_CONSUMER)
SubscribableChannel exampleConnectorConsumer();
@InputBinding(EXAMPLE_CONNECTOR_CONSUMER)
default SubscribableChannel exampleConnectorConsumer() {
return MessageChannels.publishSubscribe(EXAMPLE_CONNECTOR_CONSUMER).get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,23 @@
import java.util.Map;
import org.activiti.cloud.api.process.model.IntegrationRequest;
import org.activiti.cloud.api.process.model.IntegrationResult;
import org.activiti.cloud.common.messaging.functional.Connector;
import org.activiti.cloud.common.messaging.functional.ConnectorBinding;
import org.activiti.cloud.connectors.starter.channels.IntegrationResultSender;
import org.activiti.cloud.connectors.starter.configuration.ConnectorProperties;
import org.activiti.cloud.connectors.starter.model.IntegrationResultBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(HeadersConnectorChannels.class)
public class HeadersConnector {
@ConnectorBinding(
input = HeadersConnectorChannels.HEADERS_CONNECTOR_CONSUMER,
condition = "headers['processDefinitionVersion']!=null",
outputHeader = ""
)
@Component(HeadersConnectorChannels.HEADERS_CONNECTOR_CONSUMER + "Connector")
public class HeadersConnector implements Connector<Message<IntegrationRequest>, Void> {

private final IntegrationResultSender integrationResultSender;
private final ConnectorProperties connectorProperties;
Expand All @@ -42,11 +46,11 @@ public HeadersConnector(IntegrationResultSender integrationResultSender, Connect
this.connectorProperties = connectorProperties;
}

@StreamListener(
value = HeadersConnectorChannels.HEADERS_CONNECTOR_CONSUMER,
condition = "headers['processDefinitionVersion']!=null"
)
public void receiveHeadersConnector(IntegrationRequest integrationRequest, @Headers Map<String, Object> headers) {
@Override
public Void apply(Message<IntegrationRequest> integrationRequestMessage) {
MessageHeaders headers = integrationRequestMessage.getHeaders();
IntegrationRequest integrationRequest = integrationRequestMessage.getPayload();

Map<String, Object> result = new HashMap<>();

result.put("processDefinitionVersion", headers.get("processDefinitionVersion"));
Expand All @@ -59,5 +63,6 @@ public void receiveHeadersConnector(IntegrationRequest integrationRequest, @Head
.buildMessage();

integrationResultSender.send(message);
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
*/
package org.activiti.cloud.examples.connectors;

import org.springframework.cloud.stream.annotation.Input;
import org.activiti.cloud.common.messaging.functional.InputBinding;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.messaging.SubscribableChannel;

public interface HeadersConnectorChannels {
String HEADERS_CONNECTOR_CONSUMER = "headersConnectorConsumer";

@Input(HEADERS_CONNECTOR_CONSUMER)
SubscribableChannel headersConnectorConsumer();
@InputBinding(HEADERS_CONNECTOR_CONSUMER)
default SubscribableChannel headersConnectorConsumer() {
return MessageChannels.publishSubscribe(HEADERS_CONNECTOR_CONSUMER).get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,22 @@
import java.util.Map;
import org.activiti.api.process.model.IntegrationContext;
import org.activiti.cloud.api.process.model.IntegrationRequest;
import org.activiti.cloud.common.messaging.functional.Connector;
import org.activiti.cloud.common.messaging.functional.ConnectorBinding;
import org.activiti.cloud.connectors.starter.channels.IntegrationResultSender;
import org.activiti.cloud.connectors.starter.configuration.ConnectorProperties;
import org.activiti.cloud.connectors.starter.model.IntegrationResultBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(MoviesDescriptionConnectorChannels.class)
public class MoviesDescriptionConnector {
@ConnectorBinding(
input = MoviesDescriptionConnectorChannels.MOVIES_DESCRIPTION_CONSUMER,
condition = "",
outputHeader = ""
)
@Component(MoviesDescriptionConnectorChannels.MOVIES_DESCRIPTION_CONSUMER + "Connector")
public class MoviesDescriptionConnector implements Connector<IntegrationRequest, Void> {

private Logger logger = LoggerFactory.getLogger(MoviesDescriptionConnector.class);

Expand All @@ -44,8 +48,8 @@ public MoviesDescriptionConnector(
this.connectorProperties = connectorProperties;
}

@StreamListener(value = MoviesDescriptionConnectorChannels.MOVIES_DESCRIPTION_CONSUMER)
public void receive(IntegrationRequest integrationRequest) {
@Override
public Void apply(IntegrationRequest integrationRequest) {
IntegrationContext integrationContext = integrationRequest.getIntegrationContext();
Map<String, Object> inBoundVariables = integrationContext.getInBoundVariables();
logger.info(">>inbound: " + inBoundVariables);
Expand All @@ -57,5 +61,6 @@ public void receive(IntegrationRequest integrationRequest) {
integrationResultSender.send(
IntegrationResultBuilder.resultFor(integrationRequest, connectorProperties).buildMessage()
);
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
*/
package org.activiti.cloud.examples.connectors;

import org.springframework.cloud.stream.annotation.Input;
import org.activiti.cloud.common.messaging.functional.InputBinding;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.messaging.SubscribableChannel;

public interface MoviesDescriptionConnectorChannels {
String MOVIES_DESCRIPTION_CONSUMER = "moviesDescriptionConsumer";

@Input(MOVIES_DESCRIPTION_CONSUMER)
SubscribableChannel moviesDescriptionConsumer();
@InputBinding(MOVIES_DESCRIPTION_CONSUMER)
default SubscribableChannel moviesDescriptionConsumer() {
return MessageChannels.publishSubscribe(MOVIES_DESCRIPTION_CONSUMER).get();
}
}
Loading

0 comments on commit 242d10f

Please sign in to comment.