# Kafka Connect

**Kafka Connect** is a tool that helps you **move data in and out of Apache Kafka** without having to write code, only configuration required. 

You can think of it as a **bridge** that connects Kafka with:

- Databases like **MySQL, PostgreSQL, MongoDB**
- Cloud services like **S3**
- File systems like **local files or HDFS**
- Message queues, APIs, and more


**Kafka Connect Modes: Standalone vs Distributed**

Kafka Connect supports two modes of operation:

- **Standalone** - Best for local development and simple use cases. All connectors run in a single process.
- **Distributed** - Best for production. Connectors are distributed across a Kafka Connect **cluster** of workers. Uses Kafka to coordinate and persist configs.

Start with **standalone mode** while you're learning or testing locally.

**Connectors**

Kafka Connect uses **connectors** which are like plugins. There are two types:

- **Source Connector**: Pulls data *into* Kafka (e.g., from a MySQL table)
- **Sink Connector**: Pushes data *out of* Kafka (e.g., to a JSON file)


**Why Should You Use Kafka Connect?**

- **No coding needed** — just configuration
- **Scalable** — can run standalone or distributed
- **Automatic** — it keeps syncing data for you
- **Great for data pipelines** and **ETL workflows**


#### How Kafka Connect Works

`[ External System ] ←→ [ Kafka Connect ] ←→ [ Apache Kafka ]`

You configure connectors using simple `.properties` files or REST API calls. Lets begin with configuring Kafka Connect standalone mode. 

#### Setting the `plugin.path`

Kafka Connect loads connector JARs from a directory called the **plugin path**. You must set this in your Connect worker config file, especially in **standalone mode**.

In your `connect-standalone.properties` file, open the file with your favourite text editor and uncomment the line `plugin.path`. Then add the `libs` folder from your `kafka` folder i.e. the `kafka/libs` folder path which contains JAR files to run the connectors. Add the file path to your `kafka/libs` folder to the end of your `plugin.path`. For example using kafka version 3.9.0 with the username admin the filepath would be:

`plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,/home/admin/kafka_2.12-3.9.0/libs`

### Configuring connectors

Let's look at two examples of how to configure a connector sink between you're file system and Kafka and a connector source between a PostgreSQL database. To configure a connector you will need to create a Java `.properties` file which contains key value pairs defining the connectors configuration. 

#### Writing Kafka Data to a JSON File (Sink)

Let’s say you're working on a user tracking system. You publish JSON events to a Kafka topic called `user-events-topic`. Now, you want to archive that data to a local JSON file for backup or analytics. Let’s say you have a Kafka topic called `json-test`, and you want to save that data to a local file in JSON format.

**Create the Connector Configuration**

Make a directory in your `~/kafka/config` folder to store your connector configs. Let's create a folder called `sink-configs`

Create a file inside the folder called `json_file_sink.properties`, open the file so you can begin configuring the connector:

In [None]:
name=user-json-file-sink
connector.class=FileStreamSink
tasks.max=1
# Topic or topics that will send their data to the file sink
topics=user-events-topic 
# File where topic data will be saved, remember to create the folders if they don't already exist
file=/home/ubuntu/topic_data/user_events.json

# Save all keys as strings 
key.converter=org.apache.kafka.connect.storage.StringConverter
# Save all values as JSON entries 
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

Now we need to start Kafka Connect so that it can apply the configuration. Run the following command from your Kafka folder.

In [None]:
bin/connect-standalone.sh config/connect-standalone.properties config/sink-configs/json_file_sink.properties

Using `Kafka-Python` we can now send some data to the topic and check if the data has arrived in the `user_events.json` file. 

In [None]:
from kafka import KafkaProducer
import json
import time

# Create Kafka producer
producer = KafkaProducer(
    bootstrap_servers='localhost:9092',  # Adjust if your Kafka is on a different host/port
    value_serializer=lambda v: json.dumps(v).encode('utf-8')  # Send JSON-encoded messages
)

# Example data to send
messages = [
    {"user": "alice", "action": "login"},
    {"user": "bob", "action": "logout"},
    {"user": "charlie", "action": "purchase"}
]

# Send messages to 'kafka-test' topic
for msg in messages:
    producer.send('user-events-topic ', value=msg)
    print(f"Sent: {msg}")
    time.sleep(0.5)  # Optional delay for clarity

# Wait for all messages to be delivered
producer.flush()
producer.close()

Now viewing the `user_events.json` file in our `topic_data` folder we see the following results.

In [None]:
{"user": "alice", "action": "login"},
{"user": "bob", "action": "logout"},
{"user": "charlie", "action": "purchase"}

As more data is sent to the topic messages will continue to be processed and sent to the `user_events.json` file. 

### PostgreSQL Source Connector – Reading from a Database

Let's first begin by creating the test database that we're going to have Kafka extract data from. Open your database management system of choice and begin by creating a new database and table with sample data.

In [None]:
CREATE DATABASE user_data;

CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100),
    email VARCHAR(100)
);

INSERT INTO users (name, email) VALUES
('Ada Lovelace', 'ada@techmail.com'),
('Alan Turing', 'alan@enigma.org');

Now we again need to configure the connector, so that data is being pulled from the database and sent to a topic when the database is altered. Create a new folder in your kafka `config` folder called `sources-config`. Then create a file inside that folder called `postgres_source.properties` and open the file to begin configuring it. Mainly here you need to update the connection string on line `5` to ensure that the connector connects to your database. 

In [None]:
name=pg-user-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
# connection string for you database. Update the string with the connection detail to your postgreSQL database.
connection.url=jdbc:postgresql://localhost:5432/user_data?user=postgres&password=yourpassword
# Take to extract data from and send to topic 
table.whitelist=users
# Only fetch new rows based on the id column
mode=incrementing
incrementing.column.name=id
# Prefix of the topic you want the table data send to. In our case the table is called users so data will
# be sent to the topic pg-users
topic.prefix=pg-
poll.interval.ms=5000

# Save all keys as strings 
key.converter=org.apache.kafka.connect.storage.StringConverter
# Save all values as JSON entries 
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

Other avaliable mode options for the JDBC connecter are as follows: 

- **bulk**: Pulls the **entire table** every time it polls.                                    
- **incrementing**: Uses a **monotonically increasing column** (like `id`) to fetch **new rows** only. 
- **timestamp**: Uses a **timestamp column** (like `last_updated`) to fetch new/updated rows.       
- **timestamp+incrementing**: Uses **both** timestamp and incrementing columns for more accurate sync.          

Now we just need to start Kafka Connect from the Kafka folder to ensure that the configuration has been applied. 

In [None]:
bin/connect-standalone.sh config/connect-standalone.properties config/source-config/postgres_source.properties

Kafka should then begin pull data from the database and storing the data in the `pg-users` topic. Let's check the results using the Kafka Python API. 

In [None]:
from kafka import KafkaConsumer
import json

# Connect to Kafka and subscribe to the topic
consumer = KafkaConsumer(
    'pg-users',  # Replace with your topic name
    bootstrap_servers='localhost:9092',  # Kafka broker address
    group_id='postgres-reader-group',  # Consumer group name
    auto_offset_reset='earliest',  # Start from beginning if no offset is stored
    enable_auto_commit=True,  # Let Kafka manage offsets
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))  # Parse JSON values
)

print("Listening for messages on topic 'pg-users'...\n")

# Read and print messages continuously
for message in consumer:
    print("Received message:")
    print(message.value)  # This will be a Python dictionary

You should now see the messages in the topic being printed out to your terminal as JSON entires. 

## Where to Go Next with Kafka Connect

Now that you've seen how to use Kafka Connect with PostgreSQL and local files, you're ready to explore a wide range of connectors and build more powerful data pipelines.

### Other Common Kafka Connectors

Here are some popular connectors used in real-world projects:

#### Source Connectors (pull data into Kafka)

- **Debezium** (PostgreSQL, MySQL, MongoDB) – captures changes (CDC) in real time
- **JDBC Source** – pulls data from any relational database using SQL queries
- **Kafka Connect File Source** – reads data from CSV, JSON, or text files
- **MQTT Source** – pulls IoT sensor data from MQTT brokers
- **S3 Source** – reads files from AWS S3 buckets

#### Sink Connectors (push data out of Kafka)

- **JDBC Sink** – write Kafka data to PostgreSQL, MySQL, SQL Server, etc.
- **Elasticsearch Sink** – index data into Elasticsearch for search and analytics
- **MongoDB Sink** – send JSON records to MongoDB collections
- **S3 Sink** – archive Kafka data to AWS S3 as JSON, Avro, or Parquet
- **BigQuery Sink** – stream Kafka data into Google BigQuery
- **Kafka Connect File Sink** – save Kafka data to disk as files


### Tips for Connecting External Systems

- Check connector compatibility  
  Always ensure the connector version matches your Kafka Connect version and Kafka cluster.

- Set the `plugin.path` correctly  
  Each connector should be placed in its own subfolder. Make sure `plugin.path` is defined in your worker configuration that points to this path. 

- Understand schemas  
  Some connectors (especially Avro or Protobuf) require a Schema Registry to track data structure definitions.

- Use the REST API in distributed mode  
  For production deployments, the Kafka Connect REST API is the preferred way to create, monitor, and delete connectors programmatically.

## Key Takeaways

- Kafka Connect is a framework for integrating Kafka with external systems using configurable source and sink connectors.
- Use standalone mode for local testing and distributed mode for scalable, production-grade deployments.
- The `plugin.path` in your worker config tells Kafka Connect where to find installed connectors.
- Source connectors bring data into Kafka; sink connectors send data out to databases, files, cloud storage, and more.
- Use converters (`key.converter` and `value.converter`) to define how Kafka data is serialized (e.g., String, JSON, Avro).
- Kafka Connect supports built-in and community connectors for systems like PostgreSQL, MySQL, S3, Elasticsearch, and MongoDB.