Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/multiple datasinks #308

Merged
merged 25 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
3dc245a
Adds support for multiple datasinks in one route
Aegid1 Apr 7, 2024
b0e0bf8
adjusted the routes.json of databridge.examples
Aegid1 Apr 15, 2024
bfb03e1
Restores Example Configurations
jannisjung Jun 13, 2024
c8fcec6
Refactors Kafka JsonataMultipleAAS Example * Adds new Configuration m…
jannisjung Jun 13, 2024
dbec569
Refactors multiple datasinks in one route (PR #300) * Enables Usage o…
jannisjung Jun 13, 2024
1b99701
Updates typos
jannisjung Jun 14, 2024
46295bc
Resolves Review Comments
jannisjung Jun 24, 2024
38f9f12
Merge branch 'feature/multiple-datasinks' of https://github.com/janni…
jannisjung Jun 24, 2024
297f5a5
Improves PollingConsumer Tests
jannisjung Jul 18, 2024
409c259
Fixes jsonata-mqtt-test
jannisjung Jul 18, 2024
de247b3
Fixes dot-aas-v3 test
jannisjung Jul 18, 2024
0fc449f
fixes http-polling-jsonata-test
jannisjung Jul 18, 2024
e59ae82
fixes kafka-jsonata-test
jannisjung Jul 18, 2024
be54425
Fixes kafka-jsonatamultiple-test
jannisjung Jul 18, 2024
4742073
Fixes typo
jannisjung Jul 19, 2024
41b4da4
Fixes another typo
jannisjung Jul 19, 2024
e1639ed
Removes never used abstract method
jannisjung Jul 19, 2024
69273fe
Fixes typo
jannisjung Jul 19, 2024
ea66da5
Resolves Review Remarks
jannisjung Jul 22, 2024
741787e
Adds regression test
jannisjung Jul 25, 2024
be8cc84
Merge branch 'main' into feature/multiple-datasinks
jannisjung Jul 25, 2024
2f99945
Reworks regression test
jannisjung Jul 25, 2024
3546f3a
Merge branch 'feature/multiple-datasinks' of https://github.com/janni…
jannisjung Jul 25, 2024
4a6ecc0
Fixes typo
jannisjung Jul 26, 2024
c43e4d4
Resolves review remark
jannisjung Jul 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@

import java.util.UUID;

import org.eclipse.basyx.aas.aggregator.api.IAASAggregator;
import org.eclipse.basyx.aas.aggregator.proxy.AASAggregatorProxy;
import org.eclipse.digitaltwin.basyx.databridge.executable.regression.DataBridgeSuiteAASPollingConsumer;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
Expand All @@ -41,18 +39,13 @@
public class ITTestDataBridgeAASPollingConsumer extends DataBridgeSuiteAASPollingConsumer {

private static String BROKER_HOST = "broker.mqttdashboard.com";
private static String HOST = "localhost";

@Override
protected MqttClient getMqttClient() throws MqttException {

String publisherId = UUID.randomUUID().toString();

return new MqttClient("tcp://" + BROKER_HOST+ ":1883", publisherId);
}

@Override
protected IAASAggregator getAASAggregatorProxy() {
return new AASAggregatorProxy("http://" + HOST + ":4001");
return new MqttClient("tcp://" + BROKER_HOST+ ":1883", publisherId, new MemoryPersistence());
}

jannisjung marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import org.eclipse.basyx.aas.aggregator.api.IAASAggregator;
import java.util.concurrent.TimeUnit;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
Expand All @@ -53,7 +54,6 @@
public abstract class DataBridgeSuiteAASPollingConsumer {

protected abstract MqttClient getMqttClient() throws MqttException;
protected abstract IAASAggregator getAASAggregatorProxy();
private static Logger logger = LoggerFactory.getLogger(DataBridgeSuiteAASPollingConsumer.class);
private static String user_name = "test1";
private static String password = "1234567";
Expand Down Expand Up @@ -81,7 +81,7 @@ private void assertPropertyValue(String expectedValue, String topic) throws Mqtt

fetchExpectedValue(topic);

assertEquals(receivedMessage, expectedValue);
assertEquals(expectedValue, receivedMessage);
}


Expand All @@ -100,7 +100,6 @@ public void connectionLost(Throwable cause) {

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {

receivedMessage = new String(message.getPayload(), StandardCharsets.UTF_8);
}

Expand All @@ -109,7 +108,6 @@ public void deliveryComplete(IMqttDeliveryToken token) {

}
});

mqttClient.subscribe(currentTopic);
waitForPropagation();
mqttClient.disconnect();
Expand All @@ -125,8 +123,8 @@ private MqttClient mqttConnectionInitiate() throws MqttException {
MqttClient mqttClient = getMqttClient();

MqttConnectOptions connOpts = setUpMqttConnection(user_name, password);
connOpts.setCleanSession(true);
mqttClient.connect(connOpts);
connOpts.setCleanSession(true);
return mqttClient;
}

Expand All @@ -138,7 +136,7 @@ private static MqttConnectOptions setUpMqttConnection(String username, String pa
}

private static void waitForPropagation() throws InterruptedException {
Thread.sleep(6000);
TimeUnit.SECONDS.sleep(10);
}

private String wrapStringValue(String value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@

import java.io.IOException;
import java.util.UUID;
import org.eclipse.basyx.aas.aggregator.api.IAASAggregator;
import org.eclipse.basyx.aas.aggregator.proxy.AASAggregatorProxy;

jannisjung marked this conversation as resolved.
Show resolved Hide resolved
import org.eclipse.basyx.components.aas.AASServerComponent;
import org.eclipse.basyx.components.aas.configuration.AASServerBackend;
import org.eclipse.basyx.components.aas.configuration.BaSyxAASServerConfiguration;
Expand All @@ -52,7 +51,6 @@
*/
public class TestDataBridgeAASPollingConsumer extends DataBridgeSuiteAASPollingConsumer {

private static final String AAS_AGGREGATOR_URL = "http://localhost:4001";
private static AASServerComponent aasServer;
private static String BROKER_URL = "tcp://broker.mqttdashboard.com:1883";
private static Server mqttBroker;
Expand All @@ -75,11 +73,6 @@ protected MqttClient getMqttClient() throws MqttException {
return new MqttClient(BROKER_URL, publisherId, new MemoryPersistence());
}

@Override
protected IAASAggregator getAASAggregatorProxy() {
return new AASAggregatorProxy(AAS_AGGREGATOR_URL);
}

private static void startMqttBroker() throws IOException {
mqttBroker = new Server();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public static void setUp() throws IOException {
startUpdaterComponent();
}

private static void startMqttBroker() throws IOException {
protected static void startMqttBroker() throws IOException {
mqttBroker = new Server();

IResourceLoader classpathLoader = new ClasspathResourceLoader();
Expand All @@ -80,7 +80,7 @@ private static void startMqttBroker() throws IOException {
mqttBroker.startServer(classPathConfig);
}

private static void configureAndStartAASServer() {
protected static void configureAndStartAASServer() {
BaSyxContextConfiguration aasContextConfig = new BaSyxContextConfiguration(4001, "");

BaSyxAASServerConfiguration aasConfig = new BaSyxAASServerConfiguration(AASServerBackend.INMEMORY, "aasx/updatertest.aasx");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*******************************************************************************
* Copyright (C) 2024 the Eclipse BaSyx Authors
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*
* SPDX-License-Identifier: MIT
******************************************************************************/
package org.eclipse.digitaltwin.basyx.databridge.executable.regression;

import java.io.IOException;

import org.eclipse.digitaltwin.basyx.databridge.component.DataBridgeExecutable;
import org.junit.BeforeClass;

/**
* @author jungjan
*/
public class TestDatabeidgeMultipleDataSinks extends TestDataBridgeMqtt {

@BeforeClass
public static void setUp() throws IOException {
TestDataBridgeMqtt.configureAndStartAASServer();
TestDataBridgeMqtt.startMqttBroker();
startUpdaterComponent();
}

protected static void startUpdaterComponent() {
DataBridgeExecutable.main(new String[] {"src/test/resources/mqtt/multisink-databridge"});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# #############################
# AAS Server configuration file
# #############################

# #############################
# Backend
# #############################
# Specifies the backend that loads the AAS and Submodels

# InMemory - does not persist AAS or submodels
aas.backend=InMemory

# MongoDB - persists data within a MongoDB
# See connection configuration in mongodb.properties
# aas.backend=MongoDB

# #############################
# Source
# #############################
# Possible to load an AAS Environment from a file

aas.source=/usr/share/config/updatertest.aasx

# Other examples (Currently supported: *.xml, *.json and *.aasx):
# aas.source=aasx/myAAS.aasx
# aas.source=aasx/myAAS.xml
# aas.source=aasx/myAAS.json
# Or when encapsulated in the docker volume for this container:
# aas.source=/usr/share/config/myAAS.aasx

# #############################
# MQTT
# #############################
# Possible to enable MQTT events

aas.events=NONE
# aas.events=MQTT

# #############################
# AASX Upload
# #############################
# Possible to enable AASX Upload

aas.aasxUpload=Disabled
# aas.aasxUpload=Enabled


# #############################
# Registry
# #############################
# If specified, can directly registers the AAS that has been loaded from the source file

# Path specifies the registry endpoint
# registry.path=http://localhost:4000/registry/

# Hostpath specifies the endpoint of the deployed AAS component
# If hostpath is empty, the registered AAS endpoint is derived from the context properties
# registry.hostpath=

# If one or more submodels are specified here, only the submodels will be registered at the
# registry. This can be used for distributed submodel deployments
# In case of an empty or no list, this does not have an effect. By default, all submodels
# are registered at a given registry.
# registry.submodels=["smId1","smId2"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[
{
"uniqueId": "ConnectedSubmodel/ConnectedPropertyA",
"submodelEndpoint": "http://localhost:4001/shells/TestUpdatedDeviceAAS/aas/submodels/ConnectedSubmodel/submodel",
"idShortPath": "ConnectedPropertyA"
},
{
"uniqueId": "ConnectedSubmodel/ConnectedPropertyB",
"submodelEndpoint": "http://localhost:4001/shells/TestUpdatedDeviceAAS/aas/submodels/ConnectedSubmodel/submodel",
"idShortPath": "ConnectedPropertyB"
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# ###############################
# HTTP Context configuration file
# ###############################

# ###############################
# Context Path
# ###############################
# Specifies the subpath in the url for this server context

contextPath=

# ###############################
# Hostname
# ###############################
# Specifies the hostname for this server context

contextHostname=localhost

# ###############################
# Port
# ###############################
# Specifies the port for this server context

contextPort=4001
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
$sum(Account.Order.Product.(Price * Quantity))
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Account.Order[0].Product[0].ProductID
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[
{
"uniqueId": "jsonataA",
"queryPath": "jsonataA.json",
"inputType": "JsonString",
"outputType": "JsonString"
},
{
"uniqueId": "jsonataB",
"queryPath": "jsonataB.json",
"inputType": "JsonString",
"outputType": "JsonString"
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[
{
"uniqueId": "property1",
"serverUrl": "127.0.0.1",
"serverPort": 1884,
"topic": "PropertyB"
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[
{
"datasource": "property1",
"transformers": ["jsonataA", "jsonataB"],
"datasinks": ["ConnectedSubmodel/ConnectedPropertyA", "ConnectedSubmodel/ConnectedPropertyB"],
"datasinkMappingConfiguration":
{
"ConnectedSubmodel/ConnectedPropertyA": ["jsonataA"],
"ConnectedSubmodel/ConnectedPropertyB": ["jsonataB"]
},
"trigger": "event"
}
]
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public Object loadConfiguration() {
* file path and the resource loader
* @return
*/
@SuppressWarnings("resource")
private InputStreamReader getJsonReader() {

InputStream stream = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
* SPDX-License-Identifier: MIT
******************************************************************************/


package org.eclipse.digitaltwin.basyx.databridge.core.configuration.route.core;

import java.util.Map;

import org.apache.camel.builder.RouteBuilder;

public abstract class AbstractRouteCreator implements IRouteCreator {
Expand Down Expand Up @@ -54,10 +55,17 @@ public void addRouteToRouteBuilder(RouteConfiguration routeConfig) {
String dataSourceEndpoint = RouteCreatorHelper.getDataSourceEndpoint(routesConfiguration, routeConfig.getDatasource());
String[] dataSinkEndpoints = RouteCreatorHelper.getDataSinkEndpoints(routesConfiguration, routeConfig.getDatasinks());
String[] dataTransformerEndpoints = RouteCreatorHelper.getDataTransformerEndpoints(routesConfiguration, routeConfig.getTransformers());
Map<String, String[]> datasinkMapping = RouteCreatorHelper.getDataSinkMapping(routesConfiguration, routeConfig.getDatasinkMappingConfiguration());
String routeId = routeConfig.getRouteId();

configureRoute(routeConfig, dataSourceEndpoint, dataSinkEndpoints, dataTransformerEndpoints, routeId);
if (datasinkMapping == null || datasinkMapping.isEmpty()) {
configureRoute(routeConfig, dataSourceEndpoint, dataSinkEndpoints, dataTransformerEndpoints, routeId);
} else {
configureRoute(routeConfig, dataSourceEndpoint, dataSinkEndpoints, dataTransformerEndpoints, datasinkMapping, routeId);
}
}

protected abstract void configureRoute(RouteConfiguration routeConfig, String dataSourceEndpoint, String[] dataSinkEndpoints, String[] dataTransformerEndpoints, String routeId);

protected abstract void configureRoute(RouteConfiguration routeConfig, String dataSourceEndpoint, String[] dataSinkEndpoints, String[] dataTransformerEndpoints, Map<String, String[]> DataSinkMapping, String routeId);
}
Loading