This project demonstrates how to build a Kafka producer and consumer using Python with the Confluent Kafka library.
This sample application shows a practical example of:
- Producer: Sends order messages to a Kafka topic
- Consumer: Reads and processes messages from the topic
- Configuration Management: Centralized configuration for both components
Before running this project, ensure you have:
- Python 3.7+ installed
- Docker and Docker Compose (to run Kafka locally)
- pip (Python package manager)
Install the required Python packages:
pip install confluent-kafkaUse Docker Compose to start a Kafka broker locally:
docker-compose up -dThis will start a Kafka instance on localhost:9092.
To verify Kafka is running:
docker psYou should see the kafka_python container running.
sample-python-kafka/
├── app/
│ ├── config.py # Configuration settings
│ ├── producer.py # Kafka producer implementation
│ └── consumer.py # Kafka consumer implementation
├── docker-compose.yml # Docker setup for Kafka
└── README.md # This file
The config.py file contains centralized configuration for both producer and consumer:
KAFKA_BOOTSTRAP_SERVERS = "localhost:9092" # Kafka broker address
TOPIC_NAME = "orders" # Kafka topic name
PRODUCER_CONFIG = {
"bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS,
}
CONSUMER_CONFIG = {
"bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS,
"group.id": "orders-consumer-group", # Consumer group ID
"auto.offset.reset": "earliest", # Start from the beginning of the topic
}- bootstrap.servers: Address of the Kafka broker
- group.id: Unique identifier for the consumer group (allows multiple consumers to work together)
- auto.offset.reset: Determines what happens when there's no initial offset -
earliestreads from the start
The producer generates sample order messages and sends them to the Kafka topic:
cd app
python producer.py- Creates 10 sample order messages with:
- Order ID
- Customer name
- Amount
- Timestamp
- Sends each message to the
orderstopic - Includes a callback to confirm delivery
- Waits 1 second between messages
Message delivered to topic=orders partition=0 offset=0
Message delivered to topic=orders partition=0 offset=1
Message delivered to topic=orders partition=0 offset=2
...
The consumer listens to messages from the Kafka topic:
cd app
py consumer.py- Subscribes to the
orderstopic - Polls for messages continuously
- Decodes and displays:
- Message key
- Message value (JSON)
- Topic name
- Partition number
- Offset (message position)
- Handles errors gracefully
- Closes connection on keyboard interrupt (Ctrl+C)
Waiting for messages...
Received message:
Key: 0
Value: {'order_id': 0, 'customer': 'Antonio', 'amount': 25.5, 'created_at': 1234567890.123}
Topic: orders
Partition: 0
Offset: 0
Received message:
Key: 1
Value: {'order_id': 1, 'customer': 'Antonio', 'amount': 26.5, 'created_at': 1234567891.456}
Topic: orders
Partition: 0
Offset: 1
...
-
Terminal 1 - Start the consumer:
cd app py consumer.pyThe consumer will wait for messages.
-
Terminal 2 - Run the producer:
cd app py producer.pyMessages will be produced and immediately received by the consumer.
- Stop the consumer (Ctrl+C)
- Run the producer again to send new messages
- Start the consumer - it will receive all messages from the beginning (due to
auto.offset.reset: "earliest")
The producer uses a callback function (delivery_report) to confirm when messages are successfully delivered:
def delivery_report(err, msg):
if err is not None:
print(f"Message delivery failed: {err}")
else:
print(f"Message delivered to topic={msg.topic()} partition={msg.partition()} offset={msg.offset()}")- Key: Identifies the message (used for partitioning)
- Value: The actual message content (JSON in this case)
- Callback: Executed after delivery attempt
The consumer uses a polling mechanism to fetch messages:
msg = consumer.poll(1.0) # Wait up to 1 second for a message- Partitions: Messages can be distributed across partitions for scalability
- Offsets: Track the position in the topic (enables message replay)
- Consumer Group: Multiple consumers in the same group share the load
Press Ctrl+C in the consumer terminal.
docker-compose downTo remove volumes as well:
docker-compose down -v- Ensure Kafka is running:
docker ps - Verify the bootstrap server address in
config.pymatches your Kafka setup - Wait a few seconds after starting Docker Compose for Kafka to be ready
- Ensure the producer has finished before checking the consumer
- Verify both are using the same topic name (
orders) - Check that the consumer group ID matches in the configuration
- Messages with the same key are guaranteed to be in order within a partition
- The producer uses
order_idas the key, ensuring orders are processed in sequence
- confluent-kafka: Python client for Apache Kafka
- Installation:
pip install confluent-kafka - Documentation: Confluent Kafka Python
- Installation:
To extend this project:
- Error Handling: Add retry logic and dead-letter queues
- Schema Registry: Use Avro or Protobuf for message schemas
- Transactions: Implement exactly-once semantics
- Monitoring: Add metrics and logging
- Multiple Partitions: Configure topics with multiple partitions for better throughput
- Consumer Groups: Run multiple consumers to process messages in parallel
This project is provided as-is for educational purposes.