Skip to content
Permalink
Browse files
Added loading entire region through initial results
  • Loading branch information
jhuynh1 committed Jan 25, 2020
1 parent 04bdfc9 commit 1258cfc80c35a1c505bef05d4cf315671bf37333
Showing 8 changed files with 111 additions and 22 deletions.
@@ -13,7 +13,6 @@ public class GeodeConnectorConfig {
public static final String DURABLE_CLIENT_TIME_OUT = "durableClientTimeout";
public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000";


//GeodeKafka Specific Configuration
public static final String TASK_ID = "GEODE_TASK_ID"; //One config per task

@@ -46,6 +45,8 @@ public class GeodeConnectorConfig {
public static final String QUEUE_SIZE = "geodeConnectorQueueSize";
public static final String DEFAULT_QUEUE_SIZE = "100000";

public static final String LOAD_ENTIRE_REGION = "loadEntireRegion";
public static final String DEFAULT_LOAD_ENTIRE_REGION = "false";


private final int taskId;
@@ -9,6 +9,7 @@
import org.apache.geode.cache.query.RegionNotFoundException;
import org.apache.kafka.connect.errors.ConnectException;

import java.util.Collection;
import java.util.List;

public class GeodeContext {
@@ -40,7 +41,15 @@ public CqQuery newCq(String name, String query, CqAttributes cqAttributes, boole
cq.execute();
return cq;
} catch (RegionNotFoundException | CqException | CqExistsException e) {
e.printStackTrace();
throw new ConnectException(e);
}
}

public Collection newCqWithInitialResults(String name, String query, CqAttributes cqAttributes, boolean isDurable) throws ConnectException {
try {
CqQuery cq = clientCache.getQueryService().newCq(name, query, cqAttributes, isDurable);
return cq.executeWithInitialResults();
} catch (RegionNotFoundException | CqException | CqExistsException e) {
throw new ConnectException(e);
}
}
@@ -17,10 +17,12 @@
import static geode.kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX;
import static geode.kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_ID;
import static geode.kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT;
import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOAD_ENTIRE_REGION;
import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOCATOR;
import static geode.kafka.GeodeConnectorConfig.DEFAULT_QUEUE_SIZE;
import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT;
import static geode.kafka.GeodeConnectorConfig.LOAD_ENTIRE_REGION;
import static geode.kafka.GeodeConnectorConfig.LOCATORS;
import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE;

@@ -69,6 +71,7 @@ private Map<String, String> computeMissingConfigurations(Map<String, String> pro
props.computeIfAbsent(BATCH_SIZE, (key) -> DEFAULT_BATCH_SIZE);
props.computeIfAbsent(QUEUE_SIZE, (key) -> DEFAULT_QUEUE_SIZE);
props.computeIfAbsent(CQ_PREFIX, (key) -> DEFAULT_CQ_PREFIX);
props.computeIfAbsent(LOAD_ENTIRE_REGION, (key) -> DEFAULT_LOAD_ENTIRE_REGION);
return props;
}

@@ -14,14 +14,19 @@ class GeodeKafkaSourceListener implements CqStatusListener {

public String regionName;
private BlockingQueue<GeodeEvent> eventBuffer;
private boolean initialResultsLoaded;

public GeodeKafkaSourceListener(BlockingQueue<GeodeEvent> eventBuffer, String regionName) {
this.eventBuffer = eventBuffer;
this.regionName = regionName;
initialResultsLoaded = false;
}

@Override
public void onEvent(CqEvent aCqEvent) {
while (!initialResultsLoaded) {
Thread.yield();
}
try {
eventBuffer.offer(new GeodeEvent(regionName, aCqEvent), 2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
@@ -52,4 +57,8 @@ public void onCqDisconnected() {
public void onCqConnected() {

}

public void signalInitialResultsLoaded() {
initialResultsLoaded = true;
}
}
@@ -1,15 +1,17 @@
package geode.kafka.source;

import geode.kafka.GeodeContext;
import geode.kafka.GeodeConnectorConfig;
import geode.kafka.GeodeContext;
import org.apache.geode.cache.query.CqAttributes;
import org.apache.geode.cache.query.CqAttributesFactory;
import org.apache.geode.cache.query.CqEvent;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -19,6 +21,7 @@

import static geode.kafka.GeodeConnectorConfig.BATCH_SIZE;
import static geode.kafka.GeodeConnectorConfig.CQ_PREFIX;
import static geode.kafka.GeodeConnectorConfig.LOAD_ENTIRE_REGION;
import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE;
import static geode.kafka.GeodeConnectorConfig.REGION_NAME;

@@ -33,6 +36,7 @@ public class GeodeKafkaSourceTask extends SourceTask {
private static final Map<String, Long> OFFSET_DEFAULT = createOffset();

private GeodeContext geodeContext;
private GeodeConnectorConfig geodeConnectorConfig;
private List<String> topics;
private Map<String, Map<String, String>> sourcePartitions;
private static BlockingQueue<GeodeEvent> eventBuffer;
@@ -53,7 +57,7 @@ public String version() {
@Override
public void start(Map<String, String> props) {
try {
GeodeConnectorConfig geodeConnectorConfig = new GeodeConnectorConfig(props);
geodeConnectorConfig = new GeodeConnectorConfig(props);
logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
geodeContext = new GeodeContext(geodeConnectorConfig);

@@ -65,8 +69,9 @@ public void start(Map<String, String> props) {
topics = geodeConnectorConfig.getTopics();

String cqPrefix = props.get(CQ_PREFIX);
boolean loadEntireRegion = Boolean.parseBoolean(props.get(LOAD_ENTIRE_REGION));

installOnGeode(geodeConnectorConfig, geodeContext, eventBuffer, cqPrefix);
installOnGeode(geodeConnectorConfig, geodeContext, eventBuffer, cqPrefix, loadEntireRegion);
}
catch (Exception e) {
logger.error("Unable to start source task", e);
@@ -95,23 +100,35 @@ public void stop() {
geodeContext.getClientCache().close(true);
}

void installOnGeode(GeodeConnectorConfig geodeConnectorConfig, GeodeContext geodeContext, BlockingQueue eventBuffer, String cqPrefix) {
void installOnGeode(GeodeConnectorConfig geodeConnectorConfig, GeodeContext geodeContext, BlockingQueue eventBuffer, String cqPrefix, boolean loadEntireRegion) {
boolean isDurable = geodeConnectorConfig.isDurable();
int taskId = geodeConnectorConfig.getTaskId();
for (String region : geodeConnectorConfig.getRegionNames()) {
installListenersToRegion(geodeContext, taskId, eventBuffer, region, cqPrefix, isDurable);
installListenersToRegion(geodeContext, taskId, eventBuffer, region, cqPrefix, loadEntireRegion, isDurable);
}
if (isDurable) {
geodeContext.getClientCache().readyForEvents();
}
}

void installListenersToRegion(GeodeContext geodeContext, int taskId, BlockingQueue<GeodeEvent> eventBuffer, String regionName, String cqPrefix, boolean isDurable) {
void installListenersToRegion(GeodeContext geodeContext, int taskId, BlockingQueue<GeodeEvent> eventBuffer, String regionName, String cqPrefix, boolean loadEntireRegion, boolean isDurable) {
CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
cqAttributesFactory.addCqListener(new GeodeKafkaSourceListener(eventBuffer, regionName));
GeodeKafkaSourceListener listener = new GeodeKafkaSourceListener(eventBuffer, regionName);
cqAttributesFactory.addCqListener(listener);
CqAttributes cqAttributes = cqAttributesFactory.create();
geodeContext.newCq(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes,
isDurable);
try {
if (loadEntireRegion) {
Collection<CqEvent> events = geodeContext.newCqWithInitialResults(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes,
isDurable);
eventBuffer.addAll(events.stream().map(e -> new GeodeEvent(regionName, e)).collect(Collectors.toList()));
} else {
geodeContext.newCq(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes,
isDurable);
}
}
finally {
listener.signalInitialResultsLoaded();
}
}

/**
@@ -2,7 +2,9 @@

import org.junit.Test;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.hamcrest.CoreMatchers.allOf;
import static org.hamcrest.CoreMatchers.is;
@@ -41,4 +43,16 @@ public void shouldBeAbleToParseGeodeLocatorStrings() {
List<LocatorHostPort> locators = config.parseLocators(locatorString);
assertThat(2, is(locators.size()));
}

@Test
public void durableClientIdShouldNotBeSetIfPropertyIsNotSet() {
Map<String, String> props = new HashMap<>();
GeodeConnectorConfig config = new GeodeConnectorConfig(props);
assertEquals("", config.getDurableClientId());
}

@Test
public void cqPrefixShouldBeProperlyCalculatedFromProps() {

}
}
@@ -1,24 +1,69 @@
package geode.kafka.source;

import geode.kafka.GeodeConnectorConfig;
import geode.kafka.GeodeContext;
import org.apache.geode.cache.query.CqEvent;
import org.junit.Test;

import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import static geode.kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX;
import static geode.kafka.GeodeConnectorConfig.REGION_NAME;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class GeodeKafkaSourceTaskTest {


@Test
public void cqListenerOnEventPopulatesEventsBuffer() {
public void whenLoadingEntireRegionAbleToPutInitialResultsIntoEventBuffer() {
GeodeContext geodeContext = mock(GeodeContext.class);
BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
boolean loadEntireRegion = true;
boolean isDurable = false;
List<CqEvent> fakeInitialResults = new LinkedList<>();
for (int i = 0; i < 10; i++) {
fakeInitialResults.add(mock(CqEvent.class));
}

when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean())).thenReturn(fakeInitialResults);
GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
task.installListenersToRegion(geodeContext, 1, eventBuffer, "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
assertEquals(10, eventBuffer.size());
}

@Test
public void whenNotLoadingEntireRegionShouldNotPutInitialResultsIntoEventBuffer() {
GeodeContext geodeContext = mock(GeodeContext.class);
BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
boolean loadEntireRegion = false;
boolean isDurable = false;
List<CqEvent> fakeInitialResults = new LinkedList<>();
for (int i = 0; i < 10; i++) {
fakeInitialResults.add(mock(CqEvent.class));
}

when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean())).thenReturn(fakeInitialResults);
GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
task.installListenersToRegion(geodeContext, 1, eventBuffer, "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
assertEquals(0, eventBuffer.size());
}

@Test
public void cqListenerOnEventPopulatesEventsBuffer() {}

@Test
public void pollReturnsEventsWhenEventBufferHasValues() {

@@ -1,22 +1,13 @@
package geode.kafka.source;

import geode.kafka.GeodeConnectorConfig;
import org.junit.Test;

import java.util.HashMap;
import java.util.Map;

public class GeodeKafkaSourceTest {

@Test
public void durableClientIdShouldNotBeSetIfPropertyIsNotSet() {
GeodeKafkaSource source = new GeodeKafkaSource();
Map<String, String> props = new HashMap<>();
source.start(props);

}

@Test
public void cqPrefixShouldBeProperlyCalculatedFromProps() {

}
}

0 comments on commit 1258cfc

Please sign in to comment.