# 06 — Live Streaming Query

Query the streaming data pipeline in real time via ClickHouse.

**Prerequisites:** the platform must be running:
```bash
docker compose -f docker/docker-compose.yml --profile platform up -d --build
```

In [None]:
import clickhouse_connect
import time
import pandas as pd
from IPython.display import display, clear_output

client = clickhouse_connect.get_client(
    host='localhost',
    port=8123,
    username='default',
    password='changeme',
    database='default',
)

print(f'Connected to ClickHouse {client.server_version}')

## 1. What data has landed so far?

In [None]:
client.query_df("""
    SELECT symbol, date, count() AS events,
           min(ts_ns) AS first_ts_ns,
           max(ts_ns) AS last_ts_ns
    FROM exchange_events
    GROUP BY symbol, date
    ORDER BY symbol, date
""")

## 2. Event type breakdown

In [None]:
client.query_df("""
    SELECT symbol, type_name,
           count()  AS cnt,
           round(count() * 100.0 / sum(count()) OVER (PARTITION BY symbol), 2) AS pct
    FROM exchange_events
    GROUP BY symbol, type_name
    ORDER BY symbol, type_name
""")

## 3. Price series (latest session per symbol)

In [None]:
import matplotlib.pyplot as plt

symbols = client.query_df("""
    SELECT DISTINCT symbol FROM exchange_events ORDER BY symbol
""")['symbol'].tolist()

fig, axes = plt.subplots(len(symbols), 1, figsize=(14, 4 * len(symbols)), squeeze=False)

for idx, sym in enumerate(symbols):
    df = client.query_df(f"""
        SELECT ts_ns / 1e9 AS t_sec, price_ticks
        FROM exchange_events
        WHERE symbol = '{sym}'
          AND date = (SELECT max(date) FROM exchange_events WHERE symbol = '{sym}')
          AND type_name LIKE 'EXECUTE%'
        ORDER BY ts_ns
    """)

    ax = axes[idx, 0]
    ax.plot(df['t_sec'], df['price_ticks'], linewidth=0.4, alpha=0.8)
    ax.set_title(f'{sym} — latest session')
    ax.set_xlabel('Session time (s)')
    ax.set_ylabel('Price (ticks)')

plt.tight_layout()
plt.show()

## 4. Session summary

In [None]:
client.query_df("""
    SELECT
        symbol,
        date,
        count()                              AS total_events,
        countIf(type_name LIKE 'EXECUTE%')   AS executions,
        countIf(type_name LIKE 'CANCEL%')    AS cancels,
        min(price_ticks)                     AS low_ticks,
        max(price_ticks)                     AS high_ticks,
        round((max(ts_ns) - min(ts_ns)) / 1e9, 1) AS session_duration_s
    FROM exchange_events
    GROUP BY symbol, date
    ORDER BY symbol, date
""")

## 5. 1-minute OHLC bars (latest session)

In [None]:
ohlc = client.query_df("""
    WITH latest AS (
        SELECT symbol, max(date) AS d
        FROM exchange_events
        GROUP BY symbol
    )
    SELECT e.symbol,
           toUInt32(floor(e.ts_ns / 60e9)) AS minute_bucket,
           argMin(e.price_ticks, e.ts_ns)  AS open,
           max(e.price_ticks)              AS high,
           min(e.price_ticks)              AS low,
           argMax(e.price_ticks, e.ts_ns)  AS close,
           sum(e.qty)                      AS volume
    FROM exchange_events e
    INNER JOIN latest l ON e.symbol = l.symbol AND e.date = l.d
    WHERE e.type_name LIKE 'EXECUTE%'
    GROUP BY e.symbol, minute_bucket
    ORDER BY e.symbol, minute_bucket
""")

for sym in ohlc['symbol'].unique():
    subset = ohlc[ohlc['symbol'] == sym]
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 6), sharex=True,
                                    gridspec_kw={'height_ratios': [3, 1]})
    ax1.plot(subset['minute_bucket'], subset['close'], label='Close', linewidth=1)
    ax1.fill_between(subset['minute_bucket'], subset['low'], subset['high'],
                     alpha=0.2, label='High-Low range')
    ax1.set_title(f'{sym} — 1-min OHLC (latest session)')
    ax1.set_ylabel('Price (ticks)')
    ax1.legend()

    ax2.bar(subset['minute_bucket'], subset['volume'], width=0.8, alpha=0.7)
    ax2.set_ylabel('Volume')
    ax2.set_xlabel('Minute bucket')

    plt.tight_layout()
    plt.show()

## 6. Live event counter (auto-refreshes)

Watches events accumulate in real time — refreshes every 10 seconds.

In [None]:
REFRESH_SECONDS = 10
NUM_REFRESHES = 12

for i in range(NUM_REFRESHES):
    clear_output(wait=True)
    result = client.query_df("""
        SELECT symbol,
               uniqExact(date)  AS trading_days,
               count()          AS total_events,
               max(date)        AS latest_date
        FROM exchange_events
        GROUP BY symbol
        ORDER BY symbol
    """)
    print(f'Refresh {i+1}/{NUM_REFRESHES} — {time.strftime("%H:%M:%S")}')
    display(result)
    if i < NUM_REFRESHES - 1:
        time.sleep(REFRESH_SECONDS)