Skip to content
Permalink
Browse files
Modified to use ConfigDef and default values
bumped down the version to 1.9 for now
Added system property security auth init implementation that retries username and password from system properties
  • Loading branch information
jhuynh1 committed Feb 11, 2020
1 parent 045d055 commit 5fb019c2bacf23e7408d57dcfaeb4cf6e9d8fbb7
Showing 11 changed files with 166 additions and 76 deletions.
@@ -61,8 +61,8 @@ repositories {
}

dependencies {
compile('org.apache.geode:geode-core:1.10.0')
compile('org.apache.geode:geode-cq:1.10.0')
compile('org.apache.geode:geode-core:1.9.0')
compile('org.apache.geode:geode-cq:1.9.0')
compile(group: 'org.apache.kafka', name: 'connect-api', version: '2.3.1')
compile(group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.13.0')
compile(group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.13.0')
@@ -14,14 +14,17 @@
*/
package org.geode.kafka;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class GeodeConnectorConfig {
public class GeodeConnectorConfig extends AbstractConfig {

// GeodeKafka Specific Configuration
/**
@@ -34,19 +37,50 @@ public class GeodeConnectorConfig {
public static final String LOCATORS = "locators";
public static final String DEFAULT_LOCATOR = "localhost[10334]";
public static final String SECURITY_CLIENT_AUTH_INIT = "security-client-auth-init";
private static final String DEFAULT_SECURITY_AUTH_INIT = "geode.kafka.SystemPropertyAuthInit";
public static final String SECURITY_USER = "securityUsername";
public static final String SECURITY_PASSWORD= "securityPassword";

protected final int taskId;
protected List<LocatorHostPort> locatorHostPorts;
private String securityClientAuthInit;
private String securityUser;
private String securityPassword;

//Just for testing
protected GeodeConnectorConfig() {
super(new ConfigDef(), new HashMap());
taskId = 0;
}

public GeodeConnectorConfig(Map<String, String> connectorProperties) {
taskId = Integer.parseInt(connectorProperties.get(TASK_ID));
locatorHostPorts = parseLocators(connectorProperties.get(GeodeConnectorConfig.LOCATORS));
securityClientAuthInit = connectorProperties.get(SECURITY_CLIENT_AUTH_INIT);
//Just for testing
protected GeodeConnectorConfig(Map<String, String> props) {
super(new ConfigDef(), props);
taskId = 0;
}


public GeodeConnectorConfig(ConfigDef configDef, Map<String, String> connectorProperties) {
super(configDef, connectorProperties);
taskId = getInt(TASK_ID);
locatorHostPorts = parseLocators(getString(GeodeConnectorConfig.LOCATORS));
securityUser = getString(SECURITY_USER);
securityPassword = getString(SECURITY_PASSWORD);
securityClientAuthInit = getString(SECURITY_CLIENT_AUTH_INIT);
//if we registered a username/password instead of auth init, we should use the default auth init if one isn't specified
if (usesSecurity()) {
securityClientAuthInit = securityClientAuthInit != null ? securityClientAuthInit : DEFAULT_SECURITY_AUTH_INIT;
}
}

protected static ConfigDef configurables() {
ConfigDef configDef = new ConfigDef();
configDef.define(TASK_ID, ConfigDef.Type.INT, "0", ConfigDef.Importance.MEDIUM,"");
configDef.define(LOCATORS, ConfigDef.Type.STRING, DEFAULT_LOCATOR, ConfigDef.Importance.HIGH, "");
configDef.define(SECURITY_USER, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, "");
configDef.define(SECURITY_PASSWORD, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, "");
configDef.define(SECURITY_CLIENT_AUTH_INIT, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, "");
return configDef;
}


@@ -125,4 +159,8 @@ public List<LocatorHostPort> getLocatorHostPorts() {
public String getSecurityClientAuthInit() {
return securityClientAuthInit;
}

public boolean usesSecurity() {
return securityClientAuthInit != null || securityUser != null;
}
}
@@ -27,6 +27,8 @@
import org.apache.geode.cache.query.CqQuery;
import org.apache.geode.cache.query.RegionNotFoundException;

import static org.geode.kafka.GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT;

public class GeodeContext {

private ClientCache clientCache;
@@ -35,15 +37,15 @@ public class GeodeContext {
public GeodeContext() {}

public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList,
String durableClientId, String durableClientTimeout, String securityAuthInit) {
String durableClientId, String durableClientTimeout, String securityAuthInit, boolean usesSecurity) {
clientCache = createClientCache(locatorHostPortList, durableClientId, durableClientTimeout,
securityAuthInit);
securityAuthInit, usesSecurity);
return clientCache;
}

public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList,
String securityAuthInit) {
clientCache = createClientCache(locatorHostPortList, "", "", securityAuthInit);
String securityAuthInit, boolean usesSecurity) {
clientCache = createClientCache(locatorHostPortList, "", "", securityAuthInit, usesSecurity);
return clientCache;
}

@@ -52,11 +54,11 @@ public ClientCache getClientCache() {
}

public ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName,
String durableClientTimeOut, String securityAuthInit) {
String durableClientTimeOut, String securityAuthInit, boolean usesSecurity) {
ClientCacheFactory ccf = new ClientCacheFactory();

if (securityAuthInit != null) {
ccf.set(GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT, securityAuthInit);
if (usesSecurity ) {
ccf.set(SECURITY_CLIENT_AUTH_INIT, securityAuthInit);
}
if (!durableClientName.equals("")) {
ccf.set("durable-client-id", durableClientName)
@@ -12,28 +12,23 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package geode.kafka.example;
package org.geode.kafka.security;

import java.util.Properties;

import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.security.AuthInitialize;
import org.apache.geode.security.AuthenticationFailedException;
import org.geode.kafka.GeodeConnectorConfig;

/**
* This is purely for example purposes and used in conjunction with the SimpleSecurityManager in
* Apache Geode
* DO NOT USE THIS AS A REAL WORLD SOLUTION
*/
public class ExampleAuthInit implements AuthInitialize {

public class SystemPropertyAuthInit implements AuthInitialize {
@Override
public Properties getCredentials(Properties securityProps, DistributedMember server,
boolean isPeer) throws AuthenticationFailedException {
Properties extractedProperties = new Properties();
// Do not do this in real use case. This is hardcoded and sets the user name and password for
// all users
extractedProperties.put("security-username", "Bearer");
extractedProperties.put("security-password", "Bearer");
extractedProperties.put("security-username", System.getProperty(GeodeConnectorConfig.SECURITY_USER));
extractedProperties.put("security-password", System.getProperty(GeodeConnectorConfig.SECURITY_PASSWORD));
return extractedProperties;
}
}
@@ -24,13 +24,14 @@
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;

import static org.geode.kafka.sink.GeodeSinkConnectorConfig.SINK_CONFIG_DEF;

public class GeodeKafkaSink extends SinkConnector {
private static final ConfigDef CONFIG_DEF = new ConfigDef();
private Map<String, String> sharedProps;

@Override
public void start(Map<String, String> props) {
sharedProps = computeMissingConfigurations(props);
sharedProps = props;
}

@Override
@@ -61,7 +62,7 @@ public void stop() {

@Override
public ConfigDef config() {
return CONFIG_DEF;
return SINK_CONFIG_DEF;
}

@Override
@@ -70,12 +71,4 @@ public String version() {
return "unknown";
}


private Map<String, String> computeMissingConfigurations(Map<String, String> props) {
props.computeIfAbsent(
GeodeConnectorConfig.LOCATORS, (key) -> GeodeConnectorConfig.DEFAULT_LOCATOR);
props.computeIfAbsent(
GeodeSinkConnectorConfig.NULL_VALUES_MEAN_REMOVE, (key) -> GeodeSinkConnectorConfig.DEFAULT_NULL_VALUES_MEAN_REMOVE);
return props;
}
}
@@ -61,7 +61,7 @@ public void start(Map<String, String> props) {
configure(geodeConnectorConfig);
geodeContext = new GeodeContext();
geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
geodeConnectorConfig.getSecurityClientAuthInit());
geodeConnectorConfig.getSecurityClientAuthInit(), geodeConnectorConfig.usesSecurity());
regionNameToRegion = createProxyRegions(topicToRegions.values());
} catch (Exception e) {
logger.error("Unable to start sink task", e);
@@ -14,24 +14,36 @@
*/
package org.geode.kafka.sink;

import org.apache.kafka.common.config.ConfigDef;

import java.util.List;
import java.util.Map;

import org.geode.kafka.GeodeConnectorConfig;

public class GeodeSinkConnectorConfig extends GeodeConnectorConfig {
public static final ConfigDef SINK_CONFIG_DEF = configurables();

// Used by sink
public static final String TOPIC_TO_REGION_BINDINGS = "topicToRegions";
public static final String DEFAULT_TOPIC_TO_REGION_BINDING = "[gkcTopic:gkcRegion]";
public static final String NULL_VALUES_MEAN_REMOVE = "nullValuesMeanRemove";
public static final String DEFAULT_NULL_VALUES_MEAN_REMOVE = "true";

private Map<String, List<String>> topicToRegions;
private final boolean nullValuesMeanRemove;

public GeodeSinkConnectorConfig(Map<String, String> connectorProperties) {
super(connectorProperties);
topicToRegions = parseTopicToRegions(connectorProperties.get(TOPIC_TO_REGION_BINDINGS));
nullValuesMeanRemove = Boolean.parseBoolean(connectorProperties.get(NULL_VALUES_MEAN_REMOVE));
super(SINK_CONFIG_DEF, connectorProperties);
topicToRegions = parseTopicToRegions(getString(TOPIC_TO_REGION_BINDINGS));
nullValuesMeanRemove = getBoolean(NULL_VALUES_MEAN_REMOVE);
}

protected static ConfigDef configurables() {
ConfigDef configDef = GeodeConnectorConfig.configurables();
configDef.define(TOPIC_TO_REGION_BINDINGS, ConfigDef.Type.STRING, DEFAULT_TOPIC_TO_REGION_BINDING, ConfigDef.Importance.HIGH, "");
configDef.define(NULL_VALUES_MEAN_REMOVE, ConfigDef.Type.BOOLEAN, DEFAULT_NULL_VALUES_MEAN_REMOVE, ConfigDef.Importance.MEDIUM, "");
return configDef;
}

public Map<String, List<String>> getTopicToRegions() {
@@ -26,13 +26,12 @@
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.util.ConnectorUtils;

import static org.geode.kafka.source.GeodeSourceConnectorConfig.SOURCE_CONFIG_DEF;


public class GeodeKafkaSource extends SourceConnector {

private Map<String, String> sharedProps;
// TODO maybe club this into GeodeConnnectorConfig
private static final ConfigDef CONFIG_DEF = new ConfigDef();


@Override
public Class<? extends Task> taskClass() {
@@ -61,30 +60,12 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {

@Override
public ConfigDef config() {
return CONFIG_DEF;
return SOURCE_CONFIG_DEF;
}

@Override
public void start(Map<String, String> props) {
sharedProps = computeMissingConfigurations(props);
}

private Map<String, String> computeMissingConfigurations(Map<String, String> props) {
props.computeIfAbsent(
GeodeConnectorConfig.LOCATORS, (key) -> GeodeConnectorConfig.DEFAULT_LOCATOR);
props.computeIfAbsent(
GeodeSourceConnectorConfig.DURABLE_CLIENT_TIME_OUT, (key) -> GeodeSourceConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT);
props.computeIfAbsent(
GeodeSourceConnectorConfig.DURABLE_CLIENT_ID_PREFIX, (key) -> GeodeSourceConnectorConfig.DEFAULT_DURABLE_CLIENT_ID);
props.computeIfAbsent(
GeodeSourceConnectorConfig.BATCH_SIZE, (key) -> GeodeSourceConnectorConfig.DEFAULT_BATCH_SIZE);
props.computeIfAbsent(
GeodeSourceConnectorConfig.QUEUE_SIZE, (key) -> GeodeSourceConnectorConfig.DEFAULT_QUEUE_SIZE);
props.computeIfAbsent(
GeodeSourceConnectorConfig.CQ_PREFIX, (key) -> GeodeSourceConnectorConfig.DEFAULT_CQ_PREFIX);
props.computeIfAbsent(
GeodeSourceConnectorConfig.LOAD_ENTIRE_REGION, (key) -> GeodeSourceConnectorConfig.DEFAULT_LOAD_ENTIRE_REGION);
return props;
sharedProps = props;
}

@Override
@@ -64,16 +64,14 @@ public String version() {
public void start(Map<String, String> props) {
try {
geodeConnectorConfig = new GeodeSourceConnectorConfig(props);
int taskId = geodeConnectorConfig.getTaskId();
logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
geodeContext = new GeodeContext();
geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
geodeConnectorConfig.getDurableClientId(), geodeConnectorConfig.getDurableClientTimeout(),
geodeConnectorConfig.getSecurityClientAuthInit());
geodeConnectorConfig.getSecurityClientAuthInit(), geodeConnectorConfig.usesSecurity());

batchSize = Integer.parseInt(props.get(GeodeSourceConnectorConfig.BATCH_SIZE));
eventBufferSupplier = new SharedEventBufferSupplier(Integer.parseInt(props.get(
GeodeSourceConnectorConfig.QUEUE_SIZE)));
batchSize = geodeConnectorConfig.getBatchSize();
eventBufferSupplier = new SharedEventBufferSupplier(geodeConnectorConfig.getQueueSize());

regionToTopics = geodeConnectorConfig.getRegionToTopics();
geodeConnectorConfig.getCqsToRegister();

0 comments on commit 5fb019c

Please sign in to comment.