Skip to content

Latest commit

 

History

History
496 lines (400 loc) · 17.5 KB

File metadata and controls

496 lines (400 loc) · 17.5 KB

Camel-Kafka-connector AWS2 S3 Sink

This is an example for Camel-Kafka-connector AWS2-S3 Sink

Standalone

What is needed

  • An AWS S3 bucket

Running Kafka

$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic

Setting up the needed bits and running the example

You’ll need to setup the plugin.path property in your kafka

Open the $KAFKA_HOME/config/connect-standalone.properties

and set the plugin.path property to your choosen location

You’ll need to build your connector starting from an archetype:

> mvn archetype:generate  -DarchetypeGroupId=org.apache.camel.kafkaconnector.archetypes  -DarchetypeArtifactId=camel-kafka-connector-extensible-archetype  -DarchetypeVersion=0.11.5
[INFO] Using property: camel-kafka-connector-version = 0.11.5
Confirm properties configuration:
groupId: org.apache.camel.ckc
artifactId: aws2s3
version: 1.0-SNAPSHOT
package: org.apache.camel.ckc
camel-kafka-connector-version: 0.11.5
 Y: : Y
[INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Archetype: camel-kafka-connector-extensible-archetype:0.11.5
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: org.apache.camel.ckc
[INFO] Parameter: artifactId, Value: aws2s3
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Parameter: package, Value: org.apache.camel.ckc
[INFO] Parameter: packageInPathFormat, Value: org/apache/camel/ckc
[INFO] Parameter: package, Value: com.github.oscerd
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Parameter: groupId, Value: org.apache.camel.ckc
[INFO] Parameter: camel-kafka-connector-version, Value: 0.11.5
[INFO] Parameter: artifactId, Value: aws2s3
[INFO] Project created from Archetype in dir: /home/workspace/miscellanea/aws2s3
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  30.084 s
[INFO] Finished at: 2020-08-26T11:08:21+02:00
[INFO] ------------------------------------------------------------------------
> cd /home/workspace/miscellanea/aws2s3

Now we need to edit the POM

  .
  .
  .
  <version>1.0-SNAPSHOT</version>

  <name>A Camel Kafka Connector extended</name>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <kafka-version>2.7.0</kafka-version>
    <camel-kafka-connector-version>${project.version}</camel-kafka-connector-version>
  </properties>

    <dependencies>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>connect-api</artifactId>
      <scope>provided</scope>
      <version>${kafka-version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>connect-transforms</artifactId>
      <scope>provided</scope>
      <version>${kafka-version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.camel.kafkaconnector</groupId>
      <artifactId>camel-kafka-connector</artifactId>
      <version>0.11.5</version>
    </dependency>
    <dependency>
      <groupId>org.apache.camel.kafkaconnector</groupId>
      <artifactId>camel-aws2-s3-kafka-connector</artifactId>
      <version>0.11.5</version>
    </dependency>
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-zipfile</artifactId>
      <version>3.9.0</version>
    </dependency>
  </dependencies>
  .
  .
  .

In the dependencies section you’ll need to uncomment the aws2-s3 connector dependency and adding the camel-tar.gzfile component

Now we need to build the connector:

> mvn clean package

In this example we’ll use /home/oscerd/connectors/ as plugin.path, but we’ll need the generated tar.gz from the previois build

> cd /home/oscerd/connectors/
> cp /home/workspace/miscellanea/aws2s3/target/aws2s3-1.0-SNAPSHOT-package.tar.gz .
> untar.gz aws2s3-1.0-SNAPSHOT-package.tar.gz

Now it’s time to setup the connectors

Open the AWS2 S3 configuration file

name=CamelAWS2S3SinkConnector
connector.class=org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SinkConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

topics=mytopic

camel.sink.path.bucketNameOrArn=camel-kafka-connector

camel.component.aws2-s3.accessKey=xxxx
camel.component.aws2-s3.secretKey=yyyy
camel.component.aws2-s3.region=eu-west-1

camel.sink.endpoint.keyName=${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}.tar.gz

camel.beans.aggregate=#class:org.apache.camel.processor.aggregate.tar.gzfile.ZipAggregationStrategy
camel.aggregation.size=10
camel.aggregation.timeout=5000

and add the correct credentials for AWS.

Now you can run the example

$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelAWS2S3SinkConnector.properties

Just connect to your AWS Console and check the content of camel-kafka-connector bucket.

On a different terminal run the kafka-producer and send messages to your Kafka Broker.

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic mytopic
Kafka to S3 message 1
Kafka to S3 message 2
Kafka to S3 message 3
Kafka to S3 message 4
Kafka to S3 message 5

You should see (after the timeout has been reached) a file with date-exchangeId.tar.gz name containing the following multiple files. Those files will contain the messages.

Kafka to S3 message 1
Kafka to S3 message 2
Kafka to S3 message 3
Kafka to S3 message 4
Kafka to S3 message 5

Openshift

What is needed

  • An AWS S3 bucket

  • An Openshift instance

Running Kafka using Strimzi Operator

First we install the Strimzi operator and use it to deploy the Kafka broker and Kafka Connect into our OpenShift project. We need to create security objects as part of installation so it is necessary to switch to admin user. If you use Minishift, you can do it with the following command:

oc login -u system:admin

We will use OpenShift project myproject. If it doesn’t exist yet, you can create it using following command:

oc new-project myproject

If the project already exists, you can switch to it with:

oc project myproject

We can now install the Strimzi operator into this project:

oc apply -f https://github.com/strimzi/strimzi-kafka-operator/releases/download/0.20.1/strimzi-cluster-operator-0.20.1.yaml

Next we will deploy a Kafka broker cluster and a Kafka Connect cluster and then create a Kafka Connect image with the Debezium connectors installed:

# Deploy a single node Kafka broker
oc apply -f https://github.com/strimzi/strimzi-kafka-operator/raw/0.20.1/examples/kafka/kafka-persistent-single.yaml

# Deploy a single instance of Kafka Connect with no plug-in installed
oc apply -f https://github.com/strimzi/strimzi-kafka-operator/raw/0.20.1/examples/connect/kafka-connect-s2i-single-node-kafka.yaml

Optionally enable the possibility to instantiate Kafka Connectors through specific custom resource:

oc annotate kafkaconnects2is my-connect-cluster strimzi.io/use-connector-resources=true

Add Camel Kafka connector binaries

Strimzi uses Source2Image builds to allow users to add their own connectors to the existing Strimzi Docker images. We now need to build the connectors and add them to the image, if you have built the whole project (mvn clean package) decompress the connectors you need in a folder (i.e. like my-connectors/) so that each one is in its own subfolder (alternatively you can download the latest officially released and packaged connectors from maven):

In this case we need to extend an existing connector and add a ZipAggregationStrategy, so we need to leverage the archetype

> mvn archetype:generate  -DarchetypeGroupId=org.apache.camel.kafkaconnector.archetypes  -DarchetypeArtifactId=camel-kafka-connector-extensible-archetype  -DarchetypeVersion=0.11.5
[INFO] Using property: camel-kafka-connector-version = 0.11.5
Confirm properties configuration:
groupId: org.apache.camel.ckc
artifactId: aws2s3
version: 1.0-SNAPSHOT
package: org.apache.camel.ckc
camel-kafka-connector-version: 0.11.5
 Y: : Y
[INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Archetype: camel-kafka-connector-extensible-archetype:0.11.5
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: org.apache.camel.ckc
[INFO] Parameter: artifactId, Value: aws2s3
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Parameter: package, Value: org.apache.camel.ckc
[INFO] Parameter: packageInPathFormat, Value: org/apache/camel/ckc
[INFO] Parameter: package, Value: com.github.oscerd
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Parameter: groupId, Value: org.apache.camel.ckc
[INFO] Parameter: camel-kafka-connector-version, Value: 0.11.5
[INFO] Parameter: artifactId, Value: aws2s3
[INFO] Project created from Archetype in dir: /home/workspace/miscellanea/aws2s3
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  30.084 s
[INFO] Finished at: 2020-08-26T11:08:21+02:00
[INFO] ------------------------------------------------------------------------
> cd /home/workspace/miscellanea/aws2s3

Now we need to edit the POM

  .
  .
  .
  <version>1.0-SNAPSHOT</version>

  <name>A Camel Kafka Connector extended</name>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <kafka-version>2.5.0</kafka-version>
    <camel-kafka-connector-version>${project.version}</camel-kafka-connector-version>
  </properties>

    <dependencies>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>connect-api</artifactId>
      <scope>provided</scope>
      <version>${kafka-version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>connect-transforms</artifactId>
      <scope>provided</scope>
      <version>${kafka-version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.camel.kafkaconnector</groupId>
      <artifactId>camel-kafka-connector</artifactId>
      <version>0.11.5</version>
    </dependency>
    <dependency>
      <groupId>org.apache.camel.kafkaconnector</groupId>
      <artifactId>camel-aws2-s3-kafka-connector</artifactId>
      <version>0.11.5</version>
    </dependency>
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-tar.gzfile</artifactId>
      <version>3.5.0</version>
    </dependency>
  </dependencies>
  .
  .
  .

Now we need to build the connector:

> mvn clean package

And move the tar.gz package in targe to my-connectors folder and untar.gzped it.

Now we can start the build

oc start-build my-connect-cluster-connect --from-dir=./my-connectors/ --follow

We should now wait for the rollout of the new image to finish and the replica set with the new connector to become ready. Once it is done, we can check that the connectors are available in our Kafka Connect cluster. Strimzi is running Kafka Connect in a distributed mode.

To check the available connector plugins, you can run the following command:

oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -s http://my-connect-cluster-connect-api:8083/connector-plugins

You should see something like this:

[{"class":"org.apache.camel.kafkaconnector.CamelSinkConnector","type":"sink","version":"0.11.5"},{"class":"org.apache.camel.kafkaconnector.CamelSourceConnector","type":"source","version":"0.11.5"},{"class":"org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SinkConnector","type":"sink","version":"0.11.5"},{"class":"org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SourceConnector","type":"source","version":"0.11.5"},{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.5.0"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.5.0"},{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}]

Set the AWS credential as secret (optional)

You can also set the aws creds option as secret, you’ll need to edit the file config/aws2-s3-cred.properties with the correct credentials and then execute the following command

oc create secret generic aws2-s3 --from-file=config/openshift/aws2-s3-cred.properties

Now we need to edit KafkaConnectS2I custom resource to reference the secret. For example:

spec:
  # ...
  config:
    config.providers: file
    config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
  #...
  externalConfiguration:
    volumes:
      - name: aws-credentials
        secret:
          secretName: aws2-s3

In this way the secret aws2-s3 will be mounted as volume with path /opt/kafka/external-configuration/aws-credentials/

Create connector instance

Now we can create some instance of the AWS2 S3 sink connector:

oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -X POST \
    -H "Accept:application/json" \
    -H "Content-Type:application/json" \
    http://my-connect-cluster-connect-api:8083/connectors -d @- <<'EOF'
{
  "name": "s3-sink-connector",
  "config": {
    "connector.class": "org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SinkConnector",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "topics": "s3-topic",
    "camel.sink.path.bucketNameOrArn": "camel-kafka-connector",
    "camel.sink.endpoint.keyName": "${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}.tar.gz",
    "camel.beans.aggregate": "#class:org.apache.camel.processor.aggregate.tar.gzfile.ZipAggregationStrategy",
    "camel.aggregation.size": "10",
    "camel.aggregation.timeout": "5000",
    "camel.component.aws2-s3.accessKey": "xxx",
    "camel.component.aws2-s3.secretKey": "xxx",
    "camel.component.aws2-s3.region": "xxx"
  }
}
EOF

Altenatively, if have enabled use-connector-resources, you can create the connector instance by creating a specific custom resource:

oc apply -f - << EOF
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
  name: s3-sink-connector
  namespace: myproject
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SinkConnector
  tasksMax: 1
  config:
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: org.apache.kafka.connect.storage.StringConverter
    topics: s3-topic
    camel.sink.path.bucketNameOrArn: camel-kafka-connector
    camel.sink.endpoint.keyName: ${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}.tar.gz
    camel.beans.aggregate: #class:org.apache.camel.processor.aggregate.tar.gzfile.ZipAggregationStrategy
    camel.aggregation.size: 10
    camel.aggregation.timeout: 5000
    camel.component.aws2-s3.accessKey: xxxx
    camel.component.aws2-s3.secretKey: yyyy
    camel.component.aws2-s3.region: region
EOF

If you followed the optional step for secret credentials you can run the following command:

oc apply -f config/openshift/aws2-s3-sink-with-tar.gz-aggregation.yaml

You can check the status of the connector using

oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -s http://my-connect-cluster-connect-api:8083/connectors/s3-sink-connector/status

Just connect to your AWS Console and check the content of camel-kafka-connector bucket.

On a different terminal run the kafka-producer and send messages to your Kafka Broker.

oc exec -i -c kafka my-cluster-kafka-0 -- bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic s3-topic
Kafka to S3 message 1
Kafka to S3 message 2
Kafka to S3 message 3
Kafka to S3 message 4
Kafka to S3 message 5

You should see (after the timeout has been reached) a file with date-exchangeId.tar.gz name containing the following multiple files. Those files will contain the messages.

Kafka to S3 message 1
Kafka to S3 message 2
Kafka to S3 message 3
Kafka to S3 message 4
Kafka to S3 message 5