Skip to content

Commit

Permalink
updated to use gateway framework 2.0 (#11)
Browse files Browse the repository at this point in the history
Co-authored-by: naveenad <naveena.dhougoda-hamal@diffusiondata.com>
  • Loading branch information
naveenad and naveenad committed Feb 15, 2024
1 parent 4e61ad0 commit 7d335e6
Show file tree
Hide file tree
Showing 24 changed files with 134 additions and 280 deletions.
16 changes: 16 additions & 0 deletions csv-file-adapter/csv-file-sink/.run/CsvFileSinkRunner.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="CsvFileSinkRunner" type="Application" factoryName="Application">
<option name="MAIN_CLASS_NAME" value="com.diffusiondata.gateway.adapter.csv.sink.Runner" />
<module name="csv-file-sink" />
<option name="VM_PARAMETERS" value="-Dgateway.config.file=$PROJECT_DIR$/csv-file-adapter/csv-file-sink/src/main/resources/configuration.json -Dgateway.config.use-local-services=true" />
<extension name="coverage">
<pattern>
<option name="PATTERN" value="com.diffusiondata.gateway.adapter.csv.sink.*" />
<option name="ENABLED" value="true" />
</pattern>
</extension>
<method v="2">
<option name="Make" enabled="true" />
</method>
</configuration>
</component>
2 changes: 1 addition & 1 deletion csv-file-adapter/csv-file-sink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
This adapter can be used to get updates from Diffusion JSON topics and write the update to a CSV file. It supports only one type of service.

### CSV_FILE_SINK
This sink service supports getting Diffusion JSON topic updates and publishing them to a CSV file specified in the configuration. This service will correctly function only for Diffusion topics of JSON topic type. If any other type of topic selector is used in its configuration, or the topic selector matches any non JSON topic type, when an update is received for this topic, Payload convertor exception will be thrown. This service type requires the following configuration to be declared in each defined service in the configuration file:
This sink service supports getting Diffusion JSON topic updates and publishing them to a CSV file specified in the configuration. This service will correctly function only for Diffusion topics of JSON topic type. If any other type of topic selector is used in its configuration, or the topic selector matches any non JSON topic type, when an update is received for this topic, Payload converter exception will be thrown. This service type requires the following configuration to be declared in each defined service in the configuration file:

"application": {
"filePath": "./pathToCreateTheCsvFile/"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.diffusiondata.gateway.framework.ServiceMode;
import com.diffusiondata.gateway.framework.SinkHandler;
import com.diffusiondata.gateway.framework.StateHandler;
import com.diffusiondata.gateway.framework.Subscriber;
import com.diffusiondata.gateway.framework.exceptions.ApplicationConfigurationException;


Expand Down Expand Up @@ -52,8 +53,9 @@ public ApplicationDetails getApplicationDetails() throws ApplicationConfiguratio
}

@Override
public SinkHandler addSink(
public SinkHandler<?> addSink(
ServiceDefinition serviceDefinition,
Subscriber subscriber,
StateHandler stateHandler) {

final Map<String, Object> parameters =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.concurrent.CompletableFuture;

import com.diffusiondata.gateway.framework.SinkHandler;
import com.diffusiondata.gateway.framework.TopicProperties;
import com.diffusiondata.gateway.framework.TopicType;
import com.diffusiondata.gateway.framework.exceptions.InvalidConfigurationException;

Expand All @@ -34,12 +35,12 @@ final class CsvFileSinkHandler implements SinkHandler<String> {
@Override
public SinkServiceProperties getSinkServiceProperties() throws InvalidConfigurationException {
return newSinkServicePropertiesBuilder()
.payloadConvertorName("$JSON_to_CSV_STRING")
.payloadConverter("JSON_to_CSV_STRING")
.build();
}

@Override
public CompletableFuture<?> update(String diffusionTopic, String value) {
public CompletableFuture<?> update(String diffusionTopic, String value, TopicProperties topicProperties) {

final CompletableFuture<?> updateCf =
new CompletableFuture<>();
Expand All @@ -59,6 +60,11 @@ public CompletableFuture<?> update(String diffusionTopic, String value) {
return updateCf;
}

@Override
public Class<String> valueType() {
return String.class;
}

@Override
public CompletableFuture<?> pause(PauseReason pauseReason) {
return completedFuture(null);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.diffusiondata.gateway.adapter.csv.sink;

import static com.diffusiondata.gateway.framework.DiffusionGatewayFramework.initialize;
import static com.diffusiondata.gateway.framework.DiffusionGatewayFramework.start;

/**
* Main Runner class.
Expand All @@ -12,7 +12,6 @@ public static void main(String[] args) {
final CsvFileSinkApplication csvFileSinkApplication =
new CsvFileSinkApplication();

initialize(csvFileSinkApplication)
.connect();
start(csvFileSinkApplication);
}
}
Original file line number Diff line number Diff line change
@@ -1,52 +1,51 @@
package com.diffusiondata.gateway.adapter.csv.sink;

import static com.diffusiondata.gateway.convertors.CBORContext.CBOR_FACTORY;
import static com.diffusiondata.gateway.converters.CBORContext.CBOR_FACTORY;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.LinkedHashSet;
import java.util.Set;

import com.diffusiondata.gateway.framework.converters.PayloadConverter;
import com.diffusiondata.gateway.framework.exceptions.PayloadConversionException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
import com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder;
import com.diffusiondata.gateway.framework.convertors.OutboundPayloadConvertor;
import com.diffusiondata.gateway.framework.exceptions.PayloadConversionException;
import com.pushtechnology.diffusion.datatype.json.JSON;

/**
* Outbound convertor to convert JSON data to CSV string.
* <p>
* This is a one-way convertor to be used with sink services.
* A Payload converter to convert JSON data to CSV string.
* <p>
* This converter can only be used for JSON payload with simple JSON objects or
* JSON array containing simple JSON objects. It uses fields of JSON object to
* extract headers for the CSV data. If the data to convert is of type array,
* all the items of array will be looped to extract the headers. Hence, this
* convertor is suggested to be used only for simple and small JSON payload.
* converter is suggested to be used only for simple and small JSON payload.
*
* @author DiffusionData Ltd
*/
public final class SimpleJSONToCsvStringConvertor
implements OutboundPayloadConvertor<String, JSON> {
public final class SimpleJSONToCsvStringConverter
implements PayloadConverter<JSON, String> {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(CBOR_FACTORY);
private static final ObjectMapper OBJECT_MAPPER =
new ObjectMapper(CBOR_FACTORY);

@Override
public String getName() {
return "$JSON_to_CSV_STRING";
public static String getName() {
return "JSON_to_CSV_STRING";
}

@Override
public String fromDiffusionType(JSON jsonData)
public String convert(JSON jsonData)
throws PayloadConversionException {

final JsonNode jsonNode;
try {
jsonNode = OBJECT_MAPPER.readValue(jsonData.asInputStream(), JsonNode.class);
jsonNode = OBJECT_MAPPER.readValue(jsonData.asInputStream(),
JsonNode.class);
}
catch (IOException ex) {
throw new PayloadConversionException(
Expand Down Expand Up @@ -75,11 +74,6 @@ public String fromDiffusionType(JSON jsonData)
}
}

@Override
public Class<JSON> getDiffusionType() {
return JSON.class;
}

private Set<String> createHeaders(final JsonNode jsonNode)
throws PayloadConversionException {
final Set<String> headers = new LinkedHashSet<>();
Expand All @@ -105,7 +99,7 @@ private Set<String> extractObjectFields(JsonNode jsonNode)
}
else {
throw new PayloadConversionException(
"This convertor can convert only simple JSON objects and JSON" +
"This converter can convert only simple JSON objects and JSON" +
" array containing simple JSON objects");
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.diffusiondata.gateway.framework.ServiceDefinition;
import com.diffusiondata.gateway.framework.SinkHandler;
import com.diffusiondata.gateway.framework.StateHandler;
import com.diffusiondata.gateway.framework.Subscriber;

/**
* Tests for {@link CsvFileSinkApplication}.
Expand Down Expand Up @@ -47,7 +48,7 @@ void testAddSink() {

when(serviceDefinition.getParameters()).thenReturn(Collections.singletonMap("filePath", "path"));
SinkHandler sinkHandler = csvFileSinkApplication.addSink(
serviceDefinition, stateHandler);
serviceDefinition, mock(Subscriber.class), stateHandler);

assertTrue(sinkHandler instanceof CsvFileSinkHandler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;

import com.diffusiondata.gateway.framework.TopicProperties;

/**
* Tests for {@link CsvFileSinkHandler}.
Expand All @@ -24,6 +27,9 @@ class CsvFileSinkHandlerTest {
private static final String testFilePath = "./testPath";
private CsvFileSinkHandler csvFileSinkHandler;

@Mock
private TopicProperties topicProperties;

@BeforeEach
void setUp() {
csvFileSinkHandler = new CsvFileSinkHandler(testFilePath);
Expand All @@ -46,7 +52,7 @@ void testUpdateWithSimpleDiffusionTopic() throws IOException {

assertFalse(new File(expectedFinalFileName).exists());

csvFileSinkHandler.update(diffusionTopic, dataToWrite);
csvFileSinkHandler.update(diffusionTopic, dataToWrite, topicProperties);

assertTrue(new File(expectedFinalFileName).exists());

Expand All @@ -63,7 +69,7 @@ void testUpdateWithComplexDiffusionTopic() throws IOException {

assertFalse(new File(expectedFinalFileName).exists());

csvFileSinkHandler.update(diffusionTopic, dataToWrite);
csvFileSinkHandler.update(diffusionTopic, dataToWrite, topicProperties);

assertTrue(new File(expectedFinalFileName).exists());

Expand All @@ -80,15 +86,15 @@ void testUpdateMultipleTimes() throws IOException {

assertFalse(new File(expectedFinalFileName).exists());

csvFileSinkHandler.update(diffusionTopic, firstDataToWrite);
csvFileSinkHandler.update(diffusionTopic, firstDataToWrite, topicProperties);

assertTrue(new File(expectedFinalFileName).exists());

String fileContent = Files.readString(Path.of(expectedFinalFileName));
assertEquals(firstDataToWrite, fileContent);

String lastDataToWrite = "newData";
csvFileSinkHandler.update(diffusionTopic, lastDataToWrite);
csvFileSinkHandler.update(diffusionTopic, lastDataToWrite, topicProperties);

assertTrue(new File(expectedFinalFileName).exists());

Expand All @@ -108,7 +114,7 @@ void testUpdateWithCompleteFilePath() throws IOException {

assertFalse(new File(expectedFinalFileName).exists());

csvFileSinkHandler.update(diffusionTopic, dataToWrite);
csvFileSinkHandler.update(diffusionTopic, dataToWrite, topicProperties);

assertTrue(new File(expectedFinalFileName).exists());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
import com.pushtechnology.diffusion.client.Diffusion;

/**
* Tests for {@link SimpleJSONToCsvStringConvertor}.
* Tests for {@link SimpleJSONToCsvStringConverter}.
*
* @author ndhougoda-hamal
*/
class SimpleJSONToCsvStringConvertorTest {
class SimpleJSONToCsvStringConverterTest {

private static final String SIMPLE_JSON_OBJECT_TEST_DATA = "{\n" +
" \"name\": \"Sam\",\n" +
Expand Down Expand Up @@ -51,13 +51,13 @@ class SimpleJSONToCsvStringConvertorTest {
" ]\n" +
" ]";

private final SimpleJSONToCsvStringConvertor simpleJsonToCsvStringConvertor =
new SimpleJSONToCsvStringConvertor();
private final SimpleJSONToCsvStringConverter simpleJsonToCsvStringConverter =
new SimpleJSONToCsvStringConverter();

@Test
void testFromDiffusionTypeForObjectJsonData() throws PayloadConversionException {
String convertedValue =
simpleJsonToCsvStringConvertor.fromDiffusionType(
simpleJsonToCsvStringConverter.convert(
Diffusion.dataTypes().json().fromJsonString(SIMPLE_JSON_OBJECT_TEST_DATA));

assertEquals("name,profession,age\n" +
Expand All @@ -67,7 +67,7 @@ void testFromDiffusionTypeForObjectJsonData() throws PayloadConversionException
@Test
void testFromDiffusionTypeForArrayJsonData() throws PayloadConversionException {
String convertedValue =
simpleJsonToCsvStringConvertor.fromDiffusionType(
simpleJsonToCsvStringConverter.convert(
Diffusion.dataTypes().json().fromJsonString(SIMPLE_JSON_ARRAY_TEST_DATA));

assertEquals("user,membership,age,salary\n" +
Expand All @@ -79,7 +79,7 @@ void testFromDiffusionTypeForArrayJsonData() throws PayloadConversionException {
void testFromDiffusionTypeForComplexObjectJsonData() {
PayloadConversionException exception =
assertThrows(PayloadConversionException.class,
() -> simpleJsonToCsvStringConvertor.fromDiffusionType(
() -> simpleJsonToCsvStringConverter.convert(
Diffusion.dataTypes().json().fromJsonString(COMPLEX_JSON_OBJECT_TEST_DATA)));

assertEquals(
Expand All @@ -91,17 +91,17 @@ void testFromDiffusionTypeForComplexObjectJsonData() {
void testFromDiffusionTypeForComplexArrayJsonData() {
PayloadConversionException exception =
assertThrows(PayloadConversionException.class,
() -> simpleJsonToCsvStringConvertor.fromDiffusionType(
() -> simpleJsonToCsvStringConverter.convert(
Diffusion.dataTypes().json().fromJsonString(COMPLEX_JSON_ARRAY_TEST_DATA)));

assertEquals(
"This convertor can convert only simple JSON objects and JSON " +
"This converter can convert only simple JSON objects and JSON " +
"array containing simple JSON objects",
exception.getMessage());
}

void testGetNameAndToString() {
assertEquals("$JSON_to_CSV_STRING", simpleJsonToCsvStringConvertor.getName());
assertEquals("$JSON_to_CSV_STRING", simpleJsonToCsvStringConvertor.toString());
assertEquals("JSON_to_CSV_STRING", simpleJsonToCsvStringConverter.getName());
assertEquals("JSON_to_CSV_STRING", simpleJsonToCsvStringConverter.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.io.File;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -43,7 +44,7 @@ final class CsvPollingSourceHandler implements PollingSourceHandler {
this.diffusionTopicName = diffusionTopicName;
this.publisher = publisher;
this.fileName = fileName;
this.file = new File(fileName);
this.file = new File(Objects.requireNonNull(this.getClass().getResource(fileName)).getFile());
}

@Override
Expand Down Expand Up @@ -96,7 +97,7 @@ public SourceServiceProperties getSourceServiceProperties() throws InvalidConfig
return
newSourceServicePropertiesBuilder()
.updateMode(UpdateMode.STREAMING)
.payloadConvertorName("$CSV_to_JSON")
.payloadConverter("$CSV_to_JSON")
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.nio.file.WatchEvent.Kind;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -62,7 +63,7 @@ final class CsvStreamingSourceHandler implements StreamingSourceHandler {
this.stateHandler = stateHandler;
this.publisher = publisher;
this.fileName = fileName;
this.file = new File(fileName);
this.file = new File(Objects.requireNonNull(this.getClass().getResource(fileName)).getFile());
}

@Override
Expand All @@ -83,8 +84,7 @@ public CompletableFuture<?> stop() {
public SourceServiceProperties getSourceServiceProperties() throws InvalidConfigurationException {
return
newSourceServicePropertiesBuilder()
.updateMode(UpdateMode.STREAMING)
.payloadConvertorName("$CSV_to_JSON")
.payloadConverter("$CSV_to_JSON")
.build();
}

Expand Down
Loading

0 comments on commit 7d335e6

Please sign in to comment.