Skip to content
Permalink
Browse files
Converting global variables to local
  • Loading branch information
nabarunnag committed Feb 19, 2020
1 parent 792e301 commit 3515ae72c7f25399120bd07f0fa2ac74bb4895d5
Showing 5 changed files with 15 additions and 26 deletions.
@@ -40,7 +40,6 @@ public class GeodeKafkaSinkTask extends SinkTask {
private static final Logger logger = LoggerFactory.getLogger(GeodeKafkaSinkTask.class);

private GeodeContext geodeContext;
private int taskId;
private Map<String, List<String>> topicToRegions;
private Map<String, Region> regionNameToRegion;
private boolean nullValuesMeansRemove = true;
@@ -73,7 +72,7 @@ public void start(Map<String, String> props) {

void configure(GeodeSinkConnectorConfig geodeConnectorConfig) {
logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
taskId = geodeConnectorConfig.getTaskId();
int taskId = geodeConnectorConfig.getTaskId();
topicToRegions = geodeConnectorConfig.getTopicToRegions();
nullValuesMeansRemove = geodeConnectorConfig.getNullValuesMeanRemove();
}
@@ -43,7 +43,6 @@ public class GeodeKafkaSourceTask extends SourceTask {
private static final Map<String, Long> OFFSET_DEFAULT = createOffset();

private GeodeContext geodeContext;
private GeodeSourceConnectorConfig geodeConnectorConfig;
private EventBufferSupplier eventBufferSupplier;
private Map<String, List<String>> regionToTopics;
private Map<String, Map<String, String>> sourcePartitions;
@@ -64,7 +63,7 @@ public String version() {
@Override
public void start(Map<String, String> props) {
try {
geodeConnectorConfig = new GeodeSourceConnectorConfig(props);
GeodeSourceConnectorConfig geodeConnectorConfig = new GeodeSourceConnectorConfig(props);
logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
geodeContext = new GeodeContext();
geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
@@ -54,7 +54,6 @@ public class GeodeSourceConnectorConfig extends GeodeConnectorConfig {
public static final String DEFAULT_LOAD_ENTIRE_REGION = "false";

private final String durableClientId;
private final String durableClientIdPrefix;
private final String durableClientTimeout;
private final String cqPrefix;
private final boolean loadEntireRegion;
@@ -68,7 +67,7 @@ public GeodeSourceConnectorConfig(Map<String, String> connectorProperties) {
super(SOURCE_CONFIG_DEF, connectorProperties);
cqsToRegister = parseRegionToTopics(getString(CQS_TO_REGISTER)).keySet();
regionToTopics = parseRegionToTopics(getString(REGION_TO_TOPIC_BINDINGS));
durableClientIdPrefix = getString(DURABLE_CLIENT_ID_PREFIX);
String durableClientIdPrefix = getString(DURABLE_CLIENT_ID_PREFIX);
if (isDurable(durableClientIdPrefix)) {
durableClientId = durableClientIdPrefix + taskId;
} else {
@@ -42,9 +42,6 @@ public class GeodeAsSinkDUnitTest {
@Rule
public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3);

private static MemberVM locator, server;
private static ClientVM client;

@Rule
public TestName testName = new TestName();

@@ -91,12 +88,11 @@ public GeodeAsSinkDUnitTest(int numTask, int numPartition) {
@Test
public void whenKafkaProducerProducesEventsThenGeodeMustReceiveTheseEvents() throws Exception {

locator = clusterStartupRule.startLocatorVM(0, 10334);
MemberVM locator = clusterStartupRule.startLocatorVM(0, 10334);
int locatorPort = locator.getPort();
server = clusterStartupRule.startServerVM(1, locatorPort);
client =
clusterStartupRule
.startClientVM(2, client -> client.withLocatorConnection(locatorPort));
MemberVM server = clusterStartupRule.startServerVM(1, locatorPort);
ClientVM client1 = clusterStartupRule
.startClientVM(2, client -> client.withLocatorConnection(locatorPort));
int NUM_EVENT = 10;

// Set unique names for all the different components
@@ -132,7 +128,7 @@ public void whenKafkaProducerProducesEventsThenGeodeMustReceiveTheseEvents() thr
workerAndHerderCluster = startWorkerAndHerderCluster(numTask, sourceRegion, sinkRegion,
sourceTopic, sinkTopic, temporaryFolderForOffset.getRoot().getAbsolutePath(),
"localhost[" + locatorPort + "]");
client.invoke(() -> {
client1.invoke(() -> {
ClusterStartupRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.PROXY)
.create(sinkRegion);
});
@@ -143,7 +139,7 @@ public void whenKafkaProducerProducesEventsThenGeodeMustReceiveTheseEvents() thr
producer.send(new ProducerRecord(sinkTopic, "KEY" + i, "VALUE" + i));
}

client.invoke(() -> {
client1.invoke(() -> {
Region region = ClusterStartupRule.getClientCache().getRegion(sinkRegion);
await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
@@ -54,9 +54,6 @@ public class GeodeAsSourceDUnitTest {
@Rule
public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3);

private static MemberVM locator, server;
private static ClientVM client;


@Rule
public TestName testName = new TestName();
@@ -103,12 +100,11 @@ public GeodeAsSourceDUnitTest(int numTask, int numPartition) {

@Test
public void whenDataIsInsertedInGeodeSourceThenKafkaConsumerMustReceiveEvents() throws Exception {
locator = clusterStartupRule.startLocatorVM(0, 10334);
MemberVM locator = clusterStartupRule.startLocatorVM(0, 10334);
int locatorPort = locator.getPort();
server = clusterStartupRule.startServerVM(1, locatorPort);
client =
clusterStartupRule
.startClientVM(2, client -> client.withLocatorConnection(locatorPort));
MemberVM server = clusterStartupRule.startServerVM(1, locatorPort);
ClientVM client1 = clusterStartupRule
.startClientVM(2, client -> client.withLocatorConnection(locatorPort));
int NUM_EVENT = 10;

// Set unique names for all the different components
@@ -128,7 +124,7 @@ public void whenDataIsInsertedInGeodeSourceThenKafkaConsumerMustReceiveEvents()
ClusterStartupRule.getCache().createRegionFactory(RegionShortcut.PARTITION)
.create(sinkRegion);
});
client.invoke(() -> {
client1.invoke(() -> {
ClusterStartupRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.PROXY)
.create(sourceRegion);
});
@@ -152,7 +148,7 @@ public void whenDataIsInsertedInGeodeSourceThenKafkaConsumerMustReceiveEvents()
Consumer<String, String> consumer = createConsumer(sourceTopic);

// Insert data into the Apache Geode source from the client
client.invoke(() -> {
client1.invoke(() -> {
Region region = ClusterStartupRule.getClientCache().getRegion(sourceRegion);
for (int i = 0; i < NUM_EVENT; i++) {
region.put("KEY" + i, "VALUE" + i);

0 comments on commit 3515ae7

Please sign in to comment.