Skip to content
Permalink
Browse files
Renaming the package name to org.apache.geode.kafka
  • Loading branch information
nabarunnag committed Feb 10, 2020
1 parent a9892d2 commit 045d05575e7884aaf9ee2a9ee7222fec41e321fb
Showing 32 changed files with 95 additions and 118 deletions.
@@ -12,7 +12,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package geode.kafka;
package org.geode.kafka;

import java.util.Arrays;
import java.util.Collection;
@@ -12,9 +12,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package geode.kafka;

import static geode.kafka.GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT;
package org.geode.kafka;

import java.util.Collection;
import java.util.List;
@@ -58,7 +56,7 @@ public ClientCache createClientCache(List<LocatorHostPort> locators, String dura
ClientCacheFactory ccf = new ClientCacheFactory();

if (securityAuthInit != null) {
ccf.set(SECURITY_CLIENT_AUTH_INIT, securityAuthInit);
ccf.set(GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT, securityAuthInit);
}
if (!durableClientName.equals("")) {
ccf.set("durable-client-id", durableClientName)
@@ -12,7 +12,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package geode.kafka;
package org.geode.kafka;

public class LocatorHostPort {

@@ -12,7 +12,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package geode.kafka.sink;
package org.geode.kafka.sink;

import java.util.ArrayList;
import java.util.Collection;
@@ -12,19 +12,14 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package geode.kafka.sink;

import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOCATOR;
import static geode.kafka.GeodeConnectorConfig.LOCATORS;
import static geode.kafka.GeodeSinkConnectorConfig.DEFAULT_NULL_VALUES_MEAN_REMOVE;
import static geode.kafka.GeodeSinkConnectorConfig.NULL_VALUES_MEAN_REMOVE;
package org.geode.kafka.sink;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import geode.kafka.GeodeConnectorConfig;
import org.geode.kafka.GeodeConnectorConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
@@ -77,8 +72,10 @@ public String version() {


private Map<String, String> computeMissingConfigurations(Map<String, String> props) {
props.computeIfAbsent(LOCATORS, (key) -> DEFAULT_LOCATOR);
props.computeIfAbsent(NULL_VALUES_MEAN_REMOVE, (key) -> DEFAULT_NULL_VALUES_MEAN_REMOVE);
props.computeIfAbsent(
GeodeConnectorConfig.LOCATORS, (key) -> GeodeConnectorConfig.DEFAULT_LOCATOR);
props.computeIfAbsent(
GeodeSinkConnectorConfig.NULL_VALUES_MEAN_REMOVE, (key) -> GeodeSinkConnectorConfig.DEFAULT_NULL_VALUES_MEAN_REMOVE);
return props;
}
}
@@ -12,16 +12,15 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package geode.kafka.sink;
package org.geode.kafka.sink;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import geode.kafka.GeodeContext;
import geode.kafka.GeodeSinkConnectorConfig;
import org.geode.kafka.GeodeContext;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
@@ -12,11 +12,13 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package geode.kafka;
package org.geode.kafka.sink;

import java.util.List;
import java.util.Map;

import org.geode.kafka.GeodeConnectorConfig;

public class GeodeSinkConnectorConfig extends GeodeConnectorConfig {
// Used by sink
public static final String TOPIC_TO_REGION_BINDINGS = "topicToRegions";
@@ -12,7 +12,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package geode.kafka.source;
package org.geode.kafka.source;

import java.util.concurrent.BlockingQueue;
import java.util.function.Supplier;
@@ -12,7 +12,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package geode.kafka.source;
package org.geode.kafka.source;

import org.apache.geode.cache.query.CqEvent;

@@ -12,31 +12,14 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package geode.kafka.source;

import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOCATOR;
import static geode.kafka.GeodeConnectorConfig.LOCATORS;
import static geode.kafka.source.GeodeSourceConnectorConfig.BATCH_SIZE;
import static geode.kafka.source.GeodeSourceConnectorConfig.CQS_TO_REGISTER;
import static geode.kafka.source.GeodeSourceConnectorConfig.CQ_PREFIX;
import static geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_BATCH_SIZE;
import static geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_CQ_PREFIX;
import static geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_DURABLE_CLIENT_ID;
import static geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT;
import static geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_LOAD_ENTIRE_REGION;
import static geode.kafka.source.GeodeSourceConnectorConfig.DEFAULT_QUEUE_SIZE;
import static geode.kafka.source.GeodeSourceConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
import static geode.kafka.source.GeodeSourceConnectorConfig.DURABLE_CLIENT_TIME_OUT;
import static geode.kafka.source.GeodeSourceConnectorConfig.LOAD_ENTIRE_REGION;
import static geode.kafka.source.GeodeSourceConnectorConfig.QUEUE_SIZE;
import static geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS;
package org.geode.kafka.source;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import geode.kafka.GeodeConnectorConfig;
import org.geode.kafka.GeodeConnectorConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
@@ -60,14 +43,15 @@ public Class<? extends Task> taskClass() {
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> taskConfigs = new ArrayList<>();
List<String> bindings =
GeodeConnectorConfig.parseStringByComma(sharedProps.get(REGION_TO_TOPIC_BINDINGS));
GeodeConnectorConfig
.parseStringByComma(sharedProps.get(GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS));
List<List<String>> bindingsPerTask = ConnectorUtils.groupPartitions(bindings, maxTasks);

for (int i = 0; i < maxTasks; i++) {
Map<String, String> taskProps = new HashMap<>();
taskProps.putAll(sharedProps);
taskProps.put(GeodeConnectorConfig.TASK_ID, "" + i);
taskProps.put(CQS_TO_REGISTER,
taskProps.put(GeodeSourceConnectorConfig.CQS_TO_REGISTER,
GeodeConnectorConfig.reconstructString(bindingsPerTask.get(i)));
taskConfigs.add(taskProps);
}
@@ -86,13 +70,20 @@ public void start(Map<String, String> props) {
}

private Map<String, String> computeMissingConfigurations(Map<String, String> props) {
props.computeIfAbsent(LOCATORS, (key) -> DEFAULT_LOCATOR);
props.computeIfAbsent(DURABLE_CLIENT_TIME_OUT, (key) -> DEFAULT_DURABLE_CLIENT_TIMEOUT);
props.computeIfAbsent(DURABLE_CLIENT_ID_PREFIX, (key) -> DEFAULT_DURABLE_CLIENT_ID);
props.computeIfAbsent(BATCH_SIZE, (key) -> DEFAULT_BATCH_SIZE);
props.computeIfAbsent(QUEUE_SIZE, (key) -> DEFAULT_QUEUE_SIZE);
props.computeIfAbsent(CQ_PREFIX, (key) -> DEFAULT_CQ_PREFIX);
props.computeIfAbsent(LOAD_ENTIRE_REGION, (key) -> DEFAULT_LOAD_ENTIRE_REGION);
props.computeIfAbsent(
GeodeConnectorConfig.LOCATORS, (key) -> GeodeConnectorConfig.DEFAULT_LOCATOR);
props.computeIfAbsent(
GeodeSourceConnectorConfig.DURABLE_CLIENT_TIME_OUT, (key) -> GeodeSourceConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT);
props.computeIfAbsent(
GeodeSourceConnectorConfig.DURABLE_CLIENT_ID_PREFIX, (key) -> GeodeSourceConnectorConfig.DEFAULT_DURABLE_CLIENT_ID);
props.computeIfAbsent(
GeodeSourceConnectorConfig.BATCH_SIZE, (key) -> GeodeSourceConnectorConfig.DEFAULT_BATCH_SIZE);
props.computeIfAbsent(
GeodeSourceConnectorConfig.QUEUE_SIZE, (key) -> GeodeSourceConnectorConfig.DEFAULT_QUEUE_SIZE);
props.computeIfAbsent(
GeodeSourceConnectorConfig.CQ_PREFIX, (key) -> GeodeSourceConnectorConfig.DEFAULT_CQ_PREFIX);
props.computeIfAbsent(
GeodeSourceConnectorConfig.LOAD_ENTIRE_REGION, (key) -> GeodeSourceConnectorConfig.DEFAULT_LOAD_ENTIRE_REGION);
return props;
}

@@ -12,7 +12,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package geode.kafka.source;
package org.geode.kafka.source;

import java.util.concurrent.TimeUnit;

@@ -12,11 +12,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package geode.kafka.source;

import static geode.kafka.source.GeodeSourceConnectorConfig.BATCH_SIZE;
import static geode.kafka.source.GeodeSourceConnectorConfig.QUEUE_SIZE;
import static geode.kafka.source.GeodeSourceConnectorConfig.REGION_PARTITION;
package org.geode.kafka.source;

import java.util.ArrayList;
import java.util.Collection;
@@ -25,7 +21,7 @@
import java.util.Map;
import java.util.stream.Collectors;

import geode.kafka.GeodeContext;
import org.geode.kafka.GeodeContext;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
@@ -75,8 +71,9 @@ public void start(Map<String, String> props) {
geodeConnectorConfig.getDurableClientId(), geodeConnectorConfig.getDurableClientTimeout(),
geodeConnectorConfig.getSecurityClientAuthInit());

batchSize = Integer.parseInt(props.get(BATCH_SIZE));
eventBufferSupplier = new SharedEventBufferSupplier(Integer.parseInt(props.get(QUEUE_SIZE)));
batchSize = Integer.parseInt(props.get(GeodeSourceConnectorConfig.BATCH_SIZE));
eventBufferSupplier = new SharedEventBufferSupplier(Integer.parseInt(props.get(
GeodeSourceConnectorConfig.QUEUE_SIZE)));

regionToTopics = geodeConnectorConfig.getRegionToTopics();
geodeConnectorConfig.getCqsToRegister();
@@ -165,9 +162,9 @@ GeodeKafkaSourceListener installListenersToRegion(GeodeContext geodeContext, int
Map<String, Map<String, String>> createSourcePartitionsMap(Collection<String> regionNames) {
return regionNames.stream().map(regionName -> {
Map<String, String> sourcePartition = new HashMap<>();
sourcePartition.put(REGION_PARTITION, regionName);
sourcePartition.put(GeodeSourceConnectorConfig.REGION_PARTITION, regionName);
return sourcePartition;
}).collect(Collectors.toMap(s -> s.get(REGION_PARTITION), s -> s));
}).collect(Collectors.toMap(s -> s.get(GeodeSourceConnectorConfig.REGION_PARTITION), s -> s));
}

String generateCqName(int taskId, String cqPrefix, String regionName) {
@@ -12,13 +12,13 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package geode.kafka.source;
package org.geode.kafka.source;

import java.util.Collection;
import java.util.List;
import java.util.Map;

import geode.kafka.GeodeConnectorConfig;
import org.geode.kafka.GeodeConnectorConfig;

public class GeodeSourceConnectorConfig extends GeodeConnectorConfig {

@@ -12,7 +12,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package geode.kafka.source;
package org.geode.kafka.source;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -12,7 +12,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package geode.kafka;
package org.geode.kafka;

import static org.hamcrest.CoreMatchers.allOf;
import static org.hamcrest.CoreMatchers.is;
@@ -12,7 +12,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package geode.kafka;
package org.geode.kafka;

public class GeodeContextTest {
}
@@ -12,7 +12,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package geode.kafka;
package org.geode.kafka;

import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
@@ -12,7 +12,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package geode.kafka;
package org.geode.kafka;

import java.io.IOException;

@@ -12,7 +12,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package geode.kafka;
package org.geode.kafka;

import java.io.File;
import java.io.IOException;
@@ -12,7 +12,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package geode.kafka;
package org.geode.kafka;

import java.io.IOException;
import java.util.Properties;
@@ -12,7 +12,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package geode.kafka;
package org.geode.kafka;

import java.io.File;
import java.io.IOException;
@@ -12,10 +12,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package geode.kafka;

import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION_FOR_SINK;
import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION_FOR_SOURCE;
package org.geode.kafka;

import java.io.IOException;
import java.util.Properties;
@@ -61,8 +58,10 @@ public static void main(String... args) throws IOException {
cacheServer.start();

// create the region
cache.createRegionFactory(RegionShortcut.PARTITION).create(TEST_REGION_FOR_SINK);
cache.createRegionFactory(RegionShortcut.PARTITION).create(TEST_REGION_FOR_SOURCE);
cache.createRegionFactory(RegionShortcut.PARTITION).create(
GeodeKafkaTestCluster.TEST_REGION_FOR_SINK);
cache.createRegionFactory(RegionShortcut.PARTITION).create(
GeodeKafkaTestCluster.TEST_REGION_FOR_SOURCE);
System.out.println("starting cacheserver");
while (true) {

@@ -12,7 +12,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package geode.kafka;
package org.geode.kafka;

import java.io.IOException;

0 comments on commit 045d055

Please sign in to comment.