
# 🏭 AI + Kafka Production Patterns (Colab Demo)

This notebook builds on the first demo and introduces **production-ready concepts** for AI + Kafka pipelines:

1. **Schema Registry (contracts with Avro/JSON-Schema)**  
2. **Dead-letter Queues (DLQ) for failed events**  
3. **Consumer Groups & Scaling**  
4. **Metrics & Monitoring Hooks**  

Just like before, you can run in two modes:
- `LOCAL_SIM` *(default)* — no Kafka account required (in-memory queue).
- `KAFKA` — connect to Confluent Cloud with credentials.



In [1]:

# @title Install dependencies
!pip -q install confluent-kafka==2.4.0                   fastavro==1.9.7                   prometheus-client==0.21.0                   faker==26.0.0                   pandas matplotlib


[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.1/4.1 MB[0m [31m32.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.3/3.3 MB[0m [31m30.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m54.7/54.7 kB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m35.4 MB/s[0m eta [36m0:00:00[0m
[?25h

In [2]:

# @title Configuration
from dataclasses import dataclass

MODE = "LOCAL_SIM"  # @param ["LOCAL_SIM", "KAFKA"]
INPUT_TOPIC = "events.raw"
OUTPUT_TOPIC = "events.scored"
DLQ_TOPIC = "events.dlq"

@dataclass
class Config:
    mode: str
    input_topic: str = INPUT_TOPIC
    output_topic: str = OUTPUT_TOPIC
    dlq_topic: str = DLQ_TOPIC
    n_events: int = 50

CFG = Config(mode=MODE)
CFG


Config(mode='LOCAL_SIM', input_topic='events.raw', output_topic='events.scored', dlq_topic='events.dlq', n_events=50)

In [3]:

# @title Schema Registry (simulated with Avro)
from fastavro import parse_schema, schemaless_writer, schemaless_reader
import io

# Define schema
event_schema = {
    "type": "record",
    "name": "Event",
    "fields": [
        {"name": "event_id", "type": "string"},
        {"name": "user_id", "type": "string"},
        {"name": "text", "type": "string"},
        {"name": "lang", "type": "string"},
    ],
}
parsed_schema = parse_schema(event_schema)

def serialize_avro(record: dict) -> bytes:
    buf = io.BytesIO()
    schemaless_writer(buf, parsed_schema, record)
    return buf.getvalue()

def deserialize_avro(payload: bytes) -> dict:
    buf = io.BytesIO(payload)
    return schemaless_reader(buf, parsed_schema)

# Test it
sample = {"event_id":"123","user_id":"42","text":"hello world","lang":"en"}
ser = serialize_avro(sample)
print("Serialized bytes:", ser[:20], "...")
print("Deserialized:", deserialize_avro(ser))


Serialized bytes: b'\x06123\x0442\x16hello world\x04' ...
Deserialized: {'event_id': '123', 'user_id': '42', 'text': 'hello world', 'lang': 'en'}


In [4]:

# @title Local simulated topics (for LOCAL_SIM mode)
import asyncio

class LocalQueue:
    def __init__(self):
        self.q = asyncio.Queue()
    async def produce(self, value: bytes):
        await self.q.put(value)
    async def consume(self, timeout=0.2):
        try:
            return await asyncio.wait_for(self.q.get(), timeout=timeout)
        except asyncio.TimeoutError:
            return None


In [5]:

# @title Producer with Avro encoding
import uuid, random
from faker import Faker
fake = Faker()

async def producer(cfg: Config, topic_q, n=10):
    for i in range(n):
        ev = {
            "event_id": str(uuid.uuid4()),
            "user_id": str(random.randint(1,100)),
            "text": random.choice(["great app","bad service","love it","hate it"]),
            "lang": random.choice(["en","hi"]),
        }
        await topic_q.produce(serialize_avro(ev))
    print("[Producer] Done")


In [6]:

# @title Worker with DLQ (if inference fails)
import random

async def worker(cfg: Config, in_q, out_q, dlq_q):
    processed, failed = 0,0
    while True:
        msg = await in_q.consume()
        if msg is None:
            break
        try:
            ev = deserialize_avro(msg)
            # Simulate failure
            if random.random() < 0.2:
                raise ValueError("Simulated inference error")
            # Enrich with fake sentiment
            ev["sentiment"] = random.choice(["POSITIVE","NEGATIVE"])
            await out_q.produce(serialize_avro(ev))
            processed += 1
        except Exception as e:
            await dlq_q.produce(msg)
            failed += 1
    print(f"[Worker] processed={processed}, failed={failed}")


In [7]:

# @title Sink with Prometheus-style metrics
from prometheus_client import Counter, CollectorRegistry, generate_latest

registry = CollectorRegistry()
c_processed = Counter("events_processed","Events processed",registry=registry)
c_failed = Counter("events_failed","Events failed",registry=registry)

async def sink(cfg: Config, out_q, dlq_q):
    processed, failed = [],[]
    while True:
        msg = await out_q.consume()
        if msg:
            ev = deserialize_avro(msg)
            processed.append(ev)
            c_processed.inc()
        msg2 = await dlq_q.consume()
        if msg2:
            ev2 = deserialize_avro(msg2)
            failed.append(ev2)
            c_failed.inc()
        if not msg and not msg2:
            break
    print("[Sink] Final metrics:")
    print(generate_latest(registry).decode())
    return processed, failed


In [8]:

# @title Run the demo pipeline (LOCAL_SIM)
async def main(cfg: Config):
    in_q = LocalQueue()
    out_q = LocalQueue()
    dlq_q = LocalQueue()

    await producer(cfg, in_q, cfg.n_events)
    await worker(cfg, in_q, out_q, dlq_q)
    results, failures = await sink(cfg, out_q, dlq_q)

    import pandas as pd
    df = pd.DataFrame(results)
    print("Processed:", df.head())
    print("Failures:", len(failures))

import nest_asyncio, asyncio
nest_asyncio.apply()
await main(CFG)


[Producer] Done
[Worker] processed=37, failed=13
[Sink] Final metrics:
# HELP events_processed_total Events processed
# TYPE events_processed_total counter
events_processed_total 37.0
# HELP events_processed_created Events processed
# TYPE events_processed_created gauge
events_processed_created 1.7565722780219715e+09
# HELP events_failed_total Events failed
# TYPE events_failed_total counter
events_failed_total 13.0
# HELP events_failed_created Events failed
# TYPE events_failed_created gauge
events_failed_created 1.756572278022097e+09

Processed:                                event_id user_id         text lang
0  d7ccf351-2549-4f61-9019-5bb7eeb92424      38  bad service   en
1  78b5c6c3-b5ff-4c36-9a17-3347e0fdaeb7      46      love it   en
2  4f7d19ee-994e-405e-a656-b08974c03853       6      love it   hi
3  6ec7b4e4-dcce-4337-af74-c38bb173c4af      81  bad service   hi
4  9ddcce45-bb93-472c-88a8-2a2f3faa8d73      30      hate it   en
Failures: 13



## 🧰 What You Learned

- **Schema Registry**: We used Avro schemas to serialize/deserialize events → prevents schema drift.  
- **DLQ**: Failed events go to `events.dlq` so you don’t lose data.  
- **Consumer Groups**: (not simulated here, but in real Kafka multiple workers can share a topic → scale out).  
- **Metrics**: Exposed Prometheus counters for observability.  

In production, you’d also add:
- **Retries before DLQ**
- **Alerting on DLQ growth**
- **Tracing with OpenTelemetry**
- **Dashboards with Grafana**

