Skip to content
Permalink
Browse files
All configurable parameters are now using '-' as delimiter instead of…
… camelCase

Added documentation to the configDef definitions
Updated readme with latest configuration changes
  • Loading branch information
jhuynh1 committed Feb 13, 2020
1 parent b3e4f05 commit 5b5d6596aa336ba7f01829f4ee2d2f023eb2b685
Showing 13 changed files with 132 additions and 95 deletions.
@@ -26,7 +26,7 @@ plugin.path=(Path to your clone)/geode-kafka-connector/build/libs/
name=geode-kafka-sink
connector.class=GeodeKafkaSink
tasks.max=1
topicToRegions=[someTopicToSinkFrom:someRegionToConsume]
topic-to-regions=[someTopicToSinkFrom:someRegionToConsume]
topics=someTopicToSinkFrom
locators=localHost[10334]
```
@@ -35,7 +35,7 @@ locators=localHost[10334]
name=geode-kafka-source
connector.class=GeodeKafkaSource
tasks.max=1
regionToTopics=[someRegionToSourceFrom:someTopicToConsume]
region-to-topics=[someRegionToSourceFrom:someTopicToConsume]
locators=localHost[10334]
```

@@ -47,27 +47,29 @@ bin/connect-standalone.sh config/connect-standalone.properties config/connect-ge
#### GeodeKafkaSink Properties
| Property | Required | Description| Default |
|---|---|---|---|
| locators | no, but...| A comma separated string of locators that configure which locators to connect to | localhost[10334] |
|topicToRegions| yes| A comma separated list of "one topic to many regions" bindings. Each binding is surrounded by brackets. For example "[topicName:regionName], [anotherTopic: regionName, anotherRegion]" | None. This is required to be set in the source connector properties
|locators | no, but...| A comma separated string of locators that configure which locators to connect to | localhost[10334] |
|topic-to-regions| yes| A comma separated list of "one topic to many regions" bindings. Each binding is surrounded by brackets. For example "[topicName:regionName], [anotherTopic: regionName, anotherRegion]" | "[gkctopic:gkcregion]"
|security-client-auth-init| no | Point to class that implements the [AuthInitialize Interface](https://gemfire.docs.pivotal.io/99/geode/managing/security/implementing_authentication.html)
|nullValuesMeanRemove | no | If set to true, when topics send a SinkRecord with a null value, we will convert to an operation similar to region.remove instead of putting a null value into the region | true |
|null-values-mean-remove | no | If set to true, when topics send a SinkRecord with a null value, we will convert to an operation similar to region.remove instead of putting a null value into the region | true |

* The topicToRegions property allows us to create mappings between topics and regions. A single one-to-one mapping would look similar to "[topic:region]" A one-to-many mapping can be made by comma separating the regions, for example "[topic:region1,region2]" This is equivalent to both regions being consumers of the topic.
* The topic-to-regions property allows us to create mappings between topics and regions. A single one-to-one mapping would look similar to "[topic:region]" A one-to-many mapping can be made by comma separating the regions, for example "[topic:region1,region2]" This is equivalent to both regions being consumers of the topic.

#### GeodeKafkaSource Properties
| Property | Required| Description| Default |
|---|---|---|---|
| locators | no, but...| A comma separated string of locators that configure which locators to connect to | localhost[10334] |
|regionToTopics| yes | A comma separated list of "one region to many topics" mappings. Each mapping is surrounded by brackets. For example "[regionName:topicName], "[anotherRegion: topicName, anotherTopic]" | None. This is required to be set in the source connector properties|
|region-to-topics| yes | A comma separated list of "one region to many topics" mappings. Each mapping is surrounded by brackets. For example "[regionName:topicName], "[anotherRegion: topicName, anotherTopic]" | "[gkcregion:gkctopic]"|
|security-client-auth-init| no | Point to class that implements the [AuthInitialize Interface](https://gemfire.docs.pivotal.io/99/geode/managing/security/implementing_authentication.html)
|geodeConnectorBatchSize| no | Maximum number of records to return on each poll| 100 |
|geodeConnectorQueueSize| no | Maximum number of entries in the connector queue before backing up all Geode cq listeners sharing the task queue | 10000 |
| loadEntireRegion| no| Determines if we should queue up all entries that currently exist in a region. This allows us to copy existing region data. Will be replayed whenever a task needs to re-register a cq| true |
|durableClientIdPrefix| no | Prefix string for tasks to append to when registering as a durable client. If empty string, will not register as a durable client | "" |
| durableClientTimeout| no | How long in milliseconds to persist values in Geode's durable queue before the queue is invalidated| 60000 |
| cqPrefix| no| Prefix string to identify Connector cq's on a Geode server |cqForGeodeKafka |
|security-username| no | Supply a username to be used to authenticate with Geode. Will autoset the security-client-auth-init to use a SystemPropertyAuthInit if one isn't supplied by the user| null|
|security-password| no | Supply a password to be used to authenticate with Geode| null|
|geode-connector-batch-size| no | Maximum number of records to return on each poll| 100 |
|geode-connector-queue-size| no | Maximum number of entries in the connector queue before backing up all Geode cq listeners sharing the task queue | 10000 |
| load-entire-region| no| Determines if we should queue up all entries that currently exist in a region. This allows us to copy existing region data. Will be replayed whenever a task needs to re-register a cq| true |
|durable-client-id-prefix| no | Prefix string for tasks to append to when registering as a durable client. If empty string, will not register as a durable client | "" |
| durable-client-timeout| no | How long in milliseconds to persist values in Geode's durable queue before the queue is invalidated| 60000 |
| cq-prefix| no| Prefix string to identify Connector cq's on a Geode server |cqForGeodeKafka |

* The regionToTopics property allows us to create mappings between regions and topics. A single one-to-one mapping would look similar to "[region:topic]" A one-to-many mapping can be made by comma separating the topics, for example "[region:topic1,topic2]" This is equivalent to the region be a producer for both topics
* The region-to-topics property allows us to create mappings between regions and topics. A single one-to-one mapping would look similar to "[region:topic]" A one-to-many mapping can be made by comma separating the topics, for example "[region:topic1,topic2]" This is equivalent to the region be a producer for both topics

---

@@ -14,16 +14,16 @@
*/
package org.geode.kafka;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;

public class GeodeConnectorConfig extends AbstractConfig {

// GeodeKafka Specific Configuration
@@ -37,23 +37,24 @@ public class GeodeConnectorConfig extends AbstractConfig {
public static final String LOCATORS = "locators";
public static final String DEFAULT_LOCATOR = "localhost[10334]";
public static final String SECURITY_CLIENT_AUTH_INIT = "security-client-auth-init";
private static final String DEFAULT_SECURITY_AUTH_INIT = "org.geode.kafka.security.SystemPropertyAuthInit";
private static final String DEFAULT_SECURITY_AUTH_INIT =
"org.geode.kafka.security.SystemPropertyAuthInit";
public static final String SECURITY_USER = "security-username";
public static final String SECURITY_PASSWORD= "security-password";
public static final String SECURITY_PASSWORD = "security-password";

protected final int taskId;
protected List<LocatorHostPort> locatorHostPorts;
private String securityClientAuthInit;
private String securityUserName;
private String securityPassword;

//Just for testing
// Just for testing
protected GeodeConnectorConfig() {
super(new ConfigDef(), new HashMap());
taskId = 0;
}

//Just for testing
// Just for testing
protected GeodeConnectorConfig(Map<String, String> props) {
super(new ConfigDef(), props);
taskId = 0;
@@ -67,19 +68,27 @@ public GeodeConnectorConfig(ConfigDef configDef, Map<String, String> connectorPr
securityUserName = getString(SECURITY_USER);
securityPassword = getString(SECURITY_PASSWORD);
securityClientAuthInit = getString(SECURITY_CLIENT_AUTH_INIT);
//if we registered a username/password instead of auth init, we should use the default auth init if one isn't specified
// if we registered a username/password instead of auth init, we should use the default auth
// init if one isn't specified
if (usesSecurity()) {
securityClientAuthInit = securityClientAuthInit != null ? securityClientAuthInit : DEFAULT_SECURITY_AUTH_INIT;
securityClientAuthInit =
securityClientAuthInit != null ? securityClientAuthInit : DEFAULT_SECURITY_AUTH_INIT;
}
}

protected static ConfigDef configurables() {
ConfigDef configDef = new ConfigDef();
configDef.define(TASK_ID, ConfigDef.Type.INT, "0", ConfigDef.Importance.MEDIUM,"");
configDef.define(LOCATORS, ConfigDef.Type.STRING, DEFAULT_LOCATOR, ConfigDef.Importance.HIGH, "");
configDef.define(SECURITY_USER, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, "");
configDef.define(SECURITY_PASSWORD, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, "");
configDef.define(SECURITY_CLIENT_AUTH_INIT, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, "");
configDef.define(TASK_ID, ConfigDef.Type.INT, "0", ConfigDef.Importance.MEDIUM,
"Internally used to identify each task");
configDef.define(LOCATORS, ConfigDef.Type.STRING, DEFAULT_LOCATOR, ConfigDef.Importance.HIGH,
"A comma separated string of locators that configure which locators to connect to");
configDef.define(SECURITY_USER, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH,
"Supply a username to be used to authenticate with Geode. Will autoset the security-client-auth-init to use a SystemPropertyAuthInit if one isn't supplied by the user");
configDef.define(SECURITY_PASSWORD, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH,
"Supply a password to be used to authenticate with Geode");
configDef.define(SECURITY_CLIENT_AUTH_INIT, ConfigDef.Type.STRING, null,
ConfigDef.Importance.HIGH,
"Point to class that implements the [AuthInitialize Interface](https://gemfire.docs.pivotal.io/99/geode/managing/security/implementing_authentication.html)");
return configDef;
}

@@ -14,10 +14,12 @@
*/
package org.geode.kafka;

import java.util.Collection;
import static org.geode.kafka.GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT;
import static org.geode.kafka.GeodeConnectorConfig.SECURITY_PASSWORD;
import static org.geode.kafka.GeodeConnectorConfig.SECURITY_USER;

import java.util.List;

import org.apache.geode.cache.query.CqResults;
import org.apache.kafka.connect.errors.ConnectException;

import org.apache.geode.cache.client.ClientCache;
@@ -26,12 +28,9 @@
import org.apache.geode.cache.query.CqException;
import org.apache.geode.cache.query.CqExistsException;
import org.apache.geode.cache.query.CqQuery;
import org.apache.geode.cache.query.CqResults;
import org.apache.geode.cache.query.RegionNotFoundException;

import static org.geode.kafka.GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT;
import static org.geode.kafka.GeodeConnectorConfig.SECURITY_PASSWORD;
import static org.geode.kafka.GeodeConnectorConfig.SECURITY_USER;

public class GeodeContext {

private ClientCache clientCache;
@@ -40,15 +39,18 @@ public class GeodeContext {
public GeodeContext() {}

public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList,
String durableClientId, String durableClientTimeout, String securityAuthInit, String securityUserName, String securityPassword, boolean usesSecurity) {
String durableClientId, String durableClientTimeout, String securityAuthInit,
String securityUserName, String securityPassword, boolean usesSecurity) {
clientCache = createClientCache(locatorHostPortList, durableClientId, durableClientTimeout,
securityAuthInit, securityUserName, securityPassword, usesSecurity);
return clientCache;
}

public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList,
String securityAuthInit, String securityUserName, String securityPassword, boolean usesSecurity) {
clientCache = createClientCache(locatorHostPortList, "", "", securityAuthInit, securityUserName, securityPassword, usesSecurity);
String securityAuthInit, String securityUserName, String securityPassword,
boolean usesSecurity) {
clientCache = createClientCache(locatorHostPortList, "", "", securityAuthInit, securityUserName,
securityPassword, usesSecurity);
return clientCache;
}

@@ -57,7 +59,8 @@ public ClientCache getClientCache() {
}

public ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName,
String durableClientTimeOut, String securityAuthInit, String securityUserName, String securityPassword, boolean usesSecurity) {
String durableClientTimeOut, String securityAuthInit, String securityUserName,
String securityPassword, boolean usesSecurity) {
ClientCacheFactory ccf = new ClientCacheFactory();

if (usesSecurity) {
@@ -93,7 +96,7 @@ public CqQuery newCq(String name, String query, CqAttributes cqAttributes, boole
}

public CqResults newCqWithInitialResults(String name, String query, CqAttributes cqAttributes,
boolean isDurable) throws ConnectException {
boolean isDurable) throws ConnectException {
try {
CqQuery cq = clientCache.getQueryService().newCq(name, query, cqAttributes, isDurable);
return cq.executeWithInitialResults();
@@ -19,7 +19,6 @@
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.security.AuthInitialize;
import org.apache.geode.security.AuthenticationFailedException;
import org.geode.kafka.GeodeConnectorConfig;


public class SystemPropertyAuthInit implements AuthInitialize {
@@ -14,17 +14,17 @@
*/
package org.geode.kafka.sink;

import static org.geode.kafka.sink.GeodeSinkConnectorConfig.SINK_CONFIG_DEF;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.geode.kafka.GeodeConnectorConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;

import static org.geode.kafka.sink.GeodeSinkConnectorConfig.SINK_CONFIG_DEF;
import org.geode.kafka.GeodeConnectorConfig;

public class GeodeKafkaSink extends SinkConnector {
private Map<String, String> sharedProps;
@@ -20,9 +20,9 @@
import java.util.Map;
import java.util.stream.Collectors;

import org.geode.kafka.GeodeContext;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.geode.kafka.GeodeContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -61,7 +61,9 @@ public void start(Map<String, String> props) {
configure(geodeConnectorConfig);
geodeContext = new GeodeContext();
geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
geodeConnectorConfig.getSecurityClientAuthInit(), geodeConnectorConfig.getSecurityUserName(), geodeConnectorConfig.getSecurityPassword(), geodeConnectorConfig.usesSecurity());
geodeConnectorConfig.getSecurityClientAuthInit(),
geodeConnectorConfig.getSecurityUserName(), geodeConnectorConfig.getSecurityPassword(),
geodeConnectorConfig.usesSecurity());
regionNameToRegion = createProxyRegions(topicToRegions.values());
} catch (Exception e) {
logger.error("Unable to start sink task", e);
@@ -14,20 +14,19 @@
*/
package org.geode.kafka.sink;

import org.apache.kafka.common.config.ConfigDef;

import java.util.List;
import java.util.Map;

import org.apache.kafka.common.config.ConfigDef;
import org.geode.kafka.GeodeConnectorConfig;

public class GeodeSinkConnectorConfig extends GeodeConnectorConfig {
public static final ConfigDef SINK_CONFIG_DEF = configurables();

// Used by sink
public static final String TOPIC_TO_REGION_BINDINGS = "topicToRegions";
public static final String TOPIC_TO_REGION_BINDINGS = "topic-to-regions";
public static final String DEFAULT_TOPIC_TO_REGION_BINDING = "[gkcTopic:gkcRegion]";
public static final String NULL_VALUES_MEAN_REMOVE = "nullValuesMeanRemove";
public static final String NULL_VALUES_MEAN_REMOVE = "null-values-mean-remove";
public static final String DEFAULT_NULL_VALUES_MEAN_REMOVE = "true";

private Map<String, List<String>> topicToRegions;
@@ -41,8 +40,12 @@ public GeodeSinkConnectorConfig(Map<String, String> connectorProperties) {

protected static ConfigDef configurables() {
ConfigDef configDef = GeodeConnectorConfig.configurables();
configDef.define(TOPIC_TO_REGION_BINDINGS, ConfigDef.Type.STRING, DEFAULT_TOPIC_TO_REGION_BINDING, ConfigDef.Importance.HIGH, "");
configDef.define(NULL_VALUES_MEAN_REMOVE, ConfigDef.Type.BOOLEAN, DEFAULT_NULL_VALUES_MEAN_REMOVE, ConfigDef.Importance.MEDIUM, "");
configDef.define(TOPIC_TO_REGION_BINDINGS, ConfigDef.Type.STRING,
DEFAULT_TOPIC_TO_REGION_BINDING, ConfigDef.Importance.HIGH,
"A comma separated list of \"one topic to many regions\" bindings. Each binding is surrounded by brackets. For example \"[topicName:regionName], [anotherTopic: regionName, anotherRegion]");
configDef.define(NULL_VALUES_MEAN_REMOVE, ConfigDef.Type.BOOLEAN,
DEFAULT_NULL_VALUES_MEAN_REMOVE, ConfigDef.Importance.MEDIUM,
"If set to true, when topics send a SinkRecord with a null value, we will convert to an operation similar to region.remove instead of putting a null value into the region");
return configDef;
}

@@ -14,19 +14,19 @@
*/
package org.geode.kafka.source;

import static org.geode.kafka.source.GeodeSourceConnectorConfig.SOURCE_CONFIG_DEF;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.geode.kafka.GeodeConnectorConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.util.ConnectorUtils;

import static org.geode.kafka.source.GeodeSourceConnectorConfig.SOURCE_CONFIG_DEF;
import org.geode.kafka.GeodeConnectorConfig;


public class GeodeKafkaSource extends SourceConnector {
@@ -43,7 +43,8 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> taskConfigs = new ArrayList<>();
List<String> bindings =
GeodeConnectorConfig
.parseStringByComma(sharedProps.get(GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS));
.parseStringByComma(
sharedProps.get(GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS));
List<List<String>> bindingsPerTask = ConnectorUtils.groupPartitions(bindings, maxTasks);

for (int i = 0; i < maxTasks; i++) {

0 comments on commit 5b5d659

Please sign in to comment.