diff --git a/languageConverter/pom.xml b/languageConverter/pom.xml new file mode 100644 index 0000000..28a4f0d --- /dev/null +++ b/languageConverter/pom.xml @@ -0,0 +1,19 @@ + + + + gateway-examples + com.diffusiondata.gateway.adapter + 1.0.0 + + 4.0.0 + + languageConverter + + + 11 + 11 + + + \ No newline at end of file diff --git a/languageConverter/src/main/java/converter/SpanishToFrenchConverter.java b/languageConverter/src/main/java/converter/SpanishToFrenchConverter.java new file mode 100644 index 0000000..a6feb75 --- /dev/null +++ b/languageConverter/src/main/java/converter/SpanishToFrenchConverter.java @@ -0,0 +1,22 @@ +package converter; + +import com.diffusiondata.gateway.framework.converters.PayloadConverter; +import com.diffusiondata.gateway.framework.exceptions.PayloadConversionException; + +import converter.model.French; +import converter.model.Spanish; + +/** + * Sample converter for testing. + * + * @author ndhougoda-hamal + * @since 2.0 + */ +public final class SpanishToFrenchConverter + implements PayloadConverter { + + @Override + public French convert(Spanish input) throws PayloadConversionException { + return new French(input); + } +} diff --git a/languageConverter/src/main/java/converter/StringToSpanishConverter.java b/languageConverter/src/main/java/converter/StringToSpanishConverter.java new file mode 100644 index 0000000..bf6fdd6 --- /dev/null +++ b/languageConverter/src/main/java/converter/StringToSpanishConverter.java @@ -0,0 +1,21 @@ +package converter; + +import com.diffusiondata.gateway.framework.converters.PayloadConverter; +import com.diffusiondata.gateway.framework.exceptions.PayloadConversionException; + +import converter.model.Spanish; + +/** + * Sample converter for testing. + * + * @author ndhougoda-hamal + * @since 2.0 + */ +public final class StringToSpanishConverter + implements PayloadConverter { + + @Override + public Spanish convert(String input) throws PayloadConversionException { + return new Spanish(input); + } +} diff --git a/languageConverter/src/main/java/converter/model/French.java b/languageConverter/src/main/java/converter/model/French.java new file mode 100644 index 0000000..b51a833 --- /dev/null +++ b/languageConverter/src/main/java/converter/model/French.java @@ -0,0 +1,29 @@ +package converter.model; + +/** + * Simple class to test converter. + * + * @author ndhougoda-hamal + * @since 2.0 + */ +public final class French implements Language { + + /** + * Spanish prefix. + */ + static final String PREFIX = "French-"; + + private final String content; + + /** + * Constructor. + */ + public French(Spanish content) { + this.content = PREFIX + content.toString().split("-")[1]; + } + + @Override + public String toString() { + return content; + } +} diff --git a/languageConverter/src/main/java/converter/model/Language.java b/languageConverter/src/main/java/converter/model/Language.java new file mode 100644 index 0000000..d49c11e --- /dev/null +++ b/languageConverter/src/main/java/converter/model/Language.java @@ -0,0 +1,11 @@ +package converter.model; + +/** + * Simple interface to test converter. + * + * @author ndhougoda-hamal + * @since 2.0 + */ +public interface Language { + +} diff --git a/languageConverter/src/main/java/converter/model/Spanish.java b/languageConverter/src/main/java/converter/model/Spanish.java new file mode 100644 index 0000000..4726980 --- /dev/null +++ b/languageConverter/src/main/java/converter/model/Spanish.java @@ -0,0 +1,29 @@ +package converter.model; + +/** + * Simple class to test converter. + * + * @author ndhougoda-hamal + * @since 2.0 + */ +public final class Spanish implements Language { + + /** + * Spanish prefix. + */ + static final String PREFIX = "Spanish-"; + + private final String content; + + /** + * Constructor. + */ + public Spanish(String content) { + this.content = PREFIX + content; + } + + @Override + public String toString() { + return content; + } +} diff --git a/pom.xml b/pom.xml index 631c483..3d52a14 100644 --- a/pom.xml +++ b/pom.xml @@ -11,6 +11,8 @@ csv-file-adapter misc + sample-diffusion-adapter + languageConverter @@ -32,9 +34,9 @@ 2.0.0_RC1 - org.apache.logging.log4j - log4j-slf4j2-impl - 2.20.0 + io.micrometer + micrometer-registry-jmx + 1.8.1 diff --git a/sample-diffusion-adapter/pom.xml b/sample-diffusion-adapter/pom.xml new file mode 100644 index 0000000..aecfe3d --- /dev/null +++ b/sample-diffusion-adapter/pom.xml @@ -0,0 +1,19 @@ + + + + gateway-examples + com.diffusiondata.gateway.adapter + 1.0.0 + + 4.0.0 + + sample-diffusion-adapter + + + 11 + 11 + + + \ No newline at end of file diff --git a/sample-diffusion-adapter/src/main/java/diffusion/Application.java b/sample-diffusion-adapter/src/main/java/diffusion/Application.java new file mode 100644 index 0000000..7e7052a --- /dev/null +++ b/sample-diffusion-adapter/src/main/java/diffusion/Application.java @@ -0,0 +1,120 @@ +package diffusion; + +import static com.diffusiondata.gateway.framework.DiffusionGatewayFramework.newApplicationDetailsBuilder; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import com.diffusiondata.gateway.framework.DiffusionGatewayFramework; +import com.diffusiondata.gateway.framework.GatewayApplication; +import com.diffusiondata.gateway.framework.GatewayMeterRegistry; +import com.diffusiondata.gateway.framework.Publisher; +import com.diffusiondata.gateway.framework.ServiceDefinition; +import com.diffusiondata.gateway.framework.ServiceMode; +import com.diffusiondata.gateway.framework.SinkHandler; +import com.diffusiondata.gateway.framework.StateHandler; +import com.diffusiondata.gateway.framework.StreamingSourceHandler; +import com.diffusiondata.gateway.framework.Subscriber; +import com.diffusiondata.gateway.framework.exceptions.ApplicationConfigurationException; +import com.diffusiondata.gateway.framework.exceptions.InvalidConfigurationException; + +import io.micrometer.core.instrument.Clock; +import io.micrometer.jmx.JmxMeterRegistry; + + +public class Application implements GatewayApplication { + private JmxMeterRegistry meterRegistry = + new JmxMeterRegistry(s -> null, Clock.SYSTEM); + + @Override + public GatewayMeterRegistry getGatewayMeterRegistry() { + return () -> meterRegistry; + } + + @Override + public ApplicationDetails getApplicationDetails() throws ApplicationConfigurationException { + return + newApplicationDetailsBuilder() + .addServiceType( + "REMOTE_STREAMER", + ServiceMode.STREAMING_SOURCE, + "Consumes from remote Diffusion topics", + "{\n" + + " \"$schema\": \"http://json-schema" + + ".org/draft-07/schema#\",\n" + + " \"title\": \"Generated schema for Root\",\n" + + " \"type\": \"object\",\n" + + " \"properties\": {\n" + + " \"principal\": {\n" + + " \"type\": \"string\"\n" + + " },\n" + + " \"password\": {\n" + + " \"type\": \"string\",\n" + + " \"hidden\": \"true\"\n" + + " },\n" + + " \"topicSelector\": {\n" + + " \"type\": \"string\"\n" + + " },\n" + + " \"url\": {\n" + + " \"type\": \"string\"\n" + + " }\n" + + " },\n" + + " \"required\": [\n" + + " \"principal\",\n" + + " \"password\",\n" + + " \"topicSelector\",\n" + + " \"url\"\n" + + " ]\n" + + "}" + ) + .addServiceType( + "LOCAL_STREAMER", + ServiceMode.SINK, + "Consumes from Local Diffusion topics", + null + ) + .build("Diffusion adapter", 1); + } + + @Override + public StreamingSourceHandler addStreamingSource( + ServiceDefinition serviceDefinition, + Publisher publisher, + StateHandler stateHandler) throws InvalidConfigurationException { + + Map parameters = serviceDefinition.getParameters(); + + String url = (String) parameters.get("url"); + String principal = (String) parameters.get("principal"); + String password = (String) parameters.get("password"); + String topicSelector = (String) parameters.get("topicSelector"); + String prefix = parameters.get("prefix") == null ? "" : + (String) parameters.get("prefix"); + + return new RemoteStreamer( + url, + principal, + password, + topicSelector, + prefix, + publisher); + } + + @Override + public SinkHandler addSink( + ServiceDefinition serviceDefinition, + Subscriber subscriber, + StateHandler stateHandler) throws InvalidConfigurationException { + + return new LocalStreamer(); + } + + @Override + public CompletableFuture stop() { + return CompletableFuture.completedFuture(null); + } + + public static void main(String[] args) { + DiffusionGatewayFramework.start(new Application()); + } +} diff --git a/sample-diffusion-adapter/src/main/java/diffusion/LocalStreamer.java b/sample-diffusion-adapter/src/main/java/diffusion/LocalStreamer.java new file mode 100644 index 0000000..d28ab79 --- /dev/null +++ b/sample-diffusion-adapter/src/main/java/diffusion/LocalStreamer.java @@ -0,0 +1,43 @@ +package diffusion; + +import java.util.concurrent.CompletableFuture; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.diffusiondata.gateway.framework.SinkHandler; +import com.diffusiondata.gateway.framework.TopicProperties; + +public class LocalStreamer implements SinkHandler { + private static final Logger LOG = + LoggerFactory.getLogger(LocalStreamer.class); + + @Override + public Class valueType() { + return Object.class; + } + + @Override + public CompletableFuture update( + String path, + Object value, + TopicProperties topicProperties) { + + LOG.info( + "Received {} from {} with {} properties", + value, + path, + topicProperties); + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture pause(PauseReason reason) { + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture resume(ResumeReason reason) { + return CompletableFuture.completedFuture(null); + } +} diff --git a/sample-diffusion-adapter/src/main/java/diffusion/RemoteStreamer.java b/sample-diffusion-adapter/src/main/java/diffusion/RemoteStreamer.java new file mode 100644 index 0000000..6576ceb --- /dev/null +++ b/sample-diffusion-adapter/src/main/java/diffusion/RemoteStreamer.java @@ -0,0 +1,115 @@ +package diffusion; + +import java.util.concurrent.CompletableFuture; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.diffusiondata.gateway.framework.Publisher; +import com.diffusiondata.gateway.framework.StreamingSourceHandler; +import com.diffusiondata.gateway.framework.exceptions.PayloadConversionException; +import com.pushtechnology.diffusion.client.Diffusion; +import com.pushtechnology.diffusion.client.callbacks.ErrorReason; +import com.pushtechnology.diffusion.client.features.Topics; +import com.pushtechnology.diffusion.client.features.Topics.UnsubscribeReason; +import com.pushtechnology.diffusion.client.features.Topics.ValueStream; +import com.pushtechnology.diffusion.client.session.Session; +import com.pushtechnology.diffusion.client.topics.details.TopicSpecification; + +public class RemoteStreamer implements StreamingSourceHandler { + private final Logger LOG = + LoggerFactory.getLogger(RemoteStreamer.class); + + private final String url; + private final String principal; + private final String password; + private final String topicSelector; + private final String prefix; + private final Publisher publisher; + + private Session session; + + public RemoteStreamer(String url, String principal, String password, + String topicSelector, String prefix, Publisher publisher) { + this.url = url; + this.principal = principal; + this.password = password; + this.topicSelector = topicSelector; + this.prefix = prefix; + this.publisher = publisher; + } + + @Override + public CompletableFuture start() { + + session = + Diffusion.sessions().principal(principal).password(password).open(url); + + Topics topics = session.feature(Topics.class); + + ValueStream valueStream = new ValueStreamImpl(); + topics.addStream(topicSelector, Object.class, valueStream); + topics.subscribe(topicSelector); + + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture pause(PauseReason reason) { + session.close(); + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture resume(ResumeReason reason) { + return start(); + } + + class ValueStreamImpl implements ValueStream { + + @Override + public void onValue(String topicPath, + TopicSpecification specification, Object oldValue, + Object newValue) { + try { + + publisher.publish(prefix + topicPath, newValue, + publisher.getConfiguredTopicProperties()) + .whenComplete((res, ex) -> { + if (ex == null) { + LOG.info("Published to {}", topicPath); + } + else { + LOG.error("Failed to publish from {}", topicPath); + } + }); + } + catch (PayloadConversionException ex) { + LOG.error("Failed to convert", ex); + } + + } + + @Override + public void onSubscription(String topicPath, + TopicSpecification specification) { + + } + + @Override + public void onUnsubscription(String topicPath, + TopicSpecification specification, UnsubscribeReason reason) { + + } + + @Override + public void onClose() { + + } + + @Override + public void onError(ErrorReason errorReason) { + + } + } +} diff --git a/sample-diffusion-adapter/src/main/resources/configuration.json b/sample-diffusion-adapter/src/main/resources/configuration.json new file mode 100644 index 0000000..fba4806 --- /dev/null +++ b/sample-diffusion-adapter/src/main/resources/configuration.json @@ -0,0 +1,57 @@ +{ + "framework-version" : 1, + "application-version" : 1, + "id" : "gateway-application-1", + "diffusion" : { + "url" : "ws://localhost:8090", + "principal" : "admin", + "password" : "password" + }, + "services" : [ { + "serviceName" : "localStreamer", + "description" : "", + "serviceType" : "LOCAL_STREAMER", + "config" : { + "framework" : { + "diffusionTopicSelector" : "?data//" + } + } + }, { + "serviceName" : "weatherConsumer", + "description" : "Consumes from remote topic and publishes to local", + "serviceType" : "REMOTE_STREAMER", + "config" : { + "framework" : { + "publicationRetries" : 5, + "retryIntervalMs" : 5000, + "payloadConverters" : [ { + "name" : "$Object_to_String" + } ], + "topicProperties" : { + "persistencePolicy" : "SESSION", + "timeSeries" : false, + "publishValuesOnly" : false, + "dontRetainValue" : false + } + }, + "application" : { + "principal" : "admin", + "password" : "password", + "topicSelector" : "?weather//", + "url" : "ws://localhost:7090" + }, + "state" : "ACTIVE" + } + } ], + "global" : { + "framework" : { + "threadPoolSize" : 11, + "mode" : "DYNAMIC", + "metrics" : { + "enabled" : true, + "filterMetricsPrefixes" : [ ] + } + }, + "application" : { } + } +} \ No newline at end of file diff --git a/sample-diffusion-adapter/src/main/resources/log4j2.xml b/sample-diffusion-adapter/src/main/resources/log4j2.xml new file mode 100644 index 0000000..ae36128 --- /dev/null +++ b/sample-diffusion-adapter/src/main/resources/log4j2.xml @@ -0,0 +1,42 @@ + + + + + + ${sys:diffusion.home:-.}/logs + + %date{yyyy-MM-dd HH:mm:ss.SSS}|%level|%thread|%replace{%msg}{\|}{}|%logger%n%xEx + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +