In these labs, we are doing to get items posted from a simulator of items sold in stores on items Kafka topic to Mongodb items collection.
Developers and architects who want to learn how to use Kafka Connect with a mongodb as Sink
- Lab1: is running locally a Kafka broker 2.7 (Strimzi image), zookeeper, Mongodb and Kafka Connect. A simulator app helps to send messages to Kafka. This solution use Apache Camel Kafka connector.
- Lab2: is about running Kafka Connect on OpenShift with the Simulator app, and using the IBM Event Streams manager service on the Cloud.
The following figure illustrates a the components involved with a potential production deployment with a Mongodb as a service on a cloud provider like IBM Cloud, and Kafka, application and Kafka Connect cluster deployed on OpenShift.
For development purpose and quick demonstration we can use docker and docker compose, in this case, the deployment looks like in the following figure:
You will need the following:
Apache Camel Kafka connector helps to get all the Camel Kafka connectors with source or sink connector.
- Clone the https://github.com/apache/camel-kafka-connector project and build it.
git clone https://github.com/apache/camel-kafka-connector
cd camel-kafka-connector
# build mongodb connector
mvn package -DskipTests -Dhttp.ssl.insecure=true -Dhttp.ssl.allowall=trueThis build should create a zip file under the camel-kafka-connector/connectors/camel-mongodb-kafka-connector/target named: camel-mongodb-kafka-connector-0.10.0-SNAPSHOT-package.tar.gz
Attention When trying to build all connectors, I had a lot of errors like the following:
Could not resolve dependencies for project org.apache.camel.kafkaconnector:camel-consul-kafka-connector:jar:0.10.0-SNAPSHOT: Could not transfer artifact org.apache.camel:camel-health:jar:3.10.0-20210427.184531-134 from/to apache.snapshots (https://repository.apache.org/snapshots/): transfer failed for https://repository.apache.org/snapshots/org/apache/camel/camel-health/3.10.0-SNAPSHOT/camel-health-3.10.0-20210427.184531-134.jar: peer not authenticatedThis may be due to network communication issue. In this case, look at the component that was not built and continue from there by adding -rf :<component-name for example build from consul component:
mvn package -DskipTests -Dhttp.ssl.insecure=true -Dhttp.ssl.allowall=true -rf :camel-consul-kafka-connector- Build the docker image for the Kafka connector with Strimzi Kafka as base image and the jars from Camel.
# under the config folder
docker build -t quay.io/ibmcase/camel-mongo-kconnector .-
Start Kafka, Zookeeper, KafDrop, Store Simulator and Mongodb
docker-compose up -d
-
Create the needed
itemstopic:./scripts/createTopics.sh -
Verify environment
- simulator on 8080 should have a page with Kafka backend and ready to send n records
- Kafdrop to see the Kafka broker and
itemstopic content - Kafka connector cluster should return an empty array
-
Configure MongoDB sink connector by uploading the json config to the Kafka connector. This is done with one script that use curl POST to push the camel-mongo-sink.json
./scripts/pushConnector.sh
Verifying at the address http://localhost:8083/connectors/camel-mongodb-kafka-connector/config that the connector is created and started
-
Use the simulator to send n records by navigating to the simulator URL: http://localhost:8080/#/simulator and then select Kafka and send 3 records
-
Verify records are in the
itemstopic using Kafdrop: http://localhost:9000/topic/items/messages?partition=0&offset=0&count=100 -
[Optional] you can look at the connector trace:
docker logs kconnect
-
Then verify in MongoDB the items collection:
-
Connect to
docker exec -ti mongo bashthen in the shellmongo --username root --password example use itemdb db.items.find())
-
-
To stop the environment once you are complete:
docker-compose down
The lab 2 context is little bit different as the Kafka Connect cluster is running on OpenShift using the Strimzi Operator, and thee Kafka is the IBM Event Streams managed service running on IBM Cloud.
You need the following:
- git
- jq
- OpenShift oc CLI
- Create an Event Streams managed service instance on IBM Cloud, create the credentials with
managerrole. Get the bootstrap server URLs, and the API_KEY.
- Connect to your OpenShift cluster using ‘oc’ cli
- Create a new project
oc new-project mongo-lab2
The Confluent verified connector is coming from the Mongodb team, and the deployment guide should gives you all the instruction to deploy.
This repository include a Kustomization yaml files to deploy to an OpenShift Cluster.
The simulator application is sending messages to Event Streams on cloud. So get the bootstrap server URLs.
- Define the secrets for the application with the Event Streams information and credentials
export KAFKA_BOOTSTRAP_SERVERS=broker-0-l.cloud.ibm.com:9093,broker-4-l....eventstreams.cloud.ibm.com:9093
export KAFKA_PASSWORD=lq3....9
export KAFKA_USER=token
oc create secret generic simulator-secrets --from-literal=KAFKA_BOOTSTRAP_SERVERS=$KAFKA_BOOTSTRAP_SERVERS \
--from-literal=KAFKA_SECURITY_PROTOCOL=SASL_SSL \
--from-literal=KAFKA_SASL_MECHANISM=PLAIN\
--from-literal=KAFKA_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=$KAFKA_USER password=$KAFKA_PASSWORD;"-
Deploy the application
oc apply -k kustomize/apps/store-simulator
-
Verify pods are running and get the routes:
oc get pods
oc get route store-simulator -o jsonpath="{.spec.host}"-
Open a web browser on this URL, and in the Simulator tab select Kafka and send some messages:
-
Use Kafdrop to verify messages are in Event Streams
itemstopic./scripts/kafdrop/startKafdrop.sh
http://localhost:9000/topic/items/messages?partition=0&offset=0&count=100




