Skip to content

Commit

Permalink
Added sample diffusion converter and converter using GF 2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
GitLab committed Feb 15, 2024
1 parent 7d335e6 commit 99ce97f
Show file tree
Hide file tree
Showing 13 changed files with 532 additions and 3 deletions.
19 changes: 19 additions & 0 deletions languageConverter/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>gateway-examples</artifactId>
<groupId>com.diffusiondata.gateway.adapter</groupId>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>languageConverter</artifactId>

<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package converter;

import com.diffusiondata.gateway.framework.converters.PayloadConverter;
import com.diffusiondata.gateway.framework.exceptions.PayloadConversionException;

import converter.model.French;
import converter.model.Spanish;

/**
* Sample converter for testing.
*
* @author ndhougoda-hamal
* @since 2.0
*/
public final class SpanishToFrenchConverter
implements PayloadConverter<Spanish, French> {

@Override
public French convert(Spanish input) throws PayloadConversionException {
return new French(input);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package converter;

import com.diffusiondata.gateway.framework.converters.PayloadConverter;
import com.diffusiondata.gateway.framework.exceptions.PayloadConversionException;

import converter.model.Spanish;

/**
* Sample converter for testing.
*
* @author ndhougoda-hamal
* @since 2.0
*/
public final class StringToSpanishConverter
implements PayloadConverter<String, Spanish> {

@Override
public Spanish convert(String input) throws PayloadConversionException {
return new Spanish(input);
}
}
29 changes: 29 additions & 0 deletions languageConverter/src/main/java/converter/model/French.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package converter.model;

/**
* Simple class to test converter.
*
* @author ndhougoda-hamal
* @since 2.0
*/
public final class French implements Language {

/**
* Spanish prefix.
*/
static final String PREFIX = "French-";

private final String content;

/**
* Constructor.
*/
public French(Spanish content) {
this.content = PREFIX + content.toString().split("-")[1];
}

@Override
public String toString() {
return content;
}
}
11 changes: 11 additions & 0 deletions languageConverter/src/main/java/converter/model/Language.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package converter.model;

/**
* Simple interface to test converter.
*
* @author ndhougoda-hamal
* @since 2.0
*/
public interface Language {

}
29 changes: 29 additions & 0 deletions languageConverter/src/main/java/converter/model/Spanish.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package converter.model;

/**
* Simple class to test converter.
*
* @author ndhougoda-hamal
* @since 2.0
*/
public final class Spanish implements Language {

/**
* Spanish prefix.
*/
static final String PREFIX = "Spanish-";

private final String content;

/**
* Constructor.
*/
public Spanish(String content) {
this.content = PREFIX + content;
}

@Override
public String toString() {
return content;
}
}
8 changes: 5 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
<modules>
<module>csv-file-adapter</module>
<module>misc</module>
<module>sample-diffusion-adapter</module>
<module>languageConverter</module>
</modules>

<properties>
Expand All @@ -32,9 +34,9 @@
<version>2.0.0_RC1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j2-impl</artifactId>
<version>2.20.0</version>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-jmx</artifactId>
<version>1.8.1</version>
</dependency>
</dependencies>

Expand Down
19 changes: 19 additions & 0 deletions sample-diffusion-adapter/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>gateway-examples</artifactId>
<groupId>com.diffusiondata.gateway.adapter</groupId>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>sample-diffusion-adapter</artifactId>

<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>

</project>
120 changes: 120 additions & 0 deletions sample-diffusion-adapter/src/main/java/diffusion/Application.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package diffusion;

import static com.diffusiondata.gateway.framework.DiffusionGatewayFramework.newApplicationDetailsBuilder;

import java.util.Map;
import java.util.concurrent.CompletableFuture;

import com.diffusiondata.gateway.framework.DiffusionGatewayFramework;
import com.diffusiondata.gateway.framework.GatewayApplication;
import com.diffusiondata.gateway.framework.GatewayMeterRegistry;
import com.diffusiondata.gateway.framework.Publisher;
import com.diffusiondata.gateway.framework.ServiceDefinition;
import com.diffusiondata.gateway.framework.ServiceMode;
import com.diffusiondata.gateway.framework.SinkHandler;
import com.diffusiondata.gateway.framework.StateHandler;
import com.diffusiondata.gateway.framework.StreamingSourceHandler;
import com.diffusiondata.gateway.framework.Subscriber;
import com.diffusiondata.gateway.framework.exceptions.ApplicationConfigurationException;
import com.diffusiondata.gateway.framework.exceptions.InvalidConfigurationException;

import io.micrometer.core.instrument.Clock;
import io.micrometer.jmx.JmxMeterRegistry;


public class Application implements GatewayApplication {
private JmxMeterRegistry meterRegistry =
new JmxMeterRegistry(s -> null, Clock.SYSTEM);

@Override
public GatewayMeterRegistry getGatewayMeterRegistry() {
return () -> meterRegistry;
}

@Override
public ApplicationDetails getApplicationDetails() throws ApplicationConfigurationException {
return
newApplicationDetailsBuilder()
.addServiceType(
"REMOTE_STREAMER",
ServiceMode.STREAMING_SOURCE,
"Consumes from remote Diffusion topics",
"{\n" +
" \"$schema\": \"http://json-schema" +
".org/draft-07/schema#\",\n" +
" \"title\": \"Generated schema for Root\",\n" +
" \"type\": \"object\",\n" +
" \"properties\": {\n" +
" \"principal\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
" \"password\": {\n" +
" \"type\": \"string\",\n" +
" \"hidden\": \"true\"\n" +
" },\n" +
" \"topicSelector\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
" \"url\": {\n" +
" \"type\": \"string\"\n" +
" }\n" +
" },\n" +
" \"required\": [\n" +
" \"principal\",\n" +
" \"password\",\n" +
" \"topicSelector\",\n" +
" \"url\"\n" +
" ]\n" +
"}"
)
.addServiceType(
"LOCAL_STREAMER",
ServiceMode.SINK,
"Consumes from Local Diffusion topics",
null
)
.build("Diffusion adapter", 1);
}

@Override
public StreamingSourceHandler addStreamingSource(
ServiceDefinition serviceDefinition,
Publisher publisher,
StateHandler stateHandler) throws InvalidConfigurationException {

Map<String, Object> parameters = serviceDefinition.getParameters();

String url = (String) parameters.get("url");
String principal = (String) parameters.get("principal");
String password = (String) parameters.get("password");
String topicSelector = (String) parameters.get("topicSelector");
String prefix = parameters.get("prefix") == null ? "" :
(String) parameters.get("prefix");

return new RemoteStreamer(
url,
principal,
password,
topicSelector,
prefix,
publisher);
}

@Override
public SinkHandler<?> addSink(
ServiceDefinition serviceDefinition,
Subscriber subscriber,
StateHandler stateHandler) throws InvalidConfigurationException {

return new LocalStreamer();
}

@Override
public CompletableFuture<?> stop() {
return CompletableFuture.completedFuture(null);
}

public static void main(String[] args) {
DiffusionGatewayFramework.start(new Application());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package diffusion;

import java.util.concurrent.CompletableFuture;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.diffusiondata.gateway.framework.SinkHandler;
import com.diffusiondata.gateway.framework.TopicProperties;

public class LocalStreamer implements SinkHandler<Object> {
private static final Logger LOG =
LoggerFactory.getLogger(LocalStreamer.class);

@Override
public Class<Object> valueType() {
return Object.class;
}

@Override
public CompletableFuture<?> update(
String path,
Object value,
TopicProperties topicProperties) {

LOG.info(
"Received {} from {} with {} properties",
value,
path,
topicProperties);
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<?> pause(PauseReason reason) {
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<?> resume(ResumeReason reason) {
return CompletableFuture.completedFuture(null);
}
}

0 comments on commit 99ce97f

Please sign in to comment.