Skip to content

Commit

Permalink
Merge streampipes-connect module into streampipes-container module (#569
Browse files Browse the repository at this point in the history
)
  • Loading branch information
dominikriemer committed Jan 2, 2023
1 parent b7e7446 commit ea6a3c8
Show file tree
Hide file tree
Showing 157 changed files with 473 additions and 599 deletions.
6 changes: 5 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1216,6 +1216,11 @@ IoT data streams.
<artifactId>metrics-core</artifactId>
<version>${metrics-core.version}</version>
</dependency>
<dependency>
<groupId>jakarta.json</groupId>
<artifactId>jakarta.json-api</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>javax.mail</groupId>
<artifactId>mailapi</artifactId>
Expand Down Expand Up @@ -1364,7 +1369,6 @@ IoT data streams.
<module>streampipes-client</module>
<module>streampipes-commons</module>
<module>streampipes-config</module>
<module>streampipes-connect</module>
<module>streampipes-connect-api</module>
<module>streampipes-connect-container-master</module>
<module>streampipes-container</module>
Expand Down
5 changes: 0 additions & 5 deletions streampipes-connect-container-master/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@

<dependencies>
<!-- StreamPipes dependencies -->
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-connect</artifactId>
<version>0.91.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-measurement-units</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
import org.apache.streampipes.commons.exceptions.SepaParseException;
import org.apache.streampipes.connect.adapter.GroundingService;
import org.apache.streampipes.connect.api.exception.AdapterException;
import org.apache.streampipes.connect.container.master.util.GroundingUtils;
import org.apache.streampipes.connect.container.master.util.WorkerPaths;
import org.apache.streampipes.manager.monitoring.pipeline.ExtensionsLogProvider;
import org.apache.streampipes.manager.verification.DataStreamVerifier;
Expand Down Expand Up @@ -82,7 +82,7 @@ public String addAdapter(AdapterDescription ad,
ad.setCorrespondingDataStreamElementId(dataStreamElementId);

// Add EventGrounding to AdapterDescription
EventGrounding eventGrounding = GroundingService.createEventGrounding();
EventGrounding eventGrounding = GroundingUtils.createEventGrounding();
ad.setEventGrounding(eventGrounding);

String elementId = this.adapterResourceManager.encryptAndCreate(ad);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
package org.apache.streampipes.connect.container.master.management;

import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.connect.adapter.AdapterRegistry;
import org.apache.streampipes.connect.api.IFormat;
import org.apache.streampipes.connect.api.exception.AdapterException;
import org.apache.streampipes.container.connect.adapter.AdapterRegistry;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.connect.grounding.FormatDescription;
import org.apache.streampipes.storage.api.IAdapterStorage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@

import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
import org.apache.streampipes.commons.exceptions.SpConfigurationException;
import org.apache.streampipes.connect.adapter.model.pipeline.AdapterEventPreviewPipeline;
import org.apache.streampipes.connect.api.exception.ParseException;
import org.apache.streampipes.connect.api.exception.WorkerAdapterException;
import org.apache.streampipes.connect.container.master.util.WorkerPaths;
import org.apache.streampipes.container.connect.adapter.model.pipeline.AdapterEventPreviewPipeline;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.connect.guess.AdapterEventPreview;
import org.apache.streampipes.model.connect.guess.GuessSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
package org.apache.streampipes.connect.container.master.management;

import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
import org.apache.streampipes.connect.adapter.util.TransportFormatGenerator;
import org.apache.streampipes.connect.api.exception.AdapterException;
import org.apache.streampipes.connect.container.master.util.WorkerPaths;
import org.apache.streampipes.container.connect.adapter.util.TransportFormatGenerator;
import org.apache.streampipes.model.SpDataSet;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,11 @@
*
*/

package org.apache.streampipes.connect.adapter;
package org.apache.streampipes.connect.container.master.util;

import org.apache.streampipes.config.backend.BackendConfig;
import org.apache.streampipes.config.backend.SpProtocol;
import org.apache.streampipes.connect.adapter.util.TransportFormatGenerator;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription;
import org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription;
import org.apache.streampipes.container.connect.adapter.util.TransportFormatGenerator;
import org.apache.streampipes.model.grounding.EventGrounding;
import org.apache.streampipes.model.grounding.JmsTransportProtocol;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
Expand All @@ -36,29 +33,10 @@
import java.util.Collections;
import java.util.UUID;

public class GroundingService {
public class GroundingUtils {

private static final String TOPIC_PREFIX = "org.apache.streampipes.connect.";

public static String extractTopic(AdapterDescription adapterDescription) {
EventGrounding eventGrounding = getEventGrounding(adapterDescription);
return eventGrounding.getTransportProtocol().getTopicDefinition().getActualTopicName();
}

private static EventGrounding getEventGrounding(AdapterDescription adapterDescription) {
EventGrounding eventGrounding;

if (adapterDescription instanceof SpecificAdapterSetDescription) {
eventGrounding = ((SpecificAdapterSetDescription) adapterDescription).getDataSet().getEventGrounding();
} else if (adapterDescription instanceof GenericAdapterSetDescription) {
eventGrounding = ((GenericAdapterSetDescription) adapterDescription).getDataSet().getEventGrounding();
} else {
eventGrounding = adapterDescription.getEventGrounding();
}

return eventGrounding;
}

public static EventGrounding createEventGrounding() {
EventGrounding eventGrounding = new EventGrounding();

Expand Down Expand Up @@ -100,11 +78,6 @@ public static EventGrounding createEventGrounding() {
return eventGrounding;
}

public static Boolean isPrioritized(SpProtocol prioritizedProtocol,
Class<?> protocolClass) {
return prioritizedProtocol.getProtocolClass().equals(protocolClass.getCanonicalName());
}

private static JmsTransportProtocol makeJmsTransportProtocol(String hostname, Integer port,
TopicDefinition topicDefinition) {
JmsTransportProtocol transportProtocol = new JmsTransportProtocol();
Expand Down Expand Up @@ -147,4 +120,9 @@ private static void fillTransportProtocol(TransportProtocol protocol, String hos
protocol.setBrokerHostname(hostname);
protocol.setTopicDefinition(topicDefinition);
}

public static Boolean isPrioritized(SpProtocol prioritizedProtocol,
Class<?> protocolClass) {
return prioritizedProtocol.getProtocolClass().equals(protocolClass.getCanonicalName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

package org.apache.streampipes.connect.container.master.management;

import org.apache.streampipes.connect.adapter.AdapterRegistry;
import org.apache.streampipes.connect.adapter.format.json.arraykey.JsonFormat;
import org.apache.streampipes.connect.api.IFormat;
import org.apache.streampipes.container.connect.adapter.AdapterRegistry;
import org.apache.streampipes.container.connect.adapter.format.json.arraykey.JsonFormat;
import org.apache.streampipes.model.connect.grounding.FormatDescription;

import org.junit.Test;
Expand Down
201 changes: 0 additions & 201 deletions streampipes-connect/pom.xml

This file was deleted.

Loading

0 comments on commit ea6a3c8

Please sign in to comment.