Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test code fixes for the auto-generated connectors #138

Merged
merged 3 commits into from Apr 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 0 additions & 1 deletion core/pom.xml
Expand Up @@ -97,7 +97,6 @@
<artifactId>camel-debezium-common</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
6 changes: 6 additions & 0 deletions parent/pom.xml
Expand Up @@ -273,6 +273,12 @@
<artifactId>commons-io</artifactId>
<version>${version.commons-io}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>${activemq.version}</version>
<scope>test</scope>
</dependency>

</dependencies>
</dependencyManagement>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -47,10 +47,10 @@
<modules>
<module>parent</module>
<module>core</module>
<module>tests</module>
<module>buildingtools</module>
<module>tooling</module>
<module>connectors</module>
<module>tests</module>
</modules>

<developers>
Expand Down
33 changes: 18 additions & 15 deletions tests/pom.xml
Expand Up @@ -41,27 +41,17 @@
</properties>
<dependencies>
<!--
See the comment on the PluginPathHelper class for details about why
the scope is set as provided for the connectors
This one is (temporarily) needed for providing supporting classes for the
ConnectRecordValueToMapTransformer - which is needed to properly convert record formats from
ElasticSearch to Kafka
-->
<dependency>
<groupId>org.apache.camel.kafkaconnector</groupId>
<artifactId>camel-kafka-connector</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.camel.kafkaconnector</groupId>
<artifactId>camel-sjms2-kafka-connector</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.camel.kafkaconnector</groupId>
<artifactId>camel-aws-s3-kafka-connector</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-telegram</artifactId>
Expand All @@ -84,6 +74,10 @@
<groupId>org.apache.camel</groupId>
<artifactId>camel-elasticsearch-rest</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-aws-s3</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-aws-sns</artifactId>
Expand Down Expand Up @@ -247,6 +241,12 @@
<artifactId>qpid-jms-client</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand All @@ -271,9 +271,12 @@
images used when running the tests w/ those containers
- java.util.logging.manager sets the logging manager used in GlassFish to the log4j one. GlassFish
is used by the rest API of the Kafka Connect.
- com.datastax.driver.FORCE_NIO=true prevents the ClassCastException from
io.netty.channel.epoll.EpollEventLoopGroup cannot be cast to io.netty.channel.EventLoopGroup
probably caused by a netty incompatibility somewhere

-->
<argLine>-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Dproject.basedir=${project.basedir}/.. -Dcom.amazonaws.sdk.disableCbor=true -Ditest.strimzi.container.image=${itest.strimzi.container.image} -Ditest.zookeeper.container.image=${itest.zookeeper.container.image}</argLine>
<argLine>-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Dcom.datastax.driver.FORCE_NIO=true -Dproject.basedir=${project.basedir}/.. -Dcom.amazonaws.sdk.disableCbor=true -Ditest.strimzi.container.image=${itest.strimzi.container.image} -Ditest.zookeeper.container.image=${itest.zookeeper.container.image}</argLine>
<skipTests>${skipIntegrationTests}</skipTests>
</configuration>
</plugin>
Expand Down
Expand Up @@ -21,9 +21,7 @@

/**
* An interface for producing different types of connector properties that match
* an specific type of connector in test. Examples of runtime equivalent for this
* file are the CamelSinkConnector.properties and the CamelSourceConnector.properties
* files
* an specific type of connector in test.
*/
public interface ConnectorPropertyFactory {

Expand Down
Expand Up @@ -31,6 +31,14 @@
public final class PluginPathHelper {
private static final Logger LOG = LoggerFactory.getLogger(PluginPathHelper.class);

private static final String[] MODULES = {
"core", "connectors/camel-sjms2-kafka-connector", "connectors/camel-cql-kafka-connector",
"connectors/camel-aws-sns-kafka-connector", "connectors/camel-aws-sqs-kafka-connector",
"connectors/camel-aws-s3-kafka-connector", "connectors/camel-aws-kinesis-kafka-connector",
"connectors/camel-elasticsearch-rest-kafka-connector", "connectors/camel-http-kafka-connector",
"connectors/camel-timer-kafka-connector", "connectors/camel-file-kafka-connector"
};

private static class PluginWalker extends DirectoryWalker<String> {
@Override
protected void handleFile(File file, int depth, Collection<String> results) throws IOException {
Expand All @@ -40,14 +48,31 @@ protected void handleFile(File file, int depth, Collection<String> results) thro
if (fileName.contains("kafka-connector") && fileName.contains("camel")) {
String parentDir = file.getParentFile().getCanonicalPath();
if (parentDir.endsWith("target")) {
LOG.debug("Adding file: {}", file.getCanonicalPath());
String pluginDir = file.getParentFile().getCanonicalPath();
LOG.debug("Adding directory: {}", pluginDir);

results.add(file.getCanonicalPath());
results.add(pluginDir);
}
}
}
}



@Override
protected boolean handleDirectory(File directory, int depth, Collection<String> results) throws IOException {
String directoryName = directory.getName();

if (directoryName.equals("target")) {
String pluginDir = directory.getCanonicalPath();
LOG.debug("Adding directory: {}", pluginDir);

results.add(pluginDir);
}

return true;
}

public List<String> findPlugins(File startDir) {
List<String> results = new ArrayList<>();

Expand Down Expand Up @@ -79,9 +104,14 @@ private static List<String> findPlugins(String...moduleDirs) {
}

private static List<String> findPlugins() {
return findPlugins("core", "connectors/camel-sjms2-kafka-connector");
/*
* Only load the subset of modules that has a related test, otherwise the startup time for the
* Kafka Connect runtime is extremely long
*/
return findPlugins(MODULES);
}


/*
* We need to construct a list of directories containing *only* the connector classes (ie.: those that
* specialize Kafka's Connector abstract class.
Expand All @@ -97,8 +127,8 @@ private static List<String> findPlugins() {
* 2) is located in the target directory
* 3) contains the strings 'camel' and 'kafka-connector' as part of their name.
*
* This is also leverage by the fact that the core and connectors modules have the provided scope on the test
* pom file.
* Then for every connector jar file that it finds, it configures the embedded runtime to includes the parent dir
* into the configuration.
*
* Why it does this?
*
Expand All @@ -111,7 +141,7 @@ private static List<String> findPlugins() {
*/
public static String pluginPaths() {
String ret = findPlugins().stream().collect(Collectors.joining(","));
LOG.info("Returning the following directories for the plugins: {}", ret);
LOG.info("Returning the following directories for the plugin path: {}", ret);

return ret;
}
Expand Down
Expand Up @@ -47,7 +47,7 @@ public EmbeddedKafkaService() {

String pluginPaths = PluginPathHelper.pluginPaths();

LOG.info("Adding the following directories to the plugin path: {}", pluginPaths);
LOG.info("Adding the returned directories to the plugin path. This may take A VERY long time to complete");
workerProps.put(WorkerConfig.PLUGIN_PATH_CONFIG, pluginPaths);

LOG.info("Building the embedded Kafka connect instance");
Expand Down
Expand Up @@ -43,10 +43,10 @@ class CamelAWSSNSPropertyFactory implements ConnectorPropertyFactory {
@Override
public Properties getProperties() {
Properties connectorProps = new Properties();
connectorProps.put(ConnectorConfig.NAME_CONFIG, "CamelAWSSNSSinkConnector");
connectorProps.put(ConnectorConfig.NAME_CONFIG, "CamelAwssnsSinkConnector");
connectorProps.put("tasks.max", String.valueOf(tasksMax));

connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.CamelSinkConnector");
connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.awssns.CamelAwssnsSinkConnector");
connectorProps.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
connectorProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");

Expand All @@ -56,12 +56,12 @@ public Properties getProperties() {
connectorProps.put("camel.sink.url", queueUrl);
connectorProps.put("topics", topic);

connectorProps.put("camel.component.aws-sns.configuration.access-key",
connectorProps.put("camel.component.aws-sns.accessKey",
amazonConfigs.getProperty(AWSConfigs.ACCESS_KEY, ""));
connectorProps.put("camel.component.aws-sns.configuration.secret-key",
connectorProps.put("camel.component.aws-sns.secretKey",
amazonConfigs.getProperty(AWSConfigs.SECRET_KEY, ""));

connectorProps.put("camel.component.aws-sns.configuration.region",
connectorProps.put("camel.component.aws-sns.region",
amazonConfigs.getProperty(AWSConfigs.REGION, ""));

connectorProps.put("camel.component.aws-sns.configuration", "#class:"
Expand Down
Expand Up @@ -45,10 +45,10 @@ class CamelAWSSQSPropertyFactory implements ConnectorPropertyFactory {
public Properties getProperties() {
Properties connectorProps = new Properties();

connectorProps.put(ConnectorConfig.NAME_CONFIG, "CamelAWSSQSSinkConnector");
connectorProps.put(ConnectorConfig.NAME_CONFIG, "CamelAwssqsSinkConnector");
connectorProps.put("tasks.max", String.valueOf(tasksMax));

connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.CamelSinkConnector");
connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.awssqs.CamelAwssqsSinkConnector");
connectorProps.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
connectorProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");

Expand All @@ -74,12 +74,12 @@ public Properties getProperties() {
connectorProps.put("camel.sink.url", queueUrl);
connectorProps.put("topics", topic);

connectorProps.put("camel.component.aws-sqs.configuration.access-key",
connectorProps.put("camel.component.aws-sqs.accessKey",
amazonConfigs.getProperty(AWSConfigs.ACCESS_KEY, ""));
connectorProps.put("camel.component.aws-sqs.configuration.secret-key",
connectorProps.put("camel.component.aws-sqs.secretKey",
amazonConfigs.getProperty(AWSConfigs.SECRET_KEY, ""));

connectorProps.put("camel.component.aws-sqs.configuration.region", region);
connectorProps.put("camel.component.aws-sqs.region", region);

return connectorProps;
}
Expand Down
Expand Up @@ -40,10 +40,10 @@ public CamelCassandraPropertyFactory(int tasksMax, String topic, String host, St
@Override
public Properties getProperties() {
Properties connectorProps = new Properties();
connectorProps.put(ConnectorConfig.NAME_CONFIG, "CamelCassandraQLSinkConnector");
connectorProps.put(ConnectorConfig.NAME_CONFIG, "CamelCqlSinkConnector");
connectorProps.put("tasks.max", String.valueOf(tasksMax));

connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.CamelSinkConnector");
connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.cql.CamelCqlSinkConnector");
connectorProps.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
connectorProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");

Expand Down
Expand Up @@ -64,7 +64,7 @@ public Properties getProperties() {
connectorProps.put(ConnectorConfig.NAME_CONFIG, "CamelElasticSearchSinkConnector");
connectorProps.put("tasks.max", String.valueOf(tasksMax));

connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.CamelSinkConnector");
connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.elasticsearchrest.CamelElasticsearchrestSinkConnector");
connectorProps.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
connectorProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");

Expand Down
Expand Up @@ -44,7 +44,7 @@ public R apply(R r) {

targetMap.put(key, value);
return r.newRecord(r.topic(), r.kafkaPartition(), null, r.key(),
SchemaHelper.buildSchemaBuilderForType(value), targetMap, r.timestamp());
SchemaHelper.buildSchemaBuilderForType(value), targetMap, r.timestamp());
}

@Override
Expand Down
Expand Up @@ -39,7 +39,7 @@ public Properties getProperties() {
connectorProps.put(ConnectorConfig.NAME_CONFIG, "CamelFileSinkConnector");
connectorProps.put("tasks.max", String.valueOf(tasksMax));

connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.CamelSinkConnector");
connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.file.CamelFileSinkConnector");
connectorProps.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
connectorProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");

Expand Down
Expand Up @@ -39,7 +39,7 @@ public Properties getProperties() {
connectorProps.put(ConnectorConfig.NAME_CONFIG, "CamelHttpSinkConnector");
connectorProps.put("tasks.max", String.valueOf(tasksMax));

connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.CamelSinkConnector");
connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.http.CamelHttpSinkConnector");
connectorProps.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
connectorProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");

Expand Down
Expand Up @@ -23,7 +23,6 @@
import java.util.stream.Collectors;

import org.apache.camel.kafkaconnector.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.sjms2.CamelSjms2SinkConnectorConfig;
import org.apache.kafka.connect.runtime.ConnectorConfig;


Expand Down Expand Up @@ -53,7 +52,7 @@ public Properties getProperties() {
connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.sjms2.CamelSjms2SinkConnector");
connectorProps.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
connectorProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
connectorProps.put(CamelSjms2SinkConnectorConfig.CAMEL_SINK_SJMS2_PATH_DESTINATION_NAME_CONF, queue);
connectorProps.put("camel.sink.path.destinationName", queue);
connectorProps.put("topics", topic);

Set<Map.Entry<Object, Object>> set = connectionProperties.entrySet();
Expand Down
Expand Up @@ -25,7 +25,7 @@


/**
* Creates the set of properties used by a Camel JMS Sink Connector
* Creates the set of properties used by a Camel Kinesis Source Connector
*/
class CamelAWSKinesisPropertyFactory implements ConnectorPropertyFactory {
private final int tasksMax;
Expand All @@ -44,10 +44,10 @@ class CamelAWSKinesisPropertyFactory implements ConnectorPropertyFactory {
@Override
public Properties getProperties() {
Properties connectorProps = new Properties();
connectorProps.put(ConnectorConfig.NAME_CONFIG, "CamelAWSKinesisSourceConnector");
connectorProps.put(ConnectorConfig.NAME_CONFIG, "CamelAwskinesisSourceConnector");
connectorProps.put("tasks.max", String.valueOf(tasksMax));

connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.CamelSourceConnector");
connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.awskinesis.CamelAwskinesisSourceConnector");
connectorProps.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
connectorProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");

Expand All @@ -56,12 +56,11 @@ public Properties getProperties() {
String sourceUrl = "aws-kinesis://" + streamName;
connectorProps.put("camel.source.url", sourceUrl);


connectorProps.put("camel.component.aws-kinesis.configuration.access-key",
connectorProps.put("camel.component.aws-kinesis.accessKey",
amazonConfigs.getProperty(AWSConfigs.ACCESS_KEY, ""));
connectorProps.put("camel.component.aws-kinesis.configuration.secret-key",
connectorProps.put("camel.component.aws-kinesis.secretKey",
amazonConfigs.getProperty(AWSConfigs.SECRET_KEY, ""));
connectorProps.put("camel.component.aws-kinesis.configuration.region",
connectorProps.put("camel.component.aws-kinesis.region",
amazonConfigs.getProperty(AWSConfigs.REGION, ""));

connectorProps.put("camel.component.aws-kinesis.configuration", "#class:"
Expand Down