Skip to content
Permalink
Browse files
Spotless enabled
	* check is as a part of the validate phase.
	* license headers are inserted by spotless if not included in java files.
	* All the setting files are same as used in the Geode project.
  • Loading branch information
nabarunnag committed Mar 2, 2020
1 parent 7de6066 commit 8509d6d34d0a9e85aad827d2bdb46f781e173519
Showing 14 changed files with 140 additions and 71 deletions.
@@ -0,0 +1,14 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
29 pom.xml
@@ -49,6 +49,7 @@
<awaitility.version>3.1.6</awaitility.version>
<maven-plugin.version>3.8.1</maven-plugin.version>
<zookeeper.version>3.5.7</zookeeper.version>
<spotless.version>1.27.0</spotless.version>
<confluent.maven.repo>http://packages.confluent.io/maven/</confluent.maven.repo>
</properties>

@@ -164,6 +165,34 @@

<build>
<plugins>
<plugin>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
<version>${spotless.version}</version>
<executions>
<execution>
<phase>validate</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
<configuration>
<java>
<licenseHeader>
<file>etc/license-header</file>
</licenseHeader>
<eclipse>
<file>etc/eclipse-java-google-style.xml</file>
<version>4.7.1</version>
</eclipse>
<removeUnusedImports/>
<importOrder>
<file>etc/eclipseOrganizeImports.importorder</file>
</importOrder>
</java>
</configuration>
</plugin>
<plugin>
<groupId>io.confluent</groupId>
<version>0.10.0</version>
@@ -101,8 +101,7 @@ protected static ConfigDef configurables() {
GEODE_GROUP,
2,
ConfigDef.Width.LONG,
LOCATORS_DISPLAY_NAME
);
LOCATORS_DISPLAY_NAME);
configDef.define(
SECURITY_USER,
ConfigDef.Type.STRING,
@@ -112,8 +111,7 @@ protected static ConfigDef configurables() {
GEODE_GROUP,
3,
ConfigDef.Width.MEDIUM,
SECURITY_USER_DISPLAY_NAME
);
SECURITY_USER_DISPLAY_NAME);
configDef.define(
SECURITY_PASSWORD,
ConfigDef.Type.PASSWORD,
@@ -133,8 +131,7 @@ protected static ConfigDef configurables() {
GEODE_GROUP,
5,
ConfigDef.Width.LONG,
SECURITY_CLIENT_AUTH_INIT_DISPLAY_NAME
);
SECURITY_CLIENT_AUTH_INIT_DISPLAY_NAME);
return configDef;
}

@@ -39,16 +39,16 @@ public class GeodeContext {
public GeodeContext() {}

public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList,
String durableClientId, String durableClientTimeout, String securityAuthInit,
String securityUserName, String securityPassword, boolean usesSecurity) {
String durableClientId, String durableClientTimeout, String securityAuthInit,
String securityUserName, String securityPassword, boolean usesSecurity) {
clientCache = createClientCache(locatorHostPortList, durableClientId, durableClientTimeout,
securityAuthInit, securityUserName, securityPassword, usesSecurity);
return clientCache;
}

public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList,
String securityAuthInit, String securityUserName, String securityPassword,
boolean usesSecurity) {
String securityAuthInit, String securityUserName, String securityPassword,
boolean usesSecurity) {
clientCache = createClientCache(locatorHostPortList, "", "", securityAuthInit, securityUserName,
securityPassword, usesSecurity);
return clientCache;
@@ -59,8 +59,8 @@ public ClientCache getClientCache() {
}

public ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName,
String durableClientTimeOut, String securityAuthInit, String securityUserName,
String securityPassword, boolean usesSecurity) {
String durableClientTimeOut, String securityAuthInit, String securityUserName,
String securityPassword, boolean usesSecurity) {
try {
ClientCacheFactory ccf = new ClientCacheFactory();

@@ -74,7 +74,7 @@ public ClientCache createClientCache(List<LocatorHostPort> locators, String dura
}
if (!durableClientName.equals("")) {
ccf.set("durable-client-id", durableClientName)
.set("durable-client-timeout", durableClientTimeOut);
.set("durable-client-timeout", durableClientTimeOut);
}
// currently we only allow using the default pool.
// If we ever want to allow adding multiple pools we'll have to configure pool factories
@@ -85,8 +85,8 @@ public ClientCache createClientCache(List<LocatorHostPort> locators, String dura
}
return ccf.create();
} catch (Exception e) {
throw new ConnectException(
"Unable to create an client cache connected to Apache Geode cluster");
throw new ConnectException(
"Unable to create an client cache connected to Apache Geode cluster");
}
}

@@ -102,8 +102,8 @@ public CqQuery newCq(String name, String query, CqAttributes cqAttributes, boole
}

public <E> CqResults<E> newCqWithInitialResults(String name, String query,
CqAttributes cqAttributes,
boolean isDurable) throws ConnectException {
CqAttributes cqAttributes,
boolean isDurable) throws ConnectException {
try {
CqQuery cq = clientCache.getQueryService().newCq(name, query, cqAttributes, isDurable);
return cq.executeWithInitialResults();
@@ -24,7 +24,7 @@
public class SystemPropertyAuthInit implements AuthInitialize {
@Override
public Properties getCredentials(Properties securityProps, DistributedMember server,
boolean isPeer) throws AuthenticationFailedException {
boolean isPeer) throws AuthenticationFailedException {
Properties extractedProperties = new Properties();
extractedProperties.put("security-username", securityProps.get("security-username"));
extractedProperties.put("security-password", securityProps.get("security-password"));
@@ -20,15 +20,13 @@
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionExistsException;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.kafka.GeodeContext;

@@ -62,10 +60,10 @@ public void start(Map<String, String> props) {
configure(geodeConnectorConfig);
geodeContext = new GeodeContext();
geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
geodeConnectorConfig.getSecurityClientAuthInit(),
geodeConnectorConfig.getSecurityUserName(),
geodeConnectorConfig.getSecurityPassword(),
geodeConnectorConfig.usesSecurity());
geodeConnectorConfig.getSecurityClientAuthInit(),
geodeConnectorConfig.getSecurityUserName(),
geodeConnectorConfig.getSecurityPassword(),
geodeConnectorConfig.usesSecurity());
regionNameToRegion = createProxyRegions(topicToRegions.values());
} catch (Exception e) {
logger.error("Unable to start sink task", e);
@@ -99,15 +97,15 @@ void put(Collection<SinkRecord> records, Map<String, BatchRecords> batchRecordsM
}

private void updateBatchForRegionByTopic(SinkRecord sinkRecord,
Map<String, BatchRecords> batchRecordsMap) {
Map<String, BatchRecords> batchRecordsMap) {
Collection<String> regionsToUpdate = topicToRegions.get(sinkRecord.topic());
for (String region : regionsToUpdate) {
updateBatchRecordsForRecord(sinkRecord, batchRecordsMap, region);
}
}

private void updateBatchRecordsForRecord(SinkRecord record,
Map<String, BatchRecords> batchRecordsMap, String region) {
Map<String, BatchRecords> batchRecordsMap, String region) {
BatchRecords batchRecords = batchRecordsMap.get(region);
if (batchRecords == null) {
batchRecords = new BatchRecords();
@@ -15,9 +15,9 @@
package org.apache.geode.kafka.source;

import static org.apache.geode.kafka.source.GeodeSourceConnectorConfig.SOURCE_CONFIG_DEF;
import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.TASK_ID;
import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.CQS_TO_REGISTER;
import static org.apache.geode.kafka.utils.GeodeSourceConfigurationConstants.REGION_TO_TOPIC_BINDINGS;
import static org.apache.geode.kafka.utils.GeodeConfigurationConstants.TASK_ID;

import java.util.ArrayList;
import java.util.HashMap;
@@ -71,12 +71,12 @@ public void start(Map<String, String> props) {
logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting");
geodeContext = new GeodeContext();
geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
geodeConnectorConfig.getDurableClientId(),
geodeConnectorConfig.getDurableClientTimeout(),
geodeConnectorConfig.getSecurityClientAuthInit(),
geodeConnectorConfig.getSecurityUserName(),
geodeConnectorConfig.getSecurityPassword(),
geodeConnectorConfig.usesSecurity());
geodeConnectorConfig.getDurableClientId(),
geodeConnectorConfig.getDurableClientTimeout(),
geodeConnectorConfig.getSecurityClientAuthInit(),
geodeConnectorConfig.getSecurityUserName(),
geodeConnectorConfig.getSecurityPassword(),
geodeConnectorConfig.usesSecurity());

batchSize = geodeConnectorConfig.getBatchSize();
eventBufferSupplier = new SharedEventBufferSupplier(geodeConnectorConfig.getQueueSize());
@@ -120,7 +120,7 @@ public void stop() {
}

void installOnGeode(GeodeSourceConnectorConfig geodeConnectorConfig, GeodeContext geodeContext,
EventBufferSupplier eventBuffer, String cqPrefix, boolean loadEntireRegion) {
EventBufferSupplier eventBuffer, String cqPrefix, boolean loadEntireRegion) {
boolean isDurable = geodeConnectorConfig.isDurable();
int taskId = geodeConnectorConfig.getTaskId();
for (String region : geodeConnectorConfig.getCqsToRegister()) {
@@ -133,8 +133,8 @@ void installOnGeode(GeodeSourceConnectorConfig geodeConnectorConfig, GeodeContex
}

GeodeKafkaSourceListener installListenersToRegion(GeodeContext geodeContext, int taskId,
EventBufferSupplier eventBuffer, String regionName, String cqPrefix, boolean loadEntireRegion,
boolean isDurable) {
EventBufferSupplier eventBuffer, String regionName, String cqPrefix, boolean loadEntireRegion,
boolean isDurable) {
CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
GeodeKafkaSourceListener listener = new GeodeKafkaSourceListener(eventBuffer, regionName);
cqAttributesFactory.addCqListener(listener);
@@ -141,8 +141,7 @@ protected static ConfigDef configurables() {
SOURCE_GROUP,
5,
ConfigDef.Width.MEDIUM,
CQ_PREFIX_DISPLAY_NAME
);
CQ_PREFIX_DISPLAY_NAME);

configDef.define(
BATCH_SIZE,
@@ -153,8 +152,7 @@ protected static ConfigDef configurables() {
SOURCE_GROUP,
6,
ConfigDef.Width.MEDIUM,
BATCH_SIZE_DISPLAY_NAME
);
BATCH_SIZE_DISPLAY_NAME);

configDef.define(
QUEUE_SIZE,
@@ -1,3 +1,17 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.kafka.utils;

import org.apache.kafka.common.config.types.Password;
@@ -6,9 +20,9 @@ public class GeodeConfigurationConstants {
/**
* GEODE SPECIFIC CONFIGURATION
*/
//Identifier for each task
// Identifier for each task
public static final String TASK_ID = "GEODE_TASK_ID"; // One config per task
//Specifies which Locators to connect to Apache Geode
// Specifies which Locators to connect to Apache Geode
public static final String LOCATORS = "locators";
public static final String DEFAULT_LOCATOR = "localhost[10334]";
public static final String SECURITY_CLIENT_AUTH_INIT = "security-client-auth-init";
@@ -17,15 +31,13 @@ public class GeodeConfigurationConstants {
public static final String SECURITY_USER = "security-username";
public static final String SECURITY_PASSWORD = "security-password";
public static final String TASK_ID_DOCUMENTATION = "Internally used to identify each task";
public static final String
LOCATORS_DOCUMENTATION =
public static final String LOCATORS_DOCUMENTATION =
"A comma separated string of locators that configure which locators to connect to";
public static final String
SECURITY_USER_DOCUMENTATION =
public static final String SECURITY_USER_DOCUMENTATION =
"Supply a username to be used to authenticate with Geode. Will automatically set the security-client-auth-init to use a SystemPropertyAuthInit if one isn't supplied by the user";
public static final String SECURITY_PASSWORD_DOCUMENTATION = "Supply a password to be used to authenticate with Geode";
public static final String
SECURITY_CLIENT_AUTH_INIT_DOCUMENTATION =
public static final String SECURITY_PASSWORD_DOCUMENTATION =
"Supply a password to be used to authenticate with Geode";
public static final String SECURITY_CLIENT_AUTH_INIT_DOCUMENTATION =
"Point to the Java class that implements the [AuthInitialize Interface](https://geode.apache.org/docs/guide/19/managing/security/implementing_authentication.html)";
public static final String GEODE_GROUP = "Geode-Configurations";
public static final String SECURITY_PASSWORD_DISPLAY_NAME = "Apache Geode Password";
@@ -1,3 +1,17 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.kafka.utils;

public class GeodeSinkConfigurationConstants {
@@ -8,11 +22,9 @@ public class GeodeSinkConfigurationConstants {
public static final String DEFAULT_TOPIC_TO_REGION_BINDING = "[gkcTopic:gkcRegion]";
public static final String NULL_VALUES_MEAN_REMOVE = "null-values-mean-remove";
public static final String DEFAULT_NULL_VALUES_MEAN_REMOVE = "true";
public static final String
NULL_VALUES_MEAN_REMOVE_DOCUMENTATION =
public static final String NULL_VALUES_MEAN_REMOVE_DOCUMENTATION =
"If set to true, when topics send a SinkRecord with a null value, we will convert to an operation similar to region.remove instead of putting a null value into the region";
public static final String
TOPIC_TO_REGION_BINDINGS_DOCUMENTATION =
public static final String TOPIC_TO_REGION_BINDINGS_DOCUMENTATION =
"A comma separated list of \"one topic to many regions\" bindings. Each binding is surrounded by brackets. For example \"[topicName:regionName], [anotherTopic: regionName, anotherRegion]";
public static final String SINK_GROUP = "Sink-Configurations";
public final static String TOPIC_TO_REGION_BINDINGS_DISPLAY_NAME = "Topic to region mapping";

0 comments on commit 8509d6d

Please sign in to comment.