Skip to content

Commit

Permalink
Refactor data processor and sink API (#1632) (#1633)
Browse files Browse the repository at this point in the history
* chore: mark streampipes-wrapper-python as deprecated (#1623)

* Bump types-requests in /streampipes-client-python (#1588)

Bumps [types-requests](https://github.com/python/typeshed) from 2.30.0.0 to 2.31.0.0.
- [Commits](https://github.com/python/typeshed/commits)

---
updated-dependencies:
- dependency-name: types-requests
  dependency-type: direct:development
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* Bump mkdocstrings[python] in /streampipes-client-python (#1631)

* chore: add missing support of NATS as messaging protocol (#1628)

* Bump spring-boot.version from 3.0.6 to 3.1.0 (#1595)

Bumps `spring-boot.version` from 3.0.6 to 3.1.0.

Updates `spring-boot-properties-migrator` from 3.0.6 to 3.1.0
- [Release notes](https://github.com/spring-projects/spring-boot/releases)
- [Commits](spring-projects/spring-boot@v3.0.6...v3.1.0)

Updates `spring-boot-starter-jersey` from 3.0.6 to 3.1.0
- [Release notes](https://github.com/spring-projects/spring-boot/releases)
- [Commits](spring-projects/spring-boot@v3.0.6...v3.1.0)

Updates `spring-boot-starter-jetty` from 3.0.6 to 3.1.0
- [Release notes](https://github.com/spring-projects/spring-boot/releases)
- [Commits](spring-projects/spring-boot@v3.0.6...v3.1.0)

Updates `spring-boot-starter-oauth2-client` from 3.0.6 to 3.1.0
- [Release notes](https://github.com/spring-projects/spring-boot/releases)
- [Commits](spring-projects/spring-boot@v3.0.6...v3.1.0)

Updates `spring-boot-starter-undertow` from 3.0.6 to 3.1.0
- [Release notes](https://github.com/spring-projects/spring-boot/releases)
- [Commits](spring-projects/spring-boot@v3.0.6...v3.1.0)

Updates `spring-boot-starter-web` from 3.0.6 to 3.1.0
- [Release notes](https://github.com/spring-projects/spring-boot/releases)
- [Commits](spring-projects/spring-boot@v3.0.6...v3.1.0)

Updates `spring-boot-maven-plugin` from 3.0.6 to 3.1.0
- [Release notes](https://github.com/spring-projects/spring-boot/releases)
- [Commits](spring-projects/spring-boot@v3.0.6...v3.1.0)

---
updated-dependencies:
- dependency-name: org.springframework.boot:spring-boot-properties-migrator
  dependency-type: direct:production
  update-type: version-update:semver-minor
- dependency-name: org.springframework.boot:spring-boot-starter-jersey
  dependency-type: direct:production
  update-type: version-update:semver-minor
- dependency-name: org.springframework.boot:spring-boot-starter-jetty
  dependency-type: direct:production
  update-type: version-update:semver-minor
- dependency-name: org.springframework.boot:spring-boot-starter-oauth2-client
  dependency-type: direct:production
  update-type: version-update:semver-minor
- dependency-name: org.springframework.boot:spring-boot-starter-undertow
  dependency-type: direct:production
  update-type: version-update:semver-minor
- dependency-name: org.springframework.boot:spring-boot-starter-web
  dependency-type: direct:production
  update-type: version-update:semver-minor
- dependency-name: org.springframework.boot:spring-boot-maven-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* Refactor data processor and sink API (#1632)

* Cleanup pom file (#1632)

* Migrate processors to API (#1632)

* [hotfix] Add runtime provider, update pom

* Fix bug in standalone wrapper (#1632)

* Fix instance provision in compatibility layer (#1632)

* [hotfix] Disable Maven cache in workflow file

* [hotfix] Fix failing build by reverting Flink version

* [hotfix] Add proper time selection to another cypress test (#1641)

* Fix bug in aggregation flink module

* Properly return output source and schema info

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: Tim <50115603+bossenti@users.noreply.github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
  • Loading branch information
3 people committed May 31, 2023
1 parent b9168d0 commit 3253ec7
Show file tree
Hide file tree
Showing 396 changed files with 3,428 additions and 4,110 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pr-validation.yml
Expand Up @@ -191,4 +191,4 @@ jobs:
- name: Build MkDocs
working-directory: ./streampipes-client-python
run: |
make doc
make doc
Expand Up @@ -30,6 +30,7 @@
import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
import org.apache.streampipes.messaging.nats.SpNatsProtocolFactory;
import org.apache.streampipes.service.extensions.ExtensionsModelSubmitter;

import ${package}.pe.${packageName}.${classNamePrefix}DataProcessor;
Expand All @@ -56,7 +57,8 @@ public SpServiceDefinition provideServiceDefinition() {
.registerMessagingProtocols(
new SpKafkaProtocolFactory(),
new SpJmsProtocolFactory(),
new SpMqttProtocolFactory())
new SpMqttProtocolFactory(),
new SpNatsProtocolFactory())
.build();
}
}
Expand Up @@ -34,6 +34,7 @@
import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
import org.apache.streampipes.messaging.nats.SpNatsProtocolFactory;
import org.apache.streampipes.service.extensions.ExtensionsModelSubmitter;

public class Init extends ExtensionsModelSubmitter {
Expand All @@ -57,7 +58,8 @@ public SpServiceDefinition provideServiceDefinition() {
.registerMessagingProtocols(
new SpKafkaProtocolFactory(),
new SpJmsProtocolFactory(),
new SpMqttProtocolFactory())
new SpMqttProtocolFactory(),
new SpNatsProtocolFactory())
.addConfig(ConfigKeys.FLINK_HOST, "jobmanager", "Hostname of the Flink Jobmanager")
.addConfig(ConfigKeys.FLINK_PORT, 8081, "Port of the Flink Jobmanager")
.addConfig(ConfigKeys.DEBUG, false, "Debug/Mini cluster mode of Flink program")
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
import org.apache.streampipes.messaging.nats.SpNatsProtocolFactory;
import org.apache.streampipes.service.extensions.ExtensionsModelSubmitter;

import ${package}.config.ConfigKeys;
Expand All @@ -56,7 +57,8 @@ public SpServiceDefinition provideServiceDefinition() {
.registerMessagingProtocols(
new SpKafkaProtocolFactory(),
new SpJmsProtocolFactory(),
new SpMqttProtocolFactory())
new SpMqttProtocolFactory(),
new SpNatsProtocolFactory())
.addConfig(ConfigKeys.FLINK_HOST, "jobmanager", "Hostname of the Flink Jobmanager")
.addConfig(ConfigKeys.FLINK_PORT, 8081, "Port of the Flink Jobmanager")
.addConfig(ConfigKeys.DEBUG, false, "Debug/Mini cluster mode of Flink program")
Expand Down
25 changes: 13 additions & 12 deletions pom.xml
Expand Up @@ -117,7 +117,7 @@
<snakeyaml.version>1.33</snakeyaml.version>
<snappy-java.version>1.1.7.7</snappy-java.version>
<spring.version>6.0.8</spring.version>
<spring-boot.version>3.0.6</spring-boot.version>
<spring-boot.version>3.1.0</spring-boot.version>
<spring-security.version>6.0.3</spring-security.version>
<swagger.version>2.2.7</swagger.version>
<type-parser.version>0.8.1</type-parser.version>
Expand Down Expand Up @@ -160,6 +160,7 @@
<jsrosbridge.version>0.2.0</jsrosbridge.version>
<jedis.version>4.3.1</jedis.version>
<j2objc-annotations.version>1.3</j2objc-annotations.version>
<kryo.version>2.24.0</kryo.version>
<language-detector.version>0.6</language-detector.version>
<log4j-to-slf4j.version>2.8.2</log4j-to-slf4j.version>
<lombok.version>1.18.22</lombok.version>
Expand Down Expand Up @@ -230,6 +231,11 @@
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
<version>${kryo.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
Expand Down Expand Up @@ -957,12 +963,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.11</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.11</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.maven</groupId>
<artifactId>maven-aether-provider</artifactId>
Expand Down Expand Up @@ -1221,11 +1227,6 @@
<artifactId>flink-cep_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.11</artifactId>
Expand Down
4 changes: 2 additions & 2 deletions streampipes-client-python/setup.py
Expand Up @@ -58,14 +58,14 @@
"pytest-cov==4.1.0",
"pyupgrade==3.4.0",
"types-Jinja2==2.11.9",
"types-requests==2.30.0.0",
"types-requests==2.31.0.0",
]

docs_packages = [
"mkdocs==1.4.2",
"mkdocs-awesome-pages-plugin==2.9.0",
"mkdocs-material==9.1.3",
"mkdocstrings[python]==0.21.1",
"mkdocstrings[python]==0.22.0",
"pytkdocs[numpy-style]>=0.16.1",
"mkdocs-gen-files==0.5.0",
"mkdocs-literate-nav==0.6.0",
Expand Down
Expand Up @@ -67,7 +67,15 @@ public enum Envs {

SP_TS_STORAGE_ORG("SP_TS_STORAGE_ORG", "sp"),

SP_TS_STORAGE_BUCKET("SP_TS_STORAGE_BUCKET", "sp");
SP_TS_STORAGE_BUCKET("SP_TS_STORAGE_BUCKET", "sp"),

SP_FLINK_JAR_FILE_LOC(
"SP_FLINK_JAR_FILE_LOC",
"./streampipes-processing-element-container.jar"),

SP_FLINK_JOBMANAGER_HOST("SP_FLINK_JOBMANAGER_HOST", "jobmanager"),

SP_FLINK_JOBMANAGER_PORT("SP_FLINK_JOBMANAGER_PORT", "8081");

private final String envVariableName;
private String defaultValue;
Expand Down
Expand Up @@ -179,6 +179,21 @@ public StringEnvironmentVariable getCoreAssetBaseDir() {
return new StringEnvironmentVariable(Envs.SP_CORE_ASSET_BASE_DIR);
}

@Override
public StringEnvironmentVariable getFlinkJarFileLoc() {
return new StringEnvironmentVariable(Envs.SP_FLINK_JAR_FILE_LOC);
}

@Override
public StringEnvironmentVariable getFlinkJobmanagerHost() {
return new StringEnvironmentVariable(Envs.SP_FLINK_JOBMANAGER_HOST);
}

@Override
public IntEnvironmentVariable getFlinkJobmanagerPort() {
return new IntEnvironmentVariable(Envs.SP_FLINK_JOBMANAGER_PORT);
}

@Override
public StringEnvironmentVariable getConsulLocation() {
return new StringEnvironmentVariable(Envs.SP_CONSUL_LOCATION);
Expand Down
Expand Up @@ -102,4 +102,11 @@ public interface Environment {
StringEnvironmentVariable getInitialAdminPassword();

StringEnvironmentVariable getCoreAssetBaseDir();

// Flink Wrapper
StringEnvironmentVariable getFlinkJarFileLoc();

StringEnvironmentVariable getFlinkJobmanagerHost();

IntEnvironmentVariable getFlinkJobmanagerPort();
}
Expand Up @@ -17,7 +17,7 @@
*/
package org.apache.streampipes.dataexplorer.commons;

import org.apache.streampipes.client.StreamPipesClient;
import org.apache.streampipes.client.api.IStreamPipesClient;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataexplorer.commons.influx.InfluxNameSanitizer;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
Expand All @@ -33,27 +33,27 @@ public class DataExplorerUtils {
* @param client StreamPipes client to store measure
* @param measure DataLakeMeasurement
*/
public static DataLakeMeasure sanitizeAndRegisterAtDataLake(StreamPipesClient client,
public static DataLakeMeasure sanitizeAndRegisterAtDataLake(IStreamPipesClient client,
DataLakeMeasure measure) throws SpRuntimeException {
sanitizeDataLakeMeasure(measure);
registerAtDataLake(client, measure);

return measure;
}

public static DataLakeMeasure sanitizeAndUpdateAtDataLake(StreamPipesClient client,
public static DataLakeMeasure sanitizeAndUpdateAtDataLake(IStreamPipesClient client,
DataLakeMeasure measure) throws SpRuntimeException {
sanitizeDataLakeMeasure(measure);
updateAtDataLake(client, measure);
return measure;
}

private static void registerAtDataLake(StreamPipesClient client,
private static void registerAtDataLake(IStreamPipesClient client,
DataLakeMeasure measure) throws SpRuntimeException {
client.dataLakeMeasureApi().create(measure);
}

public static void updateAtDataLake(StreamPipesClient client,
public static void updateAtDataLake(IStreamPipesClient client,
DataLakeMeasure measure) throws SpRuntimeException {
client.dataLakeMeasureApi().update(measure);
}
Expand Down
Expand Up @@ -18,7 +18,7 @@

package org.apache.streampipes.dataexplorer.commons;

import org.apache.streampipes.client.StreamPipesClient;
import org.apache.streampipes.client.api.IStreamPipesClient;
import org.apache.streampipes.commons.environment.Environment;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataexplorer.commons.image.ImageStore;
Expand All @@ -39,7 +39,7 @@ public class TimeSeriesStore {


public TimeSeriesStore(Environment environment,
StreamPipesClient client,
IStreamPipesClient client,
DataLakeMeasure measure,
boolean enableImageStore) {

Expand Down
5 changes: 5 additions & 0 deletions streampipes-extensions-api/pom.xml
Expand Up @@ -33,6 +33,11 @@
<artifactId>streampipes-client-api</artifactId>
<version>0.93.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-messaging</artifactId>
<version>0.93.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-model</artifactId>
Expand Down
Expand Up @@ -16,11 +16,9 @@
*
*/

package org.apache.streampipes.processors.transformation.flink.processor.hasher.algorithm;
package org.apache.streampipes.extensions.api.extractor;

import java.io.Serializable;
import org.apache.streampipes.model.graph.DataSinkInvocation;

public interface HashAlgorithm extends Serializable {

String toHashValue(Object value);
public interface IDataSinkParameterExtractor extends IParameterExtractor<DataSinkInvocation> {
}
Expand Up @@ -15,7 +15,10 @@
* limitations under the License.
*
*/
package org.apache.streampipes.processors.textmining.flink.processor.sentiment;

public class SentimentDetectionParameters {
package org.apache.streampipes.extensions.api.monitoring;

import org.slf4j.Logger;

public interface IPipelineElementLogger extends Logger {
}
Expand Up @@ -16,23 +16,24 @@
*
*/

package org.apache.streampipes.wrapper.runtime;
package org.apache.streampipes.extensions.api.pe;

import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
import org.apache.streampipes.wrapper.routing.SpOutputCollector;

@Deprecated(since = "0.70.0", forRemoval = true)
public interface EventProcessor<T extends EventProcessorBindingParams> extends
PipelineElement<T, DataProcessorInvocation> {
public interface IStreamPipesDataProcessor
extends IStreamPipesPipelineElement<IDataProcessorConfiguration> {

void onInvocation(T parameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext)
throws
SpRuntimeException;
void onPipelineStarted(IDataProcessorParameters params,
SpOutputCollector collector,
EventProcessorRuntimeContext runtimeContext);

void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeException;
void onEvent(Event event,
SpOutputCollector collector);

void onPipelineStopped();

}
Expand Up @@ -16,20 +16,20 @@
*
*/

package org.apache.streampipes.wrapper.runtime;
package org.apache.streampipes.extensions.api.pe;

import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.model.graph.DataSinkInvocation;
import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
import org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;

public interface EventSink<T extends EventSinkBindingParams> extends PipelineElement<T,
DataSinkInvocation> {
public interface IStreamPipesDataSink
extends IStreamPipesPipelineElement<IDataSinkConfiguration> {

void onInvocation(T parameters, EventSinkRuntimeContext runtimeContext) throws
SpRuntimeException;
void onPipelineStarted(IDataSinkParameters params,
EventSinkRuntimeContext runtimeContext);

void onEvent(Event event) throws SpRuntimeException;
void onEvent(Event event);

void onPipelineStopped();
}
Expand Up @@ -15,13 +15,12 @@
* limitations under the License.
*
*/
package org.apache.streampipes.extensions.api.declarer;

import org.apache.streampipes.model.SpDataStream;
package org.apache.streampipes.extensions.api.pe;

public interface DataStreamDeclarer extends Declarer<SpDataStream> {
import org.apache.streampipes.extensions.api.pe.config.IDataStreamConfiguration;

SpDataStream declareModel();
public interface IStreamPipesDataStream extends IStreamPipesPipelineElement<IDataStreamConfiguration> {

void executeStream();

Expand Down
Expand Up @@ -15,7 +15,12 @@
* limitations under the License.
*
*/
package org.apache.streampipes.processors.textmining.flink.processor.sentiment;

public class SentimentDetectionController {
package org.apache.streampipes.extensions.api.pe;

import org.apache.streampipes.extensions.api.pe.config.IPipelineElementConfiguration;

public interface IStreamPipesPipelineElement<PcT extends IPipelineElementConfiguration<?, ?>> {

PcT declareConfig();
}

0 comments on commit 3253ec7

Please sign in to comment.