Skip to content
Permalink
Browse files
Changed all config to lower case
Changed to CqStatusListener instead of plain CqListener
  • Loading branch information
jhuynh1 committed Jan 23, 2020
1 parent 455192a commit 65c83c8e75a86e97caf6dc127ad510dff9f8b8ee
Showing 3 changed files with 22 additions and 14 deletions.
@@ -3,39 +3,39 @@
public class GeodeConnectorConfig {

//Geode Configuration
public static final String DURABLE_CLIENT_ID_PREFIX = "DurableClientId";
public static final String DURABLE_CLIENT_ID_PREFIX = "durableClientId";
public static final String DEFAULT_DURABLE_CLIENT_ID = "";
public static final String DURABLE_CLIENT_TIME_OUT = "DurableClientTimeout";
public static final String DURABLE_CLIENT_TIME_OUT = "durableClientTimeout";
public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000";


//GeodeKafka Specific Configuration
public static final String CQ_PREFIX = "CqPrefix";
public static final String DEFAULT_CQ_PREFIX = "CqForGeodeKafka";
public static final String CQ_PREFIX = "cqPrefix";
public static final String DEFAULT_CQ_PREFIX = "cqForGeodeKafka";
/**
* Specifies which Locators to connect to Apache Geode
*/
public static final String LOCATORS = "Locators";
public static final String LOCATORS = "locators";
public static final String DEFAULT_LOCATOR = "localhost[10334]";

/**
* Specifies which Regions to connect in Apache Geode
*/
public static final String REGIONS = "Regions";
public static final String REGIONS = "regions";

/**
* Specifies which Topics to connect in Kafka
*/
public static final String TOPICS = "Topics";
public static final String TOPICS = "topics";

/**
* Property to describe the Source Partition in a record
*/
public static final String REGION_NAME = "RegionName"; //used for Source Partition Events
public static final String REGION_NAME = "regionName"; //used for Source Partition Events

public static final String BATCH_SIZE = "GeodeConnectorBatchSize";
public static final String BATCH_SIZE = "geodeConnectorBatchSize";
public static final String DEFAULT_BATCH_SIZE = "100";

public static final String QUEUE_SIZE = "GeodeConnectorQueueSize";
public static final String QUEUE_SIZE = "geodeConnectorQueueSize";
public static final String DEFAULT_QUEUE_SIZE = "100000";
}
@@ -2,11 +2,12 @@

import org.apache.geode.cache.query.CqEvent;
import org.apache.geode.cache.query.CqListener;
import org.apache.geode.cache.query.CqStatusListener;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

class GeodeKafkaSourceListener implements CqListener {
class GeodeKafkaSourceListener implements CqStatusListener {

public String regionName;
private BlockingQueue<GeodeEvent> eventBuffer;
@@ -39,4 +40,14 @@ public void onEvent(CqEvent aCqEvent) {
public void onError(CqEvent aCqEvent) {

}

@Override
public void onCqDisconnected() {
//we should probably redistribute or reconnect
}

@Override
public void onCqConnected() {

}
}
@@ -138,13 +138,10 @@ void installOnGeode(int taskId, BlockingQueue<GeodeEvent> eventBuffer, List<Loca
void installListenersToRegion(int taskId, BlockingQueue<GeodeEvent> eventBuffer, String regionName, String cqPrefix, boolean isDurable) {
CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
cqAttributesFactory.addCqListener(new GeodeKafkaSourceListener(eventBuffer, regionName));
System.out.println("JASON installing on Geode");
CqAttributes cqAttributes = cqAttributesFactory.create();
try {
System.out.println("JASON installing new cq");
clientCache.getQueryService().newCq(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes,
isDurable).execute();
System.out.println("JASON finished installing cq");
} catch (CqExistsException e) {
System.out.println("UHH");
e.printStackTrace();

0 comments on commit 65c83c8

Please sign in to comment.