Permalink
Show file tree
Hide file tree
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Browse files
Added GeodeContext (for reals this time)
Added GeodeConnectorConfigTest
- Loading branch information
Showing
2 changed files
with
91 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@@ -0,0 +1,47 @@ | ||
package geode.kafka; | ||
|
||
import org.apache.geode.cache.client.ClientCache; | ||
import org.apache.geode.cache.client.ClientCacheFactory; | ||
import org.apache.geode.cache.query.CqAttributes; | ||
import org.apache.geode.cache.query.CqException; | ||
import org.apache.geode.cache.query.CqExistsException; | ||
import org.apache.geode.cache.query.CqQuery; | ||
import org.apache.geode.cache.query.RegionNotFoundException; | ||
import org.apache.kafka.connect.errors.ConnectException; | ||
|
||
import java.util.List; | ||
|
||
public class GeodeContext { | ||
|
||
private ClientCache clientCache; | ||
|
||
|
||
public GeodeContext(GeodeConnectorConfig connectorConfig) { | ||
clientCache = createClientCache(connectorConfig.getLocatorHostPorts(), connectorConfig.getDurableClientId(), connectorConfig.getDurableClientTimeout()); | ||
} | ||
|
||
public ClientCache getClientCache() { | ||
return clientCache; | ||
} | ||
|
||
public ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName, String durableClientTimeOut) { | ||
ClientCacheFactory ccf = new ClientCacheFactory().set("durable-client-id", durableClientName) | ||
.set("durable-client-timeout", durableClientTimeOut) | ||
.setPoolSubscriptionEnabled(true); | ||
for (LocatorHostPort locator: locators) { | ||
ccf.addPoolLocator(locator.getHostName(), locator.getPort()).create(); | ||
} | ||
return ccf.create(); | ||
} | ||
|
||
public CqQuery newCq(String name, String query, CqAttributes cqAttributes, boolean isDurable) throws ConnectException { | ||
try { | ||
CqQuery cq = clientCache.getQueryService().newCq(name, query, cqAttributes, isDurable); | ||
cq.execute(); | ||
return cq; | ||
} catch (RegionNotFoundException | CqException | CqExistsException e) { | ||
e.printStackTrace(); | ||
throw new ConnectException(e); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@@ -0,0 +1,44 @@ | ||
package geode.kafka; | ||
|
||
import org.junit.Test; | ||
|
||
import java.util.List; | ||
|
||
import static org.hamcrest.CoreMatchers.allOf; | ||
import static org.hamcrest.CoreMatchers.is; | ||
import static org.junit.Assert.assertEquals; | ||
import static org.junit.Assert.assertThat; | ||
|
||
public class GeodeConnectorConfigTest { | ||
|
||
@Test | ||
public void parseRegionNamesShouldSplitOnComma() { | ||
GeodeConnectorConfig config = new GeodeConnectorConfig(); | ||
List<String> regionNames = config.parseNames("region1,region2,region3,region4"); | ||
assertEquals(4, regionNames.size()); | ||
assertThat(true, allOf(is(regionNames.contains("region1")) | ||
, is(regionNames.contains("region2")) | ||
, is(regionNames.contains("region3")) | ||
, is(regionNames.contains("region4")))); | ||
} | ||
|
||
@Test | ||
public void parseRegionNamesShouldChomp() { | ||
GeodeConnectorConfig config = new GeodeConnectorConfig(); | ||
List<String> regionNames = config.parseNames("region1, region2, region3,region4"); | ||
assertEquals(4, regionNames.size()); | ||
assertThat(true, allOf(is(regionNames instanceof List) | ||
, is(regionNames.contains("region1")) | ||
, is(regionNames.contains("region2")) | ||
, is(regionNames.contains("region3")) | ||
, is(regionNames.contains("region4")))); | ||
} | ||
|
||
@Test | ||
public void shouldBeAbleToParseGeodeLocatorStrings() { | ||
GeodeConnectorConfig config = new GeodeConnectorConfig(); | ||
String locatorString="localhost[8888], localhost[8881]"; | ||
List<LocatorHostPort> locators = config.parseLocators(locatorString); | ||
assertThat(2, is(locators.size())); | ||
} | ||
} |