In [1]:
# Kafka Consumer → SQLite
# ------------------------
# This script consumes basket data from Kafka and stores it in an SQLite database
# for later analysis (e.g., clustering, prediction, web dashboard).

import sqlite3
import json
import time
from kafka import KafkaConsumer
from datetime import datetime, timezone

# SQLite setup
conn = sqlite3.connect("baskets.db")
cursor = conn.cursor()

# Create tables if they don't exist
cursor.execute("""
CREATE TABLE IF NOT EXISTS baskets (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    timestamp TEXT
);
""")

cursor.execute("""
CREATE TABLE IF NOT EXISTS basket_items (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    basket_id INTEGER,
    item TEXT,
    FOREIGN KEY(basket_id) REFERENCES baskets(id)
);
""")
conn.commit()

# Kafka consumer setup
consumer = KafkaConsumer(
    'baskets',
    bootstrap_servers='localhost:9092',
    auto_offset_reset='earliest',
    value_deserializer=lambda v: json.loads(v.decode('utf-8')),
    group_id='basket-db-writer'
)

print("📡 Listening for basket data from Kafka...")

try:
    for message in consumer:
        data = message.value
        items = data.get("items", [])
        timestamp = datetime.fromtimestamp(data.get("timestamp", time.time()), timezone.utc).isoformat()

        # Insert basket and get its ID
        cursor.execute("INSERT INTO baskets (timestamp) VALUES (?)", (timestamp,))
        basket_id = cursor.lastrowid

        # Insert items
        cursor.executemany(
            "INSERT INTO basket_items (basket_id, item) VALUES (?, ?)",
            [(basket_id, item) for item in items]
        )

        conn.commit()
        print(f"✅ Stored basket with {len(items)} items @ {timestamp}")

except KeyboardInterrupt:
    print("⛔ Consumer stopped.")

finally:
    consumer.close()
    conn.close()


📡 Listening for basket data from Kafka...
✅ Stored basket with 4 items @ 2025-05-29T09:04:30.373390+00:00
✅ Stored basket with 3 items @ 2025-05-29T09:04:31.502320+00:00
✅ Stored basket with 1 items @ 2025-05-29T09:04:32.504750+00:00
✅ Stored basket with 4 items @ 2025-05-29T09:04:33.506624+00:00
✅ Stored basket with 4 items @ 2025-05-29T09:04:34.510239+00:00
✅ Stored basket with 5 items @ 2025-05-29T09:04:35.511985+00:00
✅ Stored basket with 1 items @ 2025-05-29T09:04:36.514248+00:00
✅ Stored basket with 5 items @ 2025-05-29T09:04:37.516573+00:00
✅ Stored basket with 1 items @ 2025-05-29T09:04:38.517461+00:00
✅ Stored basket with 2 items @ 2025-05-29T09:04:39.518453+00:00
✅ Stored basket with 5 items @ 2025-05-29T09:04:40.519610+00:00
✅ Stored basket with 9 items @ 2025-05-29T09:04:41.520903+00:00
✅ Stored basket with 1 items @ 2025-05-29T09:04:42.524303+00:00
✅ Stored basket with 3 items @ 2025-05-29T09:04:43.527026+00:00
✅ Stored basket with 2 items @ 2025-05-29T09:04:44.527836+00:0