Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split the tests in modules per service #262

Merged
merged 2 commits into from Jun 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/modules/ROOT/pages/try-it-out-locally.adoc
Expand Up @@ -338,7 +338,7 @@ This is the sample Transformer used in the integration test code that transforms

[source,bash]
----
transforms.ElasticSearchTransformer.type=org.apache.camel.kafkaconnector.sink.elasticsearch.transforms.ConnectRecordValueToMapTransformer
transforms.ElasticSearchTransformer.type=org.apache.camel.kafkaconnector.elasticsearch.sink.transforms.ConnectRecordValueToMapTransformer
----

This is a configuration for the sample transformer that defines the key used in the map:
Expand Down
2 changes: 1 addition & 1 deletion examples/CamelElasticSearchSinkConnector.properties
Expand Up @@ -29,7 +29,7 @@ value.converter=org.apache.kafka.connect.storage.StringConverter

# This is the sample Transformer used in the integration test code that transforms
# Kafka's ConnectRecord to a Map.
# transforms.ElasticSearchTransformer.type=org.apache.camel.kafkaconnector.sink.elasticsearch.transforms.ConnectRecordValueToMapTransformer
# transforms.ElasticSearchTransformer.type=org.apache.camel.kafkaconnector.elasticsearch.sink.transforms.ConnectRecordValueToMapTransformer

# This is a configuration for the sample transformer that defines the
# key used in the map.
Expand Down
7 changes: 7 additions & 0 deletions parent/pom.xml
Expand Up @@ -51,6 +51,7 @@
<version.maven.checkstyle.plugin>3.1.0</version.maven.checkstyle.plugin>
<version.maven.surefire.plugin>3.0.0-M4</version.maven.surefire.plugin>


<mycila-license-version>3.0</mycila-license-version>
<gmavenplus-plugin-version>1.6.2</gmavenplus-plugin-version>
<groovy-version>2.5.8</groovy-version>
Expand Down Expand Up @@ -336,6 +337,12 @@
<version>${version.maven.resources}</version>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>${version.maven.jar}</version>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
Expand Down
68 changes: 68 additions & 0 deletions tests/itests-aws/pom.xml
@@ -0,0 +1,68 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.camel.kafkaconnector</groupId>
<artifactId>itests-parent</artifactId>
<version>0.3.0-SNAPSHOT</version>
<relativePath>../itests-parent/pom.xml</relativePath>
</parent>

<modelVersion>4.0.0</modelVersion>

<artifactId>itests-aws</artifactId>
<name>Camel-Kafka-Connector :: Tests :: AWS</name>

<dependencies>
<dependency>
<groupId>org.apache.camel.kafkaconnector</groupId>
<artifactId>itests-common</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-aws-sqs</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-aws-s3</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-aws-sns</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-aws-kinesis</artifactId>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>localstack</artifactId>
<scope>test</scope>
</dependency>
</dependencies>


</project>
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.camel.kafkaconnector.services.aws;
package org.apache.camel.kafkaconnector.aws.clients;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
Expand All @@ -29,8 +29,8 @@
import com.amazonaws.services.sns.AmazonSNSClientBuilder;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import org.apache.camel.kafkaconnector.clients.aws.AWSConfigs;
import org.apache.camel.kafkaconnector.clients.aws.sqs.TestAWSCredentialsProvider;
import org.apache.camel.kafkaconnector.aws.common.AWSConfigs;
import org.apache.camel.kafkaconnector.aws.common.TestAWSCredentialsProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.camel.kafkaconnector.clients.aws.sqs;
package org.apache.camel.kafkaconnector.aws.clients;

import java.util.HashMap;
import java.util.List;
Expand Down
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.camel.kafkaconnector.aws.common;

public final class AWSCommon {
/**
* The default SQS queue name used during the tests
*/
public static final String DEFAULT_SQS_QUEUE = "ckc";

/**
* The default SQS queue name used during the tests
*/
public static final String DEFAULT_SQS_QUEUE_FOR_SNS = "ckcsns";

/**
* The default SNS queue name used during the tests
*/
public static final String DEFAULT_SNS_QUEUE = "ckc-sns";

/**
* The default S3 bucket name used during the tests
*/
public static final String DEFAULT_S3_BUCKET = "ckc-s3";

/**
* The default Kinesis stream name used during the tests
*/
public static final String DEFAULT_KINESIS_STREAM = "ckc-kin-stream";

private AWSCommon() {

}
}
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.kafkaconnector.clients.aws;
package org.apache.camel.kafkaconnector.aws.common;


public final class AWSConfigs {
Expand Down
Expand Up @@ -14,11 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.kafkaconnector.clients.aws.sqs;
package org.apache.camel.kafkaconnector.aws.common;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import org.apache.camel.kafkaconnector.clients.aws.AWSConfigs;

public class TestAWSCredentialsProvider implements AWSCredentialsProvider {
private static class TestAWSCredentials implements AWSCredentials {
Expand Down
Expand Up @@ -15,16 +15,17 @@
* limitations under the License.
*/

package org.apache.camel.kafkaconnector.source.aws.kinesis;
package org.apache.camel.kafkaconnector.aws.kinesis.source;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import com.amazonaws.regions.Regions;
import org.apache.camel.kafkaconnector.EndpointUrlBuilder;
import org.apache.camel.kafkaconnector.SourceConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.clients.aws.AWSConfigs;
import org.apache.camel.kafkaconnector.aws.common.AWSConfigs;
import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory;


/**
* Creates the set of properties used by a Camel Kinesis Source Connector
Expand Down
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.camel.kafkaconnector.source.aws.kinesis;
package org.apache.camel.kafkaconnector.aws.kinesis.source;

import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand All @@ -30,12 +30,13 @@
import com.amazonaws.services.kinesis.model.PutRecordsRequest;
import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
import com.amazonaws.services.kinesis.model.PutRecordsResult;
import org.apache.camel.kafkaconnector.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.TestCommon;
import org.apache.camel.kafkaconnector.clients.kafka.KafkaClient;
import org.apache.camel.kafkaconnector.services.aws.AWSService;
import org.apache.camel.kafkaconnector.services.aws.AWSServiceFactory;
import org.apache.camel.kafkaconnector.aws.common.AWSCommon;
import org.apache.camel.kafkaconnector.aws.services.AWSService;
import org.apache.camel.kafkaconnector.aws.services.AWSServiceFactory;
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -61,13 +62,18 @@ public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest {
private volatile int received;
private final int expect = 10;

@Override
protected String[] getConnectorsInTest() {
return new String[] {"camel-aws-kinesis-kafka-connector"};
}


@BeforeEach
public void setUp() {
awsKinesisClient = service.getClient();
received = 0;

CreateStreamResult result = awsKinesisClient.createStream(TestCommon.DEFAULT_KINESIS_STREAM, 1);
CreateStreamResult result = awsKinesisClient.createStream(AWSCommon.DEFAULT_KINESIS_STREAM, 1);
if (result.getSdkHttpMetadata().getHttpStatusCode() != 200) {
fail("Failed to create the stream");
} else {
Expand All @@ -77,7 +83,7 @@ public void setUp() {

@AfterEach
public void tearDown() {
DeleteStreamResult result = awsKinesisClient.deleteStream(TestCommon.DEFAULT_KINESIS_STREAM);
DeleteStreamResult result = awsKinesisClient.deleteStream(AWSCommon.DEFAULT_KINESIS_STREAM);

if (result.getSdkHttpMetadata().getHttpStatusCode() != 200) {
fail("Failed to delete the stream");
Expand Down Expand Up @@ -117,7 +123,7 @@ public void runtTest(ConnectorPropertyFactory connectorPropertyFactory) throws E

LOG.debug("Creating the consumer ...");
KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
kafkaClient.consume(TestCommon.getDefaultTestTopic(this.getClass()), this::checkRecord);
kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
LOG.debug("Created the consumer ...");

assertEquals(received, expect, "Didn't process the expected amount of messages");
Expand All @@ -128,10 +134,10 @@ public void runtTest(ConnectorPropertyFactory connectorPropertyFactory) throws E
public void testBasicSendReceive() throws ExecutionException, InterruptedException {
ConnectorPropertyFactory connectorPropertyFactory = CamelAWSKinesisPropertyFactory
.basic()
.withKafkaTopic(TestCommon.getDefaultTestTopic(this.getClass()))
.withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
.withAmazonConfig(service.getConnectionProperties())
.withConfiguration(TestKinesisConfiguration.class.getName())
.withStreamName(TestCommon.DEFAULT_KINESIS_STREAM);
.withStreamName(AWSCommon.DEFAULT_KINESIS_STREAM);

runtTest(connectorPropertyFactory);
}
Expand All @@ -141,10 +147,10 @@ public void testBasicSendReceive() throws ExecutionException, InterruptedExcepti
public void testBasicSendReceiveWithKafkaStyle() throws ExecutionException, InterruptedException {
ConnectorPropertyFactory connectorPropertyFactory = CamelAWSKinesisPropertyFactory
.basic()
.withKafkaTopic(TestCommon.getDefaultTestTopic(this.getClass()))
.withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
.withAmazonConfig(service.getConnectionProperties(), CamelAWSKinesisPropertyFactory.KAFKA_STYLE)
.withConfiguration(TestKinesisConfiguration.class.getName())
.withStreamName(TestCommon.DEFAULT_KINESIS_STREAM);
.withStreamName(AWSCommon.DEFAULT_KINESIS_STREAM);

runtTest(connectorPropertyFactory);
}
Expand All @@ -154,18 +160,18 @@ public void testBasicSendReceiveWithKafkaStyle() throws ExecutionException, Inte
public void testBasicSendReceiveUsingUrl() throws ExecutionException, InterruptedException {
ConnectorPropertyFactory connectorPropertyFactory = CamelAWSKinesisPropertyFactory
.basic()
.withKafkaTopic(TestCommon.getDefaultTestTopic(this.getClass()))
.withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
.withAmazonConfig(service.getConnectionProperties())
.withConfiguration(TestKinesisConfiguration.class.getName())
.withUrl(TestCommon.DEFAULT_KINESIS_STREAM)
.withUrl(AWSCommon.DEFAULT_KINESIS_STREAM)
.buildUrl();

runtTest(connectorPropertyFactory);
}

private void putRecords() {
PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
putRecordsRequest.setStreamName(TestCommon.DEFAULT_KINESIS_STREAM);
putRecordsRequest.setStreamName(AWSCommon.DEFAULT_KINESIS_STREAM);

List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>();

Expand Down
Expand Up @@ -15,11 +15,11 @@
* limitations under the License.
*/

package org.apache.camel.kafkaconnector.source.aws.kinesis;
package org.apache.camel.kafkaconnector.aws.kinesis.source;

import com.amazonaws.services.kinesis.AmazonKinesis;
import org.apache.camel.component.aws.kinesis.KinesisConfiguration;
import org.apache.camel.kafkaconnector.services.aws.AWSClientUtils;
import org.apache.camel.kafkaconnector.aws.clients.AWSClientUtils;

public class TestKinesisConfiguration extends KinesisConfiguration {
private AmazonKinesis amazonKinesis;
Expand Down
Expand Up @@ -15,16 +15,16 @@
* limitations under the License.
*/

package org.apache.camel.kafkaconnector.source.aws.s3;
package org.apache.camel.kafkaconnector.aws.s3.source;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import com.amazonaws.regions.Regions;
import org.apache.camel.kafkaconnector.EndpointUrlBuilder;
import org.apache.camel.kafkaconnector.SourceConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.clients.aws.AWSConfigs;
import org.apache.camel.kafkaconnector.aws.common.AWSConfigs;
import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory;


/**
Expand Down