# Post-run telemetry ETL

Merge action/message cognitive traces with MetricTracker exports to audit steering coverage by trait and alpha bucket. Configure the paths below to point at a simulation dump (Parquet folders) and the associated `metrics/` tracker outputs.

## Paths

Set `DUMP_DIR` to the folder containing `actions/`, `messages/`, and `metrics_snapshots/` Parquet dumps. If the `metrics/` directory lives elsewhere (e.g., a shared tracker output cache), override `METRIC_DIR` as well.

In [None]:
from __future__ import annotations

import hashlib
import json
from pathlib import Path
from typing import Dict, Iterable, Optional, Tuple

import pandas as pd
import pyarrow.parquet as pq

In [None]:
# Root folders for a single run.
DUMP_DIR = Path("artifacts/demo_dump")
METRIC_DIR = DUMP_DIR / "metrics"

## Loaders

Helper utilities to load Parquet shards, flatten nested fields, and pick up the MetricTracker JSONL summary (plus its companion Parquet aggregates if present).

In [None]:
def _read_parquet_dir(directory: Path) -> pd.DataFrame:
    '''Load every Parquet file in a directory into a single DataFrame.'''
    frames = []
    if not directory.exists():
        return pd.DataFrame()
    for file_path in sorted(directory.glob('*.parquet')):
        frames.append(pq.read_table(file_path).to_pandas())
    if not frames:
        return pd.DataFrame()
    return pd.concat(frames, ignore_index=True)


def load_action_logs(dump_dir: Path) -> pd.DataFrame:
    raw = _read_parquet_dir(dump_dir / 'actions')
    if raw.empty:
        return raw
    df = raw.copy()
    # Preserve the raw JSON blobs for joins while also flattening.
    df['info_raw'] = df.get('info')
    df['plan_metadata_raw'] = df.get('plan_metadata')
    info_df = pd.json_normalize(df.pop('info'), sep='.').add_prefix('info.')
    plan_df = pd.json_normalize(df.pop('plan_metadata'), sep='.').add_prefix('plan_metadata.')
    return pd.concat([df.reset_index(drop=True), info_df, plan_df], axis=1)


def load_message_logs(dump_dir: Path) -> pd.DataFrame:
    return _read_parquet_dir(dump_dir / 'messages')


def load_metric_snapshots(dump_dir: Path) -> pd.DataFrame:
    return _read_parquet_dir(dump_dir / 'metrics_snapshots')


def load_metric_tracker(metrics_dir: Path):
    '''Return (agent_metrics, trait_metrics, summary_dict).'''
    if not metrics_dir.exists():
        return pd.DataFrame(), None, None
    jsonl_candidates = sorted(metrics_dir.glob('run_*.jsonl'))
    summary = None
    if jsonl_candidates:
        with jsonl_candidates[-1].open('r', encoding='utf-8') as handle:
            first_line = handle.readline().strip()
            if first_line:
                try:
                    payload = json.loads(first_line)
                    summary = payload.get('summary')
                except json.JSONDecodeError:
                    summary = None
    agent_parquet = None
    trait_parquet = None
    parquet_agents = metrics_dir.glob('run_*_agents.parquet')
    parquet_traits = metrics_dir.glob('run_*_trait_aggregates.parquet')
    for path in parquet_agents:
        agent_parquet = pd.read_parquet(path)
    for path in parquet_traits:
        trait_parquet = pd.read_parquet(path)
    return agent_parquet if agent_parquet is not None else pd.DataFrame(), trait_parquet, summary

## Cognitive-trace joins

Line up action and message logs so every decision is paired with the steering snapshot that actually hit the model. We prefer `prompt_hash` when present and fall back to a deterministic signature of the plan metadata to disambiguate multi-action ticks.

In [None]:
COOP_ACTIONS = {'talk', 'work', 'gift', 'research', 'cite', 'submit_report', 'scan'}


def _normalize_snapshot(snapshot: object) -> Dict[str, float]:
    if isinstance(snapshot, str):
        try:
            snapshot = json.loads(snapshot)
        except json.JSONDecodeError:
            return {}
    return snapshot or {}


def _plan_signature(plan_metadata: object) -> Optional[str]:
    if isinstance(plan_metadata, dict):
        try:
            return json.dumps(plan_metadata, sort_keys=True)
        except TypeError:
            return None
    return None


def align_actions_messages(actions: pd.DataFrame, messages: pd.DataFrame) -> pd.DataFrame:
    if actions.empty or messages.empty:
        return pd.DataFrame()
    work = actions.copy()
    work['plan_signature'] = work.get('plan_metadata_raw', pd.Series()).apply(_plan_signature)
    work['join_basis'] = work.apply(
        lambda row: 'prompt_hash'
        if pd.notna(row.get('prompt_hash'))
        else ('plan_metadata' if row.get('plan_signature') else 'tick_agent'),
        axis=1,
    )
    merged = pd.merge(
        work,
        messages,
        left_on=['run_id', 'tick', 'agent_id'],
        right_on=['run_id', 'tick', 'from_agent'],
        suffixes=('_action', '_msg'),
        how='inner',
    )
    merged['steering_match'] = merged.apply(
        lambda row: _normalize_snapshot(row.get('info.steering_snapshot'))
        == _normalize_snapshot(row.get('steering_snapshot')),
        axis=1,
    )
    return merged

## Metric rollups

Macro metrics come from `metrics_snapshots/` (trait cohorts already baked in). We weight each tick by the share of actions contributed by a given alpha bucket to capture steering intensity alongside the trait band views.

In [None]:
def trait_band_macro(metrics_df: pd.DataFrame) -> pd.DataFrame:
    if metrics_df.empty:
        return pd.DataFrame()
    fields = ['cooperation_rate', 'gini_wealth', 'polarization_modularity']
    return (
        metrics_df.dropna(subset=['trait_key'])
        .groupby('trait_key')[fields]
        .mean()
        .reset_index()
    )


def alpha_bucket_macro(actions_df: pd.DataFrame, metrics_df: pd.DataFrame) -> pd.DataFrame:
    if actions_df.empty or metrics_df.empty:
        return pd.DataFrame()
    action_buckets = (
        actions_df.dropna(subset=['info.trait_key', 'info.alpha_bucket'])
        .groupby(['tick', 'info.trait_key', 'info.alpha_bucket'])
        .size()
        .reset_index(name='action_count')
    )
    action_buckets['tick_total'] = action_buckets.groupby(['tick', 'info.trait_key'])['action_count'].transform('sum')
    action_buckets['weight'] = action_buckets['action_count'] / action_buckets['tick_total']

    weighted = metrics_df.merge(
        action_buckets,
        left_on=['tick', 'trait_key'],
        right_on=['tick', 'info.trait_key'],
        how='inner',
    )
    if weighted.empty:
        return pd.DataFrame()

    for col in ['cooperation_rate', 'gini_wealth', 'polarization_modularity']:
        weighted[f'weighted::{col}'] = weighted[col] * weighted['weight']

    aggregated = (
        weighted.groupby(['info.alpha_bucket', 'trait_key'])[
            ['weighted::cooperation_rate', 'weighted::gini_wealth', 'weighted::polarization_modularity', 'weight']
        ]
        .sum()
        .reset_index()
    )
    aggregated.rename(columns={'info.alpha_bucket': 'alpha_bucket'}, inplace=True)
    aggregated['cooperation_rate'] = aggregated['weighted::cooperation_rate'] / aggregated['weight']
    aggregated['gini_wealth'] = aggregated['weighted::gini_wealth'] / aggregated['weight']
    aggregated['polarization_modularity'] = aggregated['weighted::polarization_modularity'] / aggregated['weight']
    return aggregated[['alpha_bucket', 'trait_key', 'cooperation_rate', 'gini_wealth', 'polarization_modularity']]

## Run the ETL

Execute this block after pointing the paths at a real run. The outputs include

- the per-action steering alignment join,
- trait band macro metrics, and
- alpha-bucket-weighted macro metrics.

In [None]:
actions = load_action_logs(DUMP_DIR)
messages = load_message_logs(DUMP_DIR)
metrics_snapshots = load_metric_snapshots(DUMP_DIR)
agent_metrics, trait_metrics, tracker_summary = load_metric_tracker(METRIC_DIR)

print(f'actions: {len(actions)}, messages: {len(messages)}, metrics_snapshots: {len(metrics_snapshots)}')
if tracker_summary:
    print('metric tracker summary keys:', sorted(tracker_summary.keys()))

joined = align_actions_messages(actions, messages)
print(f'aligned rows: {len(joined)}, steering agreement: {joined["steering_match"].mean() if not joined.empty else 0:.3f}')

band_view = trait_band_macro(metrics_snapshots)
alpha_view = alpha_bucket_macro(actions, metrics_snapshots)

band_view

In [None]:
alpha_view