Skip to content
Permalink
Browse files
Added JsonPdxConverter (#2)
- Allows PdxInstance objects to be converted to JSON bytes when sourced
from a Geode region into a Kafka topic
- Allows JSON bytes to be converted to PdxInstance objects when sinked
into a Geode region from a Kafka topic
- Added unit and DUnit tests for JsonPdxConverter
- Added functionality to the test framework to specify custom key
converter, custom value converter and configuration properties for each
- Added TestObject class to allow validation of
serialization/deserialization

Authored-by: Donal Evans <doevans@pivotal.io>
  • Loading branch information
DonalEvans committed Feb 21, 2020
1 parent 97a9f92 commit bc637cdc3c49557e81b51f8b5e7f1d9e0b083789
Showing 11 changed files with 483 additions and 10 deletions.
@@ -23,6 +23,7 @@

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.storage.StringConverter;

public class GeodeConnectorConfig extends AbstractConfig {

@@ -42,6 +43,9 @@ public class GeodeConnectorConfig extends AbstractConfig {
public static final String SECURITY_USER = "security-username";
public static final String SECURITY_PASSWORD = "security-password";

public static final String DEFAULT_KEY_CONVERTER = StringConverter.class.getCanonicalName();
public static final String DEFAULT_VALUE_CONVERTER = StringConverter.class.getCanonicalName();

protected final int taskId;
protected List<LocatorHostPort> locatorHostPorts;
private String securityClientAuthInit;
@@ -63,6 +63,7 @@ public ClientCache createClientCache(List<LocatorHostPort> locators, String dura
String securityPassword, boolean usesSecurity) {
ClientCacheFactory ccf = new ClientCacheFactory();

ccf.setPdxReadSerialized(true);
if (usesSecurity) {
if (securityUserName != null && securityPassword != null) {
ccf.set(SECURITY_USER, securityUserName);
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.kafka.converter;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.storage.Converter;

import org.apache.geode.pdx.JSONFormatter;
import org.apache.geode.pdx.PdxInstance;

public class JsonPdxConverter implements Converter {
public static final String JSON_TYPE_ANNOTATION = "\"@type\"";
// Default value = false
public static final String ADD_TYPE_ANNOTATION_TO_JSON = "add-type-annotation-to-json";
private Map<String, String> internalConfig = new HashMap<>();

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
if (configs != null) {
configs.forEach((key, value) -> internalConfig.put(key, (String) value));
}
}

@Override
public byte[] fromConnectData(String topic, Schema schema, Object value) {
PdxInstance pdxInstanceValue = (PdxInstance) value;
byte[] jsonBytes = getJsonBytes(pdxInstanceValue);
if (!shouldAddTypeAnnotation()) {
return jsonBytes;
}
String jsonString = new String(jsonBytes);
if (!jsonString.contains(JSON_TYPE_ANNOTATION)) {
jsonString = jsonString.replaceFirst("\\{",
"{" + JSON_TYPE_ANNOTATION + " : \"" + pdxInstanceValue.getClassName() + "\",");
}
return jsonString.getBytes();
}

@Override
public SchemaAndValue toConnectData(String topic, byte[] value) {
return new SchemaAndValue(null, JSONFormatter.fromJSON(value));
}

byte[] getJsonBytes(PdxInstance pdxInstanceValue) {
return JSONFormatter.toJSONByteArray(pdxInstanceValue);
}

boolean shouldAddTypeAnnotation() {
return Boolean.parseBoolean(internalConfig.get(ADD_TYPE_ANNOTATION_TO_JSON));
}

public Map<String, String> getInternalConfig() {
return internalConfig;
}
}
@@ -101,7 +101,6 @@ public GeodeAsSinkDUnitTest(int numTask, int numPartition) {

@Test
public void whenKafkaProducerProducesEventsThenGeodeMustReceiveTheseEvents() throws Exception {

MemberVM locator = clusterStartupRule.startLocatorVM(0, 10334);
int locatorPort = locator.getPort();
MemberVM server = clusterStartupRule.startServerVM(1, locatorPort);
@@ -54,7 +54,6 @@ public class GeodeAsSourceDUnitTest {
@Rule
public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3);


@Rule
public TestName testName = new TestName();

@@ -0,0 +1,205 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.kafka.converter;

import static org.apache.geode.kafka.GeodeConnectorConfig.DEFAULT_KEY_CONVERTER;
import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.createTopic;
import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.deleteTopic;
import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.getKafkaConfig;
import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.getZooKeeperProperties;
import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.startKafka;
import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.startWorkerAndHerderCluster;
import static org.apache.geode.kafka.utilities.GeodeKafkaTestUtils.startZooKeeper;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import kafka.zk.KafkaZkClient;
import org.apache.kafka.common.utils.Time;
import org.assertj.core.util.Arrays;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;

import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.kafka.utilities.KafkaLocalCluster;
import org.apache.geode.kafka.utilities.TestObject;
import org.apache.geode.kafka.utilities.WorkerAndHerderCluster;
import org.apache.geode.pdx.PdxInstance;
import org.apache.geode.pdx.PdxInstanceFactory;
import org.apache.geode.pdx.ReflectionBasedAutoSerializer;
import org.apache.geode.test.dunit.rules.ClientVM;
import org.apache.geode.test.dunit.rules.ClusterStartupRule;
import org.apache.geode.test.dunit.rules.MemberVM;

public class JsonPdxConverterDUnitTest {
@Rule
public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3);

@Rule
public TestName testName = new TestName();

@ClassRule
public static TemporaryFolder temporaryFolderForZooKeeper = new TemporaryFolder();

@Rule
public TemporaryFolder temporaryFolderForOffset = new TemporaryFolder();

@BeforeClass
public static void setup()
throws Exception {
startZooKeeper(getZooKeeperProperties(temporaryFolderForZooKeeper));
}

@AfterClass
public static void cleanUp() {
KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",
false,
200000,
15000,
10,
Time.SYSTEM,
"myGroup",
"myMetricType",
null);

zkClient.close();
}

@Test
public void jsonPdxConverterCanConvertPdxInstanceToJsonAndBackWhenDataMovesFromRegionToTopicToRegion()
throws Exception {
MemberVM locator = clusterStartupRule.startLocatorVM(0);
int locatorPort = locator.getPort();
// .withPDXReadSerialized()
MemberVM server1 = clusterStartupRule.startServerVM(1, server -> server
.withConnectionToLocator(locatorPort)
// .withPDXReadSerialized()
);
ClientVM client1 = clusterStartupRule.startClientVM(2, client -> client
.withLocatorConnection(locatorPort)
.withCacheSetup(
cf -> cf.setPdxSerializer(new ReflectionBasedAutoSerializer("org.apache.geode.kafka.*"))
.setPdxReadSerialized(true)));

// Set unique names for all the different components
String sourceRegionName = "SOURCE_REGION";
String sinkRegionName = "SINK_REGION";
// We only need one topic for this test, which we will both source to and sink from
String topicName = "TEST_TOPIC";

/*
* Start the Apache Geode cluster and create the source and sink regions.
* Create a Apache Geode client which can insert data into the source and get data from the sink
*/
server1.invoke(() -> {
ClusterStartupRule.getCache().createRegionFactory(RegionShortcut.PARTITION)
.create(sourceRegionName);
ClusterStartupRule.getCache().createRegionFactory(RegionShortcut.PARTITION)
.create(sinkRegionName);
});
client1.invoke(() -> {
ClientCache clientCache = ClusterStartupRule.getClientCache();
clientCache.createClientRegionFactory(ClientRegionShortcut.PROXY)
.create(sourceRegionName);
clientCache.createClientRegionFactory(ClientRegionShortcut.PROXY)
.create(sinkRegionName);
});

/*
* Start the Kafka Cluster, workers and the topic to which the Apache Geode will connect as
* a source
*/
WorkerAndHerderCluster workerAndHerderCluster = null;
KafkaLocalCluster kafkaLocalCluster = null;
try {
kafkaLocalCluster = startKafka(
getKafkaConfig(temporaryFolderForOffset.newFolder("kafkaLogs").getAbsolutePath()));
createTopic(topicName, 1, 1);
// Create workers and herder cluster
workerAndHerderCluster = startWorkerAndHerderCluster(1, sourceRegionName, sinkRegionName,
topicName, topicName, temporaryFolderForOffset.getRoot().getAbsolutePath(),
"localhost[" + locatorPort + "]", DEFAULT_KEY_CONVERTER, "",
JsonPdxConverter.class.getCanonicalName(),
JsonPdxConverter.ADD_TYPE_ANNOTATION_TO_JSON + "=true");

// Insert data into the Apache Geode source and retrieve the data from the Apache Geode sink
// from the client
client1.invoke(() -> {
// Create an object that will be serialized into a PdxInstance
String name = "testName";
int age = 42;
double number = 3.141;
List<String> words = new ArrayList<>();
words.add("words1");
words.add("words2");
words.add("words3");
TestObject originalObject = new TestObject(name, age, number, words);

ClientCache clientCache = ClusterStartupRule.getClientCache();

// Create a PdxInstance from the test object
PdxInstanceFactory instanceFactory =
clientCache.createPdxInstanceFactory(originalObject.getClass().getName());
Arrays.asList(originalObject.getClass().getFields())
.stream()
.map(field -> (Field) field)
.forEach(field -> {
try {
Object value = field.get(originalObject);
Class type = field.getType();
instanceFactory.writeField(field.getName(), value, type);
} catch (IllegalAccessException ignore) {
}
});
PdxInstance putInstance = instanceFactory.create();

// Put the PdxInstance into the source region
String key = "key1";
clientCache.getRegion(sourceRegionName).put(key, putInstance);

// Assert that the data that arrives in the sink region is the same as the data that was put
// into the source region
Region<Object, Object> sinkRegion = clientCache.getRegion(sinkRegionName);
await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> assertEquals(1, sinkRegion.sizeOnServer()));
PdxInstance getInstance = (PdxInstance) sinkRegion.get(key);

assertEquals(originalObject, getInstance.getObject());
});

} finally {
// Clean up by deleting the topic
deleteTopic(topicName);
if (workerAndHerderCluster != null) {
workerAndHerderCluster.stop();
}
if (kafkaLocalCluster != null) {
kafkaLocalCluster.stop();
}
}
}
}

0 comments on commit bc637cd

Please sign in to comment.