# Live BTC/ETH streaming with Kafka (Producer → Topics → Consumer Analytics)

This notebook shows an end-to-end pattern for **streaming live BTC/ETH ticks** into **Kafka**, then consuming them for:
- **cleaning & normalization**
- **windowed aggregations** (OHLCV-like bars, returns, spreads)
- **basic anomaly flags** (jump detection)
- **persistence** (optional) and **visualization**

> Notes:
> - The code is written to run locally against a Kafka broker (e.g., `localhost:9092`).
> - "Live" data can come from an exchange WebSocket (e.g., Binance). If your environment blocks outbound network access, run the producer on a machine that can reach the exchange and keep the consumer local.
> - Replace the WebSocket endpoint if you prefer Coinbase, Kraken, etc.

---

## Architecture

**Producer (WebSocket)**  
`Exchange WS → parse → Kafka topic: crypto.ticks`

**Kafka**
- Topic: `crypto.ticks` (keyed by symbol)
- Optional topic: `crypto.alerts` (anomaly events)

**Consumer (Analytics)**
`Kafka → schema validate → window aggregates → visualize/store`

---

## Dependencies

You'll use:
- `kafka-python` for Kafka
- `websockets` for live WS ingestion (producer)
- `pydantic` (optional) for validation
- `pandas`/`numpy`/`matplotlib` for analytics

Install (one-time):
```bash
pip install kafka-python websockets pydantic pandas numpy matplotlib pytest
```

## Kafka quickstart (Docker)

If you don't already have Kafka running, the quickest local setup is Docker. Save this as `docker-compose.yml` and run `docker compose up -d`.

```yaml
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.1
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.6.1
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
```

Create a topic:
```bash
docker exec -it $(docker ps -qf "name=kafka") kafka-topics --bootstrap-server localhost:9092 --create --topic crypto.ticks --partitions 6 --replication-factor 1
```

> Partitions: using 6 partitions lets BTC/ETH and future symbols scale across consumers.

In [None]:
# Notebook config (adjust as needed)

KAFKA_BOOTSTRAP = "localhost:9092"
TICKS_TOPIC = "crypto.ticks"
ALERTS_TOPIC = "crypto.alerts"  # optional
SYMBOLS = ["BTCUSDT", "ETHUSDT"]

# Windowing for analytics
BAR_SECONDS = 10  # make this 60 for 1-minute bars
MAX_POINTS_TO_PLOT = 300

## Data model (tick schema)

We'll standardize all ticks into a compact JSON record:

```json
{
  "source": "binance",
  "symbol": "BTCUSDT",
  "ts_ms": 1710000000000,
  "price": 67000.12,
  "qty": 0.0031
}
```

Why standardize?
- Consumers shouldn't care about exchange-specific fields.
- Enables multiple producers feeding the same topic.
- Makes testing and validation straightforward.

In [None]:
from __future__ import annotations

from dataclasses import dataclass
from typing import Any, Dict, Optional, List
import json
import time

## Unit tests first (schema + transformations)

Per good streaming practice, you test:
- parsing input events → normalized tick
- validation rejects malformed input
- window aggregation produces correct OHLCV

These tests run locally and do **not** require Kafka.

In [None]:
import math

In [None]:
# --- tests ---

def test_normalize_tick_accepts_valid():
    raw = {"symbol": "BTCUSDT", "ts_ms": 1000, "price": 10.0, "qty": 0.5, "source": "x"}
    out = normalize_tick(raw)
    assert out["symbol"] == "BTCUSDT"
    assert out["ts_ms"] == 1000
    assert out["price"] == 10.0
    assert out["qty"] == 0.5
    assert out["source"] == "x"

def test_normalize_tick_rejects_missing_fields():
    try:
        normalize_tick({"symbol": "BTCUSDT", "price": 10.0})
        assert False, "Expected ValueError"
    except ValueError:
        pass

def test_aggregate_ticks_to_bar():
    ticks = [
        {"symbol":"BTCUSDT","ts_ms": 1000, "price": 10.0, "qty": 1.0, "source":"x"},
        {"symbol":"BTCUSDT","ts_ms": 2000, "price": 12.0, "qty": 2.0, "source":"x"},
        {"symbol":"BTCUSDT","ts_ms": 3000, "price": 11.0, "qty": 3.0, "source":"x"},
    ]
    bar = aggregate_to_bar("BTCUSDT", window_start_ms=0, window_end_ms=10_000, ticks=ticks)
    assert bar["open"] == 10.0
    assert bar["high"] == 12.0
    assert bar["low"] == 10.0
    assert bar["close"] == 11.0
    assert math.isclose(bar["volume_qty"], 6.0)

def run_tests():
    tests = [
        test_normalize_tick_accepts_valid,
        test_normalize_tick_rejects_missing_fields,
        test_aggregate_ticks_to_bar,
    ]
    for t in tests:
        t()
    print(f"✅ Ran {len(tests)} tests successfully.")

## Implementation (after tests)

We'll implement three core pure functions:
1. `normalize_tick` — schema checks & coercion
2. `aggregate_to_bar` — produce OHLCV-style bar for a fixed window
3. `detect_jump` — simple anomaly flag based on percent-move threshold

In [None]:
def normalize_tick(obj: Dict[str, Any]) -> Dict[str, Any]:
    '''
    Validate and coerce a raw tick dict into the standard schema.

    Required keys:
      - source: str
      - symbol: str
      - ts_ms: int (epoch milliseconds)
      - price: float
      - qty: float

    Raises:
      ValueError if required fields are missing or invalid.
    '''
    required = ["source", "symbol", "ts_ms", "price", "qty"]
    for k in required:
        if k not in obj:
            raise ValueError(f"Missing field: {k}")

    source = str(obj["source"])
    symbol = str(obj["symbol"])

    try:
        ts_ms = int(obj["ts_ms"])
    except Exception as e:
        raise ValueError("ts_ms must be int-like") from e

    try:
        price = float(obj["price"])
        qty = float(obj["qty"])
    except Exception as e:
        raise ValueError("price/qty must be numeric") from e

    if ts_ms <= 0:
        raise ValueError("ts_ms must be > 0")
    if not (price > 0.0):
        raise ValueError("price must be > 0")
    if qty < 0.0:
        raise ValueError("qty must be >= 0")

    return {"source": source, "symbol": symbol, "ts_ms": ts_ms, "price": price, "qty": qty}


def aggregate_to_bar(
    symbol: str,
    window_start_ms: int,
    window_end_ms: int,
    ticks: List[Dict[str, Any]],
) -> Dict[str, Any]:
    '''
    Aggregate ticks to an OHLCV-like bar.

    - open: first tick price in window
    - high/low: extrema
    - close: last tick price in window
    - volume_qty: sum of qty

    Assumes ticks are within [window_start_ms, window_end_ms) and same symbol.
    '''
    if not ticks:
        raise ValueError("ticks must be non-empty")

    ticks_sorted = sorted(ticks, key=lambda t: t["ts_ms"])
    prices = [float(t["price"]) for t in ticks_sorted]
    qtys = [float(t["qty"]) for t in ticks_sorted]

    return {
        "symbol": symbol,
        "window_start_ms": int(window_start_ms),
        "window_end_ms": int(window_end_ms),
        "open": prices[0],
        "high": max(prices),
        "low": min(prices),
        "close": prices[-1],
        "volume_qty": float(sum(qtys)),
        "tick_count": int(len(ticks_sorted)),
    }


def detect_jump(prev_price: float, price: float, threshold_pct: float = 0.5) -> bool:
    '''
    Flag a "jump" if absolute percent move exceeds threshold_pct.
    Default: 0.5% between consecutive ticks (tune this for your feed granularity).
    '''
    if prev_price <= 0 or price <= 0:
        return False
    pct = abs(price / prev_price - 1.0) * 100.0
    return pct >= threshold_pct


run_tests()

## Kafka producer (live WebSocket → Kafka)

This producer connects to an exchange WebSocket, normalizes each trade tick, and publishes to Kafka.

Key Kafka producer choices:
- **key = symbol** → ensures all events for a symbol land in the same partition (preserves ordering per symbol)
- **acks='all'** (optional) → stronger durability
- **linger_ms / batch_size** (optional) → improve throughput

> If you can't reach the exchange from this notebook, run this producer as a standalone script on a machine that can. The consumer section below still works the same.

In [None]:
import asyncio
import websockets
from kafka import KafkaProducer

BINANCE_WS = "wss://stream.binance.com:9443/ws"

def make_producer() -> KafkaProducer:
    return KafkaProducer(
        bootstrap_servers=KAFKA_BOOTSTRAP,
        key_serializer=lambda k: k.encode("utf-8"),
        value_serializer=lambda v: json.dumps(v).encode("utf-8"),
        acks="all",
        retries=5,
        linger_ms=20,
    )

def parse_binance_trade(msg: Dict[str, Any]) -> Dict[str, Any]:
    '''
    Binance trade payload typically includes:
      s = symbol
      T = trade time (ms)
      p = price (string)
      q = quantity (string)
    '''
    tick = {
        "source": "binance",
        "symbol": msg.get("s"),
        "ts_ms": msg.get("T"),
        "price": float(msg.get("p")),
        "qty": float(msg.get("q")),
    }
    return normalize_tick(tick)

async def run_ws_producer(symbols: List[str], stop_after_seconds: Optional[int] = None) -> None:
    '''
    Connect to Binance WS and stream trades for the given symbols into Kafka.

    stop_after_seconds: useful for demos/tests (None => run forever)
    '''
    producer = make_producer()
    start = time.time()

    try:
        streams = "/".join([f"{s.lower()}@trade" for s in symbols])
        url = f"{BINANCE_WS}/{streams}"
        async with websockets.connect(url, ping_interval=20, ping_timeout=20) as ws:
            print(f"Connected to {url}")
            while True:
                if stop_after_seconds is not None and (time.time() - start) >= stop_after_seconds:
                    print("Stopping producer (time limit reached).")
                    break

                raw = await ws.recv()
                payload = json.loads(raw)
                data = payload.get("data", payload)

                try:
                    tick = parse_binance_trade(data)
                except Exception:
                    continue

                producer.send(TICKS_TOPIC, key=tick["symbol"], value=tick)
    finally:
        producer.flush(5)
        producer.close()

### Run the producer

Uncomment to run live for ~60 seconds:

```python
asyncio.get_event_loop().run_until_complete(run_ws_producer(SYMBOLS, 60))
```

In [None]:
# Example (disabled by default)
# asyncio.get_event_loop().run_until_complete(run_ws_producer(SYMBOLS, stop_after_seconds=60))

## Kafka consumer (analytics)

The consumer:
- reads from `crypto.ticks`
- maintains per-symbol in-memory state
- forms fixed-time windows (e.g., 10s bars)
- emits:
  - bars dataframe (in notebook)
  - optional anomaly events to `crypto.alerts`

Operational notes:
- **group_id** defines consumer group membership.
- **auto_offset_reset** controls start position if no committed offsets exist.
- To replay history, use a new `group_id` or seek manually.

In [None]:
from kafka import KafkaConsumer
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

In [None]:
@dataclass
class SymbolState:
    prev_price: Optional[float] = None
    window_start_ms: Optional[int] = None
    ticks_in_window: Optional[List[Dict[str, Any]]] = None


def make_consumer(group_id: str) -> KafkaConsumer:
    return KafkaConsumer(
        TICKS_TOPIC,
        bootstrap_servers=KAFKA_BOOTSTRAP,
        group_id=group_id,
        key_deserializer=lambda k: k.decode("utf-8") if k else None,
        value_deserializer=lambda v: json.loads(v.decode("utf-8")),
        enable_auto_commit=True,
        auto_offset_reset="latest",  # change to "earliest" for replay
        consumer_timeout_ms=1000,     # lets loops exit periodically
        max_poll_records=1000,
    )


def floor_to_window_start(ts_ms: int, window_sec: int) -> int:
    w = window_sec * 1000
    return (ts_ms // w) * w


def consume_and_aggregate(
    run_seconds: int = 60,
    window_sec: int = BAR_SECONDS,
    jump_threshold_pct: float = 0.5,
) -> pd.DataFrame:
    '''
    Consume ticks for run_seconds and return a DataFrame of bars.

    This is notebook-friendly. In production, you'd run continuously,
    checkpoint state, and write output to durable storage.
    '''
    consumer = make_consumer(group_id=f"crypto-analytics-{int(time.time())}")
    producer_alerts = KafkaProducer(
        bootstrap_servers=KAFKA_BOOTSTRAP,
        key_serializer=lambda k: k.encode("utf-8"),
        value_serializer=lambda v: json.dumps(v).encode("utf-8"),
    )

    states: Dict[str, SymbolState] = {}
    bars: List[Dict[str, Any]] = []
    start = time.time()

    try:
        while (time.time() - start) < run_seconds:
            for msg in consumer:
                tick_raw = msg.value
                try:
                    tick = normalize_tick(tick_raw)
                except Exception:
                    continue

                sym = tick["symbol"]
                st = states.setdefault(sym, SymbolState(prev_price=None, window_start_ms=None, ticks_in_window=[]))

                if st.prev_price is not None and detect_jump(st.prev_price, tick["price"], threshold_pct=jump_threshold_pct):
                    alert = {
                        "type": "jump",
                        "symbol": sym,
                        "ts_ms": tick["ts_ms"],
                        "prev_price": st.prev_price,
                        "price": tick["price"],
                        "threshold_pct": jump_threshold_pct,
                    }
                    producer_alerts.send(ALERTS_TOPIC, key=sym, value=alert)

                st.prev_price = tick["price"]

                ws = floor_to_window_start(tick["ts_ms"], window_sec)
                if st.window_start_ms is None:
                    st.window_start_ms = ws

                if ws != st.window_start_ms:
                    we = st.window_start_ms + window_sec * 1000
                    if st.ticks_in_window:
                        bar = aggregate_to_bar(sym, st.window_start_ms, we, st.ticks_in_window)
                        bars.append(bar)

                    st.window_start_ms = ws
                    st.ticks_in_window = []

                st.ticks_in_window.append(tick)

    finally:
        consumer.close()
        producer_alerts.flush(3)
        producer_alerts.close()

    df = pd.DataFrame(bars)
    if not df.empty:
        df["window_start"] = pd.to_datetime(df["window_start_ms"], unit="ms", utc=True)
        df = df.sort_values(["symbol", "window_start"]).reset_index(drop=True)
    return df

### Run analytics consumer

This assumes the producer is sending ticks into Kafka already.

```python
df_bars = consume_and_aggregate(run_seconds=60, window_sec=10, jump_threshold_pct=0.5)
df_bars.tail()
```

In [None]:
df_bars = consume_and_aggregate(run_seconds=20, window_sec=BAR_SECONDS, jump_threshold_pct=0.5)
df_bars.tail(10)

## Visualization

We'll plot:
- BTC & ETH close prices per bar
- BTC/ETH ratio if both exist

If you have many bars, we cap to the most recent `MAX_POINTS_TO_PLOT`.

In [None]:
def plot_bars(df: pd.DataFrame) -> None:
    if df.empty:
        print("No bars to plot (df is empty). Start the producer and try again.")
        return

    d = df.sort_values("window_start")
    if len(d) > MAX_POINTS_TO_PLOT:
        d = d.iloc[-MAX_POINTS_TO_PLOT:]

    for sym in sorted(d["symbol"].unique()):
        ds = d[d["symbol"] == sym]
        plt.figure()
        plt.plot(ds["window_start"], ds["close"])
        plt.title(f"{sym} close (per {BAR_SECONDS}s bar)")
        plt.xlabel("time (UTC)")
        plt.ylabel("price")
        plt.xticks(rotation=30)
        plt.tight_layout()
        plt.show()

    syms = set(d["symbol"].unique())
    if "BTCUSDT" in syms and "ETHUSDT" in syms:
        btc = d[d["symbol"] == "BTCUSDT"][["window_start", "close"]].rename(columns={"close":"btc"})
        eth = d[d["symbol"] == "ETHUSDT"][["window_start", "close"]].rename(columns={"close":"eth"})
        m = pd.merge_asof(btc.sort_values("window_start"), eth.sort_values("window_start"), on="window_start")
        if not m.empty:
            m["ratio"] = (m["btc"] / m["eth"])
            plt.figure()
            plt.plot(m["window_start"], m["ratio"])
            plt.title("BTC/ETH ratio (aligned bars)")
            plt.xlabel("time (UTC)")
            plt.ylabel("BTC / ETH")
            plt.xticks(rotation=30)
            plt.tight_layout()
            plt.show()

plot_bars(df_bars)

## Persisting results (optional)

Common patterns:
- write bars to Parquet (local / S3 / GCS)
- sink to a DB (Timescale/ClickHouse/Postgres)
- Kafka Connect sinks for managed pipelines

Below: write a local Parquet file.

In [None]:
from pathlib import Path

OUT_DIR = Path("./out")
OUT_DIR.mkdir(exist_ok=True)

if not df_bars.empty:
    out_path = OUT_DIR / "crypto_bars.parquet"
    df_bars.to_parquet(out_path, index=False)
    print(f"Wrote: {out_path.resolve()}")
else:
    print("No bars to write.")

## Scaling and production notes

1. **Partitioning**
   - key by `symbol` to preserve ordering per symbol
   - scale partitions as symbols / throughput increase

2. **Delivery semantics**
   - consumers are typically at-least-once
   - de-duplicate by `(symbol, window_start_ms)` at sink if needed

3. **State**
   - notebook keeps state in memory
   - production: Kafka Streams (RocksDB), Redis, or periodic checkpoints

4. **Schema**
   - JSON is fine for prototypes
   - Avro/Protobuf + Schema Registry is better for evolution and governance