# Integrating RabbitMQ and Kafka with FastAPI

In this tutorial, we'll explore how to integrate **RabbitMQ** and **Apache Kafka** with **FastAPI** to build scalable, robust, and asynchronous applications. We'll cover the fundamental concepts of message brokers, how to set up RabbitMQ and Kafka, and demonstrate how to use them within a FastAPI application. By the end of this tutorial, you'll be able to leverage these message brokers to enhance the performance and reliability of your FastAPI applications.

## Table of Contents

1. [Introduction](#1-introduction)
2. [Prerequisites](#2-prerequisites)
3. [Understanding Message Brokers](#3-understanding-message-brokers)
   - [3.1. What is RabbitMQ?](#31-what-is-rabbitmq)
   - [3.2. What is Apache Kafka?](#32-what-is-apache-kafka)
   - [3.3. RabbitMQ vs. Kafka](#33-rabbitmq-vs-kafka)
4. [Setting Up the Environment](#4-setting-up-the-environment)
5. [Integrating RabbitMQ with FastAPI](#5-integrating-rabbitmq-with-fastapi)
   - [5.1. Installing Dependencies](#51-installing-dependencies)
   - [5.2. Setting Up RabbitMQ](#52-setting-up-rabbitmq)
   - [5.3. Publishing Messages](#53-publishing-messages)
   - [5.4. Consuming Messages](#54-consuming-messages)
6. [Integrating Kafka with FastAPI](#6-integrating-kafka-with-fastapi)
   - [6.1. Installing Dependencies](#61-installing-dependencies)
   - [6.2. Setting Up Kafka](#62-setting-up-kafka)
   - [6.3. Producing Messages](#63-producing-messages)
   - [6.4. Consuming Messages](#64-consuming-messages)
7. [Comparing RabbitMQ and Kafka Integration](#7-comparing-rabbitmq-and-kafka-integration)
8. [Implementing Asynchronous Tasks](#8-implementing-asynchronous-tasks)
9. [Error Handling and Retries](#9-error-handling-and-retries)
10. [Testing the Application](#10-testing-the-application)
11. [Conclusion](#11-conclusion)
12. [References](#12-references)

## 1. Introduction

Message brokers like **RabbitMQ** and **Apache Kafka** play a crucial role in building distributed systems by enabling asynchronous communication between services. They help in decoupling services, improving scalability, and handling high-throughput data streams.

In this tutorial, we'll:

- Understand what RabbitMQ and Kafka are and their use cases.
- Set up RabbitMQ and Kafka in your development environment.
- Integrate RabbitMQ and Kafka with a FastAPI application.
- Implement message publishing and consuming.
- Compare the integration processes and use cases for RabbitMQ and Kafka.
- Handle errors and test the application effectively.

## 2. Prerequisites

Before we begin, ensure you have the following:

- **Python 3.7+** installed.
- Basic knowledge of **Python** and **FastAPI**.
- Familiarity with concepts like **asynchronous programming** and **message queues**.
- **Docker** installed to run RabbitMQ and Kafka in containers.

## 3. Understanding Message Brokers

### 3.1. What is RabbitMQ?

**RabbitMQ** is an open-source message broker that implements the Advanced Message Queuing Protocol (AMQP). It supports multiple messaging protocols and can be used for asynchronous communication between microservices, handling background tasks, and more.

**Key Features:**

- **Reliability**: Persistent messages, acknowledgments, and publisher confirms.
- **Flexible Routing**: Exchanges can route messages based on patterns.
- **Clustering and Federation**: For high availability and scalability.

### 3.2. What is Apache Kafka?

**Apache Kafka** is a distributed streaming platform used for building real-time data pipelines and streaming applications. It's designed for high-throughput, low-latency handling of real-time data feeds.

**Key Features:**

- **Scalability**: Handles high volumes of data with horizontal scalability.
- **Durability**: Data is persisted on disk and replicated within the cluster.
- **Stream Processing**: Supports real-time stream processing.

### 3.3. RabbitMQ vs. Kafka

| Feature            | RabbitMQ                                     | Apache Kafka                                          |
|--------------------|----------------------------------------------|-------------------------------------------------------|
| Use Case           | Message queueing, short-lived messages       | Event streaming, long-lived data                      |
| Message Model      | Queue-based, consumers pull messages         | Log-based, consumers read from offsets                |
| Delivery Semantics | At-most-once, at-least-once, exactly-once    | At-least-once (exactly-once with extra configurations)|
| Ordering           | Per-queue ordering                           | Partition-level ordering                              |
| Scalability        | Good for moderate loads                      | Designed for high throughput and scalability          |

## 4. Setting Up the Environment

Create a new project directory and set up a virtual environment.

```bash
# Create project directory
mkdir fastapi-messaging-tutorial
cd fastapi-messaging-tutorial

# Set up virtual environment
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate
```


## 5. Integrating RabbitMQ with FastAPI

### 5.1. Installing Dependencies

Install the required packages:

```bash
pip install fastapi uvicorn[standard] aio-pika
```

- **fastapi**: The web framework.
- **uvicorn**: ASGI server.
- **aio-pika**: Asynchronous Python client for RabbitMQ.

### 5.2. Setting Up RabbitMQ

We'll use Docker to run RabbitMQ:

```bash
docker run -d --hostname rabbitmq --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
```

- **Ports**:
  - **5672**: AMQP port.
  - **15672**: Management UI port.

Access the RabbitMQ Management UI at `http://localhost:15672` (default username and password are `guest`).

### 5.3. Publishing Messages

**main.py**

```python
from fastapi import FastAPI, BackgroundTasks
import aio_pika
import asyncio

app = FastAPI()

async def get_rabbitmq_connection():
    return await aio_pika.connect_robust("amqp://guest:guest@localhost/")

@app.on_event("startup")
async def startup_event():
    app.state.rabbitmq_connection = await get_rabbitmq_connection()

@app.on_event("shutdown")
async def shutdown_event():
    await app.state.rabbitmq_connection.close()

async def publish_message(message: str):
    connection = app.state.rabbitmq_connection
    channel = await connection.channel()
    queue = await channel.declare_queue("task_queue", durable=True)
    await channel.default_exchange.publish(
        aio_pika.Message(
            body=message.encode(),
            delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
        ),
        routing_key=queue.name,
    )
    await channel.close()

@app.post("/tasks")
async def create_task(message: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(publish_message, message)
    return {"message": "Task received"}
```

**Explanation**:

- **get_rabbitmq_connection**: Establishes a connection to RabbitMQ.
- **publish_message**: Publishes a message to the `task_queue`.
- **BackgroundTasks**: Used to run `publish_message` in the background.
- **Delivery Mode**: Set to `PERSISTENT` to ensure messages are saved to disk.

### 5.4. Consuming Messages

**worker.py**

```python
import asyncio
import aio_pika

async def main():
    connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
    queue_name = "task_queue"

    async with connection:
        channel = await connection.channel()
        queue = await channel.declare_queue(queue_name, durable=True)

        async with queue.iterator() as queue_iter:
            async for message in queue_iter:
                async with message.process():
                    print(f"Received message: {message.body.decode()}")
                    await asyncio.sleep(1)  # Simulate work

if __name__ == "__main__":
    asyncio.run(main())
```

**Explanation**:

- **worker.py**: A worker script that consumes messages from `task_queue`.
- **message.process()**: Ensures messages are acknowledged after processing.

## 6. Integrating Kafka with FastAPI

### 6.1. Installing Dependencies

Install the required packages:

```bash
pip install fastapi uvicorn[standard] aiokafka
```

- **aiokafka**: Asynchronous Python client for Apache Kafka.

### 6.2. Setting Up Kafka

We'll use Docker Compose to set up Kafka along with ZooKeeper.

**docker-compose.yml**

```yaml
version: '2'
services:
  zookeeper:
    image: 'confluentinc/cp-zookeeper:latest'
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: 'confluentinc/cp-kafka:latest'
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
```

Start Kafka and ZooKeeper:

```bash
docker-compose up -d
```

### 6.3. Producing Messages

**main.py**

```python
from fastapi import FastAPI, BackgroundTasks
from aiokafka import AIOKafkaProducer
import asyncio

app = FastAPI()
loop = asyncio.get_event_loop()
producer = AIOKafkaProducer(loop=loop, bootstrap_servers='localhost:9092')

@app.on_event("startup")
async def startup_event():
    await producer.start()

@app.on_event("shutdown")
async def shutdown_event():
    await producer.stop()

async def send_message(topic: str, message: str):
    await producer.send_and_wait(topic, message.encode())

@app.post("/tasks")
async def create_task(message: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(send_message, "task_topic", message)
    return {"message": "Task received"}
```

**Explanation**:

- **AIOKafkaProducer**: Initializes a Kafka producer.
- **send_message**: Sends a message to the specified Kafka topic.
- **BackgroundTasks**: Used to run `send_message` in the background.

### 6.4. Consuming Messages

**consumer.py**

```python
import asyncio
from aiokafka import AIOKafkaConsumer

async def consume():
    consumer = AIOKafkaConsumer(
        'task_topic',
        bootstrap_servers='localhost:9092',
        group_id="my-group",
    )
    await consumer.start()
    try:
        async for msg in consumer:
            print(f"Received message: {msg.value.decode()}")
    finally:
        await consumer.stop()

if __name__ == "__main__":
    asyncio.run(consume())
```

**Explanation**:

- **AIOKafkaConsumer**: Initializes a Kafka consumer.
- **async for msg in consumer**: Asynchronously consumes messages from the topic.


## 7. Comparing RabbitMQ and Kafka Integration

**Similarities**:

- Both use asynchronous clients (`aio-pika`, `aiokafka`).
- Both require running a separate service (RabbitMQ or Kafka).
- Message publishing and consuming involve similar patterns.

**Differences**:

- **RabbitMQ**: More suitable for task queues, supports complex routing, and acknowledgments are managed per message.
- **Kafka**: Designed for high-throughput, works with streams of data, and consumers track offsets.

## 8. Implementing Asynchronous Tasks

Let's create a more practical example where FastAPI handles incoming requests and offloads heavy computations to a background worker.

### 8.1. Example with RabbitMQ

**main.py** (updated)

```python
@app.post("/process-data")
async def process_data(data: dict, background_tasks: BackgroundTasks):
    message = json.dumps(data)
    background_tasks.add_task(publish_message, message)
    return {"message": "Data received and processing started"}
```

**worker.py** (updated)

```python
async def main():
    # ... existing code ...

                    # Simulate heavy computation
                    data = json.loads(message.body.decode())
                    result = heavy_computation(data)
                    print(f"Processed data: {result}")
```

### 8.2. Example with Kafka

**main.py** (updated)

```python
@app.post("/process-data")
async def process_data(data: dict, background_tasks: BackgroundTasks):
    message = json.dumps(data)
    background_tasks.add_task(send_message, "task_topic", message)
    return {"message": "Data received and processing started"}
```

**consumer.py** (updated)

```python
async def consume():
    # ... existing code ...

            data = json.loads(msg.value.decode())
            result = heavy_computation(data)
            print(f"Processed data: {result}")
```

**Note**: Replace `heavy_computation` with your actual computation logic.

## 9. Error Handling and Retries

### 9.1. RabbitMQ Error Handling

- Use **Dead Letter Exchanges** to handle failed messages.
- Implement **Retry Mechanisms** using message headers.

**worker.py** (handling exceptions)

```python
try:
    # Process message
except Exception as e:
    print(f"Error processing message: {e}")
    await message.nack(requeue=False)
```

### 9.2. Kafka Error Handling

- Consumers can commit offsets after processing messages.
- Implement **Retry Logic** in the consumer application.

**consumer.py** (handling exceptions)

```python
try:
    # Process message
except Exception as e:
    print(f"Error processing message: {e}")
    # Decide whether to continue or halt
```

## 10. Testing the Application

### 10.1. Testing RabbitMQ Integration

Start the FastAPI app:

```bash
uvicorn main:app --reload
```

Run the worker:

```bash
python worker.py
```

Send a request:

```bash
http POST http://localhost:8000/process-data data='{"value": 42}'
```

Check the worker output for processed messages.

### 10.2. Testing Kafka Integration

Start the FastAPI app:

```bash
uvicorn main:app --reload
```

Run the consumer:

```bash
python consumer.py
```

Send a request:

```bash
http POST http://localhost:8000/process-data data='{"value": 42}'
```

Check the consumer output for processed messages.

## 11. Conclusion

In this tutorial, we've:

- Understood the roles of RabbitMQ and Apache Kafka as message brokers.
- Set up RabbitMQ and Kafka using Docker.
- Integrated both RabbitMQ and Kafka with a FastAPI application.
- Implemented message publishing and consuming for both systems.
- Compared the integration processes and use cases for RabbitMQ and Kafka.
- Handled errors and tested the application.

By integrating RabbitMQ or Kafka with FastAPI, you can build scalable, asynchronous applications capable of handling high loads and complex workflows.

## 12. References

- [FastAPI Documentation](https://fastapi.tiangolo.com/)
- [RabbitMQ Official Documentation](https://www.rabbitmq.com/documentation.html)
- [Apache Kafka Documentation](https://kafka.apache.org/documentation/)
- [aio-pika Documentation](https://aio-pika.readthedocs.io/en/latest/)
- [aiokafka Documentation](https://aiokafka.readthedocs.io/en/stable/)
- [Asynchronous Programming in Python](https://docs.python.org/3/library/asyncio.html)
- [Message Queue Concepts](https://www.cloudamqp.com/blog/part1-rabbitmq-for-beginners-what-is-rabbitmq.html)