Skip to content
Permalink
Browse files
end to end test "passes" Not sure why we dont get all the events or d…
…on't get any events until enough polls are called
  • Loading branch information
jhuynh1 committed Jan 23, 2020
1 parent 94f18cb commit 70328411399865005cddfdc19bd63f9fc7c443c3
Showing 6 changed files with 243 additions and 171 deletions.
@@ -13,8 +13,8 @@ repositories {

dependencies {

compile 'org.apache.geode:geode-core:1.11.0'
compile 'org.apache.geode:geode-cq:1.11.0'
compile 'org.apache.geode:geode-core:1.10.0'
compile 'org.apache.geode:geode-cq:1.10.0'
compile(group: 'org.apache.kafka', name: 'connect-api', version: '2.3.1')
compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.13.0'
compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.13.0'
@@ -27,4 +27,7 @@ dependencies {

testCompile group: 'junit', name: 'junit', version: '4.12'

testImplementation 'org.awaitility:awaitility:4.0.2'


}
@@ -10,11 +10,23 @@
import java.util.List;
import java.util.Map;

public class GeodeKafkaSource extends SourceConnector {
import static kafka.GeodeConnectorConfig.BATCH_SIZE;
import static kafka.GeodeConnectorConfig.CQ_PREFIX;
import static kafka.GeodeConnectorConfig.DEFAULT_BATCH_SIZE;
import static kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX;
import static kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_ID;
import static kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT;
import static kafka.GeodeConnectorConfig.DEFAULT_LOCATOR;
import static kafka.GeodeConnectorConfig.DEFAULT_QUEUE_SIZE;
import static kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
import static kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT;
import static kafka.GeodeConnectorConfig.LOCATORS;
import static kafka.GeodeConnectorConfig.QUEUE_SIZE;
import static kafka.GeodeConnectorConfig.REGIONS;
import static kafka.GeodeConnectorConfig.TOPICS;
import static kafka.GeodeKafkaSourceTask.TASK_ID;

public static String REGION_NAME = "GEODE_REGION_NAME";
private String regionName;
private static String TOPICS = "TOPICS";
public class GeodeKafkaSource extends SourceConnector {

private Map<String, String> sharedProps;
private static final ConfigDef CONFIG_DEF = new ConfigDef();
@@ -34,9 +46,11 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
taskProps.putAll(sharedProps);

// use the same props for all tasks at the moment
for (int i = 0; i < maxTasks; i++)
for (int i = 0; i < maxTasks; i++) {
//TODO partition regions and topics
taskProps.put(TASK_ID, "" + i);
taskConfigs.add(taskProps);

}
return taskConfigs;
}

@@ -48,7 +62,17 @@ public ConfigDef config() {

@Override
public void start(Map<String, String> props) {
sharedProps = props;
sharedProps = computeMissingConfigurations(props);
}

private Map<String, String> computeMissingConfigurations(Map<String, String> props) {
props.computeIfAbsent(LOCATORS, (key)-> DEFAULT_LOCATOR);
props.computeIfAbsent(DURABLE_CLIENT_TIME_OUT, (key) -> DEFAULT_DURABLE_CLIENT_TIMEOUT);
props.computeIfAbsent(DURABLE_CLIENT_ID_PREFIX, (key) -> DEFAULT_DURABLE_CLIENT_ID);
props.computeIfAbsent(BATCH_SIZE, (key) -> DEFAULT_BATCH_SIZE);
props.computeIfAbsent(QUEUE_SIZE, (key) -> DEFAULT_QUEUE_SIZE);
props.computeIfAbsent(CQ_PREFIX, (key) -> DEFAULT_CQ_PREFIX);
return props;
}

@Override
@@ -60,4 +84,8 @@ public void stop() {
public String version() {
return AppInfoParser.getVersion();
}

public Map<String, String> getSharedProps() {
return sharedProps;
}
}
@@ -4,134 +4,196 @@
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.query.CqAttributes;
import org.apache.geode.cache.query.CqAttributesFactory;
import org.apache.geode.cache.query.CqEvent;
import org.apache.geode.cache.query.CqException;
import org.apache.geode.cache.query.CqExistsException;
import org.apache.geode.cache.query.CqListener;
import org.apache.geode.cache.query.RegionNotFoundException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static kafka.GeodeConnectorConfig.BATCH_SIZE;
import static kafka.GeodeConnectorConfig.CQ_PREFIX;
import static kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX;
import static kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_ID;
import static kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT;
import static kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
import static kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT;
import static kafka.GeodeConnectorConfig.QUEUE_SIZE;
import static kafka.GeodeConnectorConfig.REGION_NAME;

public class GeodeKafkaSourceTask extends SourceTask {
private static String REGION_NAME = "REGION_NAME";
private static String OFFSET = "OFFSET";
private static String topics[];
private int batchSize;
private int queueSize;
private static BlockingQueue<CqEvent> eventBuffer;
private Map<String, String> sourcePartition;
private Map<String, Long> offset;

private ClientCache clientCache;

@Override
public String version() {
return null;
}

@Override
public void start(Map<String, String> props) {
System.out.println("JASON task start");
batchSize = 100;
queueSize = 100000;
String regionName = "someRegion";
eventBuffer = new LinkedBlockingQueue<>(queueSize);
topics = new String[] {"someTopic"};
sourcePartition = new HashMap<>();
sourcePartition.put(REGION_NAME, regionName);

offset = new HashMap<>();
offset.put("OFFSET", 0L);

installOnGeode("localHost", 10334, "someRegion");
System.out.println("JASON task start end");
}

@Override
public List<SourceRecord> poll() throws InterruptedException {
// System.out.println("JASON polling");
ArrayList<SourceRecord> records = new ArrayList<>(batchSize);
ArrayList<CqEvent> events = new ArrayList<>(batchSize);
if (eventBuffer.drainTo(events, batchSize) > 0) {
for (CqEvent event : events) {

for (String topic : topics)
records.add(new SourceRecord(sourcePartition, offset, topic, null, event));
}

System.out.println("JASON we polled and returning records" + records.size());
return records;
}

// System.out.println("JASON we didn't poll any records");
return null;
}

@Override
public void stop() {
clientCache.close(true);
}

private void installOnGeode(String locatorHost, int locatorPort, String regionName) {
clientCache = new ClientCacheFactory().set("durable-client-id", "someClient")
.set("durable-client-timeout", "200")
.setPoolSubscriptionEnabled(true).addPoolLocator(locatorHost, locatorPort).create();
CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
cqAttributesFactory.addCqListener(new GeodeKafkaSourceListener());
System.out.println("JASON installing on Geode");
CqAttributes cqAttributes = cqAttributesFactory.create();
try {
System.out.println("JASON installing new cq");
clientCache.getQueryService().newCq("kafkaCQFor" + regionName, "select * from /" + regionName, cqAttributes,
true).execute();
System.out.println("JASON finished installing cq");
} catch (CqExistsException e) {
System.out.println("UHH");
e.printStackTrace();
} catch (CqException | RegionNotFoundException e) {
System.out.println("UHH e");
e.printStackTrace();
//property string to pass in to identify task
public static final String TASK_ID = "GEODE_TASK_ID";
private static final String TASK_PREFIX = "TASK";
private static final String DOT = ".";
private static final Map<String, Long> OFFSET_DEFAULT = createOffset();

private int taskId;
private ClientCache clientCache;
private List<String> regionNames;
private List<String> topics;
private Map<String, Map<String, String>> sourcePartitions;
private static BlockingQueue<GeodeEvent> eventBuffer;
private int batchSize;


private static Map<String, Long> createOffset() {
Map<String, Long> offset = new HashMap<>();
offset.put("OFFSET", 0L);
return offset;
}
catch (Exception e) {
System.out.println("UHHHHHH " + e);

@Override
public String version() {
return null;
}
System.out.println("JASON task calling ready for events");
clientCache.readyForEvents();
System.out.println("JASON task ready for events");
}

private static class GeodeKafkaSourceListener implements CqListener {
@Override
public void start(Map<String, String> props) {
try {
System.out.println("JASON task start");
taskId = Integer.parseInt(props.get(TASK_ID));
batchSize = Integer.parseInt(props.get(BATCH_SIZE));
int queueSize = Integer.parseInt(props.get(QUEUE_SIZE));
eventBuffer = new LinkedBlockingQueue<>(queueSize);

//grouping will be done in the source and not the task
regionNames = parseNames(props.get(GeodeConnectorConfig.REGIONS));
topics = parseNames(props.get(GeodeConnectorConfig.TOPICS));
sourcePartitions = createSourcePartitionsMap(regionNames);

String durableClientId = props.get(DURABLE_CLIENT_ID_PREFIX);
if (!durableClientId.equals("")) {
durableClientId += taskId;
}
System.out.println("JASON durable client id is:" + durableClientId);
String durableClientTimeout = props.get(DURABLE_CLIENT_TIME_OUT);
String cqPrefix = props.get(CQ_PREFIX);

List<LocatorHostPort> locators = parseLocators(props.get(GeodeConnectorConfig.LOCATORS));
installOnGeode(taskId, eventBuffer, locators, regionNames, durableClientId, durableClientTimeout, cqPrefix);
System.out.println("JASON task start finished");
}
catch (Exception e) {
System.out.println("Exception:" + e);
e.printStackTrace();
throw e;
}
}

@Override
public void onEvent(CqEvent aCqEvent) {
try {
System.out.println("JASON cqEvent and putting into eventBuffer");
eventBuffer.offer(aCqEvent, 2, TimeUnit.SECONDS);
} catch (InterruptedException e) {

while (true) {
try {
if (!eventBuffer.offer(aCqEvent, 2, TimeUnit.SECONDS))
break;
} catch (InterruptedException ex) {
ex.printStackTrace();
}
System.out.println("GeodeKafkaSource Queue is full");
public List<SourceRecord> poll() throws InterruptedException {
ArrayList<SourceRecord> records = new ArrayList<>(batchSize);
ArrayList<GeodeEvent> events = new ArrayList<>(batchSize);
if (eventBuffer.drainTo(events, batchSize) > 0) {
for (GeodeEvent event : events) {
for (String topic : topics) {
records.add(new SourceRecord(sourcePartitions.get(event.getRegionName()), OFFSET_DEFAULT, topic, null, event.getEvent()));
// records.add(new SourceRecord(sourcePartitions.get(event.getRegionName()), OFFSET_DEFAULT, topic, null, "STRING"));
}
}

return records;
}
}

return null;
}

@Override
public void onError(CqEvent aCqEvent) {
public void stop() {
clientCache.close(true);
}

ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName, String durableClientTimeOut) {
ClientCacheFactory ccf = new ClientCacheFactory().set("durable-client-id", durableClientName)
.set("durable-client-timeout", durableClientTimeOut)
.setPoolSubscriptionEnabled(true);
for (LocatorHostPort locator: locators) {
ccf.addPoolLocator(locator.getHostName(), locator.getPort()).create();
}
return ccf.create();
}

void installOnGeode(int taskId, BlockingQueue<GeodeEvent> eventBuffer, List<LocatorHostPort> locators, List<String> regionNames, String durableClientId, String durableClientTimeout, String cqPrefix) {
boolean isDurable = isDurable(durableClientId);

clientCache = createClientCache(locators, durableClientId, durableClientTimeout);
for (String region : regionNames) {
installListenersToRegion(taskId, eventBuffer, region, cqPrefix, isDurable);
}
if (isDurable) {
clientCache.readyForEvents();
}
}

void installListenersToRegion(int taskId, BlockingQueue<GeodeEvent> eventBuffer, String regionName, String cqPrefix, boolean isDurable) {
CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
cqAttributesFactory.addCqListener(new GeodeKafkaSourceListener(eventBuffer, regionName));
System.out.println("JASON installing on Geode");
CqAttributes cqAttributes = cqAttributesFactory.create();
try {
System.out.println("JASON installing new cq");
clientCache.getQueryService().newCq(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes,
isDurable).execute();
System.out.println("JASON finished installing cq");
} catch (CqExistsException e) {
System.out.println("UHH");
e.printStackTrace();
} catch (CqException | RegionNotFoundException e) {
System.out.println("UHH e");
e.printStackTrace();
} catch (Exception e) {
System.out.println("UHHHHHH " + e);
}
}


List<String> parseNames(String names) {
return Arrays.stream(names.split(",")).map((s) -> s.trim()).collect(Collectors.toList());
}

List<LocatorHostPort> parseLocators(String locators) {
return Arrays.stream(locators.split(",")).map((s) -> {
String locatorString = s.trim();
return parseLocator(locatorString);
}).collect(Collectors.toList());
}

private LocatorHostPort parseLocator(String locatorString) {
String[] splits = locatorString.split("\\[");
String locator = splits[0];
int port = Integer.parseInt(splits[1].replace("]", ""));
return new LocatorHostPort(locator, port);
}

boolean isDurable(String durableClientId) {
return !durableClientId.equals("");
}

String generateCqName(int taskId, String cqPrefix, String regionName) {
return cqPrefix + DOT + TASK_PREFIX + taskId + DOT + regionName;
}

/**
* converts a list of regions names into a map of source partitions
*
* @param regionNames list of regionNames
* @return Map<String, Map < String, String>> a map of source partitions, keyed by region name
*/
Map<String, Map<String, String>> createSourcePartitionsMap(List<String> regionNames) {
return regionNames.stream().map(regionName -> {
Map<String, String> sourcePartition = new HashMap<>();
sourcePartition.put(REGION_NAME, regionName);
return sourcePartition;
}).collect(Collectors.toMap(s -> s.get(REGION_NAME), s -> s));
}
}
}

0 comments on commit 7032841

Please sign in to comment.