Containerized Apache Kafka Connect
Using GoogleContainerTools/Jib to package Apache Kafka Connect Distributed Server.
This image is almost 10x smaller than popular Kafka Connect images. It only includes the Connect Runtime, no extra bloat!
The builds are automated. The version releases are not. If you find a version missing, feel free to submit a corresponding PR.
Docker Pull! 🐳
docker pull cricketeerone/apache-kafka-connect
The above image is enough for MirrorMaker2. There is also an image that includes confluent-hub
for adding a majority of third-party connectors! See section Extending with new Connectors for full usage.
docker pull cricketeerone/apache-kafka-connect:latest-confluent-hub
Alpine variants are also available. Check Docker Hub for all tags and versions.
Table of Contents
Much like the confluentinc/cp-kafka-connect
images, this container uses environment variables starting with CONNECT_
, followed by the Kafka Connect Worker properties to be configured.
For example, these are the bare minimum variables necessary to get a Connect Distributed Server running, but assumes it is connected to Kafka cluster with at least three brokers (replication factor for the three Connect topics). Additional variables for replication factor of the three Connect topics can be added, as described below for testing against less than three brokers.
CONNECT_BOOTSTRAP_SERVERS
CONNECT_GROUP_ID
CONNECT_KEY_CONVERTER
CONNECT_VALUE_CONVERTER
CONNECT_CONFIG_STORAGE_TOPIC
CONNECT_OFFSET_STORAGE_TOPIC
CONNECT_STATUS_STORAGE_TOPIC
See docker-compose.yml
for a full example of these variables' usage with the container while connected to a Kafka broker.
Looking to build your own image? tl;dr - Clone repo, and use ./mvnw clean compile jib:dockerBuild
or MVN_BUILD_CMD='compile jib:dockerBuild' make
and you're done!
Multi-platform builds (buildx)
By default, with the above commands, an image will be built for a linux/amd64
Ubuntu-based container.
The following builds and pushes multi-platform images to your personal Docker Hub account via Docker Buildx.
BUILDX_PLATFORMS=linux/arm64,linux/amd64 DOCKER_USER=$(whoami) make
As of May 2023, Alpine variants of Eclipse Temurin Java 17 images do not support arm64
.
To push to a private Docker Registry, you'll need to docker login
to that address. The following commands will push the apache-kafka-connect
image to a Docker Registry under your local username.
Feel free to change DOCKER_USER
to a custom repo name in the Registry.
$ docker login <registry-address> --username=$(whoami)
$ DOCKER_REGISTRY=<registry-address> DOCKER_USER=$(whoami) \
make
The following tutorial uses Jib to package ConnectDistributed
class for running Kafka Connect Distributed mode workers.
The following instructions use the Bitnami Kafka images, however any other Kafka Docker images should work.
This tutorial will roughly follow the same steps as the tutorial for Connect on Kafka's site, except using the Distributed Connect server instead.
If not using Docker, Kafka (and ZooKeeper, if not using Kraft) can be started locally using their respective start scripts. If this is done, though, the variables for the bootstrap servers will need to be adjusted accordingly.
The following steps can be used to run this application locally outside of Docker.
# Assumes Kafka default port
export CONNECT_BOOTSTRAP_SERVERS=127.0.0.1:9092
export CONNECT_GROUP_ID=cg_connect-jib
export CONNECT_CONFIG_STORAGE_TOPIC=connect-jib_config
export CONNECT_OFFSET_STORAGE_TOPIC=connect-jib_offsets
export CONNECT_STATUS_STORAGE_TOPIC=connect-jib_status
# Cannot be higher than the number of brokers in the export Kafka cluster
export CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1
export CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1
export CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1
# We're going to use ByteArrayConverter by default, and let individual connectors configure themselves
export CONNECT_KEY_CONVERTER=org.apache.kafka.connect.converters.ByteArrayConverter
export CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.converters.ByteArrayConverter
# Runs ConnectDistributed via Maven
./mvnw clean exec:java
Note: Sometimes the Kafka container kills itself in below steps, and the consumer commands therefore may need to be re-executed. The Connect worker should reconnect on its own.
For this exercise, we will be using three separate terminal windows, so go ahead and open those.
First, we start with getting our cluster running in the foreground. This starts Kafka listening on 9092
on the host, and 29092
within the Docker network.
Terminal 1
docker compose up kafka
We need to create the topics where data will be produced into.
Terminal 2
docker compose exec kafka \
bash -c "kafka-topics.sh --create --bootstrap-server kafka:29092 --topic input --partitions=1 --replication-factor=1"
Verify topics exist
docker compose exec kafka \
bash -c "kafka-topics.sh --list --bootstrap-server kafka:29092"
Should include input
topic in the list.
docker compose exec kafka \
bash -c "cat /data/lipsum.txt | kafka-console-producer.sh --topic input --broker-list kafka:29092"
Verify that data is there (note: hard-coding max-messages
to the number of lines of expected text)
docker compose exec kafka \
bash -c "kafka-console-consumer.sh --topic input --bootstrap-server kafka:29092 --from-beginning --max-messages=9"
Should see last line Processed a total of 9 messages
.
Now, we can build the Kafka Connect image and start it.
./mvnw clean install
docker compose up connect-jib-1
Wait for log-line Kafka Connect Started
, then post the FileSink Connector. When not provided a file
, the connector tasks will write data to the stdout of the container (Terminal 1).
Terminal 3
Use Kafka Connect REST API to start this process
curl -XPUT http://localhost:8083/connectors/console-sink/config -H 'Content-Type: application/json' -d '{
"connector.class": "FileStreamSink",
"tasks.max": 1,
"topics": "input",
"transforms": "MakeMap,AddPartition",
"transforms.MakeMap.type": "org.apache.kafka.connect.transforms.HoistField$Value",
"transforms.MakeMap.field" : "line",
"transforms.AddPartition.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddPartition.partition.field" : "partition!",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}'
This will read from the beginning of the input
topic that had data sent into it, and begin processing it.
In the output of Terminal 2, you should see something similar to the following.
connect-jib_1 | Struct{line=Morbi eu pharetra dolor. ....,partition=0}
connect-jib_1 | Struct{line=,partition=0}
connect-jib_1 | Struct{line=Nullam mauris sapien, vestibulum ....,partition=0}
This is the toString()
representation of Kafka Connect's internal Struct
class. Since we added a HoistField$Value
transform,
then there is a Structured Object with a field of line
set to the value of the Kafka message that was read from the lines of the lipsum.txt
file that was produced in the third step above,
as well as a partition
field set to the consumed record partition. The topic was only created with one partition.
To repeat that process, we delete the connector and reset the consumer group.
curl -XDELETE http://localhost:8083/connectors/console-sink
docker compose exec kafka \
bash -c "kafka-consumer-groups.sh --bootstrap-server kafka:29092 --group connect-console-sink --reset-offsets --all-topics --to-earliest --execute"
Re-run above console-producer and curl -XPUT ...
command, but this time, there will be more than 9 total messages printed.
Redo the tutorial with a new topic having more than one partition. Produce more input data to it, then increase max.tasks
of the connector.
Notice that the partition
field in the output may change (you may need to produce data multiple times to randomize the record batches).
Scaling the workers will require adding another container with a unique CONNECT_ADVERTISED_HOST_NAME
variable. I.e.
connect-jib-2:
image: *connect-image
hostname: connect-jib-2
depends_on:
- kafka
environment:
<<: *connect-vars
CONNECT_REST_ADVERTISED_HOST_NAME: connect-jib-2
A reverse proxy should be added in front of all instances. See an example using Traefik in docker-compose.cluster.yml
.
It can be started via docker compose -f docker-compose.cluster.yml up
and tested with curl -H Host:connect-jib.docker.localhost http://127.0.0.1/
.
Disclaimer It is best to think of this image as a base upon which you can add your own Connectors. Below is the output of the default connector plugins, as provided by Apache Kafka project.
Connector plugins should preferably be placed into /app/libs
, thus requiring an environment variable of CONNECT_PLUGIN_PATH="/app/libs"
. Kafka Connect plugins are often distributed as Tarballs/ZIP/JAR files that need to be extracted or added to this path, via a volume mount or downloaded with curl
.
When using the confluent-hub
image tags, you can extend those images like so
FROM cricketeerone/apache-kafka-connect:latest-confluent-hub
# Example connector installation from Confluent Hub
RUN confluent-hub install --no-prompt \
--component-dir /app/libs --worker-configs /app/resources/connect-distributed.properties -- \
<connector-id>
Where <connector-id>
is copied from one of the available sources on Confluent Hub.
There is no guarantee in compatibility with the Kafka Connect base version and any version of a plugin that you install.
To re-iterate, confluent-hub
is not part of the base image versions; they only include Connector classes provided by Apache Kafka.
These are limited to File Sink/Source and MirrorSource Connector (MirrorMaker 2.0). In general, you'll probably want to add your own Connectors, as above, rather than use this image by itself.
As of 3.6.0 release, the confluent-hub
tags include unzip
shell command for extracting other third-party connectors.
For a full example of adding plugins, and using the Confluent Schema Registry,
please refer to the schema-registry
branch.
$ curl localhost:8083/connector-plugins | jq
[
{
"class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"type": "sink",
"version": "3.8.0"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"type": "source",
"version": "3.8.0"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"type": "source",
"version": "3.8.0"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"type": "source",
"version": "3.8.0"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type": "source",
"version": "3.8.0"
}
]
The File Source/Sink are not to be used in production, and is only really meant as a "simple, standalone example," according to the docs (emphasis added).
A simple example is included with the source code for Kafka in the
file
package. This connector is meant for use in standalone mode...
files have trivially structured data -- each line is just a string. Almost all practical connectors will need schemas with more complex data formats.
That being said, the MirrorSource would be a more real-world example
Confluent documentation covers this for Basic Auth.
Create files
$ cat /tmp/connect-jaas.conf
KafkaConnect {
org.apache.kafka.connect.rest.basic.auth.extension.PropertyFileLoginModule required
file="/tmp/connect.password";
};
$ cat /tmp/connect.password # add as many lines as needed
admin: OneCricketeer
Add environment variables and mounts (JAVA_TOOL_OPTIONS
comes from Eclipse Temurin base image)
environment:
...
# Auth
CONNECT_REST_EXTENSION_CLASSES: org.apache.kafka.connect.rest.basic.auth.extension.BasicAuthSecurityRestExtension
JAVA_TOOL_OPTIONS: "-Djava.security.auth.login.config=/app/connect-jaas.conf"
volumes:
# Auth
- /tmp/connect-jaas.conf:/app/connect-jaas.conf:ro
- /tmp/connect.password:/tmp/connect.password:ro
docker compose up
and test it
$ curl -w'\n' http://localhost:8083
User cannot access the resource.
$ curl -w'\n' -uadmin:OneCricketeer http://localhost:8083
{"version":"3.8.0","commit":"60e845626d8a465a","kafka_cluster_id":"nA5eYC5WSrSHjaKgw1BpHg"}
The exec:java
goal can be used to run Kafka Connect outside of Docker.
To rebuild the container, for example, run ./mvnw clean install
or make
.
docker compose rm -sf
# Clean up mounted docker volumes
docker volume ls | grep $(basename `pwd`) | awk '{print $2}' | xargs docker volume rm
# Clean up networks
docker network ls | grep $(basename `pwd`) | awk '{print $2}' | xargs docker network rm
Learn more about Jib.