# ATUS Hierarchical Baseline Experiments - HPC Version

## Overview

This notebook runs the ATUS (American Time Use Survey) hierarchical baseline experiments safely on HPC systems. It includes 7 individual experiment rungs (R1-R6) that can be run independently.

## Experiment Structure

- **R1**: Region only
- **R2**: Region + Sex
- **R3**: Region + Employment
- **R4**: Region + Day Type
- **R5**: Region + Household Size Band
- **R6**: Region + Quarter
- **R7**: Region, Sex, Employment, Day Type, HH Size band, Quarter
Full routing model (Employment + Day Type + HH Size + Sex + Region + Quarter)

- all models run with and without hazard

## How to Use This Notebook

1. **Run Setup Cells**: Execute cells 1-3 to import libraries and set up the environment
2. **Check System Resources**: Run cell 4 to verify your HPC node has sufficient resources
3. **Run Individual Experiments**: Execute cells 5-11 one at a time for each rung (R1-R7)
4. **Monitor Progress**: Each cell will show detailed progress and can be interrupted safely
5. **Resume if Needed**: If interrupted, you can restart from any cell - completed experiments won't be re-run

## Expected Runtime

- **R1-R4**: 30-60 minutes each
- **R5-R6**: 60-120 minutes each  
- **R7**: 120-180 minutes (includes hazard model)
- **Total**: 6-12 hours for all experiments


## Resource Requirements

- **Memory**: At least 16GB RAM recommended
- **Storage**: At least 20GB free disk space
- **CPU**: Multi-core recommended for faster processing

## Import Required Libraries

In [1]:
# Import required libraries
from __future__ import annotations
import os
import sys
import subprocess
import time
import json
import psutil
import pandas as pd
from pathlib import Path
from datetime import datetime
import gc
import logging
import json
from typing import Optional, Dict, Tuple, List

# psutil is optional; handle gracefully
try:
    import psutil
except Exception:
    psutil = None

import numpy as np
import pandas as pd

import numpy as np

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

print("Libraries imported successfully")
print(f"Python version: {sys.version}")
# Set working directory
os.chdir('/ztank/scratch/user/u.rd143338/atus_analysis-main')

print(f"Working directory: {os.getcwd()}")

Libraries imported successfully
Python version: 3.11.5 (main, Sep 11 2023, 13:54:46) [GCC 11.2.0]
Working directory: /ztank/scratch/user/u.rd143338/atus_analysis-main


## Define Experiment Configuration

In [2]:
# Experiment configuration
RUNG_SPECS = {
    "R1": "region",
    "R2": "region,sex", 
    "R3": "region,employment",
    "R4": "region,day_type",
    "R5": "region,hh_size_band",
    "R6": "region, quarter",
    "R7": "employment,day_type,hh_size_band,sex,region,quarter",  # + hazard
    
    
    "R8":"sex",
    "R9":"employment",
    "R10":"day_type",
    "R11":"hh_size_band",
    "R12":"quarter",
    
    "R14": "employment,sex", 
    "R15": "employment,day_type",
    "R16": "employment, hh_size_band",
    "R17": "employment, quarter",
    
    "R18": "sex, day_type",
    "R19": "sex, hh_size_band",
    "R20": "sex, quarter",
    
    "R21": "day_type, hh_size_band",
    "R22": "day_type, quarter",
    "R23": "quarter, hh_size_band",
}

# File paths (adjust if needed)
BASE_DIR = Path(".")
SEQUENCES_FILE = "atus_analysis/data/sequences/markov_sequences.parquet"
SUBGROUPS_FILE = "atus_analysis/data/processed/subgroups.parquet"
OUTPUT_DIR = Path("atus_analysis/data/models")
PROGRESS_FILE = "experiment_progress_jupyter.json"

# Experiment settings
SEED = 2025
TEST_SIZE = 0.2
TIME_BLOCKS = "night:0-5,morning:6-11,afternoon:12-17,evening:18-23"
DWELL_BINS = "1,2,3,4,6,9,14,20,30"

# 144-slot pooling strength (keep consistent everywhere)
KAPPA_SLOT = 50

# NEW: how many CPU threads to use (set to your liking)
NUM_THREADS = min( (os.cpu_count() or 1), 170 )   # e.g., cap at 40

print("Configuration set")
print(f"Output directory: {OUTPUT_DIR}")
print(f"Number of rungs: {len(RUNG_SPECS)}")
print(f"Threads: {NUM_THREADS}")


Configuration set
Output directory: atus_analysis/data/models
Number of rungs: 22
Threads: 170


## Define Helper Functions

In [3]:


# Common-hier APIs (ensure you've replaced common_hier.py with the fixed version)
from atus_analysis.scripts.common_hier import (
    prepare_long_with_groups, pool_rare_quarter, parse_time_blocks,
    fit_b1_hier, nll_b1, save_json,
)


def _require_globals():
    missing = []
    for name in ["SEQUENCES_FILE","SUBGROUPS_FILE","OUTPUT_DIR","TIME_BLOCKS",
                 "DWELL_BINS","RUNG_SPECS","SEED","TEST_SIZE","KAPPA_SLOT"]:
        if name not in globals():
            missing.append(name)
    if missing:
        raise RuntimeError(f"Define these globals before running: {', '.join(missing)}")

def check_system_resources():
    """Check current system resources (non-fatal)."""
    print("=== System Resources ===")
    if psutil is None:
        print("psutil not available; skipping detailed checks.")
        return True

    memory = psutil.virtual_memory()
    cpu_percent = psutil.cpu_percent(interval=1)

    print(f"Memory: {memory.percent:.1f}% used, {memory.available / (1024**3):.1f}GB available")
    print(f"CPU: {cpu_percent:.1f}% usage")
    try:
        disk = psutil.disk_usage('.')
        print(f"Disk: {disk.free / (1024**3):.1f}GB free")
    except Exception:
        print("Disk: Could not check disk usage")

    warnings = []
    if memory.available < 4 * (1024**3):
        warnings.append(f"Low memory: only {memory.available / (1024**3):.1f}GB available")
    if cpu_percent > 80:
        warnings.append(f"High CPU usage: {cpu_percent:.1f}%")

    if warnings:
        print("\nWARNINGS:")
        for warning in warnings:
            print(f"  - {warning}")
    else:
        print("\n✓ System resources look good")

    return True  # non-fatal info

def _progress_file() -> Path:
    pf = globals().get("PROGRESS_FILE", None)
    if pf is None:
        # default under OUTPUT_DIR if available, else CWD
        if "OUTPUT_DIR" in globals():
            return Path(OUTPUT_DIR) / "progress.json"
        return Path.cwd() / "progress.json"
    return Path(pf)  # <-- ensure Path

def load_progress():
    """Load experiment progress from file."""
    pf = _progress_file()
    if pf.exists():
        with open(pf, 'r') as f:
            return json.load(f)
    return {'completed_rungs': [], 'failed_rungs': [], 'session_start': datetime.now().isoformat()}

def save_progress(progress):
    """Save experiment progress to file."""
    pf = _progress_file()
    progress['last_updated'] = datetime.now().isoformat()
    pf.parent.mkdir(parents=True, exist_ok=True)
    with open(pf, 'w') as f:
        json.dump(progress, f, indent=2)

def is_rung_completed(rung, progress):
    """Check if a rung has been completed successfully."""
    return rung in progress.get('completed_rungs', [])

def _make_or_load_stratified_split(long_df: pd.DataFrame, split_path: Path, test_size: float, seed: int) -> pd.DataFrame:
    """
    Create or load a respondent-level train/test split STRATIFIED by group_key,
    matching the CLI baseline scripts.
    """
    if split_path.exists():
        split = pd.read_parquet(split_path)[["TUCASEID","set"]]
        if not {"TUCASEID","set"}.issubset(split.columns):
            raise RuntimeError(f"Split file {split_path} missing required columns.")
        print(f"✓ Loaded existing split from {split_path}")
        return split

    rng = np.random.RandomState(seed)
    meta = long_df[["TUCASEID","group_key"]].drop_duplicates().copy()
    meta["rand"] = rng.rand(len(meta))
    meta["set"] = "train"

    total_test = 0
    for _, grp in meta.groupby("group_key"):
        n_test = int(round(test_size * len(grp)))
        if n_test:
            take = grp.sort_values("rand").head(n_test).index
            meta.loc[take, "set"] = "test"
            total_test += n_test

    out = meta[["TUCASEID","set"]]
    split_path.parent.mkdir(parents=True, exist_ok=True)
    out.to_parquet(split_path, index=False)
    print(f"✓ Created stratified split and saved to {split_path} "
          f"({(out['set']=='test').sum()} test / {len(out)-(out['set']=='test').sum()} train)")
    return out

def run_baseline1_hier_direct(rung, groupby, output_dir: Path, split_path: Path):
    """
    Direct (in-notebook) B1-H run that ALWAYS computes 144 per-slot (10-min) matrices.
    Writes:
      - b1h_model.json
      - b1h_slot_mats.npz  (per-slot matrices, arrays g0, g1, ...)
      - eval_b1h.json
    """
    _require_globals()
    try:
        # Optional threads limiter
        try:
            from threadpoolctl import threadpool_limits
        except Exception:
            threadpool_limits = None

        print(f"Loading data for {rung} ...")
        sequences = pd.read_parquet(SEQUENCES_FILE)
        subgroups = pd.read_parquet(SUBGROUPS_FILE)

        # Parse time blocks & group columns
        blocks = parse_time_blocks(TIME_BLOCKS)
        groupby_cols = [c.strip() for c in groupby.split(",") if c.strip()]

        # Derive quarter if needed
        if "quarter" in groupby_cols and "quarter" not in subgroups.columns:
            try:
                from __main__ import add_quarter_column
                subgroups = add_quarter_column(subgroups)
            except Exception:
                if "month" in subgroups.columns:
                    month_num = pd.to_numeric(subgroups["month"], errors="coerce")
                    qmap = {1:"Q1",2:"Q1",3:"Q1",4:"Q2",5:"Q2",6:"Q2",7:"Q3",8:"Q3",9:"Q3",10:"Q4",11:"Q4",12:"Q4"}
                    subgroups = subgroups.copy()
                    subgroups["quarter"] = month_num.map(qmap).fillna("Unknown")
                    print("quarter column derived from month (fallback)")
                else:
                    print("quarter requested but neither 'quarter' nor 'month' present; continuing without it")

        # Pool rare quarter cells if desired (0.0 == no-op)
        subgroups = pool_rare_quarter(subgroups, groupby_cols, "TUFNWGTP", threshold=0.0)

        # Build long table with groups + blocks
        long_df = prepare_long_with_groups(
            sequences, subgroups, groupby_cols, "TUFNWGTP", blocks
        )

        # Train/test split (STRATIFIED by group_key, shared file)
        split_df = _make_or_load_stratified_split(long_df, split_path, TEST_SIZE, SEED)

        # Merge split and prepare train/test
        df = long_df.merge(split_df, on="TUCASEID", how="left")
        if df["set"].isna().any():
            missing = df[df["set"].isna()]["TUCASEID"].nunique()
            raise RuntimeError(f"{missing} respondents missing train/test assignment in split")

        train_df = df[df["set"] == "train"].copy()
        test_df  = df[df["set"] == "test"].copy()
        n_states = int(df["state_id"].max()) + 1

        # Threads for BLAS/OpenMP kernels
        num_threads = int(globals().get("NUM_THREADS", os.cpu_count() or 1))
        print(f"Using up to {num_threads} threads")

        ctx = (threadpool_limits(limits=num_threads) if threadpool_limits else nullcontext())
        # Provide nullcontext() if threadpoolctl is absent
        try:
            from contextlib import nullcontext
        except Exception:
            class nullcontext:
                def __enter__(self): return None
                def __exit__(self, *args): return False

        with ctx:
            b1_model = fit_b1_hier(
                train_df, n_states, "TUFNWGTP",
                tau_block=50.0, tau_group=20.0, add_k=1.0,
                compute_slot_mats=True,                 # always build 144 matrices
                kappa_slot=float(globals().get("KAPPA_SLOT", 100.0)),
                time_blocks_spec=TIME_BLOCKS
            )

        # ---- Save model and evaluation (with NPZ sidecar for slot matrices) ----
        out_json = output_dir / "b1h_model.json"
        slot_sidecar = output_dir / "b1h_slot_mats.npz"
        b1_model_to_save = dict(b1_model)  # shallow copy

        if "slot_matrices" in b1_model_to_save:
            # Convert dict{group_key: [144 × K×K]} → per-group arrays in NPZ
            slot_dict = b1_model_to_save.pop("slot_matrices")
            groups_order = b1_model_to_save.get("meta", {}).get("groups_order", list(slot_dict.keys()))
            arrays = {f"g{gi}": np.array(slot_dict[gk], dtype=np.float32) for gi, gk in enumerate(groups_order)}
            np.savez_compressed(slot_sidecar, **arrays)  # write compressed sidecar
            # reference sidecar in JSON
            b1_model_to_save["slot_matrices_npz"] = str(slot_sidecar)
            b1_model_to_save.setdefault("meta", {})["slot_sidecar"] = str(slot_sidecar)
            b1_model_to_save["meta"]["groups_order"] = groups_order

        save_json(b1_model_to_save, out_json)
        print(f"Saved model JSON to {out_json}")
        if slot_sidecar.exists():
            print(f"Saved slot matrices sidecar: {slot_sidecar}  ({slot_sidecar.stat().st_size/1024/1024:.1f} MB)")

        # Evaluate on test (use saved dict so sidecar path is available)
        eval_result = nll_b1(test_df, b1_model_to_save, n_states, "TUFNWGTP")
        save_json(eval_result, output_dir / "eval_b1h.json")
        print("B1-H (direct) complete:", eval_result)

        return True

    except Exception as e:
        print(f"Direct execution failed: {e}")
        print("Falling back to subprocess method...")
        return False

def run_baseline1_hier_subprocess(rung, groupby, output_dir: Path, split_path: Path):
    """Run baseline1_hier via subprocess (fallback method)."""
    _require_globals()
    cmd = [
        sys.executable, "-m", "atus_analysis.scripts.baseline1_hier",
        "--sequences", str(SEQUENCES_FILE),
        "--subgroups", str(SUBGROUPS_FILE),
        "--out_dir", str(output_dir),
        "--groupby", groupby,
        "--time_blocks", TIME_BLOCKS,
        "--seed", str(SEED),
        "--test_size", str(TEST_SIZE),
        "--split_path", str(split_path),
        "--kappa_slot", str(KAPPA_SLOT),  # ensure 144 slot matrices
    ]
    print(f"Running B1-H via subprocess for {rung}...")
    print("Command:", " ".join(cmd))

    # export BLAS/OpenMP thread counts to the child process
    env = os.environ.copy()
    num_threads = str(globals().get("NUM_THREADS", os.cpu_count() or 1))
    env.update({
        "OMP_NUM_THREADS": num_threads,
        "OPENBLAS_NUM_THREADS": num_threads,
        "MKL_NUM_THREADS": num_threads,
        "NUMEXPR_NUM_THREADS": num_threads,
        "VECLIB_MAXIMUM_THREADS": num_threads,
        "OMP_WAIT_POLICY": "PASSIVE",
    })

    try:
        process = subprocess.Popen(
            cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
            universal_newlines=True, bufsize=1, env=env
        )
        for line in process.stdout:
            print(line.rstrip())
        process.wait()
        if process.returncode == 0:
            print(f"B1-H model completed successfully for {rung}")
            return True
        else:
            print(f"B1-H model failed for {rung} (return code: {process.returncode})")
            return False
    except Exception as e:
        print(f"Subprocess execution failed: {e}")
        return False

def run_baseline1_hier(rung, groupby, output_dir: Path, split_path: Path):
    """Run baseline1_hier with automatic fallback."""
    use_sub = bool(globals().get('USE_SUBPROCESS', True))
    if not use_sub:
        print("Attempting direct execution...")
        if run_baseline1_hier_direct(rung, groupby, output_dir, split_path):
            return True
    print("Using subprocess execution...")
    return run_baseline1_hier_subprocess(rung, groupby, output_dir, split_path)

def run_baseline2_hier(rung, groupby, output_dir: Path, split_path: Path):
    """Run baseline2_hier (hazard model) for a rung."""
    _require_globals()
    b1h_path = output_dir / "b1h_model.json"

    cmd = [
        sys.executable, "-m", "atus_analysis.scripts.baseline2_hier",
        "--sequences", str(SEQUENCES_FILE),
        "--subgroups", str(SUBGROUPS_FILE),
        "--out_dir", str(output_dir),
        "--groupby", groupby,
        "--time_blocks", TIME_BLOCKS,
        "--dwell_bins", DWELL_BINS,
        "--seed", str(SEED),
        "--test_size", str(TEST_SIZE),
        "--split_path", str(split_path),
        "--b1h_path", str(b1h_path),
        "--kappa_slot", str(KAPPA_SLOT),  # ensure routing inside B2 has 144 slots too
    ]
    print(f"Running B2-H (hazard) model for {rung}...")
    print("Command:", " ".join(cmd))

    env = os.environ.copy()
    num_threads = str(globals().get("NUM_THREADS", os.cpu_count() or 1))
    env.update({
        "OMP_NUM_THREADS": num_threads,
        "OPENBLAS_NUM_THREADS": num_threads,
        "MKL_NUM_THREADS": num_threads,
        "NUMEXPR_NUM_THREADS": num_threads,
        "VECLIB_MAXIMUM_THREADS": num_threads,
        "OMP_WAIT_POLICY": "PASSIVE",
    })

    try:
        process = subprocess.Popen(
            cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
            universal_newlines=True, bufsize=1, env=env
        )
        for line in process.stdout:
            print(line.rstrip())
        process.wait()
        if process.returncode == 0:
            print(f"B2-H model completed successfully for {rung}")
            return True
        else:
            print(f"B2-H model failed for {rung} (return code: {process.returncode})")
            return False
    except Exception as e:
        print(f"B2-H execution failed: {e}")
        return False

def run_single_rung(rung, include_hazard=False):
    """Run a complete experiment for a single rung; B2-H auto-enabled for R7."""
    _require_globals()
    from contextlib import suppress

    start_time = time.time()

    print("\n" + "="*60)
    print(f"STARTING RUNG {rung}")
    print("="*60)

    progress = load_progress()
    if is_rung_completed(rung, progress):
        print(f"{rung} already completed - skipping")
        return True

    groupby = RUNG_SPECS[rung]
    output_dir = OUTPUT_DIR / rung
    output_dir.mkdir(parents=True, exist_ok=True)
    split_path = OUTPUT_DIR / "fixed_split.parquet"

    print(f" Rung: {rung}")
    print(f" Groupby: {groupby}")
    print(f" Output directory: {output_dir}")
    print(f" Include hazard: {include_hazard or rung == 'R7'}")

    print("\nChecking system resources...")
    with suppress(Exception):
        check_system_resources()

    try:
        # Step 1: B1-H
        print(f"\n---  Step 1: B1-H Model for {rung} ---")
        if not run_baseline1_hier(rung, groupby, output_dir, split_path):
            progress['failed_rungs'].append(rung)
            save_progress(progress)
            return False

        # Step 2: B2-H (if requested or rung is R7)
        if include_hazard or rung == "R7":
            print(f"\n---  Step 2: B2-H Model for {rung} ---")
            if not run_baseline2_hier(rung, groupby, output_dir, split_path):
                progress['failed_rungs'].append(rung)
                save_progress(progress)
                return False

        elapsed = time.time() - start_time
        print(f"\n{rung} COMPLETED SUCCESSFULLY in {elapsed:.1f} seconds ({elapsed/60:.1f} minutes)")

        progress['completed_rungs'].append(rung)
        progress[f'{rung}_completed_at'] = datetime.now().isoformat()
        progress[f'{rung}_duration_seconds'] = elapsed
        save_progress(progress)

        with suppress(Exception):
            gc.collect()
            print("Memory cleanup completed")
        return True

    except Exception as e:
        print(f"\n{rung} FAILED with exception: {e}")
        progress['failed_rungs'].append(rung)
        progress[f'{rung}_error'] = str(e)
        save_progress(progress)
        return False

print("Helper functions defined (144-slot matrices enabled; stratified split; NPZ sidecars).")


Helper functions defined (144-slot matrices enabled; stratified split; NPZ sidecars).


In [4]:
def add_quarter_column(subgroups_df):
    """
    Add quarter column to subgroups data if missing.
    
    This is needed for R6 and R7 experiments which use quarter in their groupby.
    Derives quarter from month: Q1=Jan-Mar, Q2=Apr-Jun, Q3=Jul-Sep, Q4=Oct-Dec
    """
    if 'quarter' in subgroups_df.columns:
        print("✓ Quarter column already exists")
        return subgroups_df
    
    if 'month' not in subgroups_df.columns:
        print(" Neither quarter nor month column found!")
        return subgroups_df
    
    print("  Adding quarter column from month data...")
    df = subgroups_df.copy()
    
    # Convert month to numeric if it's string
    month_numeric = pd.to_numeric(df['month'], errors='coerce')
    
    # Create quarter mapping: Q1=1-3, Q2=4-6, Q3=7-9, Q4=10-12
    quarter_map = {
        1: 'Q1', 2: 'Q1', 3: 'Q1',
        4: 'Q2', 5: 'Q2', 6: 'Q2', 
        7: 'Q3', 8: 'Q3', 9: 'Q3',
        10: 'Q4', 11: 'Q4', 12: 'Q4'
    }
    
    df['quarter'] = month_numeric.map(quarter_map).fillna('Unknown')
    
    print(f" Quarter column added. Distribution:")
    quarter_counts = df['quarter'].value_counts().sort_index()
    for quarter, count in quarter_counts.items():
        print(f"   {quarter}: {count:,} respondents")
    
    return df

## Cell 3b: Import Baseline Scripts Directly

Instead of calling external processes, we'll import the baseline scripts directly for better integration with Jupyter.

In [5]:


# Add the project root to Python path so we can import the modules
project_root = Path('.').resolve()
if str(project_root) not in sys.path:
    sys.path.insert(0, str(project_root))

try:
    # Import the baseline functions directly
    from atus_analysis.scripts.common_hier import (
        prepare_long_with_groups, pool_rare_quarter, 
        save_json, nll_b1, fit_b1_hier, parse_time_blocks
    )
    print(" Successfully imported baseline common functions")
    
    
except ImportError as e:
    print(f"Could not import baseline functions directly: {e}")
    print("Will fall back to subprocess calls")
    USE_SUBPROCESS = True
else:
    USE_SUBPROCESS = False

 Successfully imported baseline common functions


In [6]:
# Always use subprocesses (matches your script CLIs & avoids in-notebook state issues)
USE_SUBPROCESS = True
print("Subprocess mode:", USE_SUBPROCESS)

# sanity check the smoothing strength used for 144 slot matrices
print("KAPPA_SLOT =", KAPPA_SLOT)


Subprocess mode: True
KAPPA_SLOT = 50


## Check System Status and Prerequisites

In [7]:
print("Checking system status and prerequisites...\n")

# Check system resources
resources_ok = check_system_resources()

print("\n=== File Prerequisites ===")
# Check required files
required_files = [
    SEQUENCES_FILE,
    SUBGROUPS_FILE,
    "atus_analysis/scripts/baseline1_hier.py",
    "atus_analysis/scripts/baseline2_hier.py"
]

files_ok = True
for file_path in required_files:
    if Path(file_path).exists():
        print(f" {file_path}")
    else:
        print(f" {file_path} - NOT FOUND")
        files_ok = False

# Check output directory
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
print(f" Output directory: {OUTPUT_DIR}")

# Load and display current progress
print("\n=== Current Progress ===")
progress = load_progress()
completed = progress.get('completed_rungs', [])
failed = progress.get('failed_rungs', [])

print(f"Completed rungs: {completed if completed else 'None'}")
print(f"Failed rungs: {failed if failed else 'None'}")
print(f"Remaining rungs: {[r for r in RUNG_SPECS.keys() if r not in completed]}")

# Overall status
print("\n=== Overall Status ===")
if resources_ok and files_ok:
    print(" READY TO START EXPERIMENTS")
    print("\nYou can now run the individual experiment cells below.")
else:
    print("PREREQUISITES NOT MET")
    if not resources_ok:
        print("   - System resources may be insufficient")
    if not files_ok:
        print("   - Required files are missing")
    print("\nPlease resolve issues before continuing.")

Checking system status and prerequisites...

=== System Resources ===
Memory: 1.1% used, 746.9GB available
CPU: 0.0% usage
Disk: 1947898.3GB free

✓ System resources look good

=== File Prerequisites ===
 atus_analysis/data/sequences/markov_sequences.parquet
 atus_analysis/data/processed/subgroups.parquet
 atus_analysis/scripts/baseline1_hier.py
 atus_analysis/scripts/baseline2_hier.py
 Output directory: atus_analysis/data/models

=== Current Progress ===
Completed rungs: ['R1', 'R2', 'R3', 'R4', 'R5', 'R6', 'R7', 'R8', 'R9', 'R10', 'R11', 'R12']
Failed rungs: None
Remaining rungs: ['R14', 'R15', 'R16', 'R17', 'R18', 'R19', 'R20', 'R21', 'R22', 'R23']

=== Overall Status ===
 READY TO START EXPERIMENTS

You can now run the individual experiment cells below.


# Individual Experiment Cells

## Instructions for Running Experiments

**Run these cells ONE AT A TIME** in order. Each cell represents one complete experiment rung.

- ✅ **Safe to interrupt**: You can stop any cell with the stop button - progress is automatically saved
- 🔄 **Resume anytime**: If you restart the kernel, just re-run cells 1-4, then continue from where you left off
- ⏭️ **Skip completed**: Cells will automatically skip rungs that have already completed successfully
- 📊 **Monitor progress**: Each cell shows detailed progress and resource usage

---

## Cell 5: Run Experiment R1 (Region Only)

**Expected runtime: 30-60 minutes**  
**Memory usage: Low-Medium**  
**Description: Simplest model - groups by region only**

In [8]:
# Run R1 experiment
success = run_single_rung("R1", include_hazard=True)

if success:
    print("\n R1 completed successfully! You can now run R2.")
else:
    print("\n R1 failed. Check the error messages above and try again.")


STARTING RUNG R1
R1 already completed - skipping

 R1 completed successfully! You can now run R2.


## Cell 6: Run Experiment R2 (Region + Sex)

**Expected runtime: 30-60 minutes**  
**Memory usage: Low-Medium**  
**Description: Groups by region and sex**

In [9]:
# Run R2 experiment
success = run_single_rung("R2", include_hazard=True)

if success:
    print("\n R2 completed successfully! You can now run R3.")
else:
    print("\n R2 failed. Check the error messages above and try again.")


STARTING RUNG R2
R2 already completed - skipping

 R2 completed successfully! You can now run R3.


## Cell 7: Run Experiment R3 (Region + Employment)

**Expected runtime: 30-60 minutes**  
**Memory usage: Medium**  
**Description: Groups by region and employment status**

In [10]:
# Run R3 experiment
success = run_single_rung("R3", include_hazard=True)

if success:
    print("\n R3 completed successfully! You can now run R4.")
else:
    print("\n R3 failed. Check the error messages above and try again.")


STARTING RUNG R3
R3 already completed - skipping

 R3 completed successfully! You can now run R4.


## Cell 8: Run Experiment R4 (Region + Day Type)

**Expected runtime: 30-60 minutes**  
**Memory usage: Medium**  
**Description: Groups by region and day type (weekday/weekend)**

In [11]:
# Run R4 experiment
success = run_single_rung("R4", include_hazard=True)

if success:
    print("\n R4 completed successfully! You can now run R5.")
else:
    print("\n R4 failed. Check the error messages above and try again.")


STARTING RUNG R4
R4 already completed - skipping

 R4 completed successfully! You can now run R5.


## Cell 9: Run Experiment R5 (Region + Household Size)

**Expected runtime: 60-90 minutes**  
**Memory usage: Medium**  
**Description: Groups by region and household size band**

In [12]:
# Run R5 experiment
success = run_single_rung("R5", include_hazard=True)

if success:
    print("\n R5 completed successfully! You can now run R6.")
else:
    print("\n R5 failed. Check the error messages above and try again.")


STARTING RUNG R5
R5 already completed - skipping

 R5 completed successfully! You can now run R6.


## Cell 9b: Add Quarter Column to Subgroups File (Run Before R6)

**Important**: Run this cell before attempting R6 or R7 experiments.  
This will permanently add the quarter column to the subgroups.parquet file.  
**Runtime**: 1-2 minutes  
**Purpose**: Ensures R6 and R7 experiments have the required quarter column

In [13]:
# # Add quarter column permanently to subgroups.parquet file
# print("  Adding quarter column to subgroups.parquet file...")

# try:
#     # Load the current subgroups file
#     subgroups_path = Path(SUBGROUPS_FILE)
#     print(f" Loading subgroups from: {subgroups_path}")
    
#     if not subgroups_path.exists():
#         print(f" Subgroups file not found at {subgroups_path}")
#         print("Please ensure the file exists before running this cell.")
#     else:
#         # Load the data
#         subgroups_df = pd.read_parquet(subgroups_path)
#         print(f" Loaded {len(subgroups_df)} subgroup records")
#         print(f"Current columns: {list(subgroups_df.columns)}")
        
#         # Check if quarter column already exists
#         if 'quarter' in subgroups_df.columns:
#             print(" Quarter column already exists in the file!")
#             quarter_counts = subgroups_df['quarter'].value_counts().sort_index()
#             print("Current quarter distribution:")
#             for quarter, count in quarter_counts.items():
#                 print(f"   {quarter}: {count:,} respondents")
#         else:
#             print(" Quarter column not found - adding it now...")
            
#             # Add quarter column using our function
#             subgroups_with_quarter = add_quarter_column(subgroups_df)
            
#             # Create backup of original file
#             backup_path = subgroups_path.with_suffix('.parquet.backup')
#             print(f" Creating backup at: {backup_path}")
#             subgroups_df.to_parquet(backup_path, index=False)
            
#             # Save the updated file
#             print(f" Saving updated subgroups with quarter column...")
#             subgroups_with_quarter.to_parquet(subgroups_path, index=False)
            
#             # Verify the save
#             verification_df = pd.read_parquet(subgroups_path)
#             if 'quarter' in verification_df.columns:
#                 print(" Quarter column successfully added to subgroups.parquet!")
#                 quarter_counts = verification_df['quarter'].value_counts().sort_index()
#                 print("Final quarter distribution:")
#                 for quarter, count in quarter_counts.items():
#                     print(f"   {quarter}: {count:,} respondents")
                    
#                 print(f"\n Summary:")
#                 print(f"   - Original file backed up to: {backup_path}")
#                 print(f"   - Updated file saved to: {subgroups_path}")
#                 print(f"   - Quarter column added with {len(verification_df)} records")
#                 print(f"   - R6 and R7 experiments are now ready to run!")
#             else:
#                 print(" Failed to verify quarter column in saved file")
                
# except Exception as e:
#     print(f" Error adding quarter column: {e}")
#     print(f"Error type: {type(e).__name__}")
#     print("\nYou can still run R6/R7 - the quarter column will be added dynamically.")

# print("\n" + "="*60)
# print(" READY FOR R6 AND R7 EXPERIMENTS")
# print("="*60)

## Cell 10: Run Experiment R6 

**Expected runtime: 90-120 minutes**  
**Memory usage: High**  
**Description: Full complexity routing model with all demographic variables**

In [14]:
# Run R6 experiment
success = run_single_rung("R6", include_hazard=True)

if success:
    print("\n R6 completed successfully! You can now run R7.")
else:
    print("\n R6 failed. Check the error messages above and try again.")


STARTING RUNG R6
R6 already completed - skipping

 R6 completed successfully! You can now run R7.


## Cell 11: Run Experiment R7 (Full Model + Hazard)

**Expected runtime: 120-180 minutes**  
**Memory usage: High**  
**Description: Full model including hazard modeling - most computationally intensive**

In [15]:
# Run R7 experiment (automatically includes hazard model)
success = run_single_rung("R7", include_hazard=True)

if success:
    print("\n R7 completed successfully! All experiments are now complete.")
else:
    print("\n R7 failed. Check the error messages above and try again.")


STARTING RUNG R7
R7 already completed - skipping

 R7 completed successfully! All experiments are now complete.


In [16]:
# Run R7 experiment (automatically includes hazard model)
success = run_single_rung("R8", include_hazard=True)

if success:
    print("\n R7 completed successfully! All experiments are now complete.")
else:
    print("\n R7 failed. Check the error messages above and try again.")


STARTING RUNG R8
R8 already completed - skipping

 R7 completed successfully! All experiments are now complete.


In [17]:
# Run R7 experiment (automatically includes hazard model)
success = run_single_rung("R9", include_hazard=True)

if success:
    print("\n R7 completed successfully! All experiments are now complete.")
else:
    print("\n R7 failed. Check the error messages above and try again.")


STARTING RUNG R9
R9 already completed - skipping

 R7 completed successfully! All experiments are now complete.


In [18]:
# Run R7 experiment (automatically includes hazard model)
success = run_single_rung("R10", include_hazard=True)

if success:
    print("\n R7 completed successfully! All experiments are now complete.")
else:
    print("\n R7 failed. Check the error messages above and try again.")


STARTING RUNG R10
R10 already completed - skipping

 R7 completed successfully! All experiments are now complete.


In [19]:
# Run R7 experiment (automatically includes hazard model)
success = run_single_rung("R11", include_hazard=True)

if success:
    print("\n R7 completed successfully! All experiments are now complete.")
else:
    print("\n R7 failed. Check the error messages above and try again.")


STARTING RUNG R11
R11 already completed - skipping

 R7 completed successfully! All experiments are now complete.


In [20]:
# Run R7 experiment (automatically includes hazard model)
success = run_single_rung("R12", include_hazard=True)

if success:
    print("\n R7 completed successfully! All experiments are now complete.")
else:
    print("\n R7 failed. Check the error messages above and try again.")


STARTING RUNG R12
R12 already completed - skipping

 R7 completed successfully! All experiments are now complete.


In [None]:
exps = ["R14", "R15", "R16", "R17", "R18","R19", "R20", "R21", "R22", "R23"]

for i in exps:
    success = run_single_rung(i, include_hazard=True)

    if success:
        print(f"\n {i} completed successfully! All experiments are now complete.")
    else:
        print(f"\n {i} failed. Check the error messages above and try again.")


STARTING RUNG R14
 Rung: R14
 Groupby: employment,sex
 Output directory: atus_analysis/data/models/R14
 Include hazard: True

Checking system resources...
=== System Resources ===
Memory: 1.1% used, 746.9GB available
CPU: 0.0% usage
Disk: 1947898.3GB free

✓ System resources look good

---  Step 1: B1-H Model for R14 ---
Using subprocess execution...
Running B1-H via subprocess for R14...
Command: /sw/eb/sw/Anaconda3/2023.09-0/bin/python -m atus_analysis.scripts.baseline1_hier --sequences atus_analysis/data/sequences/markov_sequences.parquet --subgroups atus_analysis/data/processed/subgroups.parquet --out_dir atus_analysis/data/models/R14 --groupby employment,sex --time_blocks night:0-5,morning:6-11,afternoon:12-17,evening:18-23 --seed 2025 --test_size 0.2 --split_path atus_analysis/data/models/fixed_split.parquet --kappa_slot 50
2025-08-30 20:02:19,799 - __main__ - INFO - STARTING B1-H (HIERARCHICAL ROUTING) MODEL TRAINING
2025-08-30 20:02:19,799 - __main__ - INFO - Sequences file: a

## Cell 12: Final Summary and Results

In [None]:

print("\n" + "="*60)
print("FINAL EXPERIMENT SUMMARY")
print("="*60)

progress = load_progress()
completed = set(progress.get('completed_rungs', []))
failed = set(progress.get('failed_rungs', []))
all_rungs = list(RUNG_SPECS.keys())

print(f"\nTotal rungs: {len(all_rungs)}")
print(f"Completed: {len(completed)} - {sorted(completed) if completed else 'None'}")
print(f"Failed: {len(failed)} - {sorted(failed) if failed else 'None'}")
not_attempted = [r for r in all_rungs if r not in completed and r not in failed]
print(f"Not attempted: {not_attempted if not_attempted else 'None'}")

# Helper: extract nll_per_weight robustly from either CLI-style or notebook-style evals
def _extract_nll(d: dict):
    # Notebook/direct save: {"nll_per_weight": ..., ...}
    if "nll_per_weight" in d:
        return d["nll_per_weight"]
    # CLI B1 eval wrapper: {"b1h": {...}}
    if "b1h" in d and isinstance(d["b1h"], dict):
        x = d["b1h"]
        if "nll_per_weight" in x:
            return x["nll_per_weight"]
        if "nll_weighted" in x and "weight_total" in x and x["weight_total"]:
            return x["nll_weighted"] / x["weight_total"]
    # Fallbacks
    if "nll_weighted" in d and "weight_total" in d and d["weight_total"]:
        return d["nll_weighted"] / d["weight_total"]
    return None

def _read_json(path: Path):
    try:
        with open(path, "r") as f:
            return json.load(f)
    except Exception:
        return None

rows = []
for rung in all_rungs:
    rung_dir = OUTPUT_DIR / rung
    status = "completed" if rung in completed else ("failed" if rung in failed else "pending")
    dur_s = progress.get(f"{rung}_duration_seconds")
    dur_min = (dur_s / 60.0) if isinstance(dur_s, (int, float)) else None

    # Files
    b1_json = rung_dir / "b1h_model.json"
    b1_npz  = rung_dir / "b1h_slot_mats.npz"
    b2_json = rung_dir / "b2h_model.json"
    eval_b1 = rung_dir / "eval_b1h.json"
    eval_b2 = rung_dir / "eval_b2h.json"

    # Metrics
    b1_nll = None
    b2_nll = None
    ej1 = _read_json(eval_b1)
    ej2 = _read_json(eval_b2)
    if ej1 is not None:
        b1_nll = _extract_nll(ej1)
        # If CLI-style eval_b1h.json, it wrapped under "b1h"
        if b1_nll is None and "b1h" in ej1:
            b1_nll = _extract_nll({"b1h": ej1["b1h"]})
    if ej2 is not None:
        # CLI baseline2 writes {"b1h": {...}, "b2h": {...}, ...}
        if "b2h" in ej2:
            b2_nll = _extract_nll({"b1h": ej2["b2h"]})
        else:
            # If someone saved just the dict (unlikely), still try
            b2_nll = _extract_nll(ej2)

    rows.append({
        "rung": rung,
        "status": status,
        "duration_min": round(dur_min, 1) if dur_min is not None else None,
        "b1h_model": b1_json.exists(),
        "b1h_slot_npz": b1_npz.exists(),
        "b2h_model": b2_json.exists(),
        "eval_b1h": eval_b1.exists(),
        "eval_b2h": eval_b2.exists(),
        "b1_nll_per_weight": (None if b1_nll is None else float(b1_nll)),
        "b2_nll_per_weight": (None if b2_nll is None else float(b2_nll)),
        "out_dir": str(rung_dir),
    })

df = pd.DataFrame(rows).set_index("rung")
# Order by rung name naturally (R1..R7) if present
df = df.loc[[r for r in all_rungs if r in df.index]]

# Pretty print summary table
with pd.option_context("display.max_columns", None, "display.width", 120):
    print("\n=== Per-rung Summary ===")
    print(df)

# Timing totals
print("\n=== Timing Information ===")
total_time = sum(progress.get(f"{r}_duration_seconds", 0.0) for r in completed)
for r in completed:
    dur = progress.get(f"{r}_duration_seconds")
    if isinstance(dur, (int, float)):
        print(f"{r}: {dur:.0f} seconds ({dur/60:.1f} minutes)")
if total_time > 0:
    print(f"\nTotal runtime: {total_time:.0f} seconds ({total_time/60:.1f} minutes, {total_time/3600:.1f} hours)")

# Output file counts (JSONs only)
print("\n=== Output Files (JSON count) ===")
for r in completed:
    rd = OUTPUT_DIR / r
    if rd.exists():
        files = list(rd.glob("*.json"))
        print(f"{r}: {len(files)} JSON files in {rd}")

# Success rate (use actual number of rungs configured)
if len(all_rungs) > 0:
    success_rate = len(completed) / len(all_rungs) * 100
    print(f"\nOverall success rate: {success_rate:.1f}%")

# Note about R6 vs R7 when hazard=True for all
if "R6" in RUNG_SPECS and "R7" in RUNG_SPECS:
    if str(RUNG_SPECS["R6"]).strip() == str(RUNG_SPECS["R7"]).strip():
        print("\nNOTE:")
        print(" R6 and R7 share the same grouping spec.")
        print(" Since you ran hazard=True for all rungs, R6 and R7 are effectively identical runs.")
        print(" You may keep only one of them for reporting.")

# Final line
if len(completed) == len(all_rungs):
    print("\nALL EXPERIMENTS COMPLETED SUCCESSFULLY!")
elif len(failed) > 0:
    print("\nSome experiments failed. Re-run the failed ones to retry.")
else:
    print(f"\n{len(all_rungs) - len(completed)} experiments remaining.")

print(f"\nProgress file saved as: {PROGRESS_FILE}")
print(f"Output directory: {OUTPUT_DIR}")


In [None]:
# # === R0 (no covariates) — build B1-H, build B2-H, dump metrics for both ===
# from pathlib import Path
# import os, sys, time, subprocess
# import numpy as np
# import pandas as pd

# # Optional thread limiter for BLAS/OpenMP
# try:
#     from threadpoolctl import threadpool_limits
# except Exception:
#     threadpool_limits = None
# from contextlib import nullcontext

# # ---------- Paths from your existing globals ----------
# REPO_ROOT  = Path(os.getcwd()).resolve()
# MODELS_DIR = (REPO_ROOT / OUTPUT_DIR).resolve()
# SEQS       = (REPO_ROOT / SEQUENCES_FILE).resolve()
# SUBS       = (REPO_ROOT / SUBGROUPS_FILE).resolve()
# SPLIT_PATH = (MODELS_DIR / "fixed_split.parquet").resolve()
# R0_DIR     = (MODELS_DIR / "R0").resolve()
# R0_DIR.mkdir(parents=True, exist_ok=True)

# t0 = time.time()
# print("→ Building fresh R0 model in:", R0_DIR)

# # ---------- 1) Prepare pooled data (single group __ALL__) ----------
# sequences = pd.read_parquet(SEQS)
# subgroups = pd.read_parquet(SUBS).copy()
# subgroups["__ALL__"] = "ALL"           # pooled
# groupby_cols = ["__ALL__"]

# blocks = parse_time_blocks(TIME_BLOCKS)
# # (threshold=0.0 is effectively a no-op but keeps consistent codepath)
# subgroups = pool_rare_quarter(subgroups, groupby_cols, "TUFNWGTP", threshold=0.0)
# long_df   = prepare_long_with_groups(sequences, subgroups, groupby_cols, "TUFNWGTP", blocks)

# # ---------- 2) Shared split (use existing if present) ----------
# if SPLIT_PATH.exists():
#     split_df = pd.read_parquet(SPLIT_PATH)[["TUCASEID","set"]]
#     print("✓ Using existing split:", SPLIT_PATH)
# else:
#     rng  = np.random.RandomState(SEED)
#     meta = long_df[["TUCASEID"]].drop_duplicates().copy()
#     meta["rand"] = rng.rand(len(meta))
#     test_n = int(round(TEST_SIZE * len(meta)))
#     meta["set"] = "train"
#     if test_n > 0:
#         take = meta.sort_values("rand").head(test_n).index
#         meta.loc[take, "set"] = "test"
#     split_df = meta[["TUCASEID","set"]]
#     SPLIT_PATH.parent.mkdir(parents=True, exist_ok=True)
#     split_df.to_parquet(SPLIT_PATH, index=False)
#     print(f"✓ Created split: {SPLIT_PATH}  (test={test_n}, train={len(meta)-test_n})")

# df       = long_df.merge(split_df, on="TUCASEID", how="left")
# train_df = df[df["set"] == "train"].copy()
# test_df  = df[df["set"] == "test"].copy()
# K        = int(df["state_id"].max()) + 1

# # ---------- 3) Train B1-H (routing) with 144 slot matrices ----------
# num_threads = int(NUM_THREADS)
# ctx = threadpool_limits(limits=num_threads) if threadpool_limits else nullcontext()
# with ctx:
#     b1 = fit_b1_hier(
#         train_df, K, "TUFNWGTP",
#         tau_block=50.0, tau_group=20.0, add_k=1.0,
#         compute_slot_mats=True,
#         kappa_slot=float(KAPPA_SLOT),
#         time_blocks_spec=TIME_BLOCKS
#     )

# # Save JSON + slot sidecar with expected keys 'g0','g1',...
# out_json_b1 = R0_DIR / "b1h_model.json"
# sidecar     = R0_DIR / "b1h_slot_mats.npz"
# to_save_b1  = dict(b1)

# if "slot_matrices" in to_save_b1:
#     slot_dict    = to_save_b1.pop("slot_matrices")   # {group_key: [144,K,K]}
#     groups_order = list(slot_dict.keys())            # for R0 → ["ALL"]
#     arrays = {f"g{gi}": np.array(slot_dict[gk], dtype=np.float32)
#               for gi, gk in enumerate(groups_order)}
#     np.savez_compressed(sidecar, **arrays)
#     meta = to_save_b1.setdefault("meta", {})
#     meta["groups_order"]  = groups_order
#     meta["slot_sidecar"]  = str(sidecar)
#     to_save_b1["slot_matrices_npz"] = str(sidecar)

# # Compatibility: some tools look for 'b1h_slot_matz.npz'
# matz = R0_DIR / "b1h_slot_matz.npz"
# try:
#     if sidecar.exists() and not matz.exists():
#         import shutil; shutil.copy2(sidecar, matz)
# except Exception:
#     pass

# save_json(to_save_b1, out_json_b1)
# print("✓ Saved:", out_json_b1)
# if sidecar.exists():
#     print(f"  Slot sidecar: {sidecar} ({sidecar.stat().st_size/1024/1024:.1f} MB)")

# # Quick eval summary for B1-H
# eval_b1 = nll_b1(test_df, to_save_b1, K, "TUFNWGTP")
# save_json(eval_b1, R0_DIR / "eval_b1h.json")
# print("✓ Saved:", R0_DIR / "eval_b1h.json")

# # ---------- 4) Train B2-H (hazard) pooled over __ALL__ ----------
# # baseline2_hier reads subgroups from disk; give it a temp parquet with __ALL__
# SUBS_R0 = R0_DIR / "_subgroups_all.parquet"
# subgroups.to_parquet(SUBS_R0, index=False)

# env = os.environ.copy()
# # normalize numexpr thread limits to avoid "MAX_THREADS" error
# max_now = int(env.get("NUMEXPR_MAX_THREADS", "64"))
# if num_threads > max_now:
#     env["NUMEXPR_MAX_THREADS"] = str(num_threads)
# env["NUMEXPR_NUM_THREADS"]     = str(min(num_threads, int(env.get("NUMEXPR_MAX_THREADS", str(num_threads)))))
# env["OMP_NUM_THREADS"]         = str(num_threads)
# env["OPENBLAS_NUM_THREADS"]    = str(num_threads)
# env["MKL_NUM_THREADS"]         = str(num_threads)
# env["VECLIB_MAXIMUM_THREADS"]  = str(num_threads)
# env["OMP_WAIT_POLICY"]         = "PASSIVE"

# cmd_b2 = [
#     sys.executable, "-u", "-m", "atus_analysis.scripts.baseline2_hier",
#     "--sequences", str(SEQS),
#     "--subgroups", str(SUBS_R0),
#     "--out_dir",   str(R0_DIR),
#     "--groupby",   "__ALL__",
#     "--time_blocks", TIME_BLOCKS,
#     "--dwell_bins",  DWELL_BINS,
#     "--seed",        str(SEED),
#     "--test_size",   str(TEST_SIZE),
#     "--split_path",  str(SPLIT_PATH),
#     "--b1h_path",    str(out_json_b1),
#     "--kappa_slot",  str(KAPPA_SLOT),
# ]
# print("\n>>>", " ".join(cmd_b2))
# res_b2 = subprocess.run(cmd_b2, cwd=str(REPO_ROOT), text=True, capture_output=True, env=env)
# print(res_b2.stdout)
# if res_b2.returncode != 0:
#     print("STDERR (B2-H):\n", res_b2.stderr)
#     raise RuntimeError("B2-H training failed (see stderr above).")
# else:
#     print("✓ R0 B2-H completed")

# # ---------- 5) Dump per-respondent metrics for BOTH variants ----------
# def dump_metrics(model_type: str) -> None:
#     cmd = [
#         sys.executable, "-u", "-m", "atus_analysis.scripts.dump_case_metrics",
#         "--model_type", model_type,
#         "--run_dir", str(R0_DIR),
#         "--sequences", str(SEQS),
#         "--subgroups", str(SUBS),
#         "--time_blocks", TIME_BLOCKS,
#         "--groupby", "",              # no covariates at scoring time
#         "--dwell_bins", DWELL_BINS,
#     ]
#     print("\n>>>", " ".join(cmd))
#     r = subprocess.run(cmd, cwd=str(REPO_ROOT), text=True, capture_output=True)
#     print(r.stdout)
#     if r.returncode != 0:
#         print("STDERR:\n", r.stderr)
#         raise RuntimeError(f"dump_case_metrics failed for {model_type}")

# dump_metrics("b1h")
# dump_metrics("b2h")

# # ---------- 6) Sanity check outputs ----------
# targets = [
#     "b1h_model.json", "b2h_model.json", "eval_b1h.json", "eval_b2h.json",
#     "b1h_slot_mats.npz", "b1h_slot_matz.npz",
#     "test_case_metrics_b1h.parquet", "test_case_metrics_b2h.parquet", "test_case_metrics.parquet"
# ]
# print("\nOutputs present:")
# for name in targets:
#     p = R0_DIR / name
#     print(f"  {name:<30} exists={p.exists()}")

# # Quick peek at metrics pairwise deltas
# try:
#     b1p = R0_DIR / "test_case_metrics_b1h.parquet"
#     b2p = R0_DIR / "test_case_metrics_b2h.parquet"
#     if b1p.exists() and b2p.exists():
#         A = pd.read_parquet(b1p)[["TUCASEID","nll_weighted","top1_correct_weight","weight_total"]].rename(
#             columns=lambda c: c+"_A" if c!="TUCASEID" else c)
#         B = pd.read_parquet(b2p)[["TUCASEID","nll_weighted","top1_correct_weight","weight_total"]].rename(
#             columns=lambda c: c+"_B" if c!="TUCASEID" else c)
#         M = A.merge(B, on="TUCASEID", how="inner")
#         d_nll = (M["nll_weighted_A"]/M["weight_total_A"]) - (M["nll_weighted_B"]/M["weight_total_B"])
#         d_top = (M["top1_correct_weight_A"]/M["weight_total_A"]) - (M["top1_correct_weight_B"]/M["weight_total_B"])
#         print("\nΔ(B1−B2) mean nll:", float(d_nll.mean()), " std:", float(d_nll.std()))
#         print("Δ(B1−B2) mean top1:", float(d_top.mean()), " std:", float(d_top.std()))
# except Exception as e:
#     print("Note: could not preview metrics:", e)

# print(f"\nR0 build & dump complete in {time.time()-t0:.1f}s")


## Add R0 with no covariates for baseline



In [22]:
RUNG_SPECS8_14_23 = {"R14": "employment,sex", 
    "R15": "employment,day_type",
    "R16": "employment, hh_size_band",
    "R17": "employment, quarter",
    
    "R18": "sex, day_type",
    "R19": "sex, hh_size_band",
    "R20": "sex, quarter",
    
    "R21": "day_type, hh_size_band",
    "R22": "day_type, quarter",
    "R23": "quarter, hh_size_band",
}

In [None]:
# Run BOTH variants (b1h and b2h) for every rung, streaming output
import subprocess


REPO_ROOT = Path(os.getcwd()).resolve()                 # already chdir'ed to repo root
MODELS_DIR = (REPO_ROOT / OUTPUT_DIR).resolve()
SEQS       = (REPO_ROOT / SEQUENCES_FILE).resolve()
SUBS       = (REPO_ROOT / SUBGROUPS_FILE).resolve()

assert MODELS_DIR.exists(), f"Models dir not found: {MODELS_DIR}"
assert SEQS.exists(),       f"Sequences parquet not found: {SEQS}"
assert SUBS.exists(),       f"Subgroups parquet not found: {SUBS}"

for rung, groupby in RUNG_SPECS8_14_23.items():
    run_dir = MODELS_DIR / rung
    if not run_dir.exists():
        print(f"[SKIP {rung}] missing directory: {run_dir}")
        continue

    for model_type in ("b1h", "b2h"):
        cmd = [
            sys.executable, "-u", "-m", "atus_analysis.scripts.dump_case_metrics",
            "--model_type", model_type,          #  explicitly run this variant
            "--run_dir", str(run_dir),
            "--sequences", str(SEQS),
            "--subgroups", str(SUBS),
            "--time_blocks", TIME_BLOCKS,
            "--groupby", groupby,               # rung-specific groupby from RUNG_SPECS
            "--dwell_bins", DWELL_BINS,
        ]
        print("\n>>>", " ".join(cmd))
        proc = subprocess.Popen(
            cmd,
            cwd=str(REPO_ROOT),                 # ensure package imports resolve
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            text=True,
            bufsize=1
        )
        for line in proc.stdout:                # live progress
            print(line, end="")
        rc = proc.wait()
        print(f"[{rung}:{model_type}] return code:", rc)

# List outputs we produced
print("\nDiscovered metrics files:")
for rung in RUNG_SPECS:
    rdir = MODELS_DIR / rung
    if rdir.exists():
        hits = sorted(str(p) for p in rdir.glob("test_case_metrics*.parquet"))
        print(f"  {rung}: {hits}")



>>> /sw/eb/sw/Anaconda3/2023.09-0/bin/python -u -m atus_analysis.scripts.dump_case_metrics --model_type b1h --run_dir /ztank/scratch/user/u.rd143338/atus_analysis-main/atus_analysis/data/models/R14 --sequences /ztank/scratch/user/u.rd143338/atus_analysis-main/atus_analysis/data/sequences/markov_sequences.parquet --subgroups /ztank/scratch/user/u.rd143338/atus_analysis-main/atus_analysis/data/processed/subgroups.parquet --time_blocks night:0-5,morning:6-11,afternoon:12-17,evening:18-23 --groupby employment,sex --dwell_bins 1,2,3,4,6,9,14,20,30
✓ [R14:b1h] wrote /ztank/scratch/user/u.rd143338/atus_analysis-main/atus_analysis/data/models/R14/test_case_metrics_b1h.parquet
  (compat) /ztank/scratch/user/u.rd143338/atus_analysis-main/atus_analysis/data/models/R14/test_case_metrics.parquet present
[R14:b1h] return code: 0

>>> /sw/eb/sw/Anaconda3/2023.09-0/bin/python -u -m atus_analysis.scripts.dump_case_metrics --model_type b2h --run_dir /ztank/scratch/user/u.rd143338/atus_analysis-main/atu