Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/multiple datasinks #308

Merged
merged 25 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
3dc245a
Adds support for multiple datasinks in one route
Aegid1 Apr 7, 2024
b0e0bf8
adjusted the routes.json of databridge.examples
Aegid1 Apr 15, 2024
bfb03e1
Restores Example Configurations
jannisjung Jun 13, 2024
c8fcec6
Refactors Kafka JsonataMultipleAAS Example * Adds new Configuration m…
jannisjung Jun 13, 2024
dbec569
Refactors multiple datasinks in one route (PR #300) * Enables Usage o…
jannisjung Jun 13, 2024
1b99701
Updates typos
jannisjung Jun 14, 2024
46295bc
Resolves Review Comments
jannisjung Jun 24, 2024
38f9f12
Merge branch 'feature/multiple-datasinks' of https://github.com/janni…
jannisjung Jun 24, 2024
297f5a5
Improves PollingConsumer Tests
jannisjung Jul 18, 2024
409c259
Fixes jsonata-mqtt-test
jannisjung Jul 18, 2024
de247b3
Fixes dot-aas-v3 test
jannisjung Jul 18, 2024
0fc449f
fixes http-polling-jsonata-test
jannisjung Jul 18, 2024
e59ae82
fixes kafka-jsonata-test
jannisjung Jul 18, 2024
be54425
Fixes kafka-jsonatamultiple-test
jannisjung Jul 18, 2024
4742073
Fixes typo
jannisjung Jul 19, 2024
41b4da4
Fixes another typo
jannisjung Jul 19, 2024
e1639ed
Removes never used abstract method
jannisjung Jul 19, 2024
69273fe
Fixes typo
jannisjung Jul 19, 2024
ea66da5
Resolves Review Remarks
jannisjung Jul 22, 2024
741787e
Adds regression test
jannisjung Jul 25, 2024
be8cc84
Merge branch 'main' into feature/multiple-datasinks
jannisjung Jul 25, 2024
2f99945
Reworks regression test
jannisjung Jul 25, 2024
3546f3a
Merge branch 'feature/multiple-datasinks' of https://github.com/janni…
jannisjung Jul 25, 2024
4a6ecc0
Fixes typo
jannisjung Jul 26, 2024
c43e4d4
Resolves review remark
jannisjung Jul 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public Object loadConfiguration() {
* file path and the resource loader
* @return
*/
@SuppressWarnings("resource")
private InputStreamReader getJsonReader() {

InputStream stream = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
* SPDX-License-Identifier: MIT
******************************************************************************/


package org.eclipse.digitaltwin.basyx.databridge.core.configuration.route.core;

import java.util.Map;

import org.apache.camel.builder.RouteBuilder;

public abstract class AbstractRouteCreator implements IRouteCreator {
Expand Down Expand Up @@ -54,10 +55,17 @@ public void addRouteToRouteBuilder(RouteConfiguration routeConfig) {
String dataSourceEndpoint = RouteCreatorHelper.getDataSourceEndpoint(routesConfiguration, routeConfig.getDatasource());
String[] dataSinkEndpoints = RouteCreatorHelper.getDataSinkEndpoints(routesConfiguration, routeConfig.getDatasinks());
String[] dataTransformerEndpoints = RouteCreatorHelper.getDataTransformerEndpoints(routesConfiguration, routeConfig.getTransformers());
Map<String, String[]> datasinkMapping = RouteCreatorHelper.getDataSinkMapping(routesConfiguration, routeConfig.getDatasinkMappingConfiguration());
String routeId = routeConfig.getRouteId();

configureRoute(routeConfig, dataSourceEndpoint, dataSinkEndpoints, dataTransformerEndpoints, routeId);
if (datasinkMapping == null || datasinkMapping.isEmpty()) {
configureRoute(routeConfig, dataSourceEndpoint, dataSinkEndpoints, dataTransformerEndpoints, routeId);
} else {
configureRoute(routeConfig, dataSourceEndpoint, dataSinkEndpoints, dataTransformerEndpoints, datasinkMapping, routeId);
}
}

protected abstract void configureRoute(RouteConfiguration routeConfig, String dataSourceEndpoint, String[] dataSinkEndpoints, String[] dataTransformerEndpoints, String routeId);

protected abstract void configureRoute(RouteConfiguration routeConfig, String dataSourceEndpoint, String[] dataSinkEndpoints, String[] dataTransformerEndpoints, Map<String, String[]> DataSinkMapping, String routeId);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (C) 2021 the Eclipse BaSyx Authors
* Copyright (C) 2024 the Eclipse BaSyx Authors
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
Expand Down Expand Up @@ -29,24 +29,63 @@
import java.util.List;
import java.util.Map;

/**
* @author DataBridge authors, jungjan
*/
public class RouteConfiguration {
private String trigger;
private String routeId;
private String datasource;
private List<String> transformers = new ArrayList<>();
private List<String> datasinks = new ArrayList<>();
private Map<String, String[]> datasinkMappingConfiguration;

private Map<String, Object> triggerData = new HashMap<>();

public RouteConfiguration() {
}

/**
* Constructs a new RouteConfiguration object with a mapping configuration to
* map distinct transformators to multiple datasinks.
*
* @param trigger
* the trigger for the route configuration
* @param routeId
* the ID of the route
* @param datasource
* the datasource associated with the route
* @param transformers
* the list of transformers to be applied in the route
* @param datasinks
* the list of datasinks to which data should be routed
* @param datasinkMappingConfiguration
* the mapping configuration for datasinks, mapping each datasink to
* its corresponding configuration
*/
jannisjung marked this conversation as resolved.
Show resolved Hide resolved
public RouteConfiguration(String trigger, String datasource, List<String> transformers, List<String> datasinks, Map<String, String[]> datasinkMappingConfiguration) {
this.trigger = trigger;
this.datasource = datasource;
this.transformers = transformers;
this.datasinks = datasinks;
this.datasinkMappingConfiguration = datasinkMappingConfiguration;
}

/**
* Constructs a new RouteConfiguration object without a mapping configuration to
* map distinct transformators to multiple datasinks. I.e., all transformators
* would be equally applied to all data sinks.
*
* @param trigger
* the trigger for the route configuration
* @param routeId
* the ID of the route (optional, can be null)
* @param datasource
* the datasource associated with the route
* @param transformers
* the list of transformers to be applied in the route
* @param datasinks
* the list of datasinks to which data should be routed
*/
public RouteConfiguration(String trigger, String datasource, List<String> transformers, List<String> datasinks) {
this.trigger = trigger;
Expand All @@ -56,7 +95,7 @@ public RouteConfiguration(String trigger, String datasource, List<String> transf
}

public RouteConfiguration(RouteConfiguration configuration) {
this(configuration.getRouteTrigger(), configuration.getDatasource(), configuration.getTransformers(), configuration.getDatasinks());
this(configuration.getRouteTrigger(), configuration.getDatasource(), configuration.getTransformers(), configuration.getDatasinks(), configuration.getDatasinkMappingConfiguration());
setRouteId(configuration.getRouteId());
this.triggerData = configuration.triggerData;
}
Expand Down Expand Up @@ -97,4 +136,12 @@ public void setDatasource(String datasource) {
this.datasource = datasource;
}

public Map<String, String[]> getDatasinkMappingConfiguration() {
return datasinkMappingConfiguration;
}

public void setDatasinkMappingConfiguration(Map<String, String[]> datasinkMappingConfiguration) {
this.datasinkMappingConfiguration = datasinkMappingConfiguration;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (C) 2021 the Eclipse BaSyx Authors
* Copyright (C) 2024 the Eclipse BaSyx Authors
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
Expand All @@ -24,35 +24,57 @@
******************************************************************************/
package org.eclipse.digitaltwin.basyx.databridge.core.configuration.route.core;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
* @author DataBridge authors, jungjan
*/
public class RouteCreatorHelper {
private RouteCreatorHelper() {
}

public static String getDataSourceEndpoint(RoutesConfiguration routesConfiguration, String dataSourceId) {
return routesConfiguration.getDatasources().get(dataSourceId).getConnectionURI();
return routesConfiguration.getDatasources()
.get(dataSourceId)
.getConnectionURI();
jannisjung marked this conversation as resolved.
Show resolved Hide resolved
}

public static String getDataSinkEndpoint(RoutesConfiguration routesConfiguration, String dataSinkId) {
return routesConfiguration.getDatasinks().get(dataSinkId).getConnectionURI();
return routesConfiguration.getDatasinks()
.get(dataSinkId)
.getConnectionURI();
jannisjung marked this conversation as resolved.
Show resolved Hide resolved
}

public static String[] getDataSinkEndpoints(RoutesConfiguration routesConfiguration, List<String> dataSinkIdList) {
List<String> endpoints = new ArrayList<>();
for (String dataSinkId : dataSinkIdList) {
endpoints.add(routesConfiguration.getDatasinks().get(dataSinkId).getConnectionURI());
}
return dataSinkIdList.stream()
.map(routesConfiguration.getDatasinks()::get)
.map(dataSinkConfiguration -> dataSinkConfiguration.getConnectionURI())
.toArray(String[]::new);

}

return endpoints.toArray(new String[0]);
public static String[] getDataTransformerEndpoints(RoutesConfiguration routesConfiguration, List<String> transformerIdLists) {
return transformerIdLists.stream()
.map(routesConfiguration.getTransformers()::get)
.map(dataTransformerConfiguration -> dataTransformerConfiguration.getConnectionURI())
.toArray(String[]::new);
}

public static String[] getDataTransformerEndpoints(RoutesConfiguration routesConfiguration, List<String> transformerIdList) {
List<String> endpoints = new ArrayList<>();
for (String transformerId : transformerIdList) {
endpoints.add(routesConfiguration.getTransformers().get(transformerId).getConnectionURI());
public static Map<String, String[]> getDataSinkMapping(RoutesConfiguration routesConfiguration, Map<String, String[]> datasinkMappingConfiguration) {
if (datasinkMappingConfiguration == null || datasinkMappingConfiguration.isEmpty()) {
return null;
}
return endpoints.toArray(new String[0]);
Set<String> dataSinkIds = datasinkMappingConfiguration.keySet();
Map<String, String> resolvedDataSinkEndpoints = dataSinkIds.stream()
.collect(Collectors.toMap(dataSinkId -> dataSinkId, dataSinkId -> routesConfiguration.getDatasinks()
.get(dataSinkId)
.getConnectionURI()));

return dataSinkIds.stream()
.collect(Collectors.toMap(resolvedDataSinkEndpoints::get, dataSinkId -> getDataTransformerEndpoints(routesConfiguration, Arrays.asList(datasinkMappingConfiguration.get(dataSinkId)))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@
package org.eclipse.digitaltwin.basyx.databridge.core.configuration.route.event;

import java.util.List;
import java.util.Map;

import org.eclipse.digitaltwin.basyx.databridge.core.configuration.route.core.RouteConfiguration;

/**
* A connection of a single route (source, transformer(s), sink(s))
* A connection of a single route (source, transformer(s), sink(s), sinkmapper(s)
*
* @author haque, fischer
*
Expand All @@ -40,6 +41,10 @@ public class EventRouteConfiguration extends RouteConfiguration {
public EventRouteConfiguration(String datasource, List<String> transformers, List<String> datasinks) {
super(ROUTE_TRIGGER, datasource, transformers, datasinks);
}

public EventRouteConfiguration(String datasource, List<String> transformers, List<String> datasinks, Map<String, String[]> datasinkMapping) {
super(ROUTE_TRIGGER, datasource, transformers, datasinks, datasinkMapping);
}

public EventRouteConfiguration(RouteConfiguration configuration) {
super(configuration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,60 @@
******************************************************************************/
package org.eclipse.digitaltwin.basyx.databridge.core.configuration.route.event;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.MulticastDefinition;
import org.apache.camel.model.RouteDefinition;
import org.eclipse.digitaltwin.basyx.databridge.core.configuration.route.core.AbstractRouteCreator;
import org.eclipse.digitaltwin.basyx.databridge.core.configuration.route.core.RouteConfiguration;
import org.eclipse.digitaltwin.basyx.databridge.core.configuration.route.core.RoutesConfiguration;

public class EventRouteCreator extends AbstractRouteCreator {

public EventRouteCreator(RouteBuilder routeBuilder, RoutesConfiguration routesConfiguration) {
super(routeBuilder, routesConfiguration);
}

@Override
protected void configureRoute(RouteConfiguration routeConfiguration, String dataSourceEndpoint, String[] dataSinkEndpoints, String[] dataTransformerEndpoints, String routeId) {
RouteDefinition routeDefinition = getRouteBuilder().from(dataSourceEndpoint).routeId(routeId).to("log:" + routeId);
RouteDefinition routeDefinition = startRouteDefinition(dataSourceEndpoint, routeId);

if (!(dataTransformerEndpoints == null || dataTransformerEndpoints.length == 0)) {
routeDefinition.to(dataTransformerEndpoints).to("log:" + routeId);
routeDefinition.to(dataTransformerEndpoints)
.to("log:" + routeId);
jannisjung marked this conversation as resolved.
Show resolved Hide resolved
}

routeDefinition.to(dataSinkEndpoints[0]).to("log:" + routeId);
routeDefinition.to(dataSinkEndpoints)
jannisjung marked this conversation as resolved.
Show resolved Hide resolved
.to("log:" + routeId);
}

@Override
protected void configureRoute(RouteConfiguration routeConfig, String dataSourceEndpoint, String[] dataSinkEndpoints, String[] dataTransformerEndpoints, Map<String, String[]> dataSinkMapping, String routeId) {
MulticastDefinition routeDefinition = startRouteDefinition(dataSourceEndpoint, routeId).multicast();
dataSinkMapping.forEach((dataSink, dataTransformers) -> routeDefinition.pipeline()
.to(dataTransformers)
.to(dataSink)
.to("log:" + routeId));

getUnmappedEndpoints(dataSinkEndpoints, dataSinkMapping).forEach(dataSink -> routeDefinition.to(dataSink)
.to("log: " + routeId));

routeDefinition.end();
}

}
private List<String> getUnmappedEndpoints(String[] dataSinkEndpoints, Map<String, String[]> dataSinkMapping) {
return Arrays.stream(dataSinkEndpoints)
.filter(Predicate.not(dataSinkMapping::containsKey))
.collect(Collectors.toList());
}

private RouteDefinition startRouteDefinition(String dataSourceEndpoint, String routeId) {
return getRouteBuilder().from(dataSourceEndpoint)
.routeId(routeId)
.to("log:" + routeId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package org.eclipse.digitaltwin.basyx.databridge.core.configuration.route.request;

import java.util.List;
import java.util.Map;

import org.eclipse.digitaltwin.basyx.databridge.core.configuration.route.core.RouteConfiguration;
import org.springframework.http.HttpMethod;
Expand Down Expand Up @@ -53,6 +54,10 @@ public RequestRouteConfiguration(String datasource, List<String> transformers, L
super(ROUTE_TRIGGER, datasource, transformers, datasinks);
}

public RequestRouteConfiguration(String datasource, List<String> transformers, List<String> datasinks, Map<String, String[]> datasinkMapping) {
super(ROUTE_TRIGGER, datasource, transformers, datasinks, datasinkMapping);
}

public RequestRouteConfiguration(RouteConfiguration configuration) {
super(configuration);
host = (String) getTriggerData().get(HOST);
Expand Down Expand Up @@ -85,7 +90,6 @@ public void setPort(String port) {
}

public String getRequestEndpointURI() {
return REQUEST_COMPONENT + ":" + REQUEST_PROTOCOL + "://" + getHost() + ":" + getPort() + getPath() + "?"
+ HTTP_METHOD_RESTRICT_PARAMETER;
return REQUEST_COMPONENT + ":" + REQUEST_PROTOCOL + "://" + getHost() + ":" + getPort() + getPath() + "?" + HTTP_METHOD_RESTRICT_PARAMETER;
jannisjung marked this conversation as resolved.
Show resolved Hide resolved
}
}
Loading
Loading