# ChatBot System Design (Streaming Similar to ChatGPT)

## Technologies Used
- **Frontend:** ReactJS
- **Backend:** Python (FastAPI)
- **Streaming/Reactive Programming:** React Programming
- **Message Broker:** Kafka

## High-Level Design Steps

### 1. User Query Input
- Users can ask queries to the ChatBot through a user-friendly interface built with ReactJS.

### 2. Request Handling
- The user’s query will be sent to the backend where Python's FastAPI will handle the request.
- Implement streaming or reactive programming concepts to process the incoming messages.
- The message should then be published to a Kafka topic.

### 3. Kafka High-Level Design

#### Data Storage and Retrieval
- Use Kafka topics to store incoming queries and system-generated responses.
- Consider using separate topics for different stages of the message lifecycle:
  - `user_queries`
  - `system_responses`

### 4. Generate System Response
- Once a query is received in the Kafka topic, a consumer service will read the message and generate an appropriate response.
- This response will then be stored in another Kafka topic (`system_responses`).

### 5. Response Delivery
- A Kafka producer will read the system-generated response from the `system_responses` topic.
- The producer will use Python's FastAPI to push the response back to the ReactJS front-end to be displayed to the user in real-time.

## High-Level Flow Diagram

```plaintext
+---------------+        +----------------+      +------------------+      +-------------------+
|               |        |                |      |                  |      |                   |
|  User Query   +------->+  FastAPI        +----->+  Kafka Topic      +----->+  Consumer Service  |
|   (ReactJS)   |        |   (Backend)     |      |  (user_queries)  |      |  (Generate Response)|
|               |        |                |      |                  |      |                   |
+---------------+        +----------------+      +------------------+      +-------------------+
                                                                            |
                                                                            v
+---------------+        +----------------+      +------------------+      +-------------------+
|               |        |                |      |                  |      |                   |
| System Response+<-------+  FastAPI        +<-----+  Kafka Topic      +<-----+  Producer Service |
|   (ReactJS)   |        |   (Backend)     |      |  (system_responses)|      |   (Push Response) |
|               |        |                |      |                  |      |                   |
+---------------+        +----------------+      +------------------+      +-------------------+


# ReactJS - User Input and Display

## src/App.js

In [None]:
import React, { useState } from 'react';
import './App.css';

function App() {
  const [query, setQuery] = useState('');
  const [response, setResponse] = useState('');

  const handleQueryChange = (e) => {
    setQuery(e.target.value);
  };

  const handleQuerySubmit = async () => {
    const res = await fetch('http://localhost:8000/query', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ query })
    });
    const data = await res.json();
    setResponse(data.response);
  };

  return (
    <div className="App">
      <header className="App-header">
        <h1>ChatBot</h1>
        <input 
          type="text" 
          value={query} 
          onChange={handleQueryChange} 
          placeholder="Ask your question..." 
        />
        <button onClick={handleQuerySubmit}>Submit</button>
        <p>Response: {response}</p>
      </header>
    </div>
  );
}

export default App;


### `src/index.css`

In [None]:
.App {
  text-align: center;
}

.App-logo {
  height: 40vmin;
  pointer-events: none;
}

.App-header {
  background-color: #282c34;
  min-height: 100vh;
  display: flex;
  flex-direction: column;
  align-items: center;
  justify-content: center;
  font-size: calc(10px + 2vmin);
  color: white;
}


<h1>Python FastAPI - Backend to Handle Query and Kafka Integration</h1>

<h2>app/main.py</h2>

In [None]:
from fastapi import FastAPI, Request
from pydantic import BaseModel
from kafka import KafkaProducer, KafkaConsumer
import json

app = FastAPI()

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

class Query(BaseModel):
    query: str

@app.post("/query")
async def create_query(query: Query):
    producer.send('user_queries', {'query': query.query})
    producer.flush()
    
    consumer = KafkaConsumer(
        'system_responses',
        bootstrap_servers='localhost:9092',
        auto_offset_reset='earliest',
        enable_auto_commit=True,
        group_id='consumer-group-a'
    )
    
    for message in consumer:
        response = json.loads(message.value.decode('utf-8'))
        if response['query'] == query.query:
            return {"response": response['response']}


<h1>Kafka Consumer - System Response Generator</h1>

<h2>consumer.py</h2>

In [None]:
from kafka import KafkaConsumer, KafkaProducer
import json
import uuid

consumer = KafkaConsumer(
    'user_queries',
    bootstrap_servers='localhost:9092',
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='consumer-group-b'
)

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

for message in consumer:
    query = json.loads(message.value.decode('utf-8'))['query']
    response = f"Generated response for '{query}'"
    producer.send('system_responses', {'query': query, 'response': response})
    producer.flush()


# Create the Producer Service
<b>`main.py`</b> - FastAPI Application:

In [None]:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from confluent_kafka import Producer, KafkaError

app = FastAPI()

# Kafka configuration
kafka_config = {
    'bootstrap.servers': 'localhost:9092',
    'linger.ms': 10
}
kafka_topic = 'system_responses'

# Initialize Kafka producer
producer = Producer(kafka_config)

class ResponseMessage(BaseModel):
    user_id: int
    message: str

def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

@app.post("/produce/")
async def produce_message(response_message: ResponseMessage):
    try:
        # Produce message to Kafka
        producer.produce(
            kafka_topic,
            key=str(response_message.user_id),
            value=response_message.message,
            callback=delivery_report
        )
        producer.poll(0)
        producer.flush()
        return {"status": "message sent"}
    except KafkaError as e:
        raise HTTPException(status_code=500, detail=f"Kafka error: {e}")

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=8000)


## Modularize Kafka implementation

<b>Project Structure</b>

In [None]:
kafka_producer_service/
├── main.py
├── api/
│   ├── __init__.py
│   └── producer.py
└── kafka/
    ├── __init__.py
    └── config.py


1. <b>`kafka/config.py`</b> -<h3> Kafka Configuration</h3

In [None]:
from confluent_kafka import Producer

def get_kafka_producer():
    kafka_config = {
        'bootstrap.servers': 'localhost:9092',
        'linger.ms': 10
    }
    return Producer(kafka_config)


2. <b>`api/producer.py`<b> - <h3>Producer API</h3

In [None]:
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from confluent_kafka import KafkaError
from kafka.config import get_kafka_producer

router = APIRouter()

# Initialize Kafka producer
producer = get_kafka_producer()
kafka_topic = 'system_responses'

class ResponseMessage(BaseModel):
    user_id: int
    message: str

def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

@router.post("/produce/")
async def produce_message(response_message: ResponseMessage):
    try:
        # Produce message to Kafka
        producer.produce(
            kafka_topic,
            key=str(response_message.user_id),
            value=response_message.message,
            callback=delivery_report
        )
        producer.poll(0)
        producer.flush()
        return {"status": "message sent"}
    except KafkaError as e:
        raise HTTPException(status_code=500, detail=f"Kafka error: {e}")


3. <b>`main.py`</b> - <h3>FastAPI Application Entry Point</h3>

In [None]:
from fastapi import FastAPI
from api.producer import router as producer_router

app = FastAPI()

# Include the producer router
app.include_router(producer_router, prefix="/api")

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=8000)


<h1>Setup Kafka Topics</h1>

<p>Make sure to create the necessary Kafka topics (<code>user_queries</code> and <code>system_responses</code>) in your Kafka instance.</p>

<p>This is a skeletal structure to get you started. You can extend it by adding more robust error handling, optimizations, and additional features as per your requirements.</p>


# WebSocketss D generator;


graph TD;
    A[ReactJS (Frontend)] -->|WebSocket Connection| B[WebSocket Server];
    B -->|Produce Message| C[(Kafka)];
    C -->|Consume Message| D[System Response Generator];
    D -->|Produce Response| C;
    C -->|Consume Response| B;
    B -->|Send Response| A;

    classDef react fill:#f9f,stroke:#333,stroke-width:2px;
    classDef ws fill:#6f9,stroke:#333,stroke-width:2px;
    classDef kafka fill:#9cf,stroke:#333,stroke-width:2px;
    classDef generator fill:#fc9,stroke:#333,stroke-width:2px;

    class A react;
    class B ws;
    class C kafka;
    class D generator;

<h1>Design Overview</h1>

<ul>
  <li><strong>ReactJS (Frontend):</strong>
    <ul>
      <li>User inputs and displays the response.</li>
      <li>Communicates with the backend via WebSocket.</li>
    </ul>
  </li>
  <li><strong>WebSocket Server (Backend):</strong>
    <ul>
      <li>Handles incoming queries from the frontend.</li>
      <li>Produces messages to Kafka.</li>
      <li>Consumes messages from Kafka and sends responses back to the frontend.</li>
    </ul>
  </li>
  <li><strong>Kafka:</strong>
    <ul>
      <li>Acts as a message broker between the WebSocket server and the system response generator.</li>
    </ul>
  </li>
  <li><strong>Consumer for System Response Generator:</strong>
    <ul>
      <li>Consumes user queries from Kafka.</li>
      <li>Generates responses and produces them back to Kafka.</li>
    </ul>
  </li>
</ul>

<h1>Implementation</h1>

<h2>1. ReactJS</h2>


In [None]:
import React, { useState, useEffect } from 'react';
import './App.css';

function App() {
  const [query, setQuery] = useState('');
  const [response, setResponse] = useState('');
  const [socket, setSocket] = useState(null);

  useEffect(() => {
    const ws = new WebSocket('ws://localhost:8000');
    ws.onmessage = (event) => {
      const data = JSON.parse(event.data);
      setResponse(data.response);
    };
    setSocket(ws);

    return () => {
      ws.close();
    };
  }, []);

  const handleQueryChange = (e) => {
    setQuery(e.target.value);
  };

  const handleQuerySubmit = () => {
    socket.send(JSON.stringify({ query }));
  };

  return (
    <div className="App">
      <header className="App-header">
        <h1>ChatBot</h1>
        <input 
          type="text" 
          value={query} 
          onChange={handleQueryChange} 
          placeholder="Ask your question..." 
        />
        <button onClick={handleQuerySubmit}>Submit</button>
        <p>Response: {response}</p>
      </header>
    </div>
  );
}

export default App;


<h2>2. WebSocket Server</h2>

<p>First, install the necessary libraries:</p>


In [None]:
pip install websockets kafka-python


<h2>app/server.py</h2>


In [None]:
import asyncio
import websockets
import json
from kafka import KafkaProducer, KafkaConsumer

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

async def handle_client(websocket, path):
    consumer = KafkaConsumer(
        'system_responses',
        bootstrap_servers='localhost:9092',
        auto_offset_reset='earliest',
        enable_auto_commit=True,
        group_id=str(uuid.uuid4())  # Unique group_id for each connection
    )

    async for message in websocket:
        query_data = json.loads(message)
        producer.send('user_queries', query_data)
        producer.flush()
        
        for msg in consumer:
            response_data = json.loads(msg.value.decode('utf-8'))
            if response_data['query'] == query_data['query']:
                await websocket.send(json.dumps({'response': response_data['response']}))
                break

start_server = websockets.serve(handle_client, "localhost", 8000)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()


<h2>3. Kafka Consumer for System Response Generator</h2>

<h3>consumer.py</h3>


In [None]:
from kafka import KafkaConsumer, KafkaProducer
import json

consumer = KafkaConsumer(
    'user_queries',
    bootstrap_servers='localhost:9092',
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='consumer-group-b'
)

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

for message in consumer:
    query = json.loads(message.value.decode('utf-8'))['query']
    response = f"Generated response for '{query}'"
    producer.send('system_responses', {'query': query, 'response': response})
    producer.flush()


<h2>Setup Kafka Topics</h2>

<p>Make sure to create the necessary Kafka topics (<code>user_queries</code> and <code>system_responses</code>) in your Kafka instance.</p>

<p>This setup allows you to communicate from the React app to the WebSocket server, which interacts with Kafka and retrieves/generates responses dynamically. The components are decoupled and can scale independently.</p>


# Code Coverage or Test Coverage

1. <b>`tests/test_messages.py`</b>
This file will contain all of our test cases:

In [None]:
import pytest
from fastapi.testclient import TestClient
from main import app
from unittest.mock import patch, MagicMock
from datetime import datetime
from io import BytesIO
import pytz

client = TestClient(app)

@pytest.fixture
def mock_get_user():
    return {"id": 1, "name": "Test User"}

@pytest.fixture
def mock_session():
    session = MagicMock()
    yield session
    session.close()

@pytest.fixture
def mock_audio_file():
    return UploadFile(
        filename="test_audio.mp3",
        content_type="audio/mpeg",
        file=BytesIO(b"dummy audio contents")
    )

@patch("main.get_user", side_effect=mock_get_user)
@patch("main.get_session", side_effect=mock_session)
def test_send_message_without_audio(mock_get_user, mock_get_session):
    response = client.post("/messages", json={"question": "What is the weather today?"})
    assert response.status_code == 200
    assert "messages" in response.json()

@patch("main.get_user", side_effect=mock_get_user)
@patch("main.get_session", side_effect=mock_session)
def test_send_message_with_empty_audio(mock_get_user, mock_get_session):
    empty_audio = UploadFile(
        filename="test_audio.mp3",
        content_type="audio/mpeg",
        file=BytesIO(b"")
    )
    response = client.post("/messages", files={"audio": empty_audio.file})
    assert response.status_code == 400
    assert response.json() == {"detail": "Uploaded audio file is empty"}

@patch("main.get_user", side_effect=mock_get_user)
@patch("main.get_session", side_effect=mock_session)
def test_send_message_with_audio(mock_get_user, mock_get_session, mock_audio_file):
    response = client.post("/messages", files={"audio": mock_audio_file.file})
    assert response.status_code == 200
    assert "messages" in response.json()

@patch("main.get_user", side_effect=mock_get_user)
@patch("main.get_session", side_effect=mock_session)
def test_send_message_with_conversation_id(mock_get_user, mock_get_session):
    conversation_id = 1234
    response = client.post("/messages", json={"conversation_id": conversation_id, "question": "Tell me a joke."})
    assert response.status_code == 200
    assert "messages" in response.json()

@patch("main.get_user", side_effect=mock_get_user)
@patch("main.get_session", side_effect=mock_session)
def test_send_message_without_question_and_audio(mock_get_user, mock_get_session):
    response = client.post("/messages")
    assert response.status_code == 400
    assert response.json() == {"detail": "Text or audio file is required"}

# Add more tests as needed


In [None]:
import pytest
from fastapi.testclient import TestClient
from main import app
from unittest.mock import patch, MagicMock
from datetime import datetime
from io import BytesIO
import pytz
from fastapi import UploadFile  # Fix: Add this missing import

client = TestClient(app)

@pytest.fixture
def mock_get_user():
    return {"id": 1, "name": "Test User"}

@pytest.fixture
def mock_session():
    session = MagicMock()
    yield session
    session.close()

@pytest.fixture
def mock_audio_file():
    return UploadFile(
        filename="test_audio.mp3",
        content_type="audio/mpeg",
        file=BytesIO(b"dummy audio contents")
    )

@pytest.fixture(autouse=True)
def override_dependencies(mock_get_user, mock_session):
    with patch("main.get_user", return_value=mock_get_user), \
         patch("main.get_session", return_value=mock_session):
        yield

def test_send_message_without_audio():
    response = client.post("/messages", json={"question": "What is the weather today?"})
    assert response.status_code == 200
    assert "messages" in response.json()

def test_send_message_with_empty_audio():
    empty_audio = UploadFile(
        filename="test_audio.mp3",
        content_type="audio/mpeg",
        file=BytesIO(b"")
    )
    response = client.post("/messages", files={"audio": ("test_audio.mp3", empty_audio.file, "audio/mpeg")})
    assert response.status_code == 400
    assert response.json() == {"detail": "Uploaded audio file is empty"}

def test_send_message_with_audio(mock_audio_file):
    response = client.post("/messages", files={"audio": (mock_audio_file.filename, mock_audio_file.file, mock_audio_file.content_type)})
    assert response.status_code == 200
    assert "messages" in response.json()

def test_send_message_with_conversation_id():
    conversation_id = 1234
    response = client.post("/messages", json={"conversation_id": conversation_id, "question": "Tell me a joke."})
    assert response.status_code == 200
    assert "messages" in response.json()

def test_send_message_without_question_and_audio():
    response = client.post("/messages")
    assert response.status_code == 400
    assert response.json() == {"detail": "Text or audio file is required"}


### Explanation

**Fixtures:** Used to mock user data and session objects that will be passed to the endpoint.

**Mocking Dependencies:** We patch the dependencies like `get_user` and `get_session` to return controlled data for our tests.

### Test Cases

- **test_send_message_without_audio:** Tests sending a message without an audio file.
- **test_send_message_with_empty_audio:** Tests handling of an empty audio file.
- **test_send_message_with_audio:** Tests sending a message with a valid audio file.
- **test_send_message_with_conversation_id:** Tests sending a message with a specific conversation ID.
- **test_send_message_without_question_and_audio:** Tests handling when both question and audio are missing.

You can expand your tests by adding more scenarios and edge cases as needed.

**Run the tests using:** 
```bash
pytest


******************************************************

## consumer_service.py: Consuming and Producing with Transactions

In [None]:
import asyncio
from confluent_kafka import Consumer, KafkaException
from producer_service import create_producer

async def consume_and_produce(query_string):
    consumer_config = {
        'bootstrap.servers': 'localhost:9092',
        'group.id': 'my-consumer-group',
        'enable.auto.commit': False,
        'isolation.level': 'read_committed'
    }

    consumer = Consumer(consumer_config)
    consumer.subscribe(['input_topic'])

    # Producer configuration with transactions
    producer = create_producer()
    producer.init_transactions()

    try:
        while True:
            msg = consumer.poll(timeout=1.0)
            if msg is None:
                continue
            
            if msg.error():
                raise KafkaException(msg.error())

            key = msg.key().decode('utf-8')
            value = msg.value().decode('utf-8')

            if query_string in value:
                processed_value = value.upper()  # Example processing logic

                try:
                    producer.begin_transaction()
                    producer.produce('output_topic', key=key.encode('utf-8'), value=processed_value.encode('utf-8'))
                    producer.send_offsets_to_transaction(
                        [msg],
                        consumer_config['group.id']
                    )
                    producer.commit_transaction()

                    consumer.commit(asynchronous=False)
                    return {'key': key, 'value': processed_value}

                except KafkaException as e:
                    producer.abort_transaction()
                    print(f"Transaction aborted due to error: {e}")
                    raise
    finally:
        producer.flush()
        consumer.close()


## Create a new file named consumer.py and define your route there using FastAPI's APIRouter.

In [None]:
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel
from consumer_service import consume_and_produce

router = APIRouter()

class MessageResponse(BaseModel):
    key: str
    value: str

@router.get('/consume', response_model=MessageResponse)
async def consume(query_string: str = Query(..., min_length=1)):
    try:
        result = await consume_and_produce(query_string)
        return result
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


<b>Now, import the router from consumer.py and include it in your FastAPI app in main.py.</b>

In [None]:
from fastapi import FastAPI
import uvicorn
from consumer import router as consumer_router

app = FastAPI()

# Include the router
app.include_router(consumer_router, prefix="/api")

if __name__ == '__main__':
    uvicorn.run(app, host='0.0.0.0', port=8000)


*******************************************************

html
<h2>Database Transaction Decorator</h2>
<p>This decorator ensures that any function it wraps is executed within a database transaction. If the function raises an exception, the transaction is rolled back; otherwise, it is committed.</p>

In [None]:
from sqlalchemy.orm import Session
from contextlib import contextmanager

@contextmanager
def transactional(session: Session):
    try:
        yield
        session.commit()
    except Exception as e:
        session.rollback()
        raise e

def transaction_decorator(func):
    def wrapper(*args, **kwargs):
        session = kwargs.get('session')
        if not session:
            raise ValueError("Session is required")
        with transactional(session):
            return func(*args, **kwargs)
    return wrapper


html
<h2>Kafka Consumer Decorator</h2>
<p>This decorator consumes messages from a Kafka topic and passes them to the wrapped function.</p>

In [None]:
from confluent_kafka import Consumer, KafkaException

def kafka_consumer_decorator(consumer_conf, topics):
    def decorator(func):
        def wrapper(*args, **kwargs):
            consumer = Consumer(consumer_conf)
            consumer.subscribe(topics)

            try:
                while True:
                    msg = consumer.poll(timeout=1.0)
                    if msg is None:
                        continue
                    if msg.error():
                        if msg.error().code() == KafkaError._PARTITION_EOF:
                            # End of partition event
                            continue
                        else:
                            raise KafkaException(msg.error())
                    func(msg.value(), *args, **kwargs)
            finally:
                consumer.close()
        
        return wrapper
    
    return decorator


html
<h2>Kafka Producer Decorator</h2>
<p>This decorator sends messages to a Kafka topic after the wrapped function executes successfully.</p>

In [None]:
from confluent_kafka import Producer

producer_instance = None

def get_producer(producer_conf):
    global producer_instance
    if not producer_instance:
        producer_instance = Producer(producer_conf)
    return producer_instance

def kafka_producer_decorator(producer_conf, topic):
    def decorator(func):
        def wrapper(*args, **kwargs):
            producer = get_producer(producer_conf)
            result = func(*args, **kwargs)
            producer.produce(topic, key=None, value=result)
            producer.flush()
            return result
        return wrapper
    return decorator


html
<h2>Example Usage</h2>
<p>Here’s how you can use these decorators in your code:</p>

In [None]:
from fastapi import FastAPI, Depends

# Assume that get_db_session is a dependency that provides a database session
from dependencies import get_db_session 

app = FastAPI()

# Example of usage in a route with SQLAlchemy transactions
@app.post('/transactional-endpoint')
@transaction_decorator
async def transactional_endpoint(data: dict, session: Session = Depends(get_db_session)):
    # Your business logic here
    ...

# Example Kafka Consumer configuration
consumer_config = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group',
    'auto.offset.reset': 'earliest'
}

# Example Kafka Producer configuration
producer_config = {
    'bootstrap.servers': 'localhost:9092'
}

# Creating a function using Kafka consumer
@kafka_consumer_decorator(consumer_config, ['my-topic'])
def consume_kafka_message(message, *args, **kwargs):
    print(f"Consumed message: {message}")

# Using Kafka Producer in a function
@kafka_producer_decorator(producer_config, 'output-topic')
def process_data():
    data = "Processed Data"
    return data

# Call the function to start consuming Kafka messages
consume_kafka_message()

# To produce a message
process_data()


************************************************

html
<h2>Kafka Transaction Decorator</h2>
<p>Creating a Kafka transaction decorator for both producer and consumer operations requires handling transactions in such a way that ensures message delivery guarantees (exactly-once semantics). This involves creating a decorator that manages transactional producers and consumers effectively.</p>
<p>We'll use the <code>confluent_kafka</code> library, which supports Kafka's transactions. Below is an example showing how you can create these decorators.</p>

<h3>Producer Transaction Decorator</h3>

In [None]:
from confluent_kafka import Producer, KafkaException

def kafka_producer_transaction_decorator(producer_conf):
    def decorator(func):
        def wrapper(*args, **kwargs):
            producer = Producer(producer_conf)
            try:
                producer.init_transactions()
                producer.begin_transaction()
                result = func(producer, *args, **kwargs)
                producer.commit_transaction()
            except KafkaException as e:
                producer.abort_transaction()
                raise e
            finally:
                producer.flush()
            return result
        return wrapper
    return decorator


## Consumer Transaction Decorator

In [None]:
from confluent_kafka import Consumer, KafkaError, KafkaException

def kafka_consumer_transaction_decorator(consumer_conf, topics):
    def decorator(func):
        def wrapper(*args, **kwargs):
            consumer = Consumer(consumer_conf)
            consumer.subscribe(topics)

            try:
                while True:
                    msg = consumer.poll(timeout=1.0)
                    if msg is None:
                        continue
                    if msg.error():
                        if msg.error().code() == KafkaError._PARTITION_EOF:
                            # End of partition event
                            continue
                        else:
                            raise KafkaException(msg.error())
                    
                    producer = kwargs.get('producer')
                    if not producer:
                        raise ValueError("Producer is required to pass to the decorated function")

                    try:
                        producer.init_transactions()
                        producer.begin_transaction()
                        func(msg.value(), producer, *args, **kwargs)
                        producer.send_offsets_to_transaction(
                            {msg.topic(): [(msg.partition(), msg.offset() + 1)]},
                            consumer.consumer_group_metadata()
                        )
                        producer.commit_transaction()
                    except KafkaException as e:
                        producer.abort_transaction()
                        raise e
            finally:
                consumer.close()
        
        return wrapper

    return decorator


### Example Usage
Here's how you can use these decorators in your code:

In [None]:
# Example Kafka Producer configuration
producer_config = {
    'bootstrap.servers': 'localhost:9092',
    'transactional.id': 'my-transactional-producer'
}

# Example Kafka Consumer configuration
consumer_config = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group',
    'enable.auto.commit': False,
    'isolation.level': 'read_committed'  # Important for transactions
}

@kafka_producer_transaction_decorator(producer_config)
def produce_message(producer, some_data):
    producer.produce('output-topic', key=None, value=some_data)

@kafka_consumer_transaction_decorator(consumer_config, ['input-topic'])
def consume_and_process_message(message, producer):
    processed_data = f"Processed {message}"
    produce_message(producer, processed_data)

# Start consuming messages
consume_and_process_message()


***************************************

html
<h2>Combined Transaction Decorators</h2>
<p>Yes, we can create composite decorators that handle both database transactions and Kafka transactions. By nesting decorators, you can ensure that both types of transactions are managed properly within the scope of a single function.</p>
<p>Below is an example implementation showing how you could combine database transaction management (using SQLAlchemy as an example) and Kafka transaction management in decorators:</p>

<h3>Database Transaction Decorator</h3>
<p>We'll use <code>SQLAlchemy</code> for this example. Ensure you have it installed (<code>pip install sqlalchemy</code>).</p>

In [None]:
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine

engine = create_engine('sqlite:///example.db')  # Example with SQLite
Session = sessionmaker(bind=engine)

def db_transaction_decorator(func):
    def wrapper(*args, **kwargs):
        session = Session()
        try:
            result = func(session, *args, **kwargs)
            session.commit()
        except Exception as e:
            session.rollback()
            raise e
        finally:
            session.close()
        return result
    return wrapper


### Kafka Producer Transaction Decorator

In [None]:
from confluent_kafka import Producer, KafkaException

def kafka_producer_transaction_decorator(producer_conf):
    def decorator(func):
        def wrapper(session, *args, **kwargs):
            producer = Producer(producer_conf)
            try:
                producer.init_transactions()
                producer.begin_transaction()
                result = func(session, producer, *args, **kwargs)
                producer.commit_transaction()
            except KafkaException as e:
                producer.abort_transaction()
                raise e
            finally:
                producer.flush()
            return result
        return wrapper
    return decorator


### Combined Usage
Now we will combine these decorators and implement a function that uses both:

In [None]:
# Kafka Producer configuration
producer_config = {
    'bootstrap.servers': 'localhost:9092',
    'transactional.id': 'my-transactional-producer'
}

@db_transaction_decorator
@kafka_producer_transaction_decorator(producer_config)
def complex_operation(session, producer, data):
    # DB operations
    session.add(data)  # Assuming `data` is an instance of an ORM-mapped class
    
    # Kafka operations
    producer.produce('output-topic', key=None, value=str(data))

# Example usage
if __name__ == "__main__":
    from my_models import MyDataClass  # Import your SQLAlchemy models

    new_data = MyDataClass(attribute='value')
    complex_operation(new_data)


html
<h2>Explanation</h2>

<h3>Database Transaction Decorator (<code>db_transaction_decorator</code>):</h3>
<ul>
    <li>Manages a SQLAlchemy session.</li>
    <li>Begins a transaction at the start and commits if the wrapped function succeeds.</li>
    <li>Rolls back the transaction if any exception occurs and closes the session.</li>
</ul>

<h3>Kafka Producer Transaction Decorator (<code>kafka_producer_transaction_decorator</code>):</h3>
<ul>
    <li>Manages Kafka producer transactions.</li>
    <li>Initializes and begins a transaction before calling the wrapped function.</li>
    <li>Commits the Kafka transaction on success and aborts it on failure.</li>
</ul>

<h3>Combined Usage:</h3>
<p>The <code>complex_operation</code> function is decorated with both decorators. It first interacts with the database using the SQLAlchemy session. Then it performs Kafka operations within the same transactional context. The nested decorators ensure that either all operations complete successfully or none do, maintaining consistency across both the database and Kafka.</p>

<h3>Important Considerations:</h3>
<ul>
    <li>Ensure proper error handling to avoid partial updates in either system.</li>
    <li>Make sure both decorators work cohesively to manage the transactions effectively.</li>
    <li>This solution assumes that the potential side effects and performance overhead introduced by transactional management are acceptable for your specific scenario.</li>
</ul>