Skip to content

Commit

Permalink
Merge pull request #138 from orpiske/fix-delegating-classpath-issues
Browse files Browse the repository at this point in the history
  • Loading branch information
valdar committed Apr 22, 2020
2 parents 44168cc + e96566b commit 0f33098
Show file tree
Hide file tree
Showing 21 changed files with 106 additions and 65 deletions.
1 change: 0 additions & 1 deletion core/pom.xml
Expand Up @@ -97,7 +97,6 @@
<artifactId>camel-debezium-common</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
6 changes: 6 additions & 0 deletions parent/pom.xml
Expand Up @@ -273,6 +273,12 @@
<artifactId>commons-io</artifactId>
<version>${version.commons-io}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>${activemq.version}</version>
<scope>test</scope>
</dependency>

</dependencies>
</dependencyManagement>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -47,10 +47,10 @@
<modules>
<module>parent</module>
<module>core</module>
<module>tests</module>
<module>buildingtools</module>
<module>tooling</module>
<module>connectors</module>
<module>tests</module>
</modules>

<developers>
Expand Down
33 changes: 18 additions & 15 deletions tests/pom.xml
Expand Up @@ -41,27 +41,17 @@
</properties>
<dependencies>
<!--
See the comment on the PluginPathHelper class for details about why
the scope is set as provided for the connectors
This one is (temporarily) needed for providing supporting classes for the
ConnectRecordValueToMapTransformer - which is needed to properly convert record formats from
ElasticSearch to Kafka
-->
<dependency>
<groupId>org.apache.camel.kafkaconnector</groupId>
<artifactId>camel-kafka-connector</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.camel.kafkaconnector</groupId>
<artifactId>camel-sjms2-kafka-connector</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.camel.kafkaconnector</groupId>
<artifactId>camel-aws-s3-kafka-connector</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-telegram</artifactId>
Expand All @@ -84,6 +74,10 @@
<groupId>org.apache.camel</groupId>
<artifactId>camel-elasticsearch-rest</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-aws-s3</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-aws-sns</artifactId>
Expand Down Expand Up @@ -247,6 +241,12 @@
<artifactId>qpid-jms-client</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand All @@ -271,9 +271,12 @@
images used when running the tests w/ those containers
- java.util.logging.manager sets the logging manager used in GlassFish to the log4j one. GlassFish
is used by the rest API of the Kafka Connect.
- com.datastax.driver.FORCE_NIO=true prevents the ClassCastException from
io.netty.channel.epoll.EpollEventLoopGroup cannot be cast to io.netty.channel.EventLoopGroup
probably caused by a netty incompatibility somewhere
-->
<argLine>-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Dproject.basedir=${project.basedir}/.. -Dcom.amazonaws.sdk.disableCbor=true -Ditest.strimzi.container.image=${itest.strimzi.container.image} -Ditest.zookeeper.container.image=${itest.zookeeper.container.image}</argLine>
<argLine>-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Dcom.datastax.driver.FORCE_NIO=true -Dproject.basedir=${project.basedir}/.. -Dcom.amazonaws.sdk.disableCbor=true -Ditest.strimzi.container.image=${itest.strimzi.container.image} -Ditest.zookeeper.container.image=${itest.zookeeper.container.image}</argLine>
<skipTests>${skipIntegrationTests}</skipTests>
</configuration>
</plugin>
Expand Down
Expand Up @@ -21,9 +21,7 @@

/**
* An interface for producing different types of connector properties that match
* an specific type of connector in test. Examples of runtime equivalent for this
* file are the CamelSinkConnector.properties and the CamelSourceConnector.properties
* files
* an specific type of connector in test.
*/
public interface ConnectorPropertyFactory {

Expand Down
Expand Up @@ -31,6 +31,14 @@
public final class PluginPathHelper {
private static final Logger LOG = LoggerFactory.getLogger(PluginPathHelper.class);

private static final String[] MODULES = {
"core", "connectors/camel-sjms2-kafka-connector", "connectors/camel-cql-kafka-connector",
"connectors/camel-aws-sns-kafka-connector", "connectors/camel-aws-sqs-kafka-connector",
"connectors/camel-aws-s3-kafka-connector", "connectors/camel-aws-kinesis-kafka-connector",
"connectors/camel-elasticsearch-rest-kafka-connector", "connectors/camel-http-kafka-connector",
"connectors/camel-timer-kafka-connector", "connectors/camel-file-kafka-connector"
};

private static class PluginWalker extends DirectoryWalker<String> {
@Override
protected void handleFile(File file, int depth, Collection<String> results) throws IOException {
Expand All @@ -40,14 +48,31 @@ protected void handleFile(File file, int depth, Collection<String> results) thro
if (fileName.contains("kafka-connector") && fileName.contains("camel")) {
String parentDir = file.getParentFile().getCanonicalPath();
if (parentDir.endsWith("target")) {
LOG.debug("Adding file: {}", file.getCanonicalPath());
String pluginDir = file.getParentFile().getCanonicalPath();
LOG.debug("Adding directory: {}", pluginDir);

results.add(file.getCanonicalPath());
results.add(pluginDir);
}
}
}
}



@Override
protected boolean handleDirectory(File directory, int depth, Collection<String> results) throws IOException {
String directoryName = directory.getName();

if (directoryName.equals("target")) {
String pluginDir = directory.getCanonicalPath();
LOG.debug("Adding directory: {}", pluginDir);

results.add(pluginDir);
}

return true;
}

public List<String> findPlugins(File startDir) {
List<String> results = new ArrayList<>();

Expand Down Expand Up @@ -79,9 +104,14 @@ private static List<String> findPlugins(String...moduleDirs) {
}

private static List<String> findPlugins() {
return findPlugins("core", "connectors/camel-sjms2-kafka-connector");
/*
* Only load the subset of modules that has a related test, otherwise the startup time for the
* Kafka Connect runtime is extremely long
*/
return findPlugins(MODULES);
}


/*
* We need to construct a list of directories containing *only* the connector classes (ie.: those that
* specialize Kafka's Connector abstract class.
Expand All @@ -97,8 +127,8 @@ private static List<String> findPlugins() {
* 2) is located in the target directory
* 3) contains the strings 'camel' and 'kafka-connector' as part of their name.
*
* This is also leverage by the fact that the core and connectors modules have the provided scope on the test
* pom file.
* Then for every connector jar file that it finds, it configures the embedded runtime to includes the parent dir
* into the configuration.
*
* Why it does this?
*
Expand All @@ -111,7 +141,7 @@ private static List<String> findPlugins() {
*/
public static String pluginPaths() {
String ret = findPlugins().stream().collect(Collectors.joining(","));
LOG.info("Returning the following directories for the plugins: {}", ret);
LOG.info("Returning the following directories for the plugin path: {}", ret);

return ret;
}
Expand Down
Expand Up @@ -47,7 +47,7 @@ public EmbeddedKafkaService() {

String pluginPaths = PluginPathHelper.pluginPaths();

LOG.info("Adding the following directories to the plugin path: {}", pluginPaths);
LOG.info("Adding the returned directories to the plugin path. This may take A VERY long time to complete");
workerProps.put(WorkerConfig.PLUGIN_PATH_CONFIG, pluginPaths);

LOG.info("Building the embedded Kafka connect instance");
Expand Down
Expand Up @@ -43,10 +43,10 @@ class CamelAWSSNSPropertyFactory implements ConnectorPropertyFactory {
@Override
public Properties getProperties() {
Properties connectorProps = new Properties();
connectorProps.put(ConnectorConfig.NAME_CONFIG, "CamelAWSSNSSinkConnector");
connectorProps.put(ConnectorConfig.NAME_CONFIG, "CamelAwssnsSinkConnector");
connectorProps.put("tasks.max", String.valueOf(tasksMax));

connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.CamelSinkConnector");
connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.awssns.CamelAwssnsSinkConnector");
connectorProps.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
connectorProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");

Expand All @@ -56,12 +56,12 @@ public Properties getProperties() {
connectorProps.put("camel.sink.url", queueUrl);
connectorProps.put("topics", topic);

connectorProps.put("camel.component.aws-sns.configuration.access-key",
connectorProps.put("camel.component.aws-sns.accessKey",
amazonConfigs.getProperty(AWSConfigs.ACCESS_KEY, ""));
connectorProps.put("camel.component.aws-sns.configuration.secret-key",
connectorProps.put("camel.component.aws-sns.secretKey",
amazonConfigs.getProperty(AWSConfigs.SECRET_KEY, ""));

connectorProps.put("camel.component.aws-sns.configuration.region",
connectorProps.put("camel.component.aws-sns.region",
amazonConfigs.getProperty(AWSConfigs.REGION, ""));

connectorProps.put("camel.component.aws-sns.configuration", "#class:"
Expand Down
Expand Up @@ -45,10 +45,10 @@ class CamelAWSSQSPropertyFactory implements ConnectorPropertyFactory {
public Properties getProperties() {
Properties connectorProps = new Properties();

connectorProps.put(ConnectorConfig.NAME_CONFIG, "CamelAWSSQSSinkConnector");
connectorProps.put(ConnectorConfig.NAME_CONFIG, "CamelAwssqsSinkConnector");
connectorProps.put("tasks.max", String.valueOf(tasksMax));

connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.CamelSinkConnector");
connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.awssqs.CamelAwssqsSinkConnector");
connectorProps.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
connectorProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");

Expand All @@ -74,12 +74,12 @@ public Properties getProperties() {
connectorProps.put("camel.sink.url", queueUrl);
connectorProps.put("topics", topic);

connectorProps.put("camel.component.aws-sqs.configuration.access-key",
connectorProps.put("camel.component.aws-sqs.accessKey",
amazonConfigs.getProperty(AWSConfigs.ACCESS_KEY, ""));
connectorProps.put("camel.component.aws-sqs.configuration.secret-key",
connectorProps.put("camel.component.aws-sqs.secretKey",
amazonConfigs.getProperty(AWSConfigs.SECRET_KEY, ""));

connectorProps.put("camel.component.aws-sqs.configuration.region", region);
connectorProps.put("camel.component.aws-sqs.region", region);

return connectorProps;
}
Expand Down
Expand Up @@ -40,10 +40,10 @@ public CamelCassandraPropertyFactory(int tasksMax, String topic, String host, St
@Override
public Properties getProperties() {
Properties connectorProps = new Properties();
connectorProps.put(ConnectorConfig.NAME_CONFIG, "CamelCassandraQLSinkConnector");
connectorProps.put(ConnectorConfig.NAME_CONFIG, "CamelCqlSinkConnector");
connectorProps.put("tasks.max", String.valueOf(tasksMax));

connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.CamelSinkConnector");
connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.cql.CamelCqlSinkConnector");
connectorProps.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
connectorProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");

Expand Down
Expand Up @@ -64,7 +64,7 @@ public Properties getProperties() {
connectorProps.put(ConnectorConfig.NAME_CONFIG, "CamelElasticSearchSinkConnector");
connectorProps.put("tasks.max", String.valueOf(tasksMax));

connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.CamelSinkConnector");
connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.elasticsearchrest.CamelElasticsearchrestSinkConnector");
connectorProps.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
connectorProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");

Expand Down
Expand Up @@ -44,7 +44,7 @@ public R apply(R r) {

targetMap.put(key, value);
return r.newRecord(r.topic(), r.kafkaPartition(), null, r.key(),
SchemaHelper.buildSchemaBuilderForType(value), targetMap, r.timestamp());
SchemaHelper.buildSchemaBuilderForType(value), targetMap, r.timestamp());
}

@Override
Expand Down
Expand Up @@ -39,7 +39,7 @@ public Properties getProperties() {
connectorProps.put(ConnectorConfig.NAME_CONFIG, "CamelFileSinkConnector");
connectorProps.put("tasks.max", String.valueOf(tasksMax));

connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.CamelSinkConnector");
connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.file.CamelFileSinkConnector");
connectorProps.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
connectorProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");

Expand Down
Expand Up @@ -39,7 +39,7 @@ public Properties getProperties() {
connectorProps.put(ConnectorConfig.NAME_CONFIG, "CamelHttpSinkConnector");
connectorProps.put("tasks.max", String.valueOf(tasksMax));

connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.CamelSinkConnector");
connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.http.CamelHttpSinkConnector");
connectorProps.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
connectorProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");

Expand Down
Expand Up @@ -23,7 +23,6 @@
import java.util.stream.Collectors;

import org.apache.camel.kafkaconnector.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.sjms2.CamelSjms2SinkConnectorConfig;
import org.apache.kafka.connect.runtime.ConnectorConfig;


Expand Down Expand Up @@ -53,7 +52,7 @@ public Properties getProperties() {
connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.sjms2.CamelSjms2SinkConnector");
connectorProps.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
connectorProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
connectorProps.put(CamelSjms2SinkConnectorConfig.CAMEL_SINK_SJMS2_PATH_DESTINATION_NAME_CONF, queue);
connectorProps.put("camel.sink.path.destinationName", queue);
connectorProps.put("topics", topic);

Set<Map.Entry<Object, Object>> set = connectionProperties.entrySet();
Expand Down
Expand Up @@ -25,7 +25,7 @@


/**
* Creates the set of properties used by a Camel JMS Sink Connector
* Creates the set of properties used by a Camel Kinesis Source Connector
*/
class CamelAWSKinesisPropertyFactory implements ConnectorPropertyFactory {
private final int tasksMax;
Expand All @@ -44,10 +44,10 @@ class CamelAWSKinesisPropertyFactory implements ConnectorPropertyFactory {
@Override
public Properties getProperties() {
Properties connectorProps = new Properties();
connectorProps.put(ConnectorConfig.NAME_CONFIG, "CamelAWSKinesisSourceConnector");
connectorProps.put(ConnectorConfig.NAME_CONFIG, "CamelAwskinesisSourceConnector");
connectorProps.put("tasks.max", String.valueOf(tasksMax));

connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.CamelSourceConnector");
connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.awskinesis.CamelAwskinesisSourceConnector");
connectorProps.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
connectorProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");

Expand All @@ -56,12 +56,11 @@ public Properties getProperties() {
String sourceUrl = "aws-kinesis://" + streamName;
connectorProps.put("camel.source.url", sourceUrl);


connectorProps.put("camel.component.aws-kinesis.configuration.access-key",
connectorProps.put("camel.component.aws-kinesis.accessKey",
amazonConfigs.getProperty(AWSConfigs.ACCESS_KEY, ""));
connectorProps.put("camel.component.aws-kinesis.configuration.secret-key",
connectorProps.put("camel.component.aws-kinesis.secretKey",
amazonConfigs.getProperty(AWSConfigs.SECRET_KEY, ""));
connectorProps.put("camel.component.aws-kinesis.configuration.region",
connectorProps.put("camel.component.aws-kinesis.region",
amazonConfigs.getProperty(AWSConfigs.REGION, ""));

connectorProps.put("camel.component.aws-kinesis.configuration", "#class:"
Expand Down

0 comments on commit 0f33098

Please sign in to comment.