Skip to content
Permalink
Browse files
Configurable shared event buffer
  • Loading branch information
jhuynh1 committed Feb 4, 2020
1 parent 7b4ee00 commit 4ff1f22dd88dbe0487494a4b7d0dedbf4a205f9a
Showing 6 changed files with 122 additions and 22 deletions.
@@ -0,0 +1,7 @@
package geode.kafka.source;

import java.util.concurrent.BlockingQueue;
import java.util.function.Supplier;

public interface EventBufferSupplier extends Supplier<BlockingQueue<GeodeEvent>> {
}
@@ -19,20 +19,19 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

class GeodeKafkaSourceListener implements CqStatusListener {

private static final Logger logger = LoggerFactory.getLogger(GeodeKafkaSourceListener.class);

public String regionName;
private BlockingQueue<GeodeEvent> eventBuffer;
private EventBufferSupplier eventBufferSupplier;
private boolean initialResultsLoaded;

public GeodeKafkaSourceListener(BlockingQueue<GeodeEvent> eventBuffer, String regionName) {
this.eventBuffer = eventBuffer;
public GeodeKafkaSourceListener(EventBufferSupplier eventBufferSupplier, String regionName) {
this.regionName = regionName;
this.eventBufferSupplier = eventBufferSupplier;
initialResultsLoaded = false;
}

@@ -42,12 +41,12 @@ public void onEvent(CqEvent aCqEvent) {
Thread.yield();
}
try {
eventBuffer.offer(new GeodeEvent(regionName, aCqEvent), 2, TimeUnit.SECONDS);
eventBufferSupplier.get().offer(new GeodeEvent(regionName, aCqEvent), 2, TimeUnit.SECONDS);
} catch (InterruptedException e) {

while (true) {
try {
if (!eventBuffer.offer(new GeodeEvent(regionName, aCqEvent), 2, TimeUnit.SECONDS))
if (!eventBufferSupplier.get().offer(new GeodeEvent(regionName, aCqEvent), 2, TimeUnit.SECONDS))
break;
} catch (InterruptedException ex) {
ex.printStackTrace();
@@ -29,7 +29,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;

import static geode.kafka.source.GeodeSourceConnectorConfig.BATCH_SIZE;
@@ -48,11 +47,9 @@ public class GeodeKafkaSourceTask extends SourceTask {

private GeodeContext geodeContext;
private GeodeSourceConnectorConfig geodeConnectorConfig;
private int taskId;
private EventBufferSupplier eventBufferSupplier;
private Map<String, List<String>> regionToTopics;
private Collection<String> cqsToRegister;
private Map<String, Map<String, String>> sourcePartitions;
private static BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue<>(100000);
private int batchSize;


@@ -71,21 +68,21 @@ public String version() {
public void start(Map<String, String> props) {
try {
geodeConnectorConfig = new GeodeSourceConnectorConfig(props);
taskId = geodeConnectorConfig.getTaskId();
int taskId = geodeConnectorConfig.getTaskId();
logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
geodeContext = new GeodeContext();
geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(), geodeConnectorConfig.getDurableClientId(), geodeConnectorConfig.getDurableClientTimeout(), geodeConnectorConfig.getSecurityClientAuthInit());

batchSize = Integer.parseInt(props.get(BATCH_SIZE));
eventBufferSupplier = new SharedEventBufferSupplier(Integer.parseInt(props.get(QUEUE_SIZE)));

regionToTopics = geodeConnectorConfig.getRegionToTopics();
cqsToRegister = geodeConnectorConfig.getCqsToRegister();
geodeConnectorConfig.getCqsToRegister();
sourcePartitions = createSourcePartitionsMap(regionToTopics.keySet());

String cqPrefix = geodeConnectorConfig.getCqPrefix();

boolean loadEntireRegion = geodeConnectorConfig.getLoadEntireRegion();
installOnGeode(geodeConnectorConfig, geodeContext, eventBuffer, cqPrefix, loadEntireRegion);
installOnGeode(geodeConnectorConfig, geodeContext, eventBufferSupplier, cqPrefix, loadEntireRegion);
} catch (Exception e) {
e.printStackTrace();
logger.error("Unable to start source task", e);
@@ -97,7 +94,7 @@ public void start(Map<String, String> props) {
public List<SourceRecord> poll() throws InterruptedException {
ArrayList<SourceRecord> records = new ArrayList<>(batchSize);
ArrayList<GeodeEvent> events = new ArrayList<>(batchSize);
if (eventBuffer.drainTo(events, batchSize) > 0) {
if (eventBufferSupplier.get().drainTo(events, batchSize) > 0) {
for (GeodeEvent event : events) {
String regionName = event.getRegionName();
List<String> topics = regionToTopics.get(regionName);
@@ -116,7 +113,7 @@ public void stop() {
geodeContext.getClientCache().close(true);
}

void installOnGeode(GeodeSourceConnectorConfig geodeConnectorConfig, GeodeContext geodeContext, BlockingQueue eventBuffer, String cqPrefix, boolean loadEntireRegion) {
void installOnGeode(GeodeSourceConnectorConfig geodeConnectorConfig, GeodeContext geodeContext, EventBufferSupplier eventBuffer, String cqPrefix, boolean loadEntireRegion) {
boolean isDurable = geodeConnectorConfig.isDurable();
int taskId = geodeConnectorConfig.getTaskId();
for (String region : geodeConnectorConfig.getCqsToRegister()) {
@@ -127,7 +124,7 @@ void installOnGeode(GeodeSourceConnectorConfig geodeConnectorConfig, GeodeContex
}
}

GeodeKafkaSourceListener installListenersToRegion(GeodeContext geodeContext, int taskId, BlockingQueue<GeodeEvent> eventBuffer, String regionName, String cqPrefix, boolean loadEntireRegion, boolean isDurable) {
GeodeKafkaSourceListener installListenersToRegion(GeodeContext geodeContext, int taskId, EventBufferSupplier eventBuffer, String regionName, String cqPrefix, boolean loadEntireRegion, boolean isDurable) {
CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
GeodeKafkaSourceListener listener = new GeodeKafkaSourceListener(eventBuffer, regionName);
cqAttributesFactory.addCqListener(listener);
@@ -136,7 +133,7 @@ GeodeKafkaSourceListener installListenersToRegion(GeodeContext geodeContext, int
if (loadEntireRegion) {
Collection<CqEvent> events = geodeContext.newCqWithInitialResults(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes,
isDurable);
eventBuffer.addAll(events.stream().map(e -> new GeodeEvent(regionName, e)).collect(Collectors.toList()));
eventBuffer.get().addAll(events.stream().map(e -> new GeodeEvent(regionName, e)).collect(Collectors.toList()));
} else {
geodeContext.newCq(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes,
isDurable);
@@ -0,0 +1,37 @@
package geode.kafka.source;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;

public class SharedEventBufferSupplier implements EventBufferSupplier {

private static BlockingQueue<GeodeEvent> eventBuffer;

public SharedEventBufferSupplier(int size) {
recreateEventBufferIfNeeded(size);
}

BlockingQueue recreateEventBufferIfNeeded(int size) {
if (eventBuffer == null || (eventBuffer.size() + eventBuffer.remainingCapacity()) != size) {
synchronized (GeodeKafkaSource.class) {
if (eventBuffer == null || (eventBuffer.size() + eventBuffer.remainingCapacity()) != size) {
BlockingQueue<GeodeEvent> oldEventBuffer = eventBuffer;
eventBuffer = new LinkedBlockingQueue<>(size);
if (oldEventBuffer != null) {
eventBuffer.addAll(oldEventBuffer);
}
}
}
}
return eventBuffer;
}

/**
* Callers should not store a reference to this and instead always call get to make sure we always use the latest buffer
* Buffers themselves shouldn't change often but in cases where we want to modify the size
*/
public BlockingQueue<GeodeEvent> get() {
return eventBuffer;
}
}
@@ -58,7 +58,7 @@ public void whenLoadingEntireRegionAbleToPutInitialResultsIntoEventBuffer() {

when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean())).thenReturn(fakeInitialResults);
GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
task.installListenersToRegion(geodeContext, 1, eventBuffer, "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
task.installListenersToRegion(geodeContext, 1, createEventBufferSupplier(eventBuffer), "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
assertEquals(10, eventBuffer.size());
}

@@ -75,7 +75,7 @@ public void whenNotLoadingEntireRegionShouldNotPutInitialResultsIntoEventBuffer(

when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean())).thenReturn(fakeInitialResults);
GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
task.installListenersToRegion(geodeContext, 1, eventBuffer, "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
task.installListenersToRegion(geodeContext, 1, createEventBufferSupplier(eventBuffer), "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
assertEquals(0, eventBuffer.size());
}

@@ -88,7 +88,7 @@ public void cqListenerOnEventPopulatesEventsBuffer() {

when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean())).thenReturn(new ArrayList());
GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
GeodeKafkaSourceListener listener = task.installListenersToRegion(geodeContext, 1, eventBuffer, "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);
GeodeKafkaSourceListener listener = task.installListenersToRegion(geodeContext, 1, createEventBufferSupplier(eventBuffer), "testRegion", DEFAULT_CQ_PREFIX, loadEntireRegion, isDurable);

listener.onEvent(mock(CqEvent.class));
assertEquals(1, eventBuffer.size());
@@ -140,7 +140,7 @@ public void cqWithInitialResultsIsInvokedForEveryRegionWithATopicIfLoadEntireIsS
when (config.getCqsToRegister()).thenReturn(regionToTopicsMap.keySet());

GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
task.installOnGeode(config, geodeContext, new LinkedBlockingQueue(), "someCqPrefix", true);
task.installOnGeode(config, geodeContext, createEventBufferSupplier(new LinkedBlockingQueue<>()), "someCqPrefix", true);
verify(geodeContext, times(1)).newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean());
}

@@ -218,4 +218,13 @@ public void cqPrefixShouldBeProperlyCalculatedFromProps() {
// GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
}


private EventBufferSupplier createEventBufferSupplier(BlockingQueue<GeodeEvent> eventBuffer) {
return new EventBufferSupplier() {
@Override
public BlockingQueue<GeodeEvent> get() {
return eventBuffer;
}
};
}
}
@@ -0,0 +1,51 @@
package geode.kafka.source;

import org.junit.Test;

import java.util.concurrent.BlockingQueue;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.mock;

public class SharedEventBufferSupplierTest {

@Test
public void creatingNewSharedEventSupplierShouldCreateInstance() {
SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
assertNotNull(supplier.get());
}

@Test
public void alreadySharedEventSupplierShouldReturnSameInstanceOfEventBuffer() {
SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
BlockingQueue<GeodeEvent> queue = supplier.get();
supplier = new SharedEventBufferSupplier(1);
assertEquals(queue, supplier.get());
}

@Test
public void newEventBufferShouldBeReflectedInAllSharedSuppliers() {
SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
SharedEventBufferSupplier newSupplier = new SharedEventBufferSupplier(2);
assertEquals(supplier.get(), newSupplier.get());
}

@Test
public void newEventBufferSuppliedShouldNotBeTheOldQueue() {
SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
BlockingQueue<GeodeEvent> queue = supplier.get();
SharedEventBufferSupplier newSupplier = new SharedEventBufferSupplier(2);
assertNotEquals(queue, newSupplier.get());
}

@Test
public void newEventBufferShouldContainAllEventsFromTheOldSupplier() {
SharedEventBufferSupplier supplier = new SharedEventBufferSupplier(1);
GeodeEvent geodeEvent = mock(GeodeEvent.class);
supplier.get().add(geodeEvent);
SharedEventBufferSupplier newSupplier = new SharedEventBufferSupplier(2);
assertEquals(geodeEvent, newSupplier.get().poll());
}
}

0 comments on commit 4ff1f22

Please sign in to comment.