# Flink Data Trace + Integration Test Notebook

This notebook validates pipeline correctness by querying data in strict order allowing one to confirm outputs from the test run:

1. Raw ingress (`streaming_events`, DLQ, quarantine)
2. Typed tables (`ai_stream_status`, `stream_trace_events`, `ai_stream_events`, `stream_ingest_metrics`)
3. Silver projections (`fact_stream_*`)
4. Stateful lifecycle facts (`fact_workflow_*`)
5. Rollups (`agg_*`)
6. API views (`v_api_*`)

It also runs SQL assertion suites and scenario-candidate checks used by the CLI harness.

**Quick navigation**
- Run top setup cells first.
- Use the in-notebook `Mode`/`06-09` toggles for compact vs debug rendering.
- See `Executive Summary + API Readiness Assertions` for a verdict-first snapshot.
- Use drill-down sections only when a summary row is WARN/FAIL.

### Refreshing the Notebook Outputs
This notebook reads data from the database specified below and can be refreshed by re-running the test fixtures.  See the project testing readme regarding the scenario integration harness and associated commands.

In [1]:
# If needed once (also see uv project commands for environment setup instead):
# %pip install clickhouse-connect pandas


In [2]:
from __future__ import annotations
import json
import os
from datetime import datetime, timezone
from pathlib import Path
import sys
_nb_helper_loaded = False
for _candidate in [Path.cwd().resolve(), *Path.cwd().resolve().parents]:
    _nb_dir = _candidate / 'tests' / 'python' / 'notebooks'
    if (_nb_dir / 'notebook_shared.py').is_file():
        if str(_nb_dir) not in sys.path:
            sys.path.insert(0, str(_nb_dir))
        _nb_helper_loaded = True
        break
if not _nb_helper_loaded:
    raise FileNotFoundError('Could not locate tests/python/notebooks/notebook_shared.py from current working directory')

import clickhouse_connect
import pandas as pd
from notebook_shared import resolve_repo_root, resolve_sql_dir, resolve_validation_window
CH_HOST = os.getenv('CH_HOST', 'localhost')
CH_PORT = int(os.getenv('CH_PORT', '8123'))
CH_DATABASE = os.getenv('CH_DATABASE', 'livepeer_analytics')
CH_USER = os.getenv('CH_USER', 'analytics_user')
CH_PASSWORD = os.getenv('CH_PASSWORD', 'analytics_password')
CH_SECURE = os.getenv('CH_SECURE', '').lower() in {'1', 'true', 'yes'}
VALIDATION_LOOKBACK_HOURS = int(os.getenv('VALIDATION_LOOKBACK_HOURS', '24'))
LIMIT_PER_SCENARIO = int(os.getenv('LIMIT_PER_SCENARIO', '5'))
SHOW_DEBUG = os.getenv('NB_SHOW_DEBUG', '0').strip().lower() in {'1', 'true', 'yes'}
SHOW_0609_DEBUG = os.getenv('NB_SHOW_0609_DEBUG', '0').strip().lower() in {'1', 'true', 'yes'}
REPO_ROOT = resolve_repo_root(Path.cwd().resolve())
SQL_DIR = resolve_sql_dir(REPO_ROOT)
FROM_TS, TO_TS, WINDOW_SOURCE = resolve_validation_window(REPO_ROOT, VALIDATION_LOOKBACK_HOURS)
print(f'Repo root: {REPO_ROOT}')
print(f'SQL dir: {SQL_DIR}')
print(f'Window source: {WINDOW_SOURCE}')
print(f'Window UTC: {FROM_TS.isoformat()} -> {TO_TS.isoformat()}')
print(f'Debug mode: SHOW_DEBUG={SHOW_DEBUG}, SHOW_0609_DEBUG={SHOW_0609_DEBUG}')
import ipywidgets as widgets
from IPython.display import Markdown, display

# In-notebook display controls (button-first; env vars remain defaults).
MODE_WIDGET = widgets.ToggleButtons(
    options=[('Compact', 'compact'), ('Debug', 'debug')],
    value='debug' if SHOW_DEBUG else 'compact',
    description='Mode:',
)
MODE_0609_WIDGET = widgets.ToggleButtons(
    options=[('Auto 06-09', 'auto'), ('Force Debug 06-09', 'force')],
    value='force' if SHOW_0609_DEBUG else 'auto',
    description='06-09:',
)

def _apply_display_mode(*_):
    global SHOW_DEBUG, SHOW_0609_DEBUG
    SHOW_DEBUG = MODE_WIDGET.value == 'debug'
    SHOW_0609_DEBUG = MODE_0609_WIDGET.value == 'force'

_apply_display_mode()
MODE_WIDGET.observe(_apply_display_mode, names='value')
MODE_0609_WIDGET.observe(_apply_display_mode, names='value')
display(widgets.HBox([MODE_WIDGET, MODE_0609_WIDGET]))
display(Markdown('Use these toggles, then rerun sections below as needed.'))
if CH_HOST not in {'localhost', '127.0.0.1'}:
    print(f"WARNING: Using non-local ClickHouse host: {CH_HOST}:{CH_PORT}")



Repo root: /home/julian/Documents/development/spe-work/livepeer-naap-analytics
SQL dir: /home/julian/Documents/development/spe-work/livepeer-naap-analytics/tests/integration/sql
Window source: manifest /home/julian/Documents/development/spe-work/livepeer-naap-analytics/tests/integration/fixtures/manifest.json
Window UTC: 2026-02-25T16:08:39.871000+00:00 -> 2026-02-25T21:43:55.262000+00:00
Debug mode: SHOW_DEBUG=False, SHOW_0609_DEBUG=False


HBox(children=(ToggleButtons(description='Mode:', options=(('Compact', 'compact'), ('Debug', 'debug')), value=â€¦

Use these toggles, then rerun sections below as needed.

### Configure the Notebook Database Connection and Test Window

In [3]:
client = clickhouse_connect.get_client(
    host=CH_HOST,
    port=CH_PORT,
    username=CH_USER,
    password=CH_PASSWORD,
    database=CH_DATABASE,
    secure=CH_SECURE,
)

QUERY_SETTINGS = {
    'max_execution_time': int(os.getenv('CH_MAX_EXECUTION_TIME_SEC', '30')),
    'max_threads': int(os.getenv('CH_MAX_THREADS', '4')),
    'max_rows_to_read': int(os.getenv('CH_MAX_ROWS_TO_READ', '5000000')),
    'read_overflow_mode': 'break',
}

params = {
    'from_ts': FROM_TS.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3],
    'to_ts': TO_TS.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3],
    'limit_per_scenario': LIMIT_PER_SCENARIO,
}


def query_df(sql: str, parameters: dict | None = None) -> pd.DataFrame:
    result = client.query(sql, parameters=parameters or {}, settings=QUERY_SETTINGS)
    return pd.DataFrame(result.result_rows, columns=result.column_names)


def parse_blocks(path: str | Path, marker: str) -> list[tuple[str, str]]:
    blocks = []
    current_name = None
    current_lines = []
    for raw_line in Path(path).read_text(encoding='utf-8').splitlines():
        if raw_line.startswith(marker):
            if current_name and current_lines:
                sql = '\n'.join(current_lines).strip().rstrip(';')
                if sql:
                    blocks.append((current_name, sql))
            current_name = raw_line.split(':', 1)[1].strip()
            current_lines = []
            continue
        if current_name is not None:
            current_lines.append(raw_line)
    if current_name and current_lines:
        sql = '\n'.join(current_lines).strip().rstrip(';')
        if sql:
            blocks.append((current_name, sql))
    return blocks

def style_status_table(df: pd.DataFrame, status_col: str = 'status'):
    if status_col not in df.columns:
        return df
    palette = {
        'PASS': '#dcfce7',
        'FAIL': '#fee2e2',
        'WARN': '#fef3c7',
        'INFO': '#dbeafe',
    }
    def _color_status(col: pd.Series):
        return [
            f"background-color: {palette.get(str(v), '#f3f4f6')}; font-weight: 700; color: #111827"
            for v in col
        ]
    return df.style.apply(_color_status, subset=[status_col])


# Integration Test Run Breakdown

The marks the begining of the test run analysis.

## Harness Artifacts Review

This section reads `artifacts/test-runs/<run_id>` outputs when present and surfaces failures in the test run stages before interactive exploration.


In [4]:
import json

HARNESS_ROOT = REPO_ROOT / 'artifacts' / 'test-runs'
HARNESS_RUN_ID = os.getenv('HARNESS_RUN_ID', '').strip()

def _latest_harness_run(root: Path) -> Path | None:
    if not root.exists():
        return None
    runs = sorted([p for p in root.iterdir() if p.is_dir()])
    return runs[-1] if runs else None

def _load_json(path: Path) -> dict | None:
    if not path.exists():
        return None
    return json.loads(path.read_text(encoding='utf-8'))

selected_run = (HARNESS_ROOT / HARNESS_RUN_ID) if HARNESS_RUN_ID else _latest_harness_run(HARNESS_ROOT)

if not selected_run or not selected_run.exists():
    print('No harness artifacts found. Run tests/python/scripts/run_scenario_test_harness.py first or set HARNESS_RUN_ID.')
else:
    print(f'Using harness run: {selected_run.name}')
    summary = _load_json(selected_run / 'summary.json')
    if summary:
        stage_df = pd.DataFrame(summary.get('results', []))
        if not stage_df.empty and {'stage','status','duration_sec'}.issubset(stage_df.columns):
            display(style_status_table(stage_df[['stage', 'status', 'duration_sec', 'error']]))

    assertion_files = ['assert_raw_typed.json', 'assert_pipeline.json', 'assert_api.json', 'assert_scenarios.json']
    failure_rows = []
    for file_name in assertion_files:
        payload = _load_json(selected_run / 'stages' / file_name)
        if not payload:
            continue
        for r in payload.get('results', []):
            if not r.get('passed', False):
                failure_rows.append({
                    'suite': file_name.replace('.json', ''),
                    'test_name': r.get('name', ''),
                    'failed_rows': r.get('failed_rows', 1),
                    'error': r.get('error', ''),
                    'diagnostics': r.get('diagnostics', {}),
                })

    if failure_rows:
        failures_df = pd.DataFrame(failure_rows)
        display(failures_df[['suite', 'test_name', 'failed_rows', 'error', 'diagnostics']])
        print(f'Harness assertion failures: {len(failures_df)}')
    else:
        print('Harness assertions: PASS (or assertion JSON files not present for this run).')


Using harness run: 20260226T191221Z


Unnamed: 0,stage,status,duration_sec,error
0,stack_up,PASS,114.195467,
1,schema_apply,PASS,1.357803,
2,pipeline_ready,PASS,50.441008,
3,replay_events,PASS,3.677835,
4,pipeline_wait,PASS,20.000626,
5,query_pack,PASS,4.955169,
6,assert_raw_typed,PASS,1.485951,
7,assert_pipeline,PASS,6.940014,
8,assert_api,PASS,2.775702,
9,assert_scenarios,PASS,0.924042,


Harness assertions: PASS (or assertion JSON files not present for this run).


### Ordered Pipeline Trace (Raw -> API)
- Default mode: compact verdicts and explainability checks.
- Debug mode: use the notebook toggle buttons at the top (`Mode`, `06-09`) to expand detailed tables.
- Optional startup defaults: `NB_SHOW_DEBUG=1` and `NB_SHOW_0609_DEBUG=1`.

#### Trace Pack: End-to-End Flow
- What this checks: Raw ingest, typed tables, silver/stateful facts, rollups, and API views in one ordered pass.
- Why it matters: Confirms data traverses each architecture layer for the selected window.
- How to read failures: Empty stages or timestamp gaps usually point to readiness, filtering window, or projection issues.


In [5]:
flow_file = SQL_DIR / 'trace_pipeline_flow.sql'
flow_blocks = parse_blocks(flow_file, '-- QUERY:')
print(f'Loaded {len(flow_blocks)} ordered queries from {flow_file}')

section_titles = {
    '01_raw_ingest': '01 Raw Ingest',
    '02_typed_tables': '02 Typed Tables',
    '03_silver_projection_counts': '03 Silver Projection Counts',
    '04_stateful_fact_counts': '04 Stateful Fact Counts',
    '05_reliability_and_swap_summary': '05 Reliability and Swap Summary',
    '06_rollup_population': '06 Rollup Population',
    '07_view_population': '07 View Population',
    '08_gpu_view_parity': '08 GPU View Parity',
    '09_sla_view_parity': '09 SLA View Parity',
    '10_network_demand_view_parity': '10 Network Demand View Parity',
}
section_guidance = {
    '01_raw_ingest': ('Confirm raw ingress is populated and DLQ/quarantine are not unexpectedly growing.', 'Rows present in streaming_events, and quality side tables stable.'),
    '02_typed_tables': ('Confirm parser output exists for expected event families.', 'Typed tables have rows where corresponding raw events exist.'),
    '03_silver_projection_counts': ('Confirm typed-to-silver projections are populated.', 'Silver rows align with typed presence for enabled event families.'),
    '04_stateful_fact_counts': ('Confirm lifecycle fact row shapes and latest-session alignment.', 'Latest sessions and segment/session relationships are not contradictory.'),
    '05_reliability_and_swap_summary': ('Summarize startup and swap outcomes at latest-session grain.', 'No contradictory startup/swap semantics; version drift is explained.'),
    '06_rollup_population': ('Confirm rollup tables have rows for the window.', 'Rollup rows exist and parity checks below explain non-1:1 counts.'),
    '07_view_population': ('Confirm API view rows exist at their serving grains.', 'Counts are explainable by grain and parity diagnostics.'),
    '08_gpu_view_parity': ('Validate GPU view keys/values against source rollups.', 'Failure mode PASS or explainable overlap diagnostics.'),
    '09_sla_view_parity': ('Validate SLA view counts against latest-session recompute.', 'Failure mode PASS with zero aggregate diffs.'),
    '10_network_demand_view_parity': ('Validate demand view against recomputed rollup/session aggregates.', 'Failure mode PASS with zero key and value diffs.'),
}

flow_results: dict[str, pd.DataFrame] = {}
for name, sql in flow_blocks:
    df = query_df(sql, params)
    flow_results[name] = df
    print(f"\n=== {section_titles.get(name, name)} ===")
    if name in section_guidance:
        checks, pass_criteria = section_guidance[name]
        display(Markdown(f"**What This Checks**: {checks}  \n**Pass Criteria**: {pass_criteria}"))

    if name == '04_stateful_fact_counts':
        display(Markdown('**Interpretation**: this section now shows comparable grains plus status flags so unexpected session/segment mismatches stand out.'))
        display(df.head(20) if SHOW_DEBUG else df.head(10))

        stateful_diag_sql = '''
WITH
  sessions_raw AS
  (
    SELECT count() AS value
    FROM livepeer_analytics.fact_workflow_sessions
    WHERE session_start_ts >= {from_ts:DateTime64(3)}
      AND session_start_ts < {to_ts:DateTime64(3)}
  ),
  sessions_latest AS
  (
    SELECT count() AS value
    FROM
    (
      SELECT workflow_session_id,
             row_number() OVER (PARTITION BY workflow_session_id ORDER BY version DESC, session_start_ts DESC, session_end_ts DESC) AS rn
      FROM livepeer_analytics.fact_workflow_sessions FINAL
      WHERE session_start_ts >= {from_ts:DateTime64(3)}
        AND session_start_ts < {to_ts:DateTime64(3)}
    )
    WHERE rn = 1
  ),
  segment_session_ids AS
  (
    SELECT uniqExact(workflow_session_id) AS value
    FROM livepeer_analytics.fact_workflow_session_segments FINAL
    WHERE segment_start_ts >= {from_ts:DateTime64(3)}
      AND segment_start_ts < {to_ts:DateTime64(3)}
  ),
  segment_rows AS
  (
    SELECT count() AS value
    FROM livepeer_analytics.fact_workflow_session_segments
    WHERE segment_start_ts >= {from_ts:DateTime64(3)}
      AND segment_start_ts < {to_ts:DateTime64(3)}
  )
SELECT
  check_name,
  lhs,
  rhs,
  delta,
  status,
  expectation
FROM
(
  SELECT
    'latest_sessions_vs_segment_session_ids' AS check_name,
    (SELECT value FROM sessions_latest) AS lhs,
    (SELECT value FROM segment_session_ids) AS rhs,
    (SELECT value FROM sessions_latest) - (SELECT value FROM segment_session_ids) AS delta,
    multiIf((SELECT value FROM segment_session_ids) > (SELECT value FROM sessions_latest), 'FAIL',
            (SELECT value FROM sessions_latest) - (SELECT value FROM segment_session_ids) <= 1, 'PASS',
            'WARN') AS status,
    'segment session ids should be <= latest sessions (gap <= 1 usually expected)' AS expectation

  UNION ALL

  SELECT
    'raw_session_rows_vs_latest_sessions' AS check_name,
    (SELECT value FROM sessions_raw) AS lhs,
    (SELECT value FROM sessions_latest) AS rhs,
    (SELECT value FROM sessions_raw) - (SELECT value FROM sessions_latest) AS delta,
    multiIf((SELECT value FROM sessions_raw) < (SELECT value FROM sessions_latest), 'FAIL',
            (SELECT value FROM sessions_raw) = (SELECT value FROM sessions_latest), 'PASS',
            'INFO') AS status,
    'raw rows >= latest sessions; positive delta indicates multiple versions in backend' AS expectation

  UNION ALL

  SELECT
    'segment_rows_vs_segment_session_ids' AS check_name,
    (SELECT value FROM segment_rows) AS lhs,
    (SELECT value FROM segment_session_ids) AS rhs,
    (SELECT value FROM segment_rows) - (SELECT value FROM segment_session_ids) AS delta,
    multiIf((SELECT value FROM segment_rows) < (SELECT value FROM segment_session_ids), 'FAIL', 'INFO') AS status,
    'segment rows are event-level and can exceed distinct session ids' AS expectation
)
ORDER BY check_name
'''
        stateful_diag_df = query_df(stateful_diag_sql, params)
        display(style_status_table(stateful_diag_df))
        continue

    if name == '05_reliability_and_swap_summary':
        display(Markdown('**Interpretation**: KPI summary below uses latest-per-session rows to avoid versioning drift in totals.'))

        latest_summary_sql = '''
WITH fs_latest AS
(
  SELECT *
  FROM
  (
    SELECT *,
           row_number() OVER (PARTITION BY workflow_session_id ORDER BY version DESC, session_start_ts DESC, session_end_ts DESC) AS rn
    FROM livepeer_analytics.fact_workflow_sessions FINAL
    WHERE session_start_ts >= {from_ts:DateTime64(3)}
      AND session_start_ts < {to_ts:DateTime64(3)}
  )
  WHERE rn = 1
)
SELECT
  count() AS sessions,
  countIf(known_stream = 1) AS known_stream_sessions,
  countIf(startup_success = 1) AS startup_success_sessions,
  countIf(startup_excused = 1) AS startup_excused_sessions,
  countIf(startup_unexcused = 1) AS startup_unexcused_sessions,
  countIf(confirmed_swap_count > 0) AS confirmed_swapped_sessions,
  countIf(inferred_orchestrator_change_count > 0) AS inferred_orchestrator_change_sessions,
  countIf(swap_count > 0) AS swapped_sessions,
  if(countIf(known_stream = 1) = 0, 0.0, countIf(startup_unexcused = 1) / toFloat64(countIf(known_stream = 1))) AS unexcused_rate
FROM fs_latest
'''
        latest_summary_df = query_df(latest_summary_sql, params)
        display(latest_summary_df)

        version_diag_sql = '''
SELECT
  count() AS raw_session_rows,
  uniqExact(workflow_session_id) AS distinct_workflow_sessions,
  countIf(version_rows > 1) AS sessions_with_multiple_versions,
  countIf(known_stream_values > 1) AS sessions_with_known_stream_transitions
FROM
(
  SELECT
    workflow_session_id,
    count() AS version_rows,
    uniqExact(known_stream) AS known_stream_values
  FROM livepeer_analytics.fact_workflow_sessions
  WHERE session_start_ts >= {from_ts:DateTime64(3)}
    AND session_start_ts < {to_ts:DateTime64(3)}
  GROUP BY workflow_session_id
)
'''
        version_diag_df = query_df(version_diag_sql, params)
        display(version_diag_df)

        version_status_df = pd.DataFrame([
            {
                'check_name': 'mixed_known_stream_versions',
                'value': int(version_diag_df.iloc[0]['sessions_with_known_stream_transitions']) if not version_diag_df.empty else 0,
                'status': 'WARN' if (not version_diag_df.empty and int(version_diag_df.iloc[0]['sessions_with_known_stream_transitions']) > 0) else 'PASS',
                'expectation': 'ideally 0; >0 means the same session changed known_stream across versions'
            }
        ])
        display(style_status_table(version_status_df))

        if not version_diag_df.empty and int(version_diag_df.iloc[0]['sessions_with_known_stream_transitions']) > 0:
            display(Markdown('**Reader note**: `mixed_known_stream_versions > 0` usually means a session was initially emitted with unresolved identity (`known_stream=0`) and later corrected (`known_stream=1`) in a newer version. Review the per-version rows below to confirm transitions are one-way (`0 -> 1`) and not regressions (`1 -> 0`).'))
            session_version_changes_sql = '''
WITH mixed_ids AS
(
  SELECT workflow_session_id
  FROM livepeer_analytics.fact_workflow_sessions
  WHERE session_start_ts >= {from_ts:DateTime64(3)}
    AND session_start_ts < {to_ts:DateTime64(3)}
  GROUP BY workflow_session_id
  HAVING uniqExact(known_stream) > 1
)
SELECT
  fs.workflow_session_id,
  fs.version,
  fs.session_start_ts,
  fs.session_end_ts,
  fs.known_stream,
  fs.stream_id,
  fs.request_id,
  fs.startup_success,
  fs.startup_excused,
  fs.startup_unexcused,
  fs.swap_count,
  fs.confirmed_swap_count,
  fs.inferred_orchestrator_change_count
FROM livepeer_analytics.fact_workflow_sessions fs
INNER JOIN mixed_ids m
  ON m.workflow_session_id = fs.workflow_session_id
WHERE fs.session_start_ts >= {from_ts:DateTime64(3)}
  AND fs.session_start_ts < {to_ts:DateTime64(3)}
ORDER BY fs.workflow_session_id, fs.version ASC, fs.session_start_ts ASC
'''
            session_changes_df = query_df(session_version_changes_sql, params)
            if SHOW_DEBUG:
                display(session_changes_df)
            else:
                display(Markdown('_Detailed per-version rows hidden in compact mode. Switch to Debug mode to expand._'))

            tracked_cols = [
                'known_stream',
                'stream_id',
                'request_id',
                'startup_success',
                'startup_excused',
                'startup_unexcused',
                'swap_count',
                'confirmed_swap_count',
                'inferred_orchestrator_change_count',
                'session_end_ts',
            ]
            diff_rows = []
            for workflow_session_id, g in session_changes_df.groupby('workflow_session_id', sort=False):
                g = g.sort_values(['version', 'session_start_ts']).reset_index(drop=True)
                for i in range(1, len(g)):
                    prev = g.iloc[i - 1]
                    curr = g.iloc[i]
                    changed = []
                    for col in tracked_cols:
                        prev_val = prev.get(col)
                        curr_val = curr.get(col)
                        if pd.isna(prev_val) and pd.isna(curr_val):
                            continue
                        if str(prev_val) != str(curr_val):
                            changed.append(f"{col}: {prev_val} -> {curr_val}")
                    if changed:
                        diff_rows.append({
                            'workflow_session_id': workflow_session_id,
                            'from_version': int(prev['version']),
                            'to_version': int(curr['version']),
                            'changed_fields': '; '.join(changed),
                            'status': 'WARN' if ('known_stream: 1 -> 0' in '; '.join(changed)) else 'INFO',
                        })

            if diff_rows:
                display(Markdown('**Per-version field diffs (for mixed known_stream sessions)**'))
                display(style_status_table(pd.DataFrame(diff_rows)))
        continue

    if name == '06_rollup_population':
        display(Markdown('**06-09 Grain Map + Reconciliation Panel**: these sections are not 1:1 with raw/stateful counts; validate them by grain-aligned parity checks sourced from assertion SQL.'))

        grain_map_df = pd.DataFrame([
            {
                'section': '06_rollup_population',
                'object': 'agg_reliability_1h / agg_stream_performance_1m',
                'grain': 'time-bucket + serving dimensions (not per-session rows)',
                'validation_source': 'assertions_pipeline.sql::gpu_view_matches_rollup'
            },
            {
                'section': '07_view_population',
                'object': 'v_api_* views',
                'grain': 'API serving grain (view-specific)',
                'validation_source': 'assertions_pipeline.sql::network_demand_view_matches_rollup + assertions_pipeline.sql::sla_view_matches_session_fact'
            },
            {
                'section': '08_gpu_view_parity',
                'object': 'v_api_gpu_metrics parity',
                'grain': 'join-key intersection at GPU metrics grain',
                'validation_source': 'assertions_pipeline.sql::gpu_view_matches_rollup'
            },
            {
                'section': '09_network_demand_view_parity',
                'object': 'v_api_network_demand parity',
                'grain': 'join-key intersection at network-demand grain',
                'validation_source': 'assertions_pipeline.sql::network_demand_view_matches_rollup'
            },
            {
                'section': '09_sla_view_parity',
                'object': 'v_api_sla_compliance parity',
                'grain': 'join-key intersection at SLA grain',
                'validation_source': 'assertions_pipeline.sql::sla_view_matches_session_fact'
            },
        ])
        display(
            grain_map_df.style.set_properties(**{
                'white-space': 'normal',
                'text-align': 'left',
            }).set_table_styles([
                {'selector': 'th', 'props': [('text-align', 'left'), ('white-space', 'normal')]},
                {'selector': 'td', 'props': [('max-width', '520px')]},
            ])
        )

        pipeline_test_blocks = dict(parse_blocks(SQL_DIR / 'assertions_pipeline.sql', '-- TEST:'))

        recon_test_names = [

            'agg_stream_performance_1m_matches_status_samples',

            'gpu_view_matches_rollup',

            'network_demand_view_matches_rollup',

            'sla_view_matches_session_fact',

            'gpu_count_delta_explained_by_key_overlap',

            'network_demand_counts_aligned_to_rollup',

            'gpu_view_covers_healthy_attributed_session_keys',

            'demand_has_rows_for_all_session_hours',

            'sla_counts_aligned_to_raw_latest_sessions',

            'view_count_grain_ordering',

        ]

        recon_rows = []

        for test_name in recon_test_names:

            sql = pipeline_test_blocks.get(test_name)

            if not sql:

                recon_rows.append({

                    'test_name': test_name,

                    'status': 'WARN',

                    'failed_rows': 1,

                    'note': 'Missing from assertions_pipeline.sql',

                })

                continue

            try:

                result_df = query_df(sql, params)

            except Exception as exc:

                recon_rows.append({

                    'test_name': test_name,

                    'status': 'FAIL',

                    'failed_rows': 1,

                    'error': str(exc),

                })

                continue

            if result_df.empty:

                recon_rows.append({

                    'test_name': test_name,

                    'status': 'FAIL',

                    'failed_rows': 1,

                    'error': 'No rows returned',

                })

                continue

            row = result_df.iloc[0].to_dict()

            failed = int(row.get('failed_rows', 1))

            out = {

                'test_name': test_name,

                'status': 'PASS' if failed == 0 else 'FAIL',

                'failed_rows': failed,

            }

            for k in [

                'failure_mode',

                'status',

                'raw_rows',

                'rollup_rows',

                'view_rows',

                'joined_rows',

                'raw_only_keys',

                'rollup_only_keys',

                'view_only_keys',

                'expected_rows',

                'expected_only_keys',

                'total_diff_sessions',

                'total_diff_streams',

                'total_diff_samples',

                'max_abs_diff_avg_output_fps',

                'mean_abs_diff_fps',

                'max_abs_diff_fps',

                'mean_abs_diff_minutes',

                'max_abs_diff_minutes',

                'total_diff_known_sessions',

                'total_diff_served_sessions',

                'total_diff_unserved_sessions',

                'total_diff_unexcused_sessions',

                'total_diff_swapped_sessions',

                'total_known_diff',

                'total_unexcused_diff',

                'total_swapped_diff',

                'row_delta',

                'overlap_delta',

                'eligible_session_keys',

                'gpu_view_keys',

                'missing_gpu_keys',

                'missing_key_examples',

                'session_hours',

                'demand_hours',

                'missing_demand_hours',

                'missing_hours',

                'demand_rows',

                'gpu_rows',

                'sla_rows',

                'note',

                'error',

            ]:

                if k in row and k not in out:

                    out[k] = row.get(k)

            recon_rows.append(out)



        recon_df = pd.DataFrame(recon_rows)



        # Compact verdict-first block for 06-10 readability.

        verdict_cols = [

            'test_name', 'status', 'failure_mode', 'joined_rows', 'rollup_rows', 'view_rows',

            'max_abs_diff_fps', 'max_abs_diff_minutes', 'total_diff_sessions', 'total_known_diff', 'total_unexcused_diff', 'total_swapped_diff'

        ]

        verdict_df = recon_df.reindex(columns=[c for c in verdict_cols if c in recon_df.columns]).copy()

        verdict_df['status_flag'] = verdict_df['status'].map(lambda s: 'PASS' if s == 'PASS' else 'FAIL')

        display(Markdown('**06-10 Verdict (Assertion-Sourced Quick Read)**'))

        display(style_status_table(verdict_df))



        explainability_names = [

            'gpu_count_delta_explained_by_key_overlap',

            'network_demand_counts_aligned_to_rollup',

            'gpu_view_covers_healthy_attributed_session_keys',

            'demand_has_rows_for_all_session_hours',

            'sla_counts_aligned_to_raw_latest_sessions',

            'view_count_grain_ordering',

        ]

        explainability_df = recon_df[recon_df['test_name'].isin(explainability_names)]

        display(Markdown('**06-10 Explainability (Assertion-Sourced)**'))

        display(style_status_table(explainability_df))

        any_fail = bool((recon_df.get('status') != 'PASS').any()) if 'status' in recon_df.columns else True
        show_debug = bool(globals().get('SHOW_0609_DEBUG', False) or globals().get('SHOW_DEBUG', False))
        if any_fail or show_debug:
            display(Markdown('**06-09 Debug Details**'))
            display(style_status_table(recon_df))
        else:
            display(Markdown('_Debug details hidden on PASS. Use top toggles (`Mode=Debug` or `06-09=Force Debug 06-09`) to expand._'))

        display(df.head(20) if SHOW_DEBUG else df.head(10))
        continue

    display(df.head(20) if SHOW_DEBUG else df.head(10))


Loaded 10 ordered queries from /home/julian/Documents/development/spe-work/livepeer-naap-analytics/tests/integration/sql/trace_pipeline_flow.sql

=== 01 Raw Ingest ===


**What This Checks**: Confirm raw ingress is populated and DLQ/quarantine are not unexpectedly growing.  
**Pass Criteria**: Rows present in streaming_events, and quality side tables stable.

Unnamed: 0,object_name,rows_window,min_ts,max_ts
0,streaming_events_dlq,0,1970-01-01 00:00:00.000,1970-01-01 00:00:00.000
1,streaming_events_quarantine,0,1970-01-01 00:00:00.000,1970-01-01 00:00:00.000
2,streaming_events,134,2026-02-25 16:08:39.871,2026-02-25 20:43:55.262



=== 02 Typed Tables ===


**What This Checks**: Confirm parser output exists for expected event families.  
**Pass Criteria**: Typed tables have rows where corresponding raw events exist.

Unnamed: 0,object_name,rows_window,min_ts,max_ts
0,ai_stream_events,26,2026-02-25 18:13:43.807,2026-02-25 20:43:55.262
1,ai_stream_status,24,2026-02-25 18:13:01.117,2026-02-25 20:43:52.031
2,network_capabilities,668,2026-02-25 16:08:39.871,2026-02-25 20:42:49.295
3,stream_ingest_metrics,0,1970-01-01 00:00:00.000,1970-01-01 00:00:00.000
4,stream_trace_events,66,2026-02-25 18:12:50.697,2026-02-25 20:43:55.262



=== 03 Silver Projection Counts ===


**What This Checks**: Confirm typed-to-silver projections are populated.  
**Pass Criteria**: Silver rows align with typed presence for enabled event families.

Unnamed: 0,object_name,rows_window,min_ts,max_ts
0,fact_stream_ingest_samples,0,1970-01-01 00:00:00.000,1970-01-01 00:00:00.000
1,fact_stream_status_samples,24,2026-02-25 18:13:01.117,2026-02-25 20:43:52.031
2,fact_stream_trace_edges,66,2026-02-25 18:12:50.697,2026-02-25 20:43:55.262



=== 04 Stateful Fact Counts ===


**What This Checks**: Confirm lifecycle fact row shapes and latest-session alignment.  
**Pass Criteria**: Latest sessions and segment/session relationships are not contradictory.

**Interpretation**: this section now shows comparable grains plus status flags so unexpected session/segment mismatches stand out.

Unnamed: 0,object_name,rows_window,min_ts,max_ts
0,fact_workflow_param_updates,0,1970-01-01 00:00:00.000,1970-01-01 00:00:00.000
1,fact_workflow_session_segments,16,2026-02-25 18:12:50.697,2026-02-25 20:43:21.927
2,fact_workflow_sessions,22,2026-02-25 18:12:50.697,2026-02-25 20:43:11.880


Unnamed: 0,check_name,lhs,rhs,delta,status,expectation
0,latest_sessions_vs_segment_session_ids,12,12,0,PASS,segment session ids should be <= latest sessions (gap <= 1 usually expected)
1,raw_session_rows_vs_latest_sessions,22,12,10,INFO,raw rows >= latest sessions; positive delta indicates multiple versions in backend
2,segment_rows_vs_segment_session_ids,16,12,4,INFO,segment rows are event-level and can exceed distinct session ids



=== 05 Reliability and Swap Summary ===


**What This Checks**: Summarize startup and swap outcomes at latest-session grain.  
**Pass Criteria**: No contradictory startup/swap semantics; version drift is explained.

**Interpretation**: KPI summary below uses latest-per-session rows to avoid versioning drift in totals.

Unnamed: 0,sessions,known_stream_sessions,startup_success_sessions,startup_excused_sessions,startup_unexcused_sessions,confirmed_swapped_sessions,inferred_orchestrator_change_sessions,swapped_sessions,unexcused_rate
0,12,12,7,5,0,2,0,2,0.0


Unnamed: 0,raw_session_rows,distinct_workflow_sessions,sessions_with_multiple_versions,sessions_with_known_stream_transitions
0,12,12,9,8


Unnamed: 0,check_name,value,status,expectation
0,mixed_known_stream_versions,8,WARN,ideally 0; >0 means the same session changed known_stream across versions


**Reader note**: `mixed_known_stream_versions > 0` usually means a session was initially emitted with unresolved identity (`known_stream=0`) and later corrected (`known_stream=1`) in a newer version. Review the per-version rows below to confirm transitions are one-way (`0 -> 1`) and not regressions (`1 -> 0`).

_Detailed per-version rows hidden in compact mode. Switch to Debug mode to expand._

**Per-version field diffs (for mixed known_stream sessions)**

Unnamed: 0,workflow_session_id,from_version,to_version,changed_fields,status
0,aiJobTesterStream-1772045489601127007|2e3bdf2a,1,15,known_stream: 0 -> 1; startup_success: 0 -> 1; swap_count: 0 -> 1; confirmed_swap_count: 0 -> 1; session_end_ts: 2026-02-25 18:51:45.044000 -> 2026-02-25 18:52:19.216000,INFO
1,aiJobTesterStream-1772047761182996484|bd1cd871,2,16,known_stream: 0 -> 1; startup_success: 0 -> 1; swap_count: 0 -> 1; confirmed_swap_count: 0 -> 1; session_end_ts: 2026-02-25 19:29:45.965000 -> 2026-02-25 19:30:38.660000,INFO
2,aiJobTesterStream-1772051277781396655|2a377429,4,12,known_stream: 0 -> 1; startup_success: 0 -> 1; session_end_ts: 2026-02-25 20:28:43.843000 -> 2026-02-25 20:28:46.719000,INFO
3,aiJobTesterStream-1772052050242033096|7d208f6e,4,12,known_stream: 0 -> 1; startup_success: 0 -> 1; session_end_ts: 2026-02-25 20:41:35.045000 -> 2026-02-25 20:41:40.442000,INFO
4,aiJobTesterStream-1772052113991767774|0815334c,2,5,known_stream: 0 -> 1; startup_excused: 0 -> 1,INFO
5,aiJobTesterStream-1772052121879441760|ea6b821a,2,5,known_stream: 0 -> 1; startup_excused: 0 -> 1,INFO
6,aiJobTesterStream-1772052129765367540|471cba71,1,2,session_end_ts: 2026-02-25 20:42:20.116000 -> 2026-02-25 20:42:25.448000,INFO
7,aiJobTesterStream-1772052129765367540|471cba71,2,13,known_stream: 0 -> 1; startup_success: 0 -> 1; session_end_ts: 2026-02-25 20:42:25.448000 -> 2026-02-25 20:42:59.885000,INFO
8,aiJobTesterStream-1772052186479599343|ebeb78b7,2,12,known_stream: 0 -> 1; startup_success: 0 -> 1; session_end_ts: 2026-02-25 20:43:32.033000 -> 2026-02-25 20:43:55.262000,INFO



=== 06 Rollup Population ===


**What This Checks**: Confirm rollup tables have rows for the window.  
**Pass Criteria**: Rollup rows exist and parity checks below explain non-1:1 counts.

**06-09 Grain Map + Reconciliation Panel**: these sections are not 1:1 with raw/stateful counts; validate them by grain-aligned parity checks sourced from assertion SQL.

Unnamed: 0,section,object,grain,validation_source
0,06_rollup_population,agg_reliability_1h / agg_stream_performance_1m,time-bucket + serving dimensions (not per-session rows),assertions_pipeline.sql::gpu_view_matches_rollup
1,07_view_population,v_api_* views,API serving grain (view-specific),assertions_pipeline.sql::network_demand_view_matches_rollup + assertions_pipeline.sql::sla_view_matches_session_fact
2,08_gpu_view_parity,v_api_gpu_metrics parity,join-key intersection at GPU metrics grain,assertions_pipeline.sql::gpu_view_matches_rollup
3,09_network_demand_view_parity,v_api_network_demand parity,join-key intersection at network-demand grain,assertions_pipeline.sql::network_demand_view_matches_rollup
4,09_sla_view_parity,v_api_sla_compliance parity,join-key intersection at SLA grain,assertions_pipeline.sql::sla_view_matches_session_fact


**06-10 Verdict (Assertion-Sourced Quick Read)**

Unnamed: 0,test_name,status,failure_mode,joined_rows,rollup_rows,view_rows,max_abs_diff_fps,max_abs_diff_minutes,total_diff_sessions,total_known_diff,total_unexcused_diff,total_swapped_diff,status_flag
0,agg_stream_performance_1m_matches_status_samples,PASS,PASS,7.0,7.0,,,,0.0,,,,PASS
1,gpu_view_matches_rollup,PASS,PASS,7.0,7.0,9.0,0.0,,,,,,PASS
2,network_demand_view_matches_rollup,PASS,PASS,5.0,5.0,5.0,0.0,0.0,0.0,,,,PASS
3,sla_view_matches_session_fact,PASS,PASS,9.0,,9.0,,,,0.0,0.0,0.0,PASS
4,gpu_count_delta_explained_by_key_overlap,PASS,,,7.0,9.0,,,,,,,PASS
5,network_demand_counts_aligned_to_rollup,PASS,,,5.0,5.0,,,,,,,PASS
6,gpu_view_covers_healthy_attributed_session_keys,PASS,,,,,,,,,,,PASS
7,demand_has_rows_for_all_session_hours,PASS,,,,,,,,,,,PASS
8,sla_counts_aligned_to_raw_latest_sessions,PASS,,,,9.0,,,,,,,PASS
9,view_count_grain_ordering,PASS,,,,,,,,,,,PASS


**06-10 Explainability (Assertion-Sourced)**

Unnamed: 0,test_name,status,failed_rows,failure_mode,rollup_rows,joined_rows,rollup_only_keys,expected_rows,expected_only_keys,total_diff_sessions,total_diff_streams,total_diff_samples,max_abs_diff_avg_output_fps,view_rows,view_only_keys,mean_abs_diff_fps,max_abs_diff_fps,mean_abs_diff_minutes,max_abs_diff_minutes,total_diff_known_sessions,total_diff_served_sessions,total_diff_unserved_sessions,total_diff_unexcused_sessions,total_diff_swapped_sessions,raw_rows,raw_only_keys,total_known_diff,total_unexcused_diff,total_swapped_diff,row_delta,overlap_delta,eligible_session_keys,gpu_view_keys,missing_gpu_keys,missing_key_examples,session_hours,demand_hours,missing_demand_hours,missing_hours,demand_rows,gpu_rows,sla_rows
4,gpu_count_delta_explained_by_key_overlap,PASS,0,,7.0,,0.0,,,,,,,9.0,2.0,,,,,,,,,,,,,,,2.0,2.0,,,,,,,,,,,
5,network_demand_counts_aligned_to_rollup,PASS,0,,5.0,,0.0,,,,,,,5.0,0.0,,,,,,,,,,,,,,,,,,,,,,,,,,,
6,gpu_view_covers_healthy_attributed_session_keys,PASS,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,7.0,9.0,0.0,[],,,,,,,
7,demand_has_rows_for_all_session_hours,PASS,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,3.0,3.0,0.0,[],,,
8,sla_counts_aligned_to_raw_latest_sessions,PASS,0,,,,,,,,,,,9.0,0.0,,,,,,,,,,9.0,0.0,,,,,,,,,,,,,,,,
9,view_count_grain_ordering,PASS,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,5.0,9.0,9.0


_Debug details hidden on PASS. Use top toggles (`Mode=Debug` or `06-09=Force Debug 06-09`) to expand._

Unnamed: 0,object_name,rows_window
0,agg_reliability_1h,12
1,agg_stream_performance_1m,7



=== 07 View Population ===


**What This Checks**: Confirm API view rows exist at their serving grains.  
**Pass Criteria**: Counts are explainable by grain and parity diagnostics.

Unnamed: 0,object_name,rows_window
0,v_api_gpu_metrics,9
1,v_api_network_demand,5
2,v_api_sla_compliance,9



=== 08 GPU View Parity ===


**What This Checks**: Validate GPU view keys/values against source rollups.  
**Pass Criteria**: Failure mode PASS or explainable overlap diagnostics.

Unnamed: 0,failure_mode,rollup_rows,view_rows,joined_rows,rollup_only_keys,view_only_keys,rollup_empty_orch_rows,rollup_empty_gpu_rows,rollup_empty_region_rows,view_empty_orch_rows,view_empty_gpu_rows,view_empty_region_rows,mean_abs_diff_fps,max_abs_diff_fps
0,PASS,7,9,7,0,2,0,0,7,0,0,9,0.0,0.0



=== 09 SLA View Parity ===


**What This Checks**: Validate SLA view counts against latest-session recompute.  
**Pass Criteria**: Failure mode PASS with zero aggregate diffs.

Unnamed: 0,failure_mode,raw_rows,view_rows,joined_rows,raw_only_keys,view_only_keys,total_known_diff,total_unexcused_diff,total_swapped_diff
0,PASS,9,9,9,0,0,0,0,0



=== 10 Network Demand View Parity ===


**What This Checks**: Validate demand view against recomputed rollup/session aggregates.  
**Pass Criteria**: Failure mode PASS with zero key and value diffs.

Unnamed: 0,failure_mode,rollup_rows,view_rows,joined_rows,rollup_only_keys,view_only_keys,mean_abs_diff_fps,max_abs_diff_fps,mean_abs_diff_minutes,max_abs_diff_minutes,total_diff_sessions,total_diff_streams,total_diff_known_sessions,total_diff_served_sessions,total_diff_unserved_sessions,total_diff_unexcused_sessions,total_diff_swapped_sessions
0,PASS,5,5,5,0,0,0.0,0.0,0.0,0.0,0,0,0,0,0,0,0


## Integration Assertions (CI-aligned)

In [6]:
def run_assertion_file(path: str | Path) -> pd.DataFrame:
    tests = parse_blocks(path, '-- TEST:')
    rows = []
    for name, sql in tests:
        try:
            df = query_df(sql, params)
        except Exception as exc:
            rows.append({'test_name': name, 'failed_rows': 1, 'status': 'FAIL', 'error': str(exc)})
            continue

        if df.empty:
            rows.append({'test_name': name, 'failed_rows': 1, 'status': 'FAIL', 'error': 'No rows'})
            continue

        row = df.iloc[0].to_dict()
        failed = int(row.get('failed_rows', 1))
        row['test_name'] = name
        row['status'] = 'PASS' if failed == 0 else 'FAIL'
        rows.append(row)

    out = pd.DataFrame(rows)
    cols = ['test_name', 'status', 'failed_rows'] + [c for c in out.columns if c not in {'test_name', 'status', 'failed_rows'}]
    return out[cols]


### Raw -> Typed Assertions
- What this checks: Accounting parity from raw events into typed tables, plus network capabilities fanout guard.
- Why it matters: Confirms parser output volume is consistent with accepted raw input (after DLQ/quarantine).
- How to read failures: Look at `raw_rows`, `accepted_rows_est`, `typed_rows`, and `typed_distinct_source_events` diagnostics first.


### Pipeline Contract Assertions
- What this checks: Presence, projection integrity, session semantics, and rollup/view parity contracts.
- Why it matters: Validates correctness-critical transformations owned by Flink and serving parity in ClickHouse.
- How to read failures: Treat as regressions unless explicitly informational; inspect failing test diagnostics first.


In [7]:
raw_typed_assertions = run_assertion_file(SQL_DIR / 'assertions_raw_typed.sql')
raw_typed_overview = raw_typed_assertions[['test_name', 'status', 'failed_rows']].copy() if not raw_typed_assertions.empty else raw_typed_assertions
display(style_status_table(raw_typed_overview))
raw_typed_failures = raw_typed_assertions[raw_typed_assertions['status'] == 'FAIL']
print(f'Raw->typed assertion failures: {len(raw_typed_failures)}')
if SHOW_DEBUG:
    display(Markdown('**Raw -> Typed Debug Details**'))
    display(style_status_table(raw_typed_assertions))

pipeline_assertions = run_assertion_file(SQL_DIR / 'assertions_pipeline.sql')
pipeline_overview = pipeline_assertions[['test_name', 'status', 'failed_rows']].copy() if not pipeline_assertions.empty else pipeline_assertions
display(style_status_table(pipeline_overview))

pipeline_failures = pipeline_assertions[pipeline_assertions['status'] == 'FAIL']
print(f'Pipeline assertion failures: {len(pipeline_failures)}')
if SHOW_DEBUG:
    display(Markdown('**Pipeline Debug Details**'))
    display(style_status_table(pipeline_assertions))


Unnamed: 0,test_name,status,failed_rows
0,raw_typed_accepted_estimate_nonnegative,PASS,0.0
1,raw_typed_no_dlq_or_quarantine_for_core_types,PASS,0.0
2,raw_typed_core_1to1_parity,PASS,0.0
3,raw_typed_network_capabilities_expected_in_window,PASS,0.0
4,raw_typed_network_capabilities_fanout_guard,PASS,0.0


Raw->typed assertion failures: 0


Unnamed: 0,test_name,status,failed_rows
0,raw_events_present,PASS,0
1,capability_dimension_mvs_present,PASS,0
2,capability_dimensions_projecting,PASS,0
3,session_fact_present,PASS,0
4,core_raw_to_silver_gold_nonempty,PASS,0
5,network_capabilities_raw_and_typed_present,PASS,0
6,status_raw_to_silver_projection,PASS,0
7,trace_raw_to_silver_projection,PASS,0
8,ingest_raw_to_silver_projection,PASS,0
9,session_final_uniqueness,PASS,0


Pipeline assertion failures: 0


## Executive Summary + API Readiness Assertions
- This section generates the notebook-level Executive Summary table after assertions run.
- What this checks: Serving view presence, key fields, ratio math, hourly grain, and recompute parity.
- Why it matters: Ensures downstream API consumers see coherent and contract-safe metrics.
- How to read failures: Focus on view-level formula drift and dimension nullability first.


In [8]:
api_readiness_assertions = run_assertion_file(SQL_DIR / 'assertions_api_readiness.sql')
api_readiness_overview = api_readiness_assertions[['test_name', 'status', 'failed_rows']].copy() if not api_readiness_assertions.empty else api_readiness_assertions
display(style_status_table(api_readiness_overview))

api_readiness_failures = api_readiness_assertions[api_readiness_assertions['status'] == 'FAIL']
print(f'API readiness assertion failures: {len(api_readiness_failures)}')
if SHOW_DEBUG:
    display(Markdown('**API Readiness Debug Details**'))
    display(style_status_table(api_readiness_assertions))

def _count_failures(df: pd.DataFrame) -> int:
    if df is None or df.empty or 'status' not in df.columns:
        return 0
    return int((df['status'] == 'FAIL').sum())

summary_rows = [
    {
        'layer': 'Raw -> Typed Assertions',
        'tests': int(len(raw_typed_assertions)),
        'failures': _count_failures(raw_typed_assertions),
    },
    {
        'layer': 'Pipeline Contract Assertions',
        'tests': int(len(pipeline_assertions)),
        'failures': _count_failures(pipeline_assertions),
    },
    {
        'layer': 'API Readiness Assertions',
        'tests': int(len(api_readiness_assertions)),
        'failures': _count_failures(api_readiness_assertions),
    },
]
summary_df = pd.DataFrame(summary_rows)
summary_df['status'] = summary_df['failures'].map(lambda x: 'PASS' if int(x) == 0 else 'FAIL')
summary_df['window_utc'] = f"{FROM_TS.isoformat()} -> {TO_TS.isoformat()}"
summary_df['db_target'] = f"{CH_HOST}:{CH_PORT}/{CH_DATABASE}"
display(Markdown('## Executive Summary'))
display(Markdown('**Status Legend**: PASS = contract met; WARN = expected soft anomaly; FAIL = contract violation; INFO = no data to evaluate.'))
display(style_status_table(summary_df[['layer', 'tests', 'failures', 'status', 'window_utc', 'db_target']]))


Unnamed: 0,test_name,status,failed_rows
0,api_views_present,PASS,0
1,gpu_metrics_keys_not_null,PASS,0
2,gpu_metrics_rollup_fields_consistent,PASS,0
3,gpu_metrics_latency_fields_nonnegative,PASS,0
4,gpu_metrics_startup_seconds_matches_ms,PASS,0
5,network_demand_hourly_grain,PASS,0
6,network_demand_by_gpu_hourly_grain,PASS,0
7,network_demand_by_gpu_required_columns_present,PASS,0
8,network_demand_by_gpu_capacity_fields_nonnegative,PASS,0
9,network_demand_additive_fields_nonnegative,PASS,0


API readiness assertion failures: 0


## Executive Summary

**Status Legend**: PASS = contract met; WARN = expected soft anomaly; FAIL = contract violation; INFO = no data to evaluate.

Unnamed: 0,layer,tests,failures,status,window_utc,db_target
0,Raw -> Typed Assertions,5,0,PASS,2026-02-25T16:08:39.871000+00:00 -> 2026-02-25T21:43:55.262000+00:00,localhost:8123/livepeer_analytics
1,Pipeline Contract Assertions,31,0,PASS,2026-02-25T16:08:39.871000+00:00 -> 2026-02-25T21:43:55.262000+00:00,localhost:8123/livepeer_analytics
2,API Readiness Assertions,14,0,PASS,2026-02-25T16:08:39.871000+00:00 -> 2026-02-25T21:43:55.262000+00:00,localhost:8123/livepeer_analytics


## API Readiness Failure Drill-Down
This section shows offending rows for failed API readiness checks so failures are explainable (contract bug vs sparse telemetry).


In [9]:
# Drill-down queries for common readiness failures
readiness_checks = {
    'gpu_metrics_keys_not_null': '''
        SELECT *
        FROM livepeer_analytics.v_api_gpu_metrics
        WHERE window_start >= {from_ts:DateTime64(3)}
          AND window_start < {to_ts:DateTime64(3)}
          AND (
            orchestrator_address = ''
            OR pipeline = ''
            OR ifNull(gpu_id, '') = ''
            OR window_start IS NULL
          )
        ORDER BY window_start DESC
        LIMIT 50
    ''',
    'gpu_metrics_rollup_fields_consistent': '''
        SELECT
          window_start,
          orchestrator_address,
          pipeline,
          model_id,
          gpu_id,
          known_sessions,
          unexcused_sessions,
          swapped_sessions,
          failure_rate,
          ifNull(unexcused_sessions / nullIf(known_sessions, 0), 0) AS recomputed_failure_rate,
          swap_rate,
          ifNull(swapped_sessions / nullIf(known_sessions, 0), 0) AS recomputed_swap_rate
        FROM livepeer_analytics.v_api_gpu_metrics
        WHERE window_start >= {from_ts:DateTime64(3)}
          AND window_start < {to_ts:DateTime64(3)}
          AND (
            abs(failure_rate - ifNull(unexcused_sessions / nullIf(known_sessions, 0), 0)) > 0.000001
            OR abs(swap_rate - ifNull(swapped_sessions / nullIf(known_sessions, 0), 0)) > 0.000001
          )
        ORDER BY window_start DESC
        LIMIT 50
    ''',
    'network_demand_by_gpu_hourly_grain': '''
        SELECT *
        FROM livepeer_analytics.v_api_network_demand_by_gpu
        WHERE window_start >= {from_ts:DateTime64(3)}
          AND window_start < {to_ts:DateTime64(3)}
          AND toMinute(window_start) != 0
        ORDER BY window_start DESC
        LIMIT 50
    ''',
    'network_demand_by_gpu_capacity_fields_nonnegative': '''
        SELECT *
        FROM livepeer_analytics.v_api_network_demand_by_gpu
        WHERE window_start >= {from_ts:DateTime64(3)}
          AND window_start < {to_ts:DateTime64(3)}
          AND (
            inference_minutes_by_gpu_type < 0
            OR used_inference_minutes < 0
            OR available_capacity_minutes < 0
            OR capacity_rate < 0
            OR capacity_rate > 1.5
          )
        ORDER BY window_start DESC
        LIMIT 50
    ''',
    'sla_compliance_rollup_safe': '''
        SELECT
          window_start,
          orchestrator_address,
          pipeline,
          model_id,
          gpu_id,
          known_sessions,
          unexcused_sessions,
          swapped_sessions,
          success_ratio,
          ifNull(1 - (unexcused_sessions / nullIf(known_sessions, 0)), 0) AS recomputed_success_ratio,
          no_swap_ratio,
          ifNull(1 - (swapped_sessions / nullIf(known_sessions, 0)), 0) AS recomputed_no_swap_ratio
        FROM livepeer_analytics.v_api_sla_compliance
        WHERE window_start >= {from_ts:DateTime64(3)}
          AND window_start < {to_ts:DateTime64(3)}
          AND (
            abs(success_ratio - ifNull(1 - (unexcused_sessions / nullIf(known_sessions, 0)), 0)) > 0.000001
            OR abs(no_swap_ratio - ifNull(1 - (swapped_sessions / nullIf(known_sessions, 0)), 0)) > 0.000001
          )
        ORDER BY window_start DESC
        LIMIT 50
    '''
}

if 'api_readiness_assertions' in globals():
    failed_tests = api_readiness_assertions.loc[api_readiness_assertions['status'] == 'FAIL', 'test_name'].tolist()
    print('Failed readiness tests:', failed_tests)
    for t in failed_tests:
        sql = readiness_checks.get(t)
        if not sql:
            print(f'\n{t}: no drill-down query registered')
            continue
        print(f'\n=== {t} offending rows ===')
        df = query_df(sql, params)
        display(df)
else:
    print('Run API readiness assertions first.')


Failed readiness tests: []


## Serving Schema Sanity
Quick schema contract check for serving views to catch column naming drift before API integration.


In [10]:
if 'api_readiness_assertions' in globals() and api_readiness_assertions is not None and not api_readiness_assertions.empty:
    schema_tests = [
        'api_views_present',
        'network_demand_by_gpu_required_columns_present',
    ]
    schema_assertions = api_readiness_assertions[api_readiness_assertions['test_name'].isin(schema_tests)].copy()
    display(Markdown('**API View Schema Contract Checks (Assertion-Sourced)**'))
    display(style_status_table(schema_assertions))

    failed_schema_tests = schema_assertions.loc[schema_assertions['status'] == 'FAIL', 'test_name'].tolist()
    if failed_schema_tests:
        describe_targets = {
            'api_views_present': ['v_api_gpu_metrics', 'v_api_network_demand', 'v_api_network_demand_by_gpu', 'v_api_sla_compliance'],
            'network_demand_by_gpu_required_columns_present': ['v_api_network_demand_by_gpu'],
        }
        displayed = set()
        for test_name in failed_schema_tests:
            for view_name in describe_targets.get(test_name, []):
                if view_name in displayed:
                    continue
                displayed.add(view_name)
                print(f'\n=== {view_name} columns ===')
                display(query_df(f"DESCRIBE TABLE livepeer_analytics.{view_name}"))
else:
    print('Run API readiness assertions first.')


**API View Schema Contract Checks (Assertion-Sourced)**

Unnamed: 0,test_name,status,failed_rows,missing_views,rows_checked,missing_columns,low_sample_windows,joined_rows
0,api_views_present,PASS,0,[],,,,
7,network_demand_by_gpu_required_columns_present,PASS,0,,,[],,


## Raw -> Silver Correlation Checks

These rows show that raw typed records are carried into silver facts using `source_event_uid` correlation.

In [11]:
projection_checks = pipeline_assertions[pipeline_assertions['test_name'].str.contains('_raw_to_silver_projection', na=False)]
display(style_status_table(projection_checks[['test_name', 'status', 'failed_rows', 'typed_rows', 'projected_rows', 'missing_in_silver']]))


Unnamed: 0,test_name,status,failed_rows,typed_rows,projected_rows,missing_in_silver
6,status_raw_to_silver_projection,PASS,0,24.0,24.0,0.0
7,trace_raw_to_silver_projection,PASS,0,66.0,66.0,0.0
8,ingest_raw_to_silver_projection,PASS,0,0.0,0.0,0.0


## Contract Coverage Profile (Telemetry Completeness)
Profiles key sparsity signals to separate telemetry gaps from serving-model bugs.


In [12]:
coverage_sql = '''
SELECT *
FROM
(
  SELECT
    'v_api_gpu_metrics' AS object_name,
    count() AS rows_window,
    countIf(pipeline = '') AS empty_pipeline_rows,
    countIf(ifNull(gpu_id, '') = '') AS empty_gpu_rows,
    countIf(status_samples = 0) AS zero_status_sample_rows,
    CAST(NULL AS Nullable(UInt64)) AS empty_gateway_rows
  FROM livepeer_analytics.v_api_gpu_metrics
  WHERE window_start >= {from_ts:DateTime64(3)}
    AND window_start < {to_ts:DateTime64(3)}

  UNION ALL

  SELECT
    'v_api_network_demand' AS object_name,
    count() AS rows_window,
    countIf(pipeline = '') AS empty_pipeline_rows,
    CAST(NULL AS Nullable(UInt64)) AS empty_gpu_rows,
    CAST(NULL AS Nullable(UInt64)) AS zero_status_sample_rows,
    countIf(gateway = '') AS empty_gateway_rows
  FROM livepeer_analytics.v_api_network_demand
  WHERE window_start >= {from_ts:DateTime64(3)}
    AND window_start < {to_ts:DateTime64(3)}
)
ORDER BY object_name
'''

display(query_df(coverage_sql, params))

hourly_sparsity_sql = '''
SELECT
  toStartOfInterval(sample_ts, INTERVAL 1 HOUR) AS window_start,
  count() AS status_rows,
  countIf(output_fps > 0) AS fps_positive_rows,
  countIf(output_fps = 0) AS fps_zero_rows,
  countIf(gateway = '') AS empty_gateway_rows
FROM livepeer_analytics.fact_stream_status_samples
WHERE sample_ts >= {from_ts:DateTime64(3)}
  AND sample_ts < {to_ts:DateTime64(3)}
GROUP BY window_start
ORDER BY window_start DESC
'''

print('\nStatus sample sparsity by hour:')
display(query_df(hourly_sparsity_sql, params))


Unnamed: 0,object_name,rows_window,empty_pipeline_rows,empty_gpu_rows,zero_status_sample_rows,empty_gateway_rows
0,v_api_gpu_metrics,9,0,0.0,2.0,
1,v_api_network_demand,5,0,,,0.0



Status sample sparsity by hour:


Unnamed: 0,window_start,status_rows,fps_positive_rows,fps_zero_rows,empty_gateway_rows
0,2026-02-25 20:00:00,16,9,7,0
1,2026-02-25 19:00:00,2,1,1,0
2,2026-02-25 18:00:00,6,3,3,0


## Scenario Candidate Discovery
### Coverage Guardrails
- `scenario_3_success_with_swap`: require non-zero candidates before running blocking scenario assertions; discovery accepts current swap fields plus legacy `swap_count` fallback.
- `scenario_4_success_with_param_updates`: track as data-availability signal; production can legitimately return zero until `fact_workflow_param_updates` is populated.
- `scenario_5_out_of_category_baseline`: sampled out-of-category sessions used to mirror notebook `fallout_df` coverage (non-blocking).
- Relationship to `fallout_df`: scenario_5 session ids should be a subset of fallout sessions for the same window.
### Fixture Source Contract
- Replay fixtures are raw-first: scenario JSONL replay rows are exported from `streaming_events` only.
- Scenario discovery still uses typed/fact tables; capability context can include rows before the scenario window.
- If scenario assertions fail while raw replay is present, inspect candidate availability/window selection before parser logic.


In [13]:
from IPython.display import Markdown

scenario_blocks = parse_blocks(SQL_DIR / 'scenario_candidates.sql', '-- QUERY:')
scenario_candidates: dict[str, pd.DataFrame] = {}
scenario_review_guidance = {
    'scenario_1_clean_success_no_swap_fps_gt_12': (
        '- Goal: confirm clean successful sessions with sustained output FPS and no swaps.\n'
        '- Validate: `avg_output_fps > 12`, `segment_orchestrators` is stable (typically 1), and IDs map to expected fixture rows.'
    ),
    'scenario_2_no_orchestrator_then_closed': (
        '- Goal: confirm startup failure path where gateway cannot find orchestrators and stream closes.\n'
        '- Validate: `startup_success=0`, `has_no_orch=1`, `has_close=1`, and timestamps/IDs are unique per row.'
    ),
    'scenario_3_success_with_swap': (
        '- Goal: confirm successful sessions with explicit, derived, or legacy swap evidence.\n'
        '- Validate buckets explicitly: `confirmed_swap_count > 0` (explicit), `inferred_orchestrator_change_count > 0` or `segment_orchestrators > 1` (derived), and `swap_count > 0` (legacy fallback), plus matching stream/request/session IDs.'
    ),
    'scenario_4_success_with_param_updates': (
        '- Goal: confirm successful sessions with parameter updates.\n'
        '- Validate: non-empty rows when data exists; if empty, treat as data-availability signal rather than parser failure.'
    ),
}
scenario_rows = []
for name, sql in scenario_blocks:
    df = query_df(sql, params)
    scenario_candidates[name] = df
    scenario_rows.append((name, df))

# Fallout table: sessions in window not covered by primary scenarios (1-4).
primary_scenarios = [
    'scenario_1_clean_success_no_swap_fps_gt_12',
    'scenario_2_no_orchestrator_then_closed',
    'scenario_3_success_with_swap',
    'scenario_4_success_with_param_updates',
]
classified_ids_primary = set()
for name in primary_scenarios:
    df = scenario_candidates.get(name)
    if df is not None and not df.empty and 'workflow_session_id' in df.columns:
        classified_ids_primary.update(df['workflow_session_id'].astype(str).tolist())

scenario5_df = scenario_candidates.get('scenario_5_out_of_category_baseline', pd.DataFrame())
scenario5_ids = set(scenario5_df['workflow_session_id'].astype(str).tolist()) if (scenario5_df is not None and not scenario5_df.empty and 'workflow_session_id' in scenario5_df.columns) else set()

fallout_sql = """
WITH fs_latest AS
(
  SELECT
    workflow_session_id,
    stream_id,
    request_id,
    session_start_ts,
    session_end_ts,
    known_stream,
    startup_success,
    startup_excused,
    startup_unexcused,
    confirmed_swap_count,
    inferred_orchestrator_change_count,
    swap_count,
    orchestrator_address,
    gpu_id,
    model_id,
    pipeline
  FROM
  (
    SELECT
      *,
      row_number() OVER (
        PARTITION BY workflow_session_id
        ORDER BY version DESC, session_start_ts DESC, session_end_ts DESC
      ) AS rn
    FROM livepeer_analytics.fact_workflow_sessions FINAL
    WHERE session_start_ts >= {from_ts:DateTime64(3)}
      AND session_start_ts < {to_ts:DateTime64(3)}
  )
  WHERE rn = 1
)
SELECT
  f.workflow_session_id AS workflow_session_id,
  f.stream_id AS stream_id,
  f.request_id AS request_id,
  f.session_start_ts AS session_start_ts,
  f.session_end_ts AS session_end_ts,
  ifNull(s.avg_output_fps, 0) AS avg_output_fps,
  f.known_stream AS known_stream,
  f.startup_success AS startup_success,
  f.startup_excused AS startup_excused,
  f.startup_unexcused AS startup_unexcused,
  f.confirmed_swap_count AS confirmed_swap_count,
  f.inferred_orchestrator_change_count AS inferred_orchestrator_change_count,
  f.swap_count AS swap_count,
  f.orchestrator_address AS orchestrator_address,
  f.gpu_id AS gpu_id,
  f.model_id AS model_id,
  f.pipeline AS pipeline,
  ifNull(tf.has_no_orch, 0) AS has_no_orch,
  ifNull(tf.has_close, 0) AS has_close
FROM fs_latest f
LEFT JOIN
(
  SELECT workflow_session_id, avg(output_fps) AS avg_output_fps
  FROM livepeer_analytics.fact_stream_status_samples
  GROUP BY workflow_session_id
) s USING (workflow_session_id)
LEFT JOIN
(
  SELECT
    workflow_session_id,
    max(toUInt8(trace_type = 'gateway_no_orchestrators_available')) AS has_no_orch,
    max(toUInt8(trace_type = 'gateway_ingest_stream_closed')) AS has_close
  FROM livepeer_analytics.fact_stream_trace_edges
  GROUP BY workflow_session_id
) tf USING (workflow_session_id)
ORDER BY f.session_start_ts DESC
"""

all_sessions_df = query_df(fallout_sql, params)
if 'workflow_session_id' not in all_sessions_df.columns:
    all_sessions_df = pd.DataFrame(columns=[
        'workflow_session_id', 'stream_id', 'request_id', 'session_start_ts', 'session_end_ts',
        'avg_output_fps', 'known_stream', 'startup_success', 'startup_excused', 'startup_unexcused',
        'confirmed_swap_count', 'inferred_orchestrator_change_count', 'swap_count',
        'orchestrator_address', 'gpu_id', 'model_id', 'pipeline', 'has_no_orch', 'has_close'
    ])
fallout_df = all_sessions_df.copy()
if classified_ids_primary and 'workflow_session_id' in fallout_df.columns:
    fallout_df = fallout_df[~fallout_df['workflow_session_id'].astype(str).isin(classified_ids_primary)].copy()

# Defensive debug: if scenario tables are populated but fallout source query is empty,
# surface a direct fact-session count for the same window to catch stale state quickly.
if len(classified_ids_primary) > 0 and len(all_sessions_df) == 0:
    direct_count_df = query_df("""
    SELECT countDistinct(workflow_session_id) AS sessions_in_window
    FROM livepeer_analytics.fact_workflow_sessions
    WHERE session_start_ts >= {from_ts:DateTime64(3)}
      AND session_start_ts < {to_ts:DateTime64(3)}
    """, params)
    direct_count = int(direct_count_df.iloc[0]['sessions_in_window']) if not direct_count_df.empty else 0
    print('Warning: fallout source query returned 0 rows while scenario tables are non-empty.')
    print(f'Direct distinct session count in window: {direct_count}')

print(f"Notebook DB target: {CH_HOST}:{CH_PORT}/{CH_DATABASE} (secure={CH_SECURE})")
print(f"Sessions in window (latest-per-id): {len(all_sessions_df)}")
print(f"Primary classified ids (scenarios 1-4): {len(classified_ids_primary)}")
print(f"Fallout sessions (outside scenarios 1-4): {len(fallout_df)}")
print(f"Scenario 5 sampled ids: {len(scenario5_ids)}")

display(Markdown("### Classified Sessions"))

for name, df in scenario_rows:
    print(f'\n{name}: {len(df)} candidate rows')
    display(Markdown("<br>"))
    display(Markdown(f"**Review guidance**\n{scenario_review_guidance.get(name, '- Goal: validate scenario row integrity.')}"))
    display(df.head(10))

fallout_display_cols = [
    'workflow_session_id', 'stream_id', 'request_id', 'session_start_ts', 'session_end_ts',
    'avg_output_fps', 'known_stream', 'startup_success', 'startup_excused', 'startup_unexcused',
    'confirmed_swap_count', 'inferred_orchestrator_change_count', 'swap_count',
    'orchestrator_address', 'gpu_id', 'model_id', 'pipeline', 'has_no_orch', 'has_close'
]

display(Markdown("### Fallout + Scenario 5 Coverage"))
covered_by_s5 = len(fallout_df[fallout_df['workflow_session_id'].astype(str).isin(scenario5_ids)]) if (len(fallout_df) and scenario5_ids) else 0
residual_fallout_df = fallout_df[~fallout_df['workflow_session_id'].astype(str).isin(scenario5_ids)].copy() if len(fallout_df) else fallout_df
coverage_df = pd.DataFrame([
    {
        'fallout_total_sessions': int(len(fallout_df)),
        'scenario5_sampled_sessions': int(len(scenario5_ids)),
        'fallout_covered_by_scenario5': int(covered_by_s5),
        'residual_fallout_sessions': int(len(residual_fallout_df)),
        'note': 'scenario_5 is a sampled subset of fallout (non-blocking)'
    }
])
display(coverage_df)

if scenario5_df is not None and not scenario5_df.empty:
    display(Markdown("**Scenario 5 Sample (Out-of-Category Baseline)**"))
    display(scenario5_df.reindex(columns=fallout_display_cols).head(50))

if len(residual_fallout_df) > 0:
    display(Markdown("**Residual Fallout (Not in Scenario 5 sample)**"))
    display(residual_fallout_df.reindex(columns=fallout_display_cols).head(50))
else:
    display(Markdown("**Residual Fallout**: none after scenario_5 sampling in current window."))


Notebook DB target: localhost:8123/livepeer_analytics (secure=False)
Sessions in window (latest-per-id): 12
Primary classified ids (scenarios 1-4): 8
Fallout sessions (outside scenarios 1-4): 4
Scenario 5 sampled ids: 4


### Classified Sessions


scenario_1_clean_success_no_swap_fps_gt_12: 3 candidate rows


<br>

**Review guidance**
- Goal: confirm clean successful sessions with sustained output FPS and no swaps.
- Validate: `avg_output_fps > 12`, `segment_orchestrators` is stable (typically 1), and IDs map to expected fixture rows.

Unnamed: 0,scenario_name,workflow_session_id,stream_id,request_id,session_start_ts,session_end_ts,avg_output_fps,segment_orchestrators
0,scenario_1_clean_success_no_swap_fps_gt_12,aiJobTesterStream-1772052186479599343|ebeb78b7,aiJobTesterStream-1772052186479599343,ebeb78b7,2026-02-25 20:43:11.605,2026-02-25 20:43:55.262,12.115748,1
1,scenario_1_clean_success_no_swap_fps_gt_12,aiJobTesterStream-1772052050242033096|7d208f6e,aiJobTesterStream-1772052050242033096,7d208f6e,2026-02-25 20:40:54.932,2026-02-25 20:41:40.442,13.609556,1
2,scenario_1_clean_success_no_swap_fps_gt_12,aiJobTesterStream-1772051277781396655|2a377429,aiJobTesterStream-1772051277781396655,2a377429,2026-02-25 20:28:02.242,2026-02-25 20:28:46.719,16.509273,1



scenario_2_no_orchestrator_then_closed: 3 candidate rows


<br>

**Review guidance**
- Goal: confirm startup failure path where gateway cannot find orchestrators and stream closes.
- Validate: `startup_success=0`, `has_no_orch=1`, `has_close=1`, and timestamps/IDs are unique per row.

Unnamed: 0,scenario_name,workflow_session_id,stream_id,request_id,session_start_ts,session_end_ts,startup_success,startup_excused,startup_unexcused,has_no_orch,has_close
0,scenario_2_no_orchestrator_then_closed,aiJobTesterStream-1772052178586381030|a06463a9,aiJobTesterStream-1772052178586381030,a06463a9,2026-02-25 20:43:03.709,2026-02-25 20:43:05.993,0,1,0,1,1
1,scenario_2_no_orchestrator_then_closed,aiJobTesterStream-1772052121879441760|ea6b821a,aiJobTesterStream-1772052121879441760,ea6b821a,2026-02-25 20:42:06.939,2026-02-25 20:42:09.437,0,1,0,1,1
2,scenario_2_no_orchestrator_then_closed,aiJobTesterStream-1772052113991767774|0815334c,aiJobTesterStream-1772052113991767774,0815334c,2026-02-25 20:41:58.525,2026-02-25 20:41:58.628,0,1,0,1,1



scenario_3_success_with_swap: 2 candidate rows


<br>

**Review guidance**
- Goal: confirm successful sessions with explicit, derived, or legacy swap evidence.
- Validate buckets explicitly: `confirmed_swap_count > 0` (explicit), `inferred_orchestrator_change_count > 0` or `segment_orchestrators > 1` (derived), and `swap_count > 0` (legacy fallback), plus matching stream/request/session IDs.

Unnamed: 0,scenario_name,workflow_session_id,stream_id,request_id,session_start_ts,session_end_ts,confirmed_swap_count,inferred_orchestrator_change_count,swap_count,segment_orchestrators,has_explicit_swap_signal,has_derived_swap_signal,has_legacy_swap_signal,swap_signal_source
0,scenario_3_success_with_swap,aiJobTesterStream-1772047761182996484|bd1cd871,aiJobTesterStream-1772047761182996484,bd1cd871,2026-02-25 19:29:25.640,2026-02-25 19:30:38.660,1,0,1,2,1,1,1,explicit
1,scenario_3_success_with_swap,aiJobTesterStream-1772045489601127007|2e3bdf2a,aiJobTesterStream-1772045489601127007,2e3bdf2a,2026-02-25 18:51:34.969,2026-02-25 18:52:19.216,1,0,1,2,1,1,1,explicit



scenario_4_success_with_param_updates: 0 candidate rows


<br>

**Review guidance**
- Goal: confirm successful sessions with parameter updates.
- Validate: non-empty rows when data exists; if empty, treat as data-availability signal rather than parser failure.


scenario_5_out_of_category_baseline: 4 candidate rows


<br>

**Review guidance**
- Goal: validate scenario row integrity.

Unnamed: 0,scenario_name,workflow_session_id,stream_id,request_id,session_start_ts,session_end_ts,avg_output_fps,has_no_orch,has_close,segment_orchestrators,updates
0,scenario_5_out_of_category_baseline,aiJobTesterStream-1772052129765367540|471cba71,aiJobTesterStream-1772052129765367540,471cba71,2026-02-25 20:42:14.758,2026-02-25 20:42:59.885,0.0,0,1,1,0
1,scenario_5_out_of_category_baseline,aiJobTesterStream-1772051405698156244|6806f180,aiJobTesterStream-1772051405698156244,6806f180,2026-02-25 20:30:10.114,2026-02-25 20:30:12.385,0.0,0,1,0,0
2,scenario_5_out_of_category_baseline,aiJobTesterStream-1772051396589037968|20e03d91,aiJobTesterStream-1772051396589037968,20e03d91,2026-02-25 20:30:01.006,2026-02-25 20:30:03.154,0.0,0,1,0,0
3,scenario_5_out_of_category_baseline,aiJobTesterStream-1772043162850646866|c40084e7,aiJobTesterStream-1772043162850646866,c40084e7,2026-02-25 18:12:50.697,2026-02-25 18:13:43.807,8.072754,0,1,1,0


### Fallout + Scenario 5 Coverage

Unnamed: 0,fallout_total_sessions,scenario5_sampled_sessions,fallout_covered_by_scenario5,residual_fallout_sessions,note
0,4,4,4,0,scenario_5 is a sampled subset of fallout (non...


**Scenario 5 Sample (Out-of-Category Baseline)**

Unnamed: 0,workflow_session_id,stream_id,request_id,session_start_ts,session_end_ts,avg_output_fps,known_stream,startup_success,startup_excused,startup_unexcused,confirmed_swap_count,inferred_orchestrator_change_count,swap_count,orchestrator_address,gpu_id,model_id,pipeline,has_no_orch,has_close
0,aiJobTesterStream-1772052129765367540|471cba71,aiJobTesterStream-1772052129765367540,471cba71,2026-02-25 20:42:14.758,2026-02-25 20:42:59.885,0.0,,,,,,,,,,,,0,1
1,aiJobTesterStream-1772051405698156244|6806f180,aiJobTesterStream-1772051405698156244,6806f180,2026-02-25 20:30:10.114,2026-02-25 20:30:12.385,0.0,,,,,,,,,,,,0,1
2,aiJobTesterStream-1772051396589037968|20e03d91,aiJobTesterStream-1772051396589037968,20e03d91,2026-02-25 20:30:01.006,2026-02-25 20:30:03.154,0.0,,,,,,,,,,,,0,1
3,aiJobTesterStream-1772043162850646866|c40084e7,aiJobTesterStream-1772043162850646866,c40084e7,2026-02-25 18:12:50.697,2026-02-25 18:13:43.807,8.072754,,,,,,,,,,,,0,1


**Residual Fallout**: none after scenario_5 sampling in current window.

### Scenario Coverage Assertions
- What this checks: Whether selected windows include expected scenario categories used for fixture generation.
- Why it matters: Protects against blind spots in scenario-based testing.
- How to read failures: Usually means sparse production windows or missing event classes, not necessarily pipeline breakage.


In [14]:
display(Markdown("**Review guidance**\n- Goal: verify each scenario class has candidate coverage in the current window (except explicitly informational checks).\n- Important: assertion `candidates` values are not always full discovery counts; several tests are existence-oriented (e.g., `LIMIT 1`). Use the side-by-side table below for interpretation."))

scenario_assertions = run_assertion_file(SQL_DIR / 'assertions_scenario_candidates.sql')
display(style_status_table(scenario_assertions))

# Compare assertion-reported candidates vs discovery table counts to avoid confusion.
assert_count_map = {}
if scenario_assertions is not None and not scenario_assertions.empty:
    for _, r in scenario_assertions.iterrows():
        name = str(r.get('test_name', ''))
        if name.startswith('scenario_') and name.endswith('_exists'):
            scenario_name = name[len('scenario_'):-len('_exists')]
            # Map back to query block naming convention.
            scenario_name = 'scenario_' + scenario_name
            assert_count_map[scenario_name] = int(r.get('candidates', 0) if pd.notna(r.get('candidates', 0)) else 0)

discovery_rows = []
for scenario_name, df in scenario_candidates.items():
    discovery_count = int(len(df)) if df is not None else 0
    assertion_count = int(assert_count_map.get(scenario_name, 0))
    note = 'ok'
    if assertion_count in (0, 1) and discovery_count > 1:
        note = 'assertion is existence-style; discovery shows sampled rows'
    discovery_rows.append({
        'scenario_name': scenario_name,
        'discovery_rows': discovery_count,
        'assertion_candidates': assertion_count,
        'interpretation': note,
    })

comparison_df = pd.DataFrame(discovery_rows).sort_values('scenario_name')
display(comparison_df)

scenario_failures = scenario_assertions[scenario_assertions['status'] == 'FAIL']
print(f'Scenario assertion failures: {len(scenario_failures)}')


**Review guidance**
- Goal: verify each scenario class has candidate coverage in the current window (except explicitly informational checks).
- Important: assertion `candidates` values are not always full discovery counts; several tests are existence-oriented (e.g., `LIMIT 1`). Use the side-by-side table below for interpretation.

Unnamed: 0,test_name,status,failed_rows,candidates,explicit_candidates,derived_candidates,legacy_candidates
0,scenario_1_clean_success_no_swap_fps_gt_12_exists,PASS,0,1,,,
1,scenario_2_no_orchestrator_then_closed_exists,PASS,0,1,,,
2,scenario_3_success_with_swap_exists,PASS,0,2,2.0,2.0,2.0
3,scenario_4_success_with_param_updates_exists,PASS,0,0,,,
4,scenario_5_out_of_category_baseline_exists,PASS,0,1,,,


Unnamed: 0,scenario_name,discovery_rows,assertion_candidates,interpretation
0,scenario_1_clean_success_no_swap_fps_gt_12,3,1,assertion is existence-style; discovery shows ...
1,scenario_2_no_orchestrator_then_closed,3,1,assertion is existence-style; discovery shows ...
2,scenario_3_success_with_swap,2,2,ok
3,scenario_4_success_with_param_updates,0,0,ok
4,scenario_5_out_of_category_baseline,4,1,assertion is existence-style; discovery shows ...


Scenario assertion failures: 0


## Interactive Session Edge Explorer

Select a scenario candidate session and render raw event/edge timeline dynamically.

In [15]:
# Reactive session explorer: changing the dropdown refreshes all correlated outputs below.

from IPython.display import Markdown

# Build unique option keys to ensure widget change events always fire.
session_lookup = {}
session_options = []
for scenario_name, df in scenario_candidates.items():
    if df is None or df.empty or 'workflow_session_id' not in df.columns:
        continue
    for row_idx, row in df.reset_index(drop=True).iterrows():
        sid = str(row['workflow_session_id'])
        stream_id = str(row.get('stream_id', '') or '')
        request_id = str(row.get('request_id', '') or '')
        display_stream = stream_id if stream_id else '(no_stream_id)'
        display_request = request_id if request_id else '(no_request_id)'
        label = (
            f"{scenario_name} | row={row_idx} | stream={display_stream} | "
            f"request={display_request}"
        )
        key = f"{scenario_name}::{row_idx}::{sid}::{stream_id}::{request_id}"
        session_lookup[key] = {
            'scenario_name': str(scenario_name),
            'row_idx': int(row_idx),
            'sid': sid,
            'stream_id': stream_id,
            'request_id': request_id,
        }
        session_options.append((label, key))

session_picker = widgets.Dropdown(
    options=session_options,
    description='Session:',
    layout=widgets.Layout(width='95%')
)

reactive_out = {
    'diagnostics': widgets.Output(),
    'gpu_observed': widgets.Output(),
    'capability_corr': widgets.Output(),
    'timeline': widgets.Output(),
    'audit': widgets.Output(),
    'hourly': widgets.Output(),
}


def _selected_context() -> dict | None:
    selected_key = getattr(session_picker, 'value', None)
    if not selected_key:
        return None
    if selected_key in session_lookup:
        return session_lookup[selected_key]
    return None


def _selected_sid() -> str | None:
    ctx = _selected_context()
    if not ctx:
        return None
    return ctx['sid']


def _render_selected_session() -> None:
    selected_ctx = _selected_context()
    sid = selected_ctx['sid'] if selected_ctx else None
    for out in reactive_out.values():
        out.clear_output(wait=True)

    if not sid:
        with reactive_out['diagnostics']:
            print('No scenario candidates in current window.')
        return

    diagnostics_sql = """
    WITH
      {sid:String} AS sid,
      latest_session AS
      (
        SELECT
          argMax(version, version) AS latest_version,
          argMax(session_start_ts, version) AS session_start_ts,
          argMax(session_end_ts, version) AS session_end_ts,
          argMax(stream_id, version) AS stream_id,
          argMax(request_id, version) AS request_id,
          argMax(orchestrator_address, version) AS latest_orchestrator_address,
          argMax(known_stream, version) AS known_stream,
          argMax(startup_success, version) AS startup_success,
          argMax(startup_excused, version) AS startup_excused,
          argMax(startup_unexcused, version) AS startup_unexcused,
          argMax(swap_count, version) AS fact_swap_count,
          argMax(error_count, version) AS fact_error_count,
          argMax(excusable_error_count, version) AS fact_excusable_error_count
        FROM livepeer_analytics.fact_workflow_sessions
        WHERE workflow_session_id = sid
      ),
      trace_counts AS
      (
        SELECT
          countIf(trace_type = 'orchestrator_swap') AS explicit_swap_edges,
          uniqExactIf(orchestrator_address, orchestrator_address != '') AS trace_orchestrators_seen
        FROM livepeer_analytics.fact_stream_trace_edges
        WHERE workflow_session_id = sid
      ),
      typed_trace_counts AS
      (
        SELECT
          countIf(trace_type = 'orchestrator_swap') AS typed_explicit_swap_edges,
          uniqExactIf(orchestrator_address, orchestrator_address != '') AS typed_trace_orchestrators_seen
        FROM livepeer_analytics.stream_trace_events
        WHERE
        (
          (
            (SELECT request_id FROM latest_session) != ''
            AND (SELECT stream_id FROM latest_session) != ''
            AND request_id = (SELECT request_id FROM latest_session)
            AND stream_id = (SELECT stream_id FROM latest_session)
          )
          OR
          (
            (SELECT request_id FROM latest_session) != ''
            AND (SELECT stream_id FROM latest_session) = ''
            AND request_id = (SELECT request_id FROM latest_session)
          )
          OR
          (
            (SELECT request_id FROM latest_session) = ''
            AND (SELECT stream_id FROM latest_session) != ''
            AND stream_id = (SELECT stream_id FROM latest_session)
          )
        )
          AND event_timestamp >= {from_ts:DateTime64(3)}
          AND event_timestamp < {to_ts:DateTime64(3)}
      ),
      segment_counts AS
      (
        SELECT
          count() AS segment_rows,
          uniqExactIf(orchestrator_address, orchestrator_address != '') AS segment_orchestrators_seen
        FROM livepeer_analytics.fact_workflow_session_segments
        WHERE workflow_session_id = sid
      ),
      ai_counts AS
      (
        SELECT
          countIf(event_type = 'error') AS raw_error_events,
          countIf(event_type = 'params_update') AS raw_params_update_events
        FROM livepeer_analytics.ai_stream_events
        WHERE
        (
          (
            (SELECT request_id FROM latest_session) != ''
            AND (SELECT stream_id FROM latest_session) != ''
            AND request_id = (SELECT request_id FROM latest_session)
            AND stream_id = (SELECT stream_id FROM latest_session)
          )
          OR
          (
            (SELECT request_id FROM latest_session) != ''
            AND (SELECT stream_id FROM latest_session) = ''
            AND request_id = (SELECT request_id FROM latest_session)
          )
          OR
          (
            (SELECT request_id FROM latest_session) = ''
            AND (SELECT stream_id FROM latest_session) != ''
            AND stream_id = (SELECT stream_id FROM latest_session)
          )
        )
          AND event_timestamp >= {from_ts:DateTime64(3)}
          AND event_timestamp < {to_ts:DateTime64(3)}
      ),
      param_fact AS
      (
        SELECT count() AS fact_param_update_rows
        FROM livepeer_analytics.fact_workflow_param_updates
        WHERE workflow_session_id = sid
      )
    SELECT
      latest_session.latest_version,
      latest_session.session_start_ts,
      latest_session.session_end_ts,
      latest_session.stream_id,
      latest_session.request_id,
      latest_session.latest_orchestrator_address,
      latest_session.known_stream,
      latest_session.startup_success,
      latest_session.startup_excused,
      latest_session.startup_unexcused,
      latest_session.fact_swap_count,
      trace_counts.explicit_swap_edges,
      trace_counts.trace_orchestrators_seen,
      typed_trace_counts.typed_explicit_swap_edges,
      typed_trace_counts.typed_trace_orchestrators_seen,
      segment_counts.segment_rows,
      segment_counts.segment_orchestrators_seen,
      ai_counts.raw_error_events,
      latest_session.fact_error_count,
      latest_session.fact_excusable_error_count,
      ai_counts.raw_params_update_events,
      param_fact.fact_param_update_rows
    FROM latest_session
    CROSS JOIN trace_counts
    CROSS JOIN typed_trace_counts
    CROSS JOIN segment_counts
    CROSS JOIN ai_counts
    CROSS JOIN param_fact
    """

    gpu_observed_sql = """
    WITH
      {sid:String} AS sid,
      latest_session AS
      (
        SELECT
          argMax(stream_id, version) AS stream_id,
          argMax(request_id, version) AS request_id,
          argMax(orchestrator_address, version) AS orchestrator_address,
          argMax(pipeline, version) AS pipeline,
          argMax(model_id, version) AS model_id,
          argMax(gpu_id, version) AS gpu_id,
          argMax(session_start_ts, version) AS session_start_ts,
          argMax(session_end_ts, version) AS session_end_ts
        FROM livepeer_analytics.fact_workflow_sessions
        WHERE workflow_session_id = sid
      ),
      segment_gpus AS
      (
        SELECT
          groupUniqArrayIf(ifNull(gpu_id, ''), ifNull(gpu_id, '') != '') AS segment_gpu_ids,
          groupUniqArrayIf(ifNull(model_id, ''), ifNull(model_id, '') != '') AS segment_model_ids
        FROM livepeer_analytics.fact_workflow_session_segments
        WHERE workflow_session_id = sid
      ),
      param_gpus AS
      (
        SELECT
          groupUniqArrayIf(ifNull(gpu_id, ''), ifNull(gpu_id, '') != '') AS param_update_gpu_ids,
          groupUniqArrayIf(ifNull(model_id, ''), ifNull(model_id, '') != '') AS param_update_model_ids
        FROM livepeer_analytics.fact_workflow_param_updates
        WHERE workflow_session_id = sid
      )
    SELECT
      sid AS workflow_session_id,
      latest_session.stream_id,
      latest_session.request_id,
      latest_session.orchestrator_address,
      latest_session.pipeline,
      latest_session.model_id,
      latest_session.gpu_id AS session_gpu_id,
      (SELECT segment_gpu_ids FROM segment_gpus) AS segment_gpu_ids,
      (SELECT param_update_gpu_ids FROM param_gpus) AS param_update_gpu_ids,
      (SELECT segment_model_ids FROM segment_gpus) AS segment_model_ids,
      (SELECT param_update_model_ids FROM param_gpus) AS param_update_model_ids,
      latest_session.session_start_ts,
      latest_session.session_end_ts
    FROM latest_session
    """

    capability_corr_sql = """
    WITH
      {sid:String} AS sid,
      latest_session AS
      (
        SELECT
          argMax(orchestrator_address, version) AS orchestrator_address,
          argMax(pipeline, version) AS pipeline,
          argMax(model_id, version) AS model_id,
          argMax(session_start_ts, version) AS session_start_ts,
          argMax(session_end_ts, version) AS session_end_ts
        FROM livepeer_analytics.fact_workflow_sessions
        WHERE workflow_session_id = sid
      ),
      observed AS
      (
        SELECT
          groupUniqArrayIf(ifNull(gpu_id, ''), ifNull(gpu_id, '') != '') AS gpu_ids,
          groupUniqArrayIf(ifNull(model_id, ''), ifNull(model_id, '') != '') AS model_ids
        FROM livepeer_analytics.fact_workflow_session_segments
        WHERE workflow_session_id = sid
      )
    SELECT
      d.snapshot_ts,
      d.orchestrator_address,
      d.orchestrator_proxy_address,
      d.pipeline,
      d.model_id,
      d.gpu_id,
      d.gpu_name,
      d.runner_version,
      d.region,
      toUInt8(d.pipeline = (SELECT pipeline FROM latest_session)) AS pipeline_match,
      toUInt8(ifNull(d.model_id, '') = ifNull((SELECT model_id FROM latest_session), '') OR has((SELECT model_ids FROM observed), ifNull(d.model_id, ''))) AS model_match,
      toUInt8(has((SELECT gpu_ids FROM observed), ifNull(d.gpu_id, ''))) AS gpu_match
    FROM livepeer_analytics.dim_orchestrator_capability_snapshots d
    WHERE d.orchestrator_address = (SELECT orchestrator_address FROM latest_session)
      AND d.snapshot_ts >= (SELECT session_start_ts FROM latest_session) - INTERVAL 24 HOUR
      AND d.snapshot_ts <= coalesce((SELECT session_end_ts FROM latest_session), now64(3, 'UTC')) + INTERVAL 24 HOUR
      AND
      (
        d.pipeline = (SELECT pipeline FROM latest_session)
        OR ifNull(d.model_id, '') = ifNull((SELECT model_id FROM latest_session), '')
        OR has((SELECT model_ids FROM observed), ifNull(d.model_id, ''))
        OR has((SELECT gpu_ids FROM observed), ifNull(d.gpu_id, ''))
      )
    ORDER BY d.snapshot_ts DESC
    LIMIT 200
    """

    timeline_sql = """
    WITH
      {sid:String} AS sid,
      latest_session AS
      (
        SELECT
          argMax(stream_id, version) AS stream_id,
          argMax(request_id, version) AS request_id
        FROM livepeer_analytics.fact_workflow_sessions
        WHERE workflow_session_id = sid
      ),
      session_window AS
      (
        SELECT
          coalesce(argMax(session_start_ts, version), {from_ts:DateTime64(3)}) - INTERVAL 1 HOUR AS from_ts,
          coalesce(argMax(session_end_ts, version), argMax(session_start_ts, version), {to_ts:DateTime64(3)}) + INTERVAL 1 HOUR AS to_ts
        FROM livepeer_analytics.fact_workflow_sessions
        WHERE workflow_session_id = sid
      )
    SELECT
      ts,
      source,
      detail,
      stream_id,
      request_id,
      orchestrator_address,
      data_timestamp,
      raw_event_timestamp
    FROM
    (
      SELECT edge_ts AS ts, 'fact_stream_trace_edges' AS source, trace_type AS detail, stream_id, request_id, orchestrator_address,
             edge_ts AS data_timestamp, CAST(NULL AS Nullable(DateTime64(3, 'UTC'))) AS raw_event_timestamp
      FROM livepeer_analytics.fact_stream_trace_edges
      WHERE workflow_session_id = sid

      UNION ALL

      SELECT coalesce(data_timestamp, event_timestamp) AS ts, 'raw_stream_trace_events' AS source, trace_type AS detail,
             stream_id, request_id, orchestrator_address, data_timestamp, event_timestamp AS raw_event_timestamp
      FROM livepeer_analytics.stream_trace_events
      WHERE
      (
        (
          (SELECT request_id FROM latest_session) != ''
          AND (SELECT stream_id FROM latest_session) != ''
          AND request_id = (SELECT request_id FROM latest_session)
          AND stream_id = (SELECT stream_id FROM latest_session)
        )
        OR
        (
          (SELECT request_id FROM latest_session) != ''
          AND (SELECT stream_id FROM latest_session) = ''
          AND request_id = (SELECT request_id FROM latest_session)
        )
        OR
        (
          (SELECT request_id FROM latest_session) = ''
          AND (SELECT stream_id FROM latest_session) != ''
          AND stream_id = (SELECT stream_id FROM latest_session)
        )
      )
        AND event_timestamp >= (SELECT from_ts FROM session_window)
        AND event_timestamp < (SELECT to_ts FROM session_window)

      UNION ALL

      SELECT event_timestamp AS ts, 'raw_ai_stream_events' AS source, event_type AS detail,
             stream_id, request_id, '' AS orchestrator_address,
             CAST(NULL AS Nullable(DateTime64(3, 'UTC'))) AS data_timestamp,
             event_timestamp AS raw_event_timestamp
      FROM livepeer_analytics.ai_stream_events
      WHERE
      (
        (
          (SELECT request_id FROM latest_session) != ''
          AND (SELECT stream_id FROM latest_session) != ''
          AND request_id = (SELECT request_id FROM latest_session)
          AND stream_id = (SELECT stream_id FROM latest_session)
        )
        OR
        (
          (SELECT request_id FROM latest_session) != ''
          AND (SELECT stream_id FROM latest_session) = ''
          AND request_id = (SELECT request_id FROM latest_session)
        )
        OR
        (
          (SELECT request_id FROM latest_session) = ''
          AND (SELECT stream_id FROM latest_session) != ''
          AND stream_id = (SELECT stream_id FROM latest_session)
        )
      )
        AND event_timestamp >= (SELECT from_ts FROM session_window)
        AND event_timestamp < (SELECT to_ts FROM session_window)
    ) t
    ORDER BY ts
    """

    audit_sql = """
    WITH {sid:String} AS sid
    SELECT
      argMax(stream_id, version) AS stream_id,
      argMax(request_id, version) AS request_id,
      argMax(orchestrator_address, version) AS orchestrator_address,
      argMax(pipeline, version) AS pipeline,
      argMax(model_id, version) AS model_id,
      argMax(gpu_id, version) AS gpu_id,
      argMax(session_start_ts, version) AS session_start_ts,
      argMax(session_end_ts, version) AS session_end_ts
    FROM livepeer_analytics.fact_workflow_sessions
    WHERE workflow_session_id = sid
    """

    hourly_sql = """
    WITH
      {sid:String} AS sid,
      session_hour_keys AS
      (
        SELECT DISTINCT
          toStartOfInterval(sample_ts, INTERVAL 1 HOUR) AS window_start,
          ifNull(orchestrator_address, '') AS orchestrator_address,
          ifNull(pipeline, '') AS pipeline,
          ifNull(model_id, '') AS model_id,
          ifNull(gpu_id, '') AS gpu_id
        FROM
        (
          SELECT sample_ts, orchestrator_address, pipeline, model_id, gpu_id
          FROM livepeer_analytics.fact_stream_status_samples
          WHERE workflow_session_id = sid
          UNION ALL
          SELECT sample_ts, orchestrator_address, pipeline, model_id, gpu_id
          FROM livepeer_analytics.fact_workflow_latency_samples
          WHERE workflow_session_id = sid
        ) x
      )
    SELECT
      k.window_start,
      nullIf(k.orchestrator_address, '') AS orchestrator_address,
      nullIf(k.pipeline, '') AS pipeline,
      nullIf(k.model_id, '') AS model_id,
      nullIf(k.gpu_id, '') AS gpu_id,
      g.status_samples,
      g.avg_output_fps,
      g.prompt_to_first_frame_ms,
      g.startup_time_ms,
      g.e2e_latency_ms,
      g.known_sessions
    FROM session_hour_keys k
    LEFT JOIN livepeer_analytics.v_api_gpu_metrics g
      ON g.window_start = k.window_start
     AND ifNull(g.orchestrator_address, '') = k.orchestrator_address
     AND ifNull(g.pipeline, '') = k.pipeline
     AND ifNull(g.model_id, '') = k.model_id
     AND ifNull(g.gpu_id, '') = k.gpu_id
    ORDER BY k.window_start, k.orchestrator_address, k.pipeline, k.model_id, k.gpu_id
    """

    try:
        with reactive_out['diagnostics']:
            display(Markdown('### Session Diagnostics (auto-refreshed)'))
            if selected_ctx:
                print(
                    'Selected from scenario table:',
                    f"scenario={selected_ctx['scenario_name']}",
                    f"row={selected_ctx['row_idx']}",
                    f"stream_id={selected_ctx['stream_id']}",
                    f"request_id={selected_ctx['request_id']}",
                    f"workflow_session_id={selected_ctx['sid']}"
                )
            display(Markdown('- Goal: validate the selected session lifecycle summary and identity consistency.\n- Validate: stream/request/session IDs match picker, swap/error counters align with timeline evidence, and orchestrator presence is expected for this scenario.'))
            display(query_df(diagnostics_sql, {**params, 'sid': sid}))

        with reactive_out['gpu_observed']:
            display(Markdown('### Session GPU Correlation (Observed)'))
            display(Markdown('- Goal: validate GPU/model attribution carried in session, segment, and param-update facts.\n- Validate: `session_gpu_id`/model align with segment arrays; blanks indicate unattributed sessions or mapping gaps.'))
            display(query_df(gpu_observed_sql, {**params, 'sid': sid}))

        with reactive_out['capability_corr']:
            display(Markdown('### Session GPU Correlation (Capabilities)'))
            display(Markdown('- Goal: validate capability snapshots overlap the selected session context.\n- Validate: `pipeline_match`, `model_match`, and `gpu_match` flags; check recent `snapshot_ts` near session time for expected orchestrator.'))
            capability_corr_df = query_df(capability_corr_sql, {**params, 'sid': sid})
            print(f'Capability correlation rows: {len(capability_corr_df)}')
            display(capability_corr_df)

        with reactive_out['timeline']:
            display(Markdown('### Interactive Session Edge Explorer (Timeline)'))
            display(Markdown('- Goal: validate event ordering from raw traces to silver edges for the selected session.\n- Validate: expected lifecycle sequence, consistent stream/request IDs, and swap/error evidence where applicable.'))
            display(query_df(timeline_sql, {**params, 'sid': sid}).head(400))

        with reactive_out['audit']:
            display(Markdown('### Bronze -> Silver -> Gold Audit'))
            display(Markdown('- Goal: validate the canonical gold row keys produced for this session.\n- Validate: orchestrator/pipeline/model/gpu keys are populated as expected and match scenario intent.'))
            display(query_df(audit_sql, {**params, 'sid': sid}))

        with reactive_out['hourly']:
            display(Markdown('### Gold Row Drill-Down (Hourly GPU View)'))
            display(Markdown('- Goal: validate hourly GPU serving rows backing this session.\n- Validate: key join columns (`window_start`, orchestrator/pipeline/model/gpu) map to non-null metrics where attribution exists.'))
            display(query_df(hourly_sql, {**params, 'sid': sid}))
    except Exception as exc:
        with reactive_out['diagnostics']:
            print(f'Error rendering session outputs: {exc}')


def _on_session_change(change):
    if change.get('name') == 'value':
        _render_selected_session()


session_picker.observe(_on_session_change, names='value')

_display_items = [session_picker]
_display_items.extend([
    reactive_out['diagnostics'],
    reactive_out['gpu_observed'],
    reactive_out['capability_corr'],
    reactive_out['timeline'],
    reactive_out['audit'],
    reactive_out['hourly'],
])

display(widgets.VBox(_display_items))

# Initial render
_render_selected_session()



VBox(children=(Dropdown(description='Session:', layout=Layout(width='95%'), options=(('scenario_1_clean_succesâ€¦

## Session Diagnostics (Swaps, Errors, Param Updates)

This section compares confirmed and inferred swap signals for each session.

`swap_count` now tracks confirmed swaps only. Inferred orchestrator changes are tracked separately via `inferred_orchestrator_change_count`.
- confirmed swaps: explicit trace edge `orchestrator_swap`
- inferred changes: canonical orchestrator identity changes observed within the session

In [16]:
def resolve_selected_session() -> tuple[str | None, str | None]:
    if 'session_picker' in globals():
        selected = getattr(session_picker, 'value', None)
        if selected:
            if isinstance(selected, tuple) and len(selected) >= 3:
                scenario_name, row_idx, sid = selected[:3]
                return str(sid), f"interactive:{scenario_name}:row={row_idx}"
            return str(selected), 'interactive'

    for name, df in scenario_candidates.items():
        if not df.empty and 'workflow_session_id' in df.columns:
            return str(df.iloc[0]['workflow_session_id']), name

    fallback = query_df("""
    SELECT argMax(workflow_session_id, version) AS workflow_session_id
    FROM livepeer_analytics.fact_workflow_sessions
    WHERE session_start_ts >= {from_ts:DateTime64(3)}
      AND session_start_ts < {to_ts:DateTime64(3)}
    """, params)
    if fallback.empty or not fallback.iloc[0]['workflow_session_id']:
        return None, None
    return str(fallback.iloc[0]['workflow_session_id']), 'fallback_latest_session'

selected_session_id, selected_source = resolve_selected_session()
print('Selected session:', selected_session_id, 'from', selected_source)


Selected session: scenario_1_clean_success_no_swap_fps_gt_12::0::aiJobTesterStream-1772052186479599343|ebeb78b7::aiJobTesterStream-1772052186479599343::ebeb78b7 from interactive


In [17]:
if selected_session_id:
    diagnostics_sql = """
    WITH
      {sid:String} AS sid,
      latest_session AS
      (
        SELECT
          argMax(version, version) AS latest_version,
          argMax(session_start_ts, version) AS session_start_ts,
          argMax(session_end_ts, version) AS session_end_ts,
          argMax(stream_id, version) AS stream_id,
          argMax(request_id, version) AS request_id,
          argMax(orchestrator_address, version) AS latest_orchestrator_address,
          argMax(known_stream, version) AS known_stream,
          argMax(startup_success, version) AS startup_success,
          argMax(startup_excused, version) AS startup_excused,
          argMax(startup_unexcused, version) AS startup_unexcused,
          argMax(swap_count, version) AS fact_swap_count,
          argMax(error_count, version) AS fact_error_count,
          argMax(excusable_error_count, version) AS fact_excusable_error_count
        FROM livepeer_analytics.fact_workflow_sessions
        WHERE workflow_session_id = sid
      ),
      trace_counts AS
      (
        SELECT
          countIf(trace_type = 'orchestrator_swap') AS explicit_swap_edges,
          uniqExactIf(orchestrator_address, orchestrator_address != '') AS trace_orchestrators_seen
        FROM livepeer_analytics.fact_stream_trace_edges
        WHERE workflow_session_id = sid
      ),
      typed_trace_counts AS
      (
        SELECT
          countIf(trace_type = 'orchestrator_swap') AS typed_explicit_swap_edges,
          uniqExactIf(orchestrator_address, orchestrator_address != '') AS typed_trace_orchestrators_seen
        FROM livepeer_analytics.stream_trace_events
        WHERE
        (
          (
            (SELECT request_id FROM latest_session) != ''
            AND (SELECT stream_id FROM latest_session) != ''
            AND request_id = (SELECT request_id FROM latest_session)
            AND stream_id = (SELECT stream_id FROM latest_session)
          )
          OR
          (
            (SELECT request_id FROM latest_session) != ''
            AND (SELECT stream_id FROM latest_session) = ''
            AND request_id = (SELECT request_id FROM latest_session)
          )
          OR
          (
            (SELECT request_id FROM latest_session) = ''
            AND (SELECT stream_id FROM latest_session) != ''
            AND stream_id = (SELECT stream_id FROM latest_session)
          )
        )
          AND event_timestamp >= {from_ts:DateTime64(3)}
          AND event_timestamp < {to_ts:DateTime64(3)}
      ),
      segment_counts AS
      (
        SELECT
          count() AS segment_rows,
          uniqExactIf(orchestrator_address, orchestrator_address != '') AS segment_orchestrators_seen
        FROM livepeer_analytics.fact_workflow_session_segments
        WHERE workflow_session_id = sid
      ),
      ai_counts AS
      (
        SELECT
          countIf(event_type = 'error') AS raw_error_events,
          countIf(event_type = 'params_update') AS raw_params_update_events
        FROM livepeer_analytics.ai_stream_events
        WHERE
        (
          (
            (SELECT request_id FROM latest_session) != ''
            AND (SELECT stream_id FROM latest_session) != ''
            AND request_id = (SELECT request_id FROM latest_session)
            AND stream_id = (SELECT stream_id FROM latest_session)
          )
          OR
          (
            (SELECT request_id FROM latest_session) != ''
            AND (SELECT stream_id FROM latest_session) = ''
            AND request_id = (SELECT request_id FROM latest_session)
          )
          OR
          (
            (SELECT request_id FROM latest_session) = ''
            AND (SELECT stream_id FROM latest_session) != ''
            AND stream_id = (SELECT stream_id FROM latest_session)
          )
        )
          AND event_timestamp >= {from_ts:DateTime64(3)}
          AND event_timestamp < {to_ts:DateTime64(3)}
      ),
      param_fact AS
      (
        SELECT count() AS fact_param_update_rows
        FROM livepeer_analytics.fact_workflow_param_updates
        WHERE workflow_session_id = sid
      ),
      session_versions AS
      (
        SELECT
          count() AS session_rows_all_versions,
          uniqExactIf(orchestrator_address, orchestrator_address != '') AS session_orchestrators_seen_across_versions
        FROM livepeer_analytics.fact_workflow_sessions
        WHERE workflow_session_id = sid
      )
    SELECT
      latest_session.latest_version,
      latest_session.session_start_ts,
      latest_session.session_end_ts,
      latest_session.stream_id,
      latest_session.request_id,
      latest_session.latest_orchestrator_address,
      latest_session.known_stream,
      latest_session.startup_success,
      latest_session.startup_excused,
      latest_session.startup_unexcused,
      latest_session.fact_swap_count,
      trace_counts.explicit_swap_edges,
      trace_counts.trace_orchestrators_seen,
      typed_trace_counts.typed_explicit_swap_edges,
      typed_trace_counts.typed_trace_orchestrators_seen,
      segment_counts.segment_rows,
      segment_counts.segment_orchestrators_seen,
      ai_counts.raw_error_events,
      latest_session.fact_error_count,
      latest_session.fact_excusable_error_count,
      ai_counts.raw_params_update_events,
      param_fact.fact_param_update_rows,
      session_versions.session_rows_all_versions,
      session_versions.session_orchestrators_seen_across_versions
    FROM latest_session
    CROSS JOIN trace_counts
    CROSS JOIN typed_trace_counts
    CROSS JOIN segment_counts
    CROSS JOIN ai_counts
    CROSS JOIN param_fact
    CROSS JOIN session_versions
    """

    diagnostics_df = query_df(diagnostics_sql, {**params, 'sid': selected_session_id})
    display(diagnostics_df)
else:
    print('No session available in this window.')


Unnamed: 0,latest_version,session_start_ts,session_end_ts,stream_id,request_id,latest_orchestrator_address,known_stream,startup_success,startup_excused,startup_unexcused,...,typed_trace_orchestrators_seen,segment_rows,segment_orchestrators_seen,raw_error_events,fact_error_count,fact_excusable_error_count,raw_params_update_events,fact_param_update_rows,session_rows_all_versions,session_orchestrators_seen_across_versions
0,0,1970-01-01,,,,,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


## Session GPU Correlation (Stream -> Session -> Capability)
For the selected session, this section shows:
- GPU IDs observed in session facts/segments/param updates
- capability correlation candidates using orchestrator + pipeline/model


In [18]:
# This section now mirrors the reactive dropdown outputs from the Interactive Session Edge Explorer.
if 'reactive_out' in globals() and isinstance(reactive_out, dict):
    display(reactive_out.get('gpu_observed'))
    display(reactive_out.get('capability_corr'))
else:
    print('Run the "Interactive Session Edge Explorer" cell first to enable reactive outputs.')



Output(outputs=({'output_type': 'display_data', 'data': {'text/plain': '<IPython.core.display.Markdown object>â€¦

Output(outputs=({'output_type': 'display_data', 'data': {'text/plain': '<IPython.core.display.Markdown object>â€¦

In [19]:
if selected_session_id:
    timeline_sql = """
    WITH
      {sid:String} AS sid,
      latest_session AS
      (
        SELECT
          argMax(stream_id, version) AS stream_id,
          argMax(request_id, version) AS request_id
        FROM livepeer_analytics.fact_workflow_sessions
        WHERE workflow_session_id = sid
      ),
      session_window AS
      (
        SELECT
          coalesce(argMax(session_start_ts, version), {from_ts:DateTime64(3)}) - INTERVAL 1 HOUR AS from_ts,
          coalesce(argMax(session_end_ts, version), argMax(session_start_ts, version), {to_ts:DateTime64(3)}) + INTERVAL 1 HOUR AS to_ts
        FROM livepeer_analytics.fact_workflow_sessions
        WHERE workflow_session_id = sid
      )
    SELECT
      ts,
      source,
      detail,
      stream_id,
      request_id,
      orchestrator_address,
      data_timestamp,
      raw_event_timestamp
    FROM
    (
      SELECT
        edge_ts AS ts,
        'fact_stream_trace_edges' AS source,
        trace_type AS detail,
        stream_id,
        request_id,
        orchestrator_address,
        edge_ts AS data_timestamp,
        CAST(NULL AS Nullable(DateTime64(3, 'UTC'))) AS raw_event_timestamp
      FROM livepeer_analytics.fact_stream_trace_edges
      WHERE workflow_session_id = sid

      UNION ALL

      SELECT
        coalesce(data_timestamp, event_timestamp) AS ts,
        'raw_stream_trace_events' AS source,
        trace_type AS detail,
        stream_id,
        request_id,
        orchestrator_address,
        data_timestamp,
        event_timestamp AS raw_event_timestamp
      FROM livepeer_analytics.stream_trace_events
      WHERE
      (
        (
          (SELECT request_id FROM latest_session) != ''
          AND (SELECT stream_id FROM latest_session) != ''
          AND request_id = (SELECT request_id FROM latest_session)
          AND stream_id = (SELECT stream_id FROM latest_session)
        )
        OR
        (
          (SELECT request_id FROM latest_session) != ''
          AND (SELECT stream_id FROM latest_session) = ''
          AND request_id = (SELECT request_id FROM latest_session)
        )
        OR
        (
          (SELECT request_id FROM latest_session) = ''
          AND (SELECT stream_id FROM latest_session) != ''
          AND stream_id = (SELECT stream_id FROM latest_session)
        )
      )
        AND event_timestamp >= (SELECT from_ts FROM session_window)
        AND event_timestamp < (SELECT to_ts FROM session_window)

      UNION ALL

      SELECT
        event_timestamp AS ts,
        'raw_ai_stream_events' AS source,
        event_type AS detail,
        stream_id,
        request_id,
        '' AS orchestrator_address,
        CAST(NULL AS Nullable(DateTime64(3, 'UTC'))) AS data_timestamp,
        event_timestamp AS raw_event_timestamp
      FROM livepeer_analytics.ai_stream_events
      WHERE
      (
        (
          (SELECT request_id FROM latest_session) != ''
          AND (SELECT stream_id FROM latest_session) != ''
          AND request_id = (SELECT request_id FROM latest_session)
          AND stream_id = (SELECT stream_id FROM latest_session)
        )
        OR
        (
          (SELECT request_id FROM latest_session) != ''
          AND (SELECT stream_id FROM latest_session) = ''
          AND request_id = (SELECT request_id FROM latest_session)
        )
        OR
        (
          (SELECT request_id FROM latest_session) = ''
          AND (SELECT stream_id FROM latest_session) != ''
          AND stream_id = (SELECT stream_id FROM latest_session)
        )
      )
        AND event_timestamp >= (SELECT from_ts FROM session_window)
        AND event_timestamp < (SELECT to_ts FROM session_window)
    ) t
    ORDER BY ts
    """

    timeline_df = query_df(timeline_sql, {**params, 'sid': selected_session_id})
    display(timeline_df.head(400))
else:
    print('No session available in this window.')


## Bronze -> Silver -> Gold Audit (Serving Validation)
For the selected session, this section traces counts and time bounds from bronze/raw events to silver facts and gold API views.
Use it to quickly verify that serving rows are grounded in source data.


In [20]:
# This section now mirrors the reactive dropdown output from the Interactive Session Edge Explorer.
if 'reactive_out' in globals() and isinstance(reactive_out, dict):
    display(reactive_out.get('audit'))
else:
    print('Run the "Interactive Session Edge Explorer" cell first to enable reactive outputs.')



Output(outputs=({'output_type': 'display_data', 'data': {'text/plain': '<IPython.core.display.Markdown object>â€¦

## Gold Row Drill-Down (Hourly GPU View)
Pick a `v_api_gpu_metrics` hourly row for the selected session key and inspect the underlying silver inputs side-by-side.


In [21]:
# This section now mirrors the reactive dropdown output from the Interactive Session Edge Explorer.
if 'reactive_out' in globals() and isinstance(reactive_out, dict):
    display(reactive_out.get('hourly'))
else:
    print('Run the "Interactive Session Edge Explorer" cell first to enable reactive outputs.')



Output(outputs=({'output_type': 'display_data', 'data': {'text/plain': '<IPython.core.display.Markdown object>â€¦

## CLI Harness Commands

Run the same checks outside notebook:

```bash
python tests/python/scripts/run_clickhouse_query_pack.py --lookback-hours 24
python tests/python/scripts/run_clickhouse_data_tests.py --sql-file tests/integration/sql/assertions_pipeline.sql --lookback-hours 24
python tests/python/scripts/run_clickhouse_data_tests.py --sql-file tests/integration/sql/assertions_scenario_candidates.sql --lookback-hours 720
```

Export production fixtures for your four scenarios:

```bash
python tests/python/scripts/export_scenario_fixtures.py \
  --from-ts 2026-01-01T00:00:00Z \
  --to-ts 2026-02-16T00:00:00Z \
  --limit-per-scenario 3
```