Skip to content
Permalink
Browse files
moved cache creation errors to GeodeContext
added info logging
removed some debug logging
  • Loading branch information
jhuynh1 committed Mar 2, 2020
1 parent 084a9ac commit a22b76ec52ca55d479f1f8bca9774a7578f5afa6
Showing 5 changed files with 35 additions and 37 deletions.
@@ -66,7 +66,6 @@ public GeodeConnectorConfig(ConfigDef configDef, Map<String, String> connectorPr
securityUserName = getString(SECURITY_USER);
securityPassword = getPassword(SECURITY_PASSWORD);
securityClientAuthInit = getPassword(SECURITY_CLIENT_AUTH_INIT);
// System.out.println(securityUserName + "NABA " + securityPassword.value() + "NABA" + securityClientAuthInit.value());
// if we registered a username/password instead of auth init, we should use the default auth
// init if one isn't specified
if (usesSecurity()) {
@@ -61,28 +61,33 @@ public ClientCache getClientCache() {
public ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName,
String durableClientTimeOut, String securityAuthInit, String securityUserName,
String securityPassword, boolean usesSecurity) {
ClientCacheFactory ccf = new ClientCacheFactory();

ccf.setPdxReadSerialized(true);
if (usesSecurity) {
if (securityUserName != null && securityPassword != null) {
ccf.set(SECURITY_USER, securityUserName);
ccf.set(SECURITY_PASSWORD, securityPassword);
try {
ClientCacheFactory ccf = new ClientCacheFactory();

ccf.setPdxReadSerialized(true);
if (usesSecurity) {
if (securityUserName != null && securityPassword != null) {
ccf.set(SECURITY_USER, securityUserName);
ccf.set(SECURITY_PASSWORD, securityPassword);
}
ccf.set(SECURITY_CLIENT_AUTH_INIT, securityAuthInit);
}
ccf.set(SECURITY_CLIENT_AUTH_INIT, securityAuthInit);
}
if (!durableClientName.equals("")) {
ccf.set("durable-client-id", durableClientName)
.set("durable-client-timeout", durableClientTimeOut);
}
// currently we only allow using the default pool.
// If we ever want to allow adding multiple pools we'll have to configure pool factories
ccf.setPoolSubscriptionEnabled(true);
if (!durableClientName.equals("")) {
ccf.set("durable-client-id", durableClientName)
.set("durable-client-timeout", durableClientTimeOut);
}
// currently we only allow using the default pool.
// If we ever want to allow adding multiple pools we'll have to configure pool factories
ccf.setPoolSubscriptionEnabled(true);

for (LocatorHostPort locator : locators) {
ccf.addPoolLocator(locator.getHostName(), locator.getPort());
for (LocatorHostPort locator : locators) {
ccf.addPoolLocator(locator.getHostName(), locator.getPort());
}
return ccf.create();
} catch (Exception e) {
throw new ConnectException(
"Unable to create an client cache connected to Apache Geode cluster");
}
return ccf.create();
}

public CqQuery newCq(String name, String query, CqAttributes cqAttributes, boolean isDurable)
@@ -106,4 +111,8 @@ public <E> CqResults<E> newCqWithInitialResults(String name, String query,
throw new ConnectException(e);
}
}

public void close(boolean keepAlive) {
clientCache.close(keepAlive);
}
}
@@ -61,16 +61,11 @@ public void start(Map<String, String> props) {
GeodeSinkConnectorConfig geodeConnectorConfig = new GeodeSinkConnectorConfig(props);
configure(geodeConnectorConfig);
geodeContext = new GeodeContext();
final ClientCache clientCache =
geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
geodeConnectorConfig.getSecurityClientAuthInit(),
geodeConnectorConfig.getSecurityUserName(),
geodeConnectorConfig.getSecurityPassword(),
geodeConnectorConfig.usesSecurity());
if (clientCache == null) {
throw new ConnectException(
"Unable to create a client cache connected to the Apache Geode cluster");
}
regionNameToRegion = createProxyRegions(topicToRegions.values());
} catch (Exception e) {
logger.error("Unable to start sink task", e);
@@ -149,7 +144,7 @@ private Region<Object, Object> createProxyRegion(String regionName) {

@Override
public void stop() {
geodeContext.getClientCache().close(false);
geodeContext.close(false);
}

}
@@ -51,7 +51,7 @@ public void onEvent(CqEvent aCqEvent) {
TimeUnit.SECONDS))
break;
} catch (InterruptedException ex) {
ex.printStackTrace();
logger.info("Thread interrupted while updating buffer", ex);
}
logger.info("GeodeKafkaSource Queue is full");
}
@@ -66,6 +66,7 @@ public void onError(CqEvent aCqEvent) {
@Override
public void onCqDisconnected() {
// we should probably redistribute or reconnect
logger.info("cq has been disconnected");
}

@Override
@@ -29,7 +29,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.query.CqAttributes;
import org.apache.geode.cache.query.CqAttributesFactory;
import org.apache.geode.cache.query.CqQuery;
@@ -68,22 +67,17 @@ public String version() {
@Override
public void start(Map<String, String> props) {
try {
System.out.println("NABA ::" + props);
GeodeSourceConnectorConfig geodeConnectorConfig = new GeodeSourceConnectorConfig(props);
logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
geodeContext = new GeodeContext();
final ClientCache clientCache =
geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
geodeConnectorConfig.getDurableClientId(),
geodeConnectorConfig.getDurableClientTimeout(),
geodeConnectorConfig.getSecurityClientAuthInit(),
geodeConnectorConfig.getSecurityUserName(),
geodeConnectorConfig.getSecurityPassword(),
geodeConnectorConfig.usesSecurity());
if (clientCache == null) {
throw new ConnectException(
"Unable to create an client cache connected to Apache Geode cluster");
}

batchSize = geodeConnectorConfig.getBatchSize();
eventBufferSupplier = new SharedEventBufferSupplier(geodeConnectorConfig.getQueueSize());

@@ -122,7 +116,7 @@ public List<SourceRecord> poll() {

@Override
public void stop() {
geodeContext.getClientCache().close(true);
geodeContext.close(true);
}

void installOnGeode(GeodeSourceConnectorConfig geodeConnectorConfig, GeodeContext geodeContext,

0 comments on commit a22b76e

Please sign in to comment.