Skip to content
Permalink
Browse files
Added sink task and sink class
Added end to end sink test
  • Loading branch information
jhuynh1 committed Jan 23, 2020
1 parent 337b2f8 commit a570805518713d82c8819daa01e8fa337d1cf787
Showing 5 changed files with 140 additions and 76 deletions.
@@ -1,76 +1,81 @@
package geode.kafka.sink;

import geode.kafka.GeodeConnectorConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class GeodeKafkaSink {

//
// /** Sink properties. */
// private Map<String, String> configProps;
//
// /** Expected configurations. */
// private static final ConfigDef CONFIG_DEF = new ConfigDef();
//
// /** {@inheritDoc} */
// @Override public String version() {
// return AppInfoParser.getVersion();
// }
//
// /**
// * A sink lifecycle method. Validates grid-specific sink properties.
// *
// * @param props Sink properties.
// */
// @Override public void start(Map<String, String> props) {
// configProps = props;
//
// try {
// A.notNullOrEmpty(configProps.get(SinkConnector.TOPICS_CONFIG), "topics");
// A.notNullOrEmpty(configProps.get(IgniteSinkConstants.CACHE_NAME), "cache name");
// A.notNullOrEmpty(configProps.get(IgniteSinkConstants.CACHE_CFG_PATH), "path to cache config file");
// }
// catch (IllegalArgumentException e) {
// throw new ConnectException("Cannot start IgniteSinkConnector due to configuration error", e);
// }
// }
//
// /**
// * Obtains a sink task class to be instantiated for feeding data into grid.
// *
// * @return IgniteSinkTask class.
// */
// @Override public Class<? extends Task> taskClass() {
// return IgniteSinkTask.class;
// }
//
// /**
// * Builds each config for <tt>maxTasks</tt> tasks.
// *
// * @param maxTasks Max number of tasks.
// * @return Task configs.
// */
// @Override public List<Map<String, String>> taskConfigs(int maxTasks) {
// List<Map<String, String>> taskConfigs = new ArrayList<>();
// Map<String, String> taskProps = new HashMap<>();
//
// taskProps.putAll(configProps);
//
// for (int i = 0; i < maxTasks; i++)
// taskConfigs.add(taskProps);
//
// return taskConfigs;
// }
//
// /** {@inheritDoc} */
// @Override public void stop() {
// // No-op.
// }
//
// /** {@inheritDoc} */
// @Override public ConfigDef config() {
// return CONFIG_DEF;
// }
import static geode.kafka.GeodeConnectorConfig.BATCH_SIZE;
import static geode.kafka.GeodeConnectorConfig.CQ_PREFIX;
import static geode.kafka.GeodeConnectorConfig.DEFAULT_BATCH_SIZE;
import static geode.kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX;
import static geode.kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_ID;
import static geode.kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT;
import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOCATOR;
import static geode.kafka.GeodeConnectorConfig.DEFAULT_QUEUE_SIZE;
import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT;
import static geode.kafka.GeodeConnectorConfig.LOCATORS;
import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE;

public class GeodeKafkaSink extends SinkConnector {
private static final ConfigDef CONFIG_DEF = new ConfigDef();
private Map<String, String> sharedProps;

@Override
public void start(Map<String, String> props) {
sharedProps = computeMissingConfigurations(props);
}

@Override
public Class<? extends Task> taskClass() {
return GeodeKafkaSinkTask.class;
}

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> taskConfigs = new ArrayList<>();
Map<String, String> taskProps = new HashMap<>();

taskProps.putAll(sharedProps);

for (int i = 0; i < maxTasks; i++) {
taskProps.put(GeodeConnectorConfig.TASK_ID, "" + i);
taskConfigs.add(taskProps);
}

return taskConfigs;
}

@Override
public void stop() {

}

@Override
public ConfigDef config() {
return CONFIG_DEF;
}

@Override
public String version() {
//TODO
return "unknown";
}


private Map<String, String> computeMissingConfigurations(Map<String, String> props) {
props.computeIfAbsent(LOCATORS, (key)-> DEFAULT_LOCATOR);
props.computeIfAbsent(DURABLE_CLIENT_TIME_OUT, (key) -> DEFAULT_DURABLE_CLIENT_TIMEOUT);
props.computeIfAbsent(DURABLE_CLIENT_ID_PREFIX, (key) -> DEFAULT_DURABLE_CLIENT_ID);
props.computeIfAbsent(BATCH_SIZE, (key) -> DEFAULT_BATCH_SIZE);
props.computeIfAbsent(QUEUE_SIZE, (key) -> DEFAULT_QUEUE_SIZE);
props.computeIfAbsent(CQ_PREFIX, (key) -> DEFAULT_CQ_PREFIX);
return props;
}
}
@@ -28,6 +28,7 @@
public class GeodeKafkaSource extends SourceConnector {

private Map<String, String> sharedProps;
//TODO maybe club this into GeodeConnnectorConfig
private static final ConfigDef CONFIG_DEF = new ConfigDef();


@@ -40,7 +41,6 @@ public Class<? extends Task> taskClass() {
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> taskConfigs = new ArrayList<>();
Map<String, String> taskProps = new HashMap<>();

taskProps.putAll(sharedProps);

for (int i = 0; i < maxTasks; i++) {
@@ -69,7 +69,7 @@ public void start(Map<String, String> props) {
installOnGeode(geodeConnectorConfig, geodeContext, eventBuffer, cqPrefix);
}
catch (Exception e) {
logger.error("Unable to start task", e);
logger.error("Unable to start source task", e);
throw e;
}
}
@@ -81,7 +81,7 @@ public List<SourceRecord> poll() throws InterruptedException {
if (eventBuffer.drainTo(events, batchSize) > 0) {
for (GeodeEvent event : events) {
for (String topic : topics) {
records.add(new SourceRecord(sourcePartitions.get(event.getRegionName()), OFFSET_DEFAULT, topic, null, event.getEvent()));
records.add(new SourceRecord(sourcePartitions.get(event.getRegionName()), OFFSET_DEFAULT, topic, null, event.getEvent().getNewValue()));
}
}
return records;
@@ -13,25 +13,35 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertNotNull;

public class GeodeKafkaTestCluster {

@@ -54,10 +64,17 @@ public static void setup() throws IOException, QuorumPeerConfig.ConfigException,
startKafka();
startGeode();
createTopic();

startWorker();
consumer = createConsumer();
Thread.sleep(5000);
}

@Before
public void beforeTests() {
}

@After
public void afterTests() {

}

@AfterClass
@@ -86,7 +103,7 @@ private static void createTopic() {
Properties topicProperties = new Properties();
topicProperties.put("flush.messages", "1");
AdminZkClient adminZkClient = new AdminZkClient(zkClient);
adminZkClient.createTopic(TEST_TOPICS,3
adminZkClient.createTopic(TEST_TOPICS,1
,1, topicProperties, RackAwareMode.Disabled$.MODULE$);
}

@@ -161,6 +178,21 @@ public static Consumer<String,String> createConsumer() {
return consumer;
}

//consumer props, less important, just for testing?
public static Producer<String,String> createProducer() {
final Properties props = new Properties();
props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());

// Create the producer using props.
final Producer<String, String> producer =
new KafkaProducer<>(props);
return producer;
}

@Test
public void endToEndSourceTest() {
ClientCache client = createGeodeClient();
@@ -179,4 +211,18 @@ public void endToEndSourceTest() {
});
}

@Test
public void endToEndSinkTest() {
ClientCache client = createGeodeClient();
Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGIONS);

Producer<String, String> producer = createProducer();
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord(TEST_TOPICS, "KEY" + i, "VALUE" + i));
}

int i = 0;
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> assertNotNull(region.get("KEY" + i)));
}

}
@@ -1,5 +1,6 @@
package geode.kafka;

import geode.kafka.sink.GeodeKafkaSink;
import geode.kafka.source.GeodeKafkaSource;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
@@ -62,8 +63,20 @@ public static void main(String[] args) throws IOException {
herder.putConnectorConfig(
sourceProps.get(ConnectorConfig.NAME_CONFIG),
sourceProps, true, (error, result)->{
System.out.println("CALLBACK: " + result + "::: error?" + error);
});

Map<String, String> sinkProps = new HashMap<>();
sinkProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSink.class.getName());
sinkProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-sink-connector");
sinkProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
sinkProps.put(REGIONS, TEST_REGIONS);
sinkProps.put(TOPICS, TEST_TOPICS);

herder.putConnectorConfig(
sinkProps.get(ConnectorConfig.NAME_CONFIG),
sinkProps, true, (error, result)->{
});


}
}

0 comments on commit a570805

Please sign in to comment.