Skip to content
Permalink
Browse files
AWS2 Lambda Sink Example: added steps for adding AWS creds secret
  • Loading branch information
oscerd committed Sep 23, 2020
1 parent be2005f commit 67bb58a777fd4b6651740176c212858f5d2398ae
Showing 1 changed file with 220 additions and 6 deletions.
@@ -1,23 +1,23 @@
# Camel-Kafka-connector AWS2 Lambda Sink

## Introduction

This is an example for Camel-Kafka-connector AWS2-Lambda Sink

## What is needed
## Standalone

### What is needed

- An AWS Lambda function
- The following project here: https://github.com/oscerd/lambda-ckc
## Running Kafka
### Running Kafka

```
$KAFKA_HOME/bin/zookeeper-server-start.sh config/zookeeper.properties
$KAFKA_HOME/bin/kafka-server-start.sh config/server.properties
$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic
```

## Setting up the needed bits and running the example
### Setting up the needed bits and running the example

You'll need to setup the plugin.path property in your kafka

@@ -33,7 +33,7 @@ In this example we'll use `/home/oscerd/connectors/`
> unzip camel-aws2-sns-kafka-connector-0.5.0-package.zip
```

## Deploying the AWS Lambda function
### Deploying the AWS Lambda function

```
> git clone git@github.com:oscerd/lambda-ckc.git
@@ -91,3 +91,217 @@ In the AWS console, in the CloudWatch Log insights section under the monitoring
2020-08-07T12:32:12.325+02:00 REPORT RequestId: 666d2257-1a22-406e-b112-2384ceac23a3 Duration: 43.33 ms Billed Duration: 100 ms Memory Size: 512 MB Max Memory Used: 75 MB Init Duration: 293.52 ms
```

## Openshift

### What is needed

- An AWS Lambda function
- The following project here: https://github.com/oscerd/lambda-ckc
- An Openshift instance
### Running Kafka using Strimzi Operator

First we install the Strimzi operator and use it to deploy the Kafka broker and Kafka Connect into our OpenShift project.
We need to create security objects as part of installation so it is necessary to switch to admin user.
If you use Minishift, you can do it with the following command:

[source,bash,options="nowrap"]
----
oc login -u system:admin
----

We will use OpenShift project `myproject`.
If it doesn't exist yet, you can create it using following command:

[source,bash,options="nowrap"]
----
oc new-project myproject
----

If the project already exists, you can switch to it with:

[source,bash,options="nowrap"]
----
oc project myproject
----

We can now install the Strimzi operator into this project:

[source,bash,options="nowrap",subs="attributes"]
----
oc apply -f https://github.com/strimzi/strimzi-kafka-operator/releases/download/0.19.0/strimzi-cluster-operator-0.19.0.yaml
----

Next we will deploy a Kafka broker cluster and a Kafka Connect cluster and then create a Kafka Connect image with the Debezium connectors installed:

[source,bash,options="nowrap",subs="attributes"]
----
# Deploy a single node Kafka broker
oc apply -f https://github.com/strimzi/strimzi-kafka-operator/raw/0.19.0/examples/kafka/kafka-persistent-single.yaml
# Deploy a single instance of Kafka Connect with no plug-in installed
oc apply -f https://github.com/strimzi/strimzi-kafka-operator/raw/0.19.0/examples/connect/kafka-connect-s2i-single-node-kafka.yaml
----

Optionally enable the possibility to instantiate Kafka Connectors through specific custom resource:
[source,bash,options="nowrap"]
----
oc annotate kafkaconnects2is my-connect-cluster strimzi.io/use-connector-resources=true
----

### Add Camel Kafka connector binaries

Strimzi uses `Source2Image` builds to allow users to add their own connectors to the existing Strimzi Docker images.
We now need to build the connectors and add them to the image,
if you have built the whole project (`mvn clean package`) decompress the connectors you need in a folder (i.e. like `my-connectors/`)
so that each one is in its own subfolder
(alternatively you can download the latest officially released and packaged connectors from maven):

So we need to do something like this:

```
> cd my-connectors/
> wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-aws2-lambda-kafka-connector/0.5.0/camel-aws2-lambda-kafka-connector-0.5.0-package.zip
> unzip camel-aws2-lambda-kafka-connector-0.5.0-package.zip
```

Now we can start the build

[source,bash,options="nowrap"]
----
oc start-build my-connect-cluster-connect --from-dir=./my-connectors/ --follow
----

We should now wait for the rollout of the new image to finish and the replica set with the new connector to become ready.
Once it is done, we can check that the connectors are available in our Kafka Connect cluster.
Strimzi is running Kafka Connect in a distributed mode.

To check the available connector plugins, you can run the following command:

[source,bash,options="nowrap"]
----
oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -s http://my-connect-cluster-connect-api:8083/connector-plugins
----

You should see something like this:

[source,json,options="nowrap"]
----
[{"class":"org.apache.camel.kafkaconnector.CamelSinkConnector","type":"sink","version":"0.5.0"},{"class":"org.apache.camel.kafkaconnector.CamelSourceConnector","type":"source","version":"0.5.0"},{"class":"org.apache.camel.kafkaconnector.aws2lambda.CamelAws2lambdaSinkConnector","type":"sink","version":"0.5.0"},{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.5.0"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.5.0"},{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}]
----

### Set the AWS credential as secret (optional)

You can also set the aws creds option as secret, you'll need to edit the file config/aws-s3-cred.properties with the correct credentials and then execute the following command

[source,bash,options="nowrap"]
----
oc create secret generic aws2-lambda --from-file=config/openshift/aws2-lambda-cred.properties
----

Now we need to edit KafkaConnectS2I custom resource to reference the secret. For example:

[source,bash,options="nowrap"]
----
spec:
# ...
config:
config.providers: file
config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
#...
externalConfiguration:
volumes:
- name: aws-credentials
secret:
secretName: aws2-lambda
----

In this way the secret aws2-lambda will be mounted as volume with path /opt/kafka/external-configuration/aws-credentials/

### Create connector instance

Now we can create some instance of the AWS2 Lambda sink connector:

[source,bash,options="nowrap"]
----
oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -X POST \
-H "Accept:application/json" \
-H "Content-Type:application/json" \
http://my-connect-cluster-connect-api:8083/connectors -d @- <<'EOF'
{
"name": "lambda-sink-connector",
"config": {
"connector.class": "org.apache.camel.kafkaconnector.aws2lambda.CamelAws2lambdaSinkConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"topics": "lambda-topic",
"camel.sink.path.function": "hello-ckc",
"camel.sink.endpoint.operation": "invokeFunction",
"camel.component.aws2-s3.accessKey": "xxx",
"camel.component.aws2-s3.secretKey": "xxx",
"camel.component.aws2-s3.region": "xxx"
}
}
EOF
----

Altenatively, if have enabled `use-connector-resources`, you can create the connector instance by creating a specific custom resource:

[source,bash,options="nowrap"]
----
oc apply -f - << EOF
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
name: lambda-sink-connector
namespace: myproject
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: org.apache.camel.kafkaconnector.aws2lambda.CamelAws2lambdaSinkConnector
tasksMax: 1
config:
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.converters.ByteArrayConverter
topics: lambda-topic
camel.sink.path.function: hello-ckc
camel.sink.endpoint.operation: invokeFunction
camel.component.aws2-s3.accessKey: xxxx
camel.component.aws2-s3.secretKey: yyyy
camel.component.aws2-s3.region: region
EOF
----

If you followed the optional step for secret credentials you can run the following command:

[source,bash,options="nowrap"]
----
oc apply -f config/openshift/aws2-lambda-sink.yaml
----

You can check the status of the connector using

[source,bash,options="nowrap"]
----
oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -s http://my-connect-cluster-connect-api:8083/connectors/lambda-sink-connector/status
----

Just connect to your AWS Console and check the content of camel-kafka-connector bucket.

On a different terminal run the kafka-producer and send messages to your Kafka Broker.

```
oc exec -i -c kafka my-cluster-kafka-0 -- bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic lambda-topic
Kafka message sent!
```

In the AWS console, in the CloudWatch Log insights section under the monitoring tabs, you should get the information about this invokation and you should see in the logs

```
2020-08-07T12:32:12.282+02:00 START RequestId: 666d2257-1a22-406e-b112-2384ceac23a3 Version: $LATEST
2020-08-07T12:32:12.321+02:00 Event received: Kafka message sent!
2020-08-07T12:32:12.325+02:00 END RequestId: 666d2257-1a22-406e-b112-2384ceac23a3
2020-08-07T12:32:12.325+02:00 REPORT RequestId: 666d2257-1a22-406e-b112-2384ceac23a3 Duration: 43.33 ms Billed Duration: 100 ms Memory Size: 512 MB Max Memory Used: 75 MB Init Duration: 293.52 ms
```

0 comments on commit 67bb58a

Please sign in to comment.