In [4]:
import time
import json
import sqlite3
from queue import Queue
import random
import string
from threading import Thread
import os

In [5]:
def producer(q):
    """Put JSON strings into the queue every few seconds."""
    for _ in range(10):  # 10 messages
        event = {
            "ride_id": ''.join(random.choices(string.ascii_uppercase + string.digits, k=6)),
            "driver_id": random.choice(["D1", "D2", "D3"]),
            "cost": round(random.uniform(5, 50), 2),
            "ts": time.strftime("%Y-%m-%d %H:%M:%S")
        }
        q.put(json.dumps(event))
        time.sleep(random.uniform(2, 6))

In [6]:
def stream_to_sqlite(q, db_path, table):
    conn = sqlite3.connect(db_path)
    cur = conn.cursor()

    total_start = time.time()
    while True:
        try:
            # wait up to 60 sec for a new message
            data = q.get(timeout=60)
        except queue.Empty:
            print("No message for 60s → stopping.")
            break

        record = json.loads(data)
        cur.execute(
            f"INSERT INTO {table}(ride_id, driver_id, cost, ts) VALUES (?, ?, ?, ?)",
            (record["ride_id"], record["driver_id"], record["cost"], record["ts"])
        )
        conn.commit()
        print("Inserted:", record)

        # Stop if total streaming time > 60 s
        if time.time() - total_start > 60:
            print("Total run exceeded 60s → stopping.")
            break

        # Optional wait before next poll
        time.sleep(1)

    conn.close()
    print("Stream processing finished.")

In [8]:
def create_table(db_path="rides.db"):
    """
    Create an SQLite database and a table for ride events
    if they don't already exist.
    """
    conn = sqlite3.connect(db_path)
    cur = conn.cursor()

    # Create table with simple schema
    cur.execute("""
        CREATE TABLE IF NOT EXISTS ride_events (
            ride_id   TEXT,
            driver_id TEXT,
            cost      REAL,
            ts        TEXT
        );
    """)

    conn.commit()
    conn.close()
    print(f"Database '{db_path}' and table 'ride_events' ready.")

In [10]:
import queue


if __name__ == "__main__":
    create_table()
    q = queue.Queue()
    Thread(target=producer, args=(q,), daemon=True).start()
    stream_to_sqlite(q, "rides.db", "ride_events")

Database 'rides.db' and table 'ride_events' ready.
Inserted: {'ride_id': 'TSJJWI', 'driver_id': 'D2', 'cost': 30.41, 'ts': '2025-09-16 10:57:59'}
Inserted: {'ride_id': 'T5F6DW', 'driver_id': 'D2', 'cost': 34.63, 'ts': '2025-09-16 10:58:04'}
Inserted: {'ride_id': 'K76DDD', 'driver_id': 'D1', 'cost': 17.72, 'ts': '2025-09-16 10:58:07'}
Inserted: {'ride_id': 'KU9IWR', 'driver_id': 'D1', 'cost': 24.76, 'ts': '2025-09-16 10:58:09'}
Inserted: {'ride_id': '4IWLRI', 'driver_id': 'D2', 'cost': 34.32, 'ts': '2025-09-16 10:58:13'}
Inserted: {'ride_id': '939QKO', 'driver_id': 'D2', 'cost': 36.33, 'ts': '2025-09-16 10:58:19'}
Inserted: {'ride_id': 'KNE8LT', 'driver_id': 'D2', 'cost': 45.97, 'ts': '2025-09-16 10:58:23'}
Inserted: {'ride_id': '94R3B2', 'driver_id': 'D1', 'cost': 30.94, 'ts': '2025-09-16 10:58:25'}
Inserted: {'ride_id': '9XNTIH', 'driver_id': 'D2', 'cost': 23.93, 'ts': '2025-09-16 10:58:30'}
Inserted: {'ride_id': '3WM0A0', 'driver_id': 'D2', 'cost': 10.67, 'ts': '2025-09-16 10:58:34'}