Skip to content

Commit

Permalink
Merge pull request #272 from valdar/issue/260
Browse files Browse the repository at this point in the history
  • Loading branch information
valdar committed Jun 15, 2020
2 parents 5f87a5c + a1885c3 commit 12705c1
Show file tree
Hide file tree
Showing 29 changed files with 340 additions and 310 deletions.
Expand Up @@ -28,9 +28,8 @@ public class CamelAws2s3SinkConnector extends CamelSinkConnector {
public ConfigDef config() {
return CamelAws2s3SinkConnectorConfig.conf();
}

@Override
public Class<? extends Task> taskClass() {
return CamelAws2s3SinkTask.class;
}
}
}

Large diffs are not rendered by default.

Expand Up @@ -26,16 +26,14 @@
public class CamelAws2s3SinkTask extends CamelSinkTask {

@Override
protected CamelSinkConnectorConfig getCamelSinkConnectorConfig(Map<String, String> props) {
protected CamelSinkConnectorConfig getCamelSinkConnectorConfig(
Map<String, String> props) {
return new CamelAws2s3SinkConnectorConfig(props);
}

@Override
protected Map<String, String> getDefaultConfig() {
return new HashMap<String, String>() {
{
put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "aws2-s3");
}
};
return new HashMap<String, String>() {{
put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "aws2-s3");
}};
}
}
}
Expand Up @@ -28,9 +28,8 @@ public class CamelAws2s3SourceConnector extends CamelSourceConnector {
public ConfigDef config() {
return CamelAws2s3SourceConnectorConfig.conf();
}

@Override
public Class<? extends Task> taskClass() {
return CamelAws2s3SourceTask.class;
}
}
}

Large diffs are not rendered by default.

Expand Up @@ -26,16 +26,14 @@
public class CamelAws2s3SourceTask extends CamelSourceTask {

@Override
protected CamelSourceConnectorConfig getCamelSourceConnectorConfig(Map<String, String> props) {
protected CamelSourceConnectorConfig getCamelSourceConnectorConfig(
Map<String, String> props) {
return new CamelAws2s3SourceConnectorConfig(props);
}

@Override
protected Map<String, String> getDefaultConfig() {
return new HashMap<String, String>() {
{
put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "aws2-s3");
}
};
return new HashMap<String, String>() {{
put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "aws2-s3");
}};
}
}
}
Expand Up @@ -17,6 +17,15 @@

package org.apache.camel.kafkaconnector.aws.common;

import java.util.Iterator;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ListVersionsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.S3VersionSummary;
import com.amazonaws.services.s3.model.VersionListing;

public final class AWSCommon {
/**
* The default SQS queue name used during the tests
Expand Down Expand Up @@ -46,4 +55,54 @@ public final class AWSCommon {
private AWSCommon() {

}

/**
* Delete an S3 bucket using the provided client. Coming from AWS documentation:
* https://docs.aws.amazon.com/AmazonS3/latest/dev/delete-or-empty-bucket.html#delete-bucket-sdk-java
* @param s3Client the AmazonS3 client instance used to delete the bucket
* @param bucketName a String containing the bucket name
*/
public static void deleteBucket(AmazonS3 s3Client, String bucketName) {
// Delete all objects from the bucket. This is sufficient
// for non versioned buckets. For versioned buckets, when you attempt to delete objects, Amazon S3 inserts
// delete markers for all objects, but doesn't delete the object versions.
// To delete objects from versioned buckets, delete all of the object versions before deleting
// the bucket (see below for an example).
ObjectListing objectListing = s3Client.listObjects(bucketName);
while (true) {
Iterator<S3ObjectSummary> objIter = objectListing.getObjectSummaries().iterator();
while (objIter.hasNext()) {
s3Client.deleteObject(bucketName, objIter.next().getKey());
}

// If the bucket contains many objects, the listObjects() call
// might not return all of the objects in the first listing. Check to
// see whether the listing was truncated. If so, retrieve the next page of objects
// and delete them.
if (objectListing.isTruncated()) {
objectListing = s3Client.listNextBatchOfObjects(objectListing);
} else {
break;
}
}

// Delete all object versions (required for versioned buckets).
VersionListing versionList = s3Client.listVersions(new ListVersionsRequest().withBucketName(bucketName));
while (true) {
Iterator<S3VersionSummary> versionIter = versionList.getVersionSummaries().iterator();
while (versionIter.hasNext()) {
S3VersionSummary vs = versionIter.next();
s3Client.deleteVersion(bucketName, vs.getKey(), vs.getVersionId());
}

if (versionList.isTruncated()) {
versionList = s3Client.listNextBatchOfVersions(versionList);
} else {
break;
}
}

// After all objects and object versions are deleted, delete the bucket.
s3Client.deleteBucket(bucketName);
}
}
Expand Up @@ -101,6 +101,8 @@ public void tearDown() {
}

awsKinesisClient.shutdown();

deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));
}

private boolean checkRecord(ConsumerRecord<String, String> record) {
Expand Down
Expand Up @@ -34,7 +34,6 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
Expand All @@ -47,9 +46,9 @@

@Testcontainers
public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {

@RegisterExtension
public static AWSService<AmazonS3> service = AWSServiceFactory.createS3Service();

private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSS3ITCase.class);

private AmazonS3 awsS3Client;
Expand All @@ -75,14 +74,15 @@ public void setUp() {
}
}


@AfterEach
public void tearDown() {
try {
awsS3Client.deleteBucket(AWSCommon.DEFAULT_S3_BUCKET);
AWSCommon.deleteBucket(awsS3Client, AWSCommon.DEFAULT_S3_BUCKET);
} catch (Exception e) {
LOG.warn("Unable to delete bucked: {}", e.getMessage(), e);
}

deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));
}

private boolean checkRecord(ConsumerRecord<String, String> record) {
Expand Down Expand Up @@ -158,8 +158,6 @@ public void testBasicSendReceiveWithKafkaStyle() throws ExecutionException, Inte
runTest(connectorPropertyFactory);
}


@Disabled("Disabled due to issue #260")
@Test
@Timeout(180)
public void testBasicSendReceiveUsingUrl() throws ExecutionException, InterruptedException {
Expand All @@ -168,13 +166,15 @@ public void testBasicSendReceiveUsingUrl() throws ExecutionException, Interrupte
ConnectorPropertyFactory connectorPropertyFactory = CamelAWSS3PropertyFactory
.basic()
.withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
.withConfiguration(TestS3Configuration.class.getName())
.withUrl(AWSCommon.DEFAULT_S3_BUCKET)
.append("configuration", CamelAWSS3PropertyFactory.classRef(TestS3Configuration.class.getName()))
.append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY))
.append("secretKey", amazonProperties.getProperty(AWSConfigs.SECRET_KEY))
.append("proxyProtocol", amazonProperties.getProperty(AWSConfigs.PROTOCOL))
.append("region", amazonProperties.getProperty(AWSConfigs.REGION, Regions.US_EAST_1.name()))
.buildUrl();
.buildUrl();

runTest(connectorPropertyFactory);
}

}
Expand Up @@ -36,6 +36,7 @@
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
Expand Down Expand Up @@ -70,6 +71,11 @@ public void setUp() {
received = 0;
}

@AfterEach
public void tearDown() {
deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));
}

private boolean checkMessages(List<Message> messages) {
for (Message message : messages) {
LOG.info("Received: {}", message.getBody());
Expand Down
Expand Up @@ -77,6 +77,7 @@ public void setUp() {

@AfterEach
public void tearDown() {
deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));
if (!awssqsClient.deleteQueue(AWSCommon.DEFAULT_SQS_QUEUE)) {
fail("Failed to delete queue");
}
Expand Down
Expand Up @@ -69,6 +69,8 @@ public void setUp() {

@AfterEach
public void tearDown() {
deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));

if (!awssqsClient.deleteQueue(AWSCommon.DEFAULT_SQS_QUEUE)) {
fail("Failed to delete queue");
}
Expand Down Expand Up @@ -142,12 +144,12 @@ public void testBasicSendReceiveUsingUrl() throws ExecutionException, Interrupte
.basic()
.withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
.withUrl(AWSCommon.DEFAULT_SQS_QUEUE)
.append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY))
.append("secretKey", amazonProperties.getProperty(AWSConfigs.SECRET_KEY))
.append("protocol", amazonProperties.getProperty(AWSConfigs.PROTOCOL))
.appendIfAvailable("amazonAWSHost", amazonProperties.getProperty(AWSConfigs.AMAZON_AWS_HOST))
.append("region", amazonProperties.getProperty(AWSConfigs.REGION, Regions.US_EAST_1.name()))
.buildUrl();
.append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY))
.append("secretKey", amazonProperties.getProperty(AWSConfigs.SECRET_KEY))
.append("protocol", amazonProperties.getProperty(AWSConfigs.PROTOCOL))
.appendIfAvailable("amazonAWSHost", amazonProperties.getProperty(AWSConfigs.AMAZON_AWS_HOST))
.append("region", amazonProperties.getProperty(AWSConfigs.REGION, Regions.US_EAST_1.name()))
.buildUrl();

runTest(connectorPropertyFactory);
}
Expand Down
Expand Up @@ -78,6 +78,8 @@ public void tearDown() {
if (testDataDao != null) {
testDataDao.dropTable();
}

deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));
}

private void putRecords(CountDownLatch latch) {
Expand Down
Expand Up @@ -17,16 +17,20 @@

package org.apache.camel.kafkaconnector.common;

import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
import org.apache.camel.kafkaconnector.common.services.kafka.KafkaService;
import org.apache.camel.kafkaconnector.common.services.kafka.KafkaServiceFactory;
import org.apache.camel.kafkaconnector.common.services.kafkaconnect.KafkaConnectRunnerFactory;
import org.apache.camel.kafkaconnector.common.services.kafkaconnect.KafkaConnectService;
import org.apache.camel.kafkaconnector.common.utils.PropertyUtils;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.junit.jupiter.Testcontainers;

@Testcontainers
public abstract class AbstractKafkaTest {
private static final Logger LOG = LoggerFactory.getLogger(AbstractKafkaTest.class);

@RegisterExtension
public final KafkaService kafkaService;
Expand Down Expand Up @@ -54,8 +58,16 @@ public KafkaService getKafkaService() {
return kafkaService;
}


public KafkaConnectService getKafkaConnectService() {
return kafkaConnectService;
}

protected void deleteKafkaTopic(String topic) {
try {
KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
kafkaClient.deleteTopic(topic);
} catch (Throwable t) {
LOG.warn("Topic not deleted (probably the Kafka test cluster was already shutting down?).", t);
}
}
}
Expand Up @@ -19,10 +19,13 @@

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Predicate;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand All @@ -37,6 +40,8 @@
* @param <V> Value type
*/
public class KafkaClient<K, V> {
private final ConsumerPropertyFactory consumerPropertyFactory;
private final ProducerPropertyFactory producerPropertyFactory;
private KafkaProducer<K, V> producer;
private KafkaConsumer<K, V> consumer;

Expand All @@ -48,8 +53,8 @@ public class KafkaClient<K, V> {
* PLAINTEXT://${address}:${port}
*/
public KafkaClient(String bootstrapServer) {
ConsumerPropertyFactory consumerPropertyFactory = new DefaultConsumerPropertyFactory(bootstrapServer);
ProducerPropertyFactory producerPropertyFactory = new DefaultProducerPropertyFactory(bootstrapServer);
consumerPropertyFactory = new DefaultConsumerPropertyFactory(bootstrapServer);
producerPropertyFactory = new DefaultProducerPropertyFactory(bootstrapServer);

producer = new KafkaProducer<>(producerPropertyFactory.getProperties());
consumer = new KafkaConsumer<>(consumerPropertyFactory.getProperties());
Expand Down Expand Up @@ -93,5 +98,14 @@ public void produce(String topic, V message) throws ExecutionException, Interrup
future.get();
}


/**
* Delete a topic
*
* @param topic the topic to be deleted
*/
public void deleteTopic(String topic) {
Properties props = producerPropertyFactory.getProperties();
AdminClient admClient = AdminClient.create(props);
admClient.deleteTopics(Collections.singleton(topic));
}
}

0 comments on commit 12705c1

Please sign in to comment.