Skip to content
Permalink
Browse files
Modified initial results to consume the correct type
Modified security
  • Loading branch information
jhuynh1 committed Feb 11, 2020
1 parent d75871a commit b3e4f059f25ccb18d69d30a34dc04b91abd5c14c
Showing 7 changed files with 66 additions and 30 deletions.
@@ -38,13 +38,13 @@ public class GeodeConnectorConfig extends AbstractConfig {
public static final String DEFAULT_LOCATOR = "localhost[10334]";
public static final String SECURITY_CLIENT_AUTH_INIT = "security-client-auth-init";
private static final String DEFAULT_SECURITY_AUTH_INIT = "org.geode.kafka.security.SystemPropertyAuthInit";
public static final String SECURITY_USER = "securityUsername";
public static final String SECURITY_PASSWORD= "securityPassword";
public static final String SECURITY_USER = "security-username";
public static final String SECURITY_PASSWORD= "security-password";

protected final int taskId;
protected List<LocatorHostPort> locatorHostPorts;
private String securityClientAuthInit;
private String securityUser;
private String securityUserName;
private String securityPassword;

//Just for testing
@@ -64,7 +64,7 @@ public GeodeConnectorConfig(ConfigDef configDef, Map<String, String> connectorPr
super(configDef, connectorProperties);
taskId = getInt(TASK_ID);
locatorHostPorts = parseLocators(getString(GeodeConnectorConfig.LOCATORS));
securityUser = getString(SECURITY_USER);
securityUserName = getString(SECURITY_USER);
securityPassword = getString(SECURITY_PASSWORD);
securityClientAuthInit = getString(SECURITY_CLIENT_AUTH_INIT);
//if we registered a username/password instead of auth init, we should use the default auth init if one isn't specified
@@ -160,7 +160,15 @@ public String getSecurityClientAuthInit() {
return securityClientAuthInit;
}

public String getSecurityUserName() {
return securityUserName;
}

public String getSecurityPassword() {
return securityPassword;
}

public boolean usesSecurity() {
return securityClientAuthInit != null || securityUser != null;
return securityClientAuthInit != null || securityUserName != null;
}
}
@@ -17,6 +17,7 @@
import java.util.Collection;
import java.util.List;

import org.apache.geode.cache.query.CqResults;
import org.apache.kafka.connect.errors.ConnectException;

import org.apache.geode.cache.client.ClientCache;
@@ -28,6 +29,8 @@
import org.apache.geode.cache.query.RegionNotFoundException;

import static org.geode.kafka.GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT;
import static org.geode.kafka.GeodeConnectorConfig.SECURITY_PASSWORD;
import static org.geode.kafka.GeodeConnectorConfig.SECURITY_USER;

public class GeodeContext {

@@ -37,15 +40,15 @@ public class GeodeContext {
public GeodeContext() {}

public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList,
String durableClientId, String durableClientTimeout, String securityAuthInit, boolean usesSecurity) {
String durableClientId, String durableClientTimeout, String securityAuthInit, String securityUserName, String securityPassword, boolean usesSecurity) {
clientCache = createClientCache(locatorHostPortList, durableClientId, durableClientTimeout,
securityAuthInit, usesSecurity);
securityAuthInit, securityUserName, securityPassword, usesSecurity);
return clientCache;
}

public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList,
String securityAuthInit, boolean usesSecurity) {
clientCache = createClientCache(locatorHostPortList, "", "", securityAuthInit, usesSecurity);
String securityAuthInit, String securityUserName, String securityPassword, boolean usesSecurity) {
clientCache = createClientCache(locatorHostPortList, "", "", securityAuthInit, securityUserName, securityPassword, usesSecurity);
return clientCache;
}

@@ -54,10 +57,14 @@ public ClientCache getClientCache() {
}

public ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName,
String durableClientTimeOut, String securityAuthInit, boolean usesSecurity) {
String durableClientTimeOut, String securityAuthInit, String securityUserName, String securityPassword, boolean usesSecurity) {
ClientCacheFactory ccf = new ClientCacheFactory();

if (usesSecurity ) {
if (usesSecurity) {
if (securityUserName != null && securityPassword != null) {
ccf.set(SECURITY_USER, securityUserName);
ccf.set(SECURITY_PASSWORD, securityPassword);
}
ccf.set(SECURITY_CLIENT_AUTH_INIT, securityAuthInit);
}
if (!durableClientName.equals("")) {
@@ -85,8 +92,8 @@ public CqQuery newCq(String name, String query, CqAttributes cqAttributes, boole
}
}

public Collection newCqWithInitialResults(String name, String query, CqAttributes cqAttributes,
boolean isDurable) throws ConnectException {
public CqResults newCqWithInitialResults(String name, String query, CqAttributes cqAttributes,
boolean isDurable) throws ConnectException {
try {
CqQuery cq = clientCache.getQueryService().newCq(name, query, cqAttributes, isDurable);
return cq.executeWithInitialResults();
@@ -27,8 +27,8 @@ public class SystemPropertyAuthInit implements AuthInitialize {
public Properties getCredentials(Properties securityProps, DistributedMember server,
boolean isPeer) throws AuthenticationFailedException {
Properties extractedProperties = new Properties();
extractedProperties.put("security-username", System.getProperty(GeodeConnectorConfig.SECURITY_USER));
extractedProperties.put("security-password", System.getProperty(GeodeConnectorConfig.SECURITY_PASSWORD));
extractedProperties.put("security-username", securityProps.get("security-username"));
extractedProperties.put("security-password", securityProps.get("security-password"));
return extractedProperties;
}
}
@@ -61,7 +61,7 @@ public void start(Map<String, String> props) {
configure(geodeConnectorConfig);
geodeContext = new GeodeContext();
geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
geodeConnectorConfig.getSecurityClientAuthInit(), geodeConnectorConfig.usesSecurity());
geodeConnectorConfig.getSecurityClientAuthInit(), geodeConnectorConfig.getSecurityUserName(), geodeConnectorConfig.getSecurityPassword(), geodeConnectorConfig.usesSecurity());
regionNameToRegion = createProxyRegions(topicToRegions.values());
} catch (Exception e) {
logger.error("Unable to start sink task", e);
@@ -22,18 +22,29 @@
public class GeodeEvent {

private String regionName;
private CqEvent event;
private Object key;
private Object value;

public GeodeEvent(String regionName, CqEvent event) {
this(regionName, event.getKey(), event.getNewValue());
}

public GeodeEvent(String regionName, Object key, Object value) {
this.regionName = regionName;
this.event = event;
this.key = key;
this.value = value;
}

public String getRegionName() {
return regionName;
}

public CqEvent getEvent() {
return event;
public Object getKey() {
return key;
}

public Object getValue() {
return value;
}

}
@@ -21,6 +21,8 @@
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.geode.cache.query.CqResults;
import org.apache.geode.cache.query.Struct;
import org.geode.kafka.GeodeContext;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
@@ -68,7 +70,7 @@ public void start(Map<String, String> props) {
geodeContext = new GeodeContext();
geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
geodeConnectorConfig.getDurableClientId(), geodeConnectorConfig.getDurableClientTimeout(),
geodeConnectorConfig.getSecurityClientAuthInit(), geodeConnectorConfig.usesSecurity());
geodeConnectorConfig.getSecurityClientAuthInit(), geodeConnectorConfig.getSecurityUserName(), geodeConnectorConfig.getSecurityPassword(), geodeConnectorConfig.usesSecurity());

batchSize = geodeConnectorConfig.getBatchSize();
eventBufferSupplier = new SharedEventBufferSupplier(geodeConnectorConfig.getQueueSize());
@@ -98,7 +100,7 @@ public List<SourceRecord> poll() throws InterruptedException {
List<String> topics = regionToTopics.get(regionName);
for (String topic : topics) {
records.add(new SourceRecord(sourcePartitions.get(regionName), OFFSET_DEFAULT, topic,
null, event.getEvent().getKey(), null, event.getEvent().getNewValue()));
null, event.getKey(), null, event.getValue()));
}
}
return records;
@@ -134,12 +136,11 @@ GeodeKafkaSourceListener installListenersToRegion(GeodeContext geodeContext, int
CqAttributes cqAttributes = cqAttributesFactory.create();
try {
if (loadEntireRegion) {
Collection<CqEvent> events =
CqResults events =
geodeContext.newCqWithInitialResults(generateCqName(taskId, cqPrefix, regionName),
"select * from /" + regionName, cqAttributes,
isDurable);
eventBuffer.get().addAll(
events.stream().map(e -> new GeodeEvent(regionName, e)).collect(Collectors.toList()));
eventBuffer.get().addAll((Collection<GeodeEvent>)events.stream().map(e -> new GeodeEvent(regionName, ((Struct)e).get("key"), ((Struct)e).get("value"))).collect(Collectors.toList()));
} else {
geodeContext.newCq(generateCqName(taskId, cqPrefix, regionName),
"select * from /" + regionName, cqAttributes,
@@ -36,6 +36,15 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.geode.cache.query.CqAttributes;
import org.apache.geode.cache.query.CqResults;
import org.apache.geode.cache.query.SelectResults;
import org.apache.geode.cache.query.Struct;
import org.apache.geode.cache.query.internal.LinkedStructSet;
import org.apache.geode.cache.query.internal.ResultsBag;
import org.apache.geode.cache.query.internal.ResultsBag;
import org.apache.geode.cache.query.internal.StructImpl;
import org.apache.geode.cache.query.internal.types.StructTypeImpl;
import org.geode.kafka.GeodeContext;
import org.junit.Test;

@@ -52,9 +61,9 @@ public void whenLoadingEntireRegionAbleToPutInitialResultsIntoEventBuffer() {
BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
boolean loadEntireRegion = true;
boolean isDurable = false;
List<CqEvent> fakeInitialResults = new LinkedList<>();
CqResults fakeInitialResults = new ResultsBag();
for (int i = 0; i < 10; i++) {
fakeInitialResults.add(mock(CqEvent.class));
fakeInitialResults.add(mock(Struct.class));
}

when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean()))
@@ -71,7 +80,7 @@ public void whenNotLoadingEntireRegionShouldNotPutInitialResultsIntoEventBuffer(
BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
boolean loadEntireRegion = false;
boolean isDurable = false;
List<CqEvent> fakeInitialResults = new LinkedList<>();
CqResults fakeInitialResults = new ResultsBag();
for (int i = 0; i < 10; i++) {
fakeInitialResults.add(mock(CqEvent.class));
}
@@ -92,7 +101,7 @@ public void cqListenerOnEventPopulatesEventsBuffer() {
boolean isDurable = false;

when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean()))
.thenReturn(new ArrayList());
.thenReturn(mock(CqResults.class));
GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
GeodeKafkaSourceListener listener =
task.installListenersToRegion(geodeContext, 1, createEventBufferSupplier(eventBuffer),
@@ -140,7 +149,7 @@ public void cqWithInitialResultsIsInvokedForEveryRegionWithATopicIfLoadEntireIsS

GeodeContext geodeContext = mock(GeodeContext.class);
when(geodeContext.getClientCache()).thenReturn(clientCache);

when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(CqAttributes.class), anyBoolean())).thenReturn(new ResultsBag());
Map<String, List<String>> regionToTopicsMap = new HashMap<>();
regionToTopicsMap.put("region1", new ArrayList());

0 comments on commit b3e4f05

Please sign in to comment.