# LEMMA-RCA – Data Transformation

Converts raw LEMMA data into the unified multimodal format: align metrics to 30s bins, crop to a 45-min fault window (PRE=15min, POST=30min), aggregate logs at service level, generate manifest + ground_truth JSONs.

Cropping takes LEMMA from ~2600-5500 timesteps down to ~90. PPTX timestamps are JST, output is UTC.

```
core_metrics_tmp/{scenario}/     -> 4 metric parquets
core_logs_tmp/{scenario}/        -> service-level log parquet
core_multimodal_tmp/{scenario}/  -> manifest.json + ground_truth.json
```

**Ground truth note:** the pipeline writes ALL pods matching `root_cause_deployment`, but for training only the pod with the actual anomaly signal should be kept. Fix `ground_truth.json` manually after running:

| Scenario | Default | Keep |
|----------|---------|------|
| 20231207 | both productpage pods | `productpage-v1-94d68db49-vc8ct` (10x CPU) |
| 20231221 | all ratings pods | `ratings-aggregated` (252x CPU) |
| 20240115 | single pod already | no change needed |
| 20240215 | single pod already | no change needed |


## 1. Scenario Configuration

Ground truth from PPTX/README files:

| Scenario | Fault | Root Cause | Notes |
|----------|-------|------------|-------|
| 20231207 | CPU stress | productpage | PPTX time is JST |
| 20231221 | Pod migration | ratings | PPTX time is JST |
| 20240115 | Malware injection | scenario10 | unique service |
| 20240215 | Network issue | details | reference format |


In [1]:
from pathlib import Path
import pandas as pd

BASE_DIR = Path("/root/lemm")

# ============================================================================
# CROPPING CONFIGURATION - Similar to Nezha for consistency
# ============================================================================
# LEMMA raw data has 2600-5500 timesteps (22-46 hours), but the fault signal
# is concentrated near the fault time. Cropping to a smaller window:
# - Dramatically reduces processing time (especially logs)
# - Creates more consistent sequence lengths with Nezha (22 timesteps)
# - Removes irrelevant "baseline" data far from the fault
# ============================================================================
PRE_FAULT_MINUTES = 15   # 30 bins before fault
POST_FAULT_MINUTES = 30  # 60 bins after fault
# Total: ~90 bins (45 minutes) vs original 2600-5500 bins

SCENARIO_CONFIG = {
    "20240215": {
        "fault_timestamp_raw": pd.Timestamp("2024-02-14 06:11:29", tz="UTC"),
        "root_cause_deployment": "details-v1",
        "root_cause_service": "details",
    },
    "20240115": {               
        "fault_timestamp_raw": pd.Timestamp("2024-01-14 07:26:39", tz="UTC"),
        "root_cause_deployment": "scenario10-malware-deployment",
        "root_cause_service": "scenario10",
    },
    "20231221": {
        # PPTX states 11:32:05 JST -> converted to UTC = 02:32:05
        "fault_timestamp_raw": pd.Timestamp("2023-12-20 02:32:05", tz="UTC"),
        "root_cause_deployment": "ratings",
        "root_cause_service": "ratings",
    },
    "20231207": {
        # PPTX states 16:53:03 JST -> converted to UTC = 07:53:03
        "fault_timestamp_raw": pd.Timestamp("2023-12-07 07:53:03", tz="UTC"),
        "root_cause_deployment": "productpage-v1",
        "root_cause_service": "productpage",
    },
}


## 2. Processing Functions

Metric loading/alignment, log cleaning + aggregation, and manifest generation.

Log cleaner tokenizes variable content (timestamps → `<TS>`, UUIDs → `<UUID>`, IPs → `<HOST>`, etc.) for better model generalization. Known ports (`:8080`, `:443`) are preserved.


In [2]:
import numpy as np
import pandas as pd
from pathlib import Path
import json
from collections import defaultdict, Counter

BASE_DIR = Path("/root/lemm")


def load_metric_npy(path: Path) -> tuple[pd.DatetimeIndex, pd.DataFrame]:
    """Load a metric NPY file and return a DataFrame with temporal index."""
    obj = np.load(path, allow_pickle=True).item()
    root_key = next(iter(obj.keys()))
    info = obj[root_key]

    pod_names = info.get("Pod_Name", info.get("Node_Name"))
    time_arr = info["time"]
    seq = info["Sequence"]

    if seq.ndim == 3:
        seq = np.squeeze(seq, -1)

    if seq.shape[0] == len(pod_names) and seq.shape[1] != len(pod_names):
        seq = seq.T

    T, D = seq.shape

    if D == len(pod_names) + 1:
        # FIX: Extra column is the LAST one (contains indices/timestamps, not CPU)
        # Verified: col[-1] has anomalous values (4-1019) vs col[0] has normal CPU
        seq = seq[:, :-1]
        D = seq.shape[1]

    if D != len(pod_names):
        raise ValueError(
            f"Mismatch after adjustments: seq has {D} columns, pod_names has {len(pod_names)}"
        )

    time_index = pd.to_datetime(time_arr, unit="s", utc=True)
    df = pd.DataFrame(seq, index=time_index, columns=pod_names)
    return time_index, df


METRIC_TYPES = {
    "pod_level_data_pod_cpu_usage_total": "gauge",
    "pod_level_data_pod_memory_working_set": "gauge",
    "pod_level_data_pod_network_rx_bytes": "rate",
    "pod_level_data_pod_network_tx_bytes": "rate",
}

def align_and_trim_metrics(scenario_id: str) -> tuple[Path, dict]:
    """
    Align metrics to log_frequency grid with tolerance and validation.
    
    Returns:
        trimmed_dir: Path to directory with aligned metrics
        validation: dict with validation statistics
    """
    metrics_dir = BASE_DIR / "metrics_data" / scenario_id
    aligned_dir = BASE_DIR / "aligned_metrics_tmp" / scenario_id
    trimmed_dir = BASE_DIR / "trimmed_metrics_tmp" / scenario_id
    aligned_dir.mkdir(parents=True, exist_ok=True)
    trimmed_dir.mkdir(parents=True, exist_ok=True)

    if scenario_id == "20240215":
        lf_name = "0215_log_frequency_pod_level_removed.npy"
    else:
        lf_name = f"{scenario_id[-4:]}_log_frequency_pod_level_removed.npy"
    
    lf_time, lf_df = load_metric_npy(metrics_dir / lf_name)
    ref_index_full = lf_df.index
    ref_step = (ref_index_full[1] - ref_index_full[0]).total_seconds() if len(ref_index_full) > 1 else 30

    metric_files = [
        "pod_level_data_pod_cpu_usage_total.npy",
        "pod_level_data_pod_memory_working_set.npy",
        "pod_level_data_pod_network_rx_bytes.npy",
        "pod_level_data_pod_network_tx_bytes.npy",
    ]
    
    all_starts = []
    all_ends = []
    for fname in metric_files:
        _, df = load_metric_npy(metrics_dir / fname)
        all_starts.append(df.index.min())
        all_ends.append(df.index.max())
    
    overlap_start = max(all_starts)
    overlap_end = min(all_ends)
    ref_index = ref_index_full[(ref_index_full >= overlap_start) & (ref_index_full <= overlap_end)]
    
    lf_df_trimmed = lf_df.loc[ref_index]
    lf_df_trimmed.to_parquet(aligned_dir / "log_frequency.parquet")
    
    validation = {"scenario": scenario_id, "ref_step_s": ref_step, "n_ref_pts": len(ref_index), "metrics": {}}

    for fname in metric_files:
        metric_name = fname.replace(".npy", "")
        metric_type = METRIC_TYPES.get(metric_name, "gauge")
        
        _, df = load_metric_npy(metrics_dir / fname)
        df = df.sort_index()
        
        if len(df) > 1:
            steps = np.diff(df.index.astype(np.int64) // 1e9)
            src_step = float(np.median(steps))
        else:
            src_step = ref_step
        tolerance = pd.Timedelta(seconds=max(src_step, ref_step) / 2)
        
        if metric_type == "rate" and src_step < ref_step:
            df_resampled = df.resample(f"{int(ref_step)}s").mean()
            df_aligned = df_resampled.reindex(ref_index, method='nearest', tolerance=tolerance)
        else:
            df_aligned = df.reindex(ref_index, method='nearest', tolerance=tolerance)
        
        nan_before = df_aligned.isna().sum().sum()
        total_cells = df_aligned.size
        
        df_aligned = df_aligned.ffill(limit=2).bfill(limit=2)
        
        nan_after = df_aligned.isna().sum().sum()
        nan_ratio = nan_after / total_cells if total_cells > 0 else 0
        imputed_ratio = (nan_before - nan_after) / total_cells if total_cells > 0 else 0
        
        orig_times = df.index
        aligned_times = ref_index[ref_index.isin(df_aligned.dropna(how='all').index)]
        if len(aligned_times) > 0 and len(orig_times) > 0:
            drifts = []
            for t in aligned_times[:100]:
                idx = orig_times.get_indexer([t], method='nearest')[0]
                if idx >= 0 and idx < len(orig_times):
                    drift = abs((t - orig_times[idx]).total_seconds())
                    drifts.append(drift)
            max_drift = max(drifts) if drifts else 0
        else:
            max_drift = 0
        
        validation["metrics"][metric_name] = {
            "src_step_s": src_step,
            "tolerance_s": tolerance.total_seconds(),
            "max_drift_s": max_drift,
            "nan_ratio": nan_ratio,
            "imputed_ratio": imputed_ratio,
            "type": metric_type,
        }
        
        out_name = fname.replace(".npy", ".parquet")
        df_aligned.to_parquet(aligned_dir / out_name)

    parquet_files = sorted(aligned_dir.glob("*.parquet"))
    dfs = {f.stem: pd.read_parquet(f) for f in parquet_files}
    starts = {name: df.first_valid_index() for name, df in dfs.items() if df.first_valid_index() is not None}
    global_start = max(starts.values()) if starts else ref_index[0]

    for name, df in dfs.items():
        trimmed_df = df.loc[global_start:]
        trimmed_df.to_parquet(trimmed_dir / f"{name}.parquet")

    return trimmed_dir, validation


def select_core_metrics(trimmed_dir: Path, scenario_id: str) -> Path:
    """
    Copy 4 core metrics and create 3 empty extras for unified schema.
    
    Output files (7 metrics + mask):
        - pod_cpu_usage_total.parquet         [CORE - with data]
        - pod_memory_working_set.parquet      [CORE - with data]
        - pod_network_rx_bytes.parquet        [CORE - with data]
        - pod_network_tx_bytes.parquet        [CORE - with data]
        - pod_latency_server_p95.parquet      [EXTRA - NaN for LEMMA]
        - pod_latency_client_p95.parquet      [EXTRA - NaN for LEMMA]
        - pod_workload_ops.parquet            [EXTRA - NaN for LEMMA]
        - metrics_mask.parquet                [1,1,1,1,0,0,0]
    """
    import shutil
    
    core_dir = BASE_DIR / "core_metrics_tmp" / scenario_id
    core_dir.mkdir(parents=True, exist_ok=True)

    # Mapping: old LEMMA names -> new unified names
    core_metric_mapping = {
        "pod_level_data_pod_cpu_usage_total.parquet": "pod_cpu_usage_total.parquet",
        "pod_level_data_pod_memory_working_set.parquet": "pod_memory_working_set.parquet",
        "pod_level_data_pod_network_rx_bytes.parquet": "pod_network_rx_bytes.parquet",
        "pod_level_data_pod_network_tx_bytes.parquet": "pod_network_tx_bytes.parquet",
    }

    # Copy and rename CORE metrics
    for old_name, new_name in core_metric_mapping.items():
        src = trimmed_dir / old_name
        dst = core_dir / new_name
        shutil.copy2(src, dst)
    
    # Read one core metric to get shape (time_index, pods)
    ref_df = pd.read_parquet(core_dir / "pod_cpu_usage_total.parquet")
    
    # Create EXTRA metrics (empty - all NaN) with same shape
    extra_metrics = [
        "pod_latency_server_p95.parquet",
        "pod_latency_client_p95.parquet", 
        "pod_workload_ops.parquet",
    ]
    
    for fname in extra_metrics:
        empty_df = pd.DataFrame(
            np.nan, 
            index=ref_df.index, 
            columns=ref_df.columns
        )
        empty_df.to_parquet(core_dir / fname)
    
    # NOTE: metrics_mask se generará después en el pipeline de entrenamiento
    # basándose en los valores reales (NaN vs datos) de cada timestep/pod
    
    print(f"  [metrics] Created 7 parquets in {core_dir}")

    return core_dir


def build_pod_to_service_from_pods(pods: list[str]) -> dict[str, str]:
    """Extract service name from pod name."""
    pod_to_service = {}
    for pod in pods:
        parts = pod.split("-")
        if len(parts) >= 1:
            pod_to_service[pod] = parts[0]
    return pod_to_service


import re

# Whitelist of common ports to preserve (not replaced by <NUM>)
_WHITELIST_PORTS = r'80|443|8080|8443|3000|3306|5432|5631|5672|5900|6379|7001|7070|8081|9000|9001|9090|9200|9300|9411|9999'


def clean_log_content_vectorized(series: pd.Series) -> pd.Series:
    """
    Vectorized version of clean_log_content using pandas str methods.
    """
    s = series.fillna('')
    s = s.str.replace(r'\x1b\[[0-9;]*m', '', regex=True)
    s = s.str.replace(r'\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}(?:\.\d+)?Z?', '<TS>', regex=True)
    s = s.str.replace(r'[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}', '<UUID>', regex=True)
    s = s.str.replace(r'\b[a-fA-F0-9]{24,}\b', '<HEX>', regex=True)
    s = s.str.replace(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}(:\d{2,5})?', '<HOST>', regex=True)
    s = s.str.replace(r'(?<=[a-zA-Z0-9])-[a-z0-9]{6,12}-[a-z0-9]{4,6}\b', '-<RAND>', regex=True)
    s = s.str.replace(r'(?<=/)\d{4,}(?=/|$|\s|")', ':id', regex=True)
    s = s.str.replace(r'(?i)(\bport\b|listen(?:ing)?(?:\s+on)?|bind(?:ing)?(?:\s+to)?|socket)[:\s]+(\d{2,5})', r'\1 __PRT_\2__', regex=True)
    s = s.str.replace(rf':({_WHITELIST_PORTS})\b', r':__PRT_\1__', regex=True)
    s = s.str.replace(r'(?i)(https?|status|code)[:\s]*([1-5]\d\d)\b', r'\1 __HTTP_\2__', regex=True)
    s = s.str.replace(r'\bv?(\d+(?:\.\d+){1,3})\b', r'__VER_\1__', regex=True)
    s = s.str.replace(r'\b\d{4,}\b', '<NUM>', regex=True)
    s = s.str.replace(r'__PRT_(\d+)__', r'\1', regex=True)
    s = s.str.replace(r'__HTTP_(\d+)__', r'\1', regex=True)
    s = s.str.replace(r'__VER_([^_]+)__', r'\1', regex=True)
    s = s.str.lower()
    s = s.str.replace(r'\s+', ' ', regex=True).str.strip()
    return s


def _process_chunk(chunk_data):
    """
    Process a data chunk with early exit optimization.
    
    Returns:
        tuple: (results_dict, status)
        - results_dict: {(time_bin, content): count}
        - status: 'ok', 'skip' (chunk before window), 'stop' (chunk after window)
    """
    chunk, time_start, time_end = chunk_data
    
    # Parse timestamps first (fast check for early exit)
    chunk["Time"] = pd.to_datetime(chunk["Time"], format='ISO8601', utc=True, errors='coerce')
    chunk = chunk.dropna(subset=["Time"])
    
    if len(chunk) == 0:
        return {}, 'skip'
    
    # Early exit: if entire chunk is AFTER time_end, signal to stop reading
    chunk_min_time = chunk["Time"].min()
    chunk_max_time = chunk["Time"].max()
    
    if chunk_min_time > time_end:
        return {}, 'stop'  # All subsequent chunks will also be after time_end
    
    # Early exit: if entire chunk is BEFORE time_start, skip it
    if chunk_max_time < time_start:
        return {}, 'skip'
    
    # Filter to window
    chunk = chunk[(chunk["Time"] >= time_start) & (chunk["Time"] <= time_end)]
    if len(chunk) == 0:
        return {}, 'ok'
    
    chunk["TimeBin"] = chunk["Time"].dt.floor("30s")
    chunk["CleanContent"] = clean_log_content_vectorized(chunk["Content"])
    grouped = chunk.groupby(["TimeBin", "CleanContent"]).size()
    return {(t_bin, content): count for (t_bin, content), count in grouped.items() if content}, 'ok'


def _process_single_log_file(args):
    """
    Process a CSV file with early termination optimization.
    
    If logs are sorted by time (common), we stop reading when we pass time_end.
    This can reduce read time by 95%+ when cropping to a small window.
    """
    csv_path, time_start, time_end = args
    CHUNK_SIZE = 500_000  # Smaller chunks for faster early termination check
    
    pod_name = csv_path.name.replace("_messages_structured.csv", "").replace("_structured.csv", "")
    parts = pod_name.split("-")
    service = parts[0] if parts else pod_name
    
    local_bins = {}
    chunks_read = 0
    chunks_skipped = 0
    
    try:
        chunk_iter = pd.read_csv(
            csv_path,
            encoding_errors='ignore',
            chunksize=CHUNK_SIZE,
            usecols=['Time', 'Content'],
            dtype={'Time': str, 'Content': str},
            engine='c',
            low_memory=True,
        )
    except Exception:
        return {}
    
    for chunk in chunk_iter:
        chunks_read += 1
        result, status = _process_chunk((chunk, time_start, time_end))
        
        if status == 'stop':
            # Early termination: all remaining data is after our window
            break
        elif status == 'skip':
            chunks_skipped += 1
            continue
        
        # Merge results
        for (t_bin, content), count in result.items():
            key = (service, t_bin, content)
            local_bins[key] = local_bins.get(key, 0) + count
    
    return local_bins


def aggregate_logs_to_services(scenario_id: str, time_index: pd.DatetimeIndex, services: list[str]) -> Path:
    """Aggregate pod logs to service level using 20 CPU workers."""
    from concurrent.futures import ProcessPoolExecutor, as_completed
    import os
    
    log_base = BASE_DIR / "log_data" / scenario_id / "log_data" / "pod"
    out_dir = BASE_DIR / "core_logs_tmp" / scenario_id
    out_dir.mkdir(parents=True, exist_ok=True)

    # Sort by size DESCENDING: large files first for better parallelization
    all_structured = sorted(log_base.glob("*_structured.csv"), 
                            key=lambda p: p.stat().st_size, 
                            reverse=True)
    total_files = len(all_structured)
    print(f"[INFO] Largest: {all_structured[0].name} ({all_structured[0].stat().st_size / 1e9:.1f} GB)")
    time_start = time_index[0]
    time_end = time_index[-1]
    
    # Prepare arguments for each file
    args_list = [(csv_path, time_start, time_end) for csv_path in all_structured]
    
    # Process in parallel (optimized for Ryzen 9 5900X: 24 threads)
    n_workers = min(os.cpu_count() or 4, 20)
    print(f"[INFO] Processing {total_files} files with {n_workers} CPU workers...")
    
    service_bins = defaultdict(lambda: defaultdict(Counter))
    completed = 0
    
    with ProcessPoolExecutor(max_workers=n_workers) as executor:
        futures = {executor.submit(_process_single_log_file, args): args[0] for args in args_list}
        
        for future in as_completed(futures):
            completed += 1
            if completed % 20 == 0:
                print(f"  Processing {completed}/{total_files} files...")
            
            result = future.result()
            # Combine results
            for (service, t_bin, tmpl), count in result.items():
                service_bins[service][t_bin][tmpl] += count

    print(f"[OK] Processed {total_files} files")

    # Build DataFrame of texts per service (column by column, more efficient)
    time_bins = time_index.floor("30s")  # Vectorized
    
    # Pre-build columns dict
    columns_data = {}
    for svc in services:
        svc_bins = service_bins[svc]
        col = []
        for ts_bin in time_bins:
            templates = svc_bins.get(ts_bin, {})
            if templates:
                text = " | ".join(f"{t}:x{c}" for t, c in templates.most_common(10))
            else:
                text = ""
            col.append(text)
        columns_data[svc] = col
    
    logs_df = pd.DataFrame(columns_data, index=time_index)
    out_path = out_dir / "logs_service_texts.parquet"
    logs_df.to_parquet(out_path)

    return out_path


def build_manifest_and_ground_truth(scenario_id: str, core_metrics_dir: Path, logs_texts_path: Path) -> Path:
    """Generate manifest.json and ground_truth.json for the scenario."""
    out_dir = BASE_DIR / "core_multimodal_tmp" / scenario_id
    out_dir.mkdir(parents=True, exist_ok=True)

    cfg = SCENARIO_CONFIG[scenario_id]
    fault_ts = cfg["fault_timestamp_raw"]
    rc_deployment = cfg["root_cause_deployment"]
    rc_service = cfg["root_cause_service"]

    cpu_df = pd.read_parquet(core_metrics_dir / "pod_cpu_usage_total.parquet")
    pods = list(cpu_df.columns)
    time_index = cpu_df.index

    pod_to_service = build_pod_to_service_from_pods(pods)
    services = sorted(set(pod_to_service.values()))
    service_to_idx = {s: i for i, s in enumerate(services)}
    pod_to_idx = {p: i for i, p in enumerate(pods)}

    # Find root cause pods
    rc_pods = [p for p in pods if rc_deployment in p]
    rc_pod_indices = [pod_to_idx[p] for p in rc_pods]
    rc_service_idx = service_to_idx.get(rc_service, -1)

    # Calculate fault_bin and fault_time_idx
    fault_bin = fault_ts.floor("30s")
    time_index_floored = time_index.floor("30s")
    
    if fault_bin in time_index_floored:
        fault_time_idx = int(time_index_floored.get_loc(fault_bin))
    else:
        fault_time_idx = int(np.searchsorted(time_index_floored, fault_bin))
        fault_time_idx = min(fault_time_idx, len(time_index) - 1)

    # Build pod_to_service_idx: array where pod_to_service_idx[pod_i] = service_idx
    # This enables per-pod log fusion in the model
    pod_to_service_idx = []
    for pod in pods:
        svc = pod_to_service.get(pod, pods[0].split("-")[0])  # fallback to first part
        svc_idx = service_to_idx.get(svc, 0)
        pod_to_service_idx.append(svc_idx)

    # Unified 7-metric schema (LEMMA + Nezha compatible)
    all_metric_files = [
        "pod_cpu_usage_total.parquet",
        "pod_memory_working_set.parquet",
        "pod_network_rx_bytes.parquet",
        "pod_network_tx_bytes.parquet",
        "pod_latency_server_p95.parquet",
        "pod_latency_client_p95.parquet",
        "pod_workload_ops.parquet",
    ]
    
    manifest = {
        "scenario_id": scenario_id,
        "dataset": "lemma",  # Identifies source dataset
        "time_start": str(time_index[0]),
        "time_end": str(time_index[-1]),
        "n_timesteps": len(time_index),
        "n_pods": len(pods),
        "n_services": len(services),
        "n_metrics": 7,  # Unified schema: 4 core + 3 extras
        "window_T": "30s",
        "metrics_files": [str(core_metrics_dir / f) for f in all_metric_files],
        # metrics_mask se genera después en el pipeline de entrenamiento
        "logs_texts_file": str(logs_texts_path),
        "pods": pods,
        "services": services,
        "service_to_idx": service_to_idx,
        "pod_to_service_idx": pod_to_service_idx,  
    }

    ground_truth = {
        "scenario_id": scenario_id,
        "fault_timestamp_raw": str(fault_ts),
        "fault_bin": str(fault_bin),
        "fault_time_idx": fault_time_idx,
        "root_cause_service": rc_service,
        "root_cause_service_idx": rc_service_idx,
        "root_cause_deployment": rc_deployment,
        "root_cause_pods": rc_pods,
        "root_cause_pod_indices": rc_pod_indices,
        "pod_to_idx": pod_to_idx,
    }

    with open(out_dir / "manifest.json", "w") as f:
        json.dump(manifest, f, indent=2)

    with open(out_dir / "ground_truth.json", "w") as f:
        json.dump(ground_truth, f, indent=2)

    return out_dir


def crop_metrics_to_fault_window(
    core_metrics_dir: Path, 
    scenario_id: str,
    pre_minutes: int = PRE_FAULT_MINUTES,
    post_minutes: int = POST_FAULT_MINUTES
) -> tuple[pd.DatetimeIndex, int]:
    """
    Crop all metric parquets to a window around the fault time.
    
    This dramatically reduces:
    - Data size: from 2600-5500 to ~90 timesteps
    - Log processing time: only processes logs in the cropped window
    - Memory usage during training
    
    Args:
        core_metrics_dir: Path to directory with metric parquets
        scenario_id: Scenario identifier
        pre_minutes: Minutes before fault to include
        post_minutes: Minutes after fault to include
        
    Returns:
        cropped_time_index: The new time index after cropping
        fault_time_idx: Index of fault bin in cropped window
    """
    cfg = SCENARIO_CONFIG[scenario_id]
    fault_ts = cfg["fault_timestamp_raw"]
    fault_bin = fault_ts.floor("30s")
    
    # Calculate window bounds
    window_start = fault_bin - pd.Timedelta(minutes=pre_minutes)
    window_end = fault_bin + pd.Timedelta(minutes=post_minutes)
    
    # Get all metric parquet files
    metric_files = list(core_metrics_dir.glob("*.parquet"))
    
    # Read reference to get original shape
    ref_df = pd.read_parquet(core_metrics_dir / "pod_cpu_usage_total.parquet")
    original_len = len(ref_df)
    
    # Crop each parquet file
    cropped_time_index = None
    for parquet_file in metric_files:
        df = pd.read_parquet(parquet_file)
        
        # Crop to window
        mask = (df.index >= window_start) & (df.index <= window_end)
        df_cropped = df.loc[mask]
        
        if cropped_time_index is None:
            cropped_time_index = df_cropped.index
        
        # Overwrite with cropped version
        df_cropped.to_parquet(parquet_file)
    
    # Calculate fault_time_idx in cropped window
    time_index_floored = cropped_time_index.floor("30s")
    if fault_bin in time_index_floored:
        fault_time_idx = int(time_index_floored.get_loc(fault_bin))
    else:
        fault_time_idx = int(np.searchsorted(time_index_floored, fault_bin))
        fault_time_idx = min(fault_time_idx, len(cropped_time_index) - 1)
    
    cropped_len = len(cropped_time_index)
    print(f"  [crop] {original_len} -> {cropped_len} timesteps ({cropped_len/original_len*100:.1f}%)")
    print(f"  [crop] Window: {window_start} to {window_end}")
    print(f"  [crop] Fault at index {fault_time_idx}/{cropped_len}")
    
    return cropped_time_index, fault_time_idx


## 3. Execute Pipeline

Runs align → crop → log aggregation → manifest generation for each scenario.

Raw LEMMA has 2600-5500 timesteps (~22-46h). Cropped to 45 min around fault (15 pre + 30 post → ~90 bins). Log processing parallelized with 20 workers, early-terminates past `time_end` — combined speedup ~95-99%.


In [3]:
def prepare_scenario(scenario_id: str) -> tuple[Path, dict]:
    """
    Prepare a complete scenario and return alignment validation.
    
    Pipeline:
    1. align_and_trim_metrics() - Align to 30s grid
    2. select_core_metrics() - Copy 4 core metrics + create 3 empty extras
    3. crop_metrics_to_fault_window() - Crop to PRE/POST window around fault
    4. aggregate_logs_to_services() - Process logs (only cropped window)
    5. build_manifest_and_ground_truth() - Generate JSON files
    """
    print(f"  [1/5] Aligning metrics to 30s grid...")
    trimmed_dir, validation = align_and_trim_metrics(scenario_id)
    
    print(f"  [2/5] Selecting core metrics...")
    core_metrics_dir = select_core_metrics(trimmed_dir, scenario_id)

    # CROP metrics to fault window BEFORE processing logs
    # This dramatically reduces log processing time
    print(f"  [3/5] Cropping to fault window...")
    cropped_time_index, fault_time_idx = crop_metrics_to_fault_window(
        core_metrics_dir, scenario_id
    )
    
    # Use cropped time_index for subsequent processing
    metrics_ref = pd.read_parquet(core_metrics_dir / "pod_cpu_usage_total.parquet")
    time_index = metrics_ref.index  # Now cropped
    pods = list(metrics_ref.columns)
    pod_to_service = build_pod_to_service_from_pods(pods)
    services = sorted(set(pod_to_service.values()))

    print(f"  [4/5] Aggregating logs to services...")
    logs_texts_path = aggregate_logs_to_services(scenario_id, time_index, services)
    
    print(f"  [5/5] Building manifest and ground truth...")
    multimodal_dir = build_manifest_and_ground_truth(scenario_id, core_metrics_dir, logs_texts_path)
    
    # Save validation info
    val_path = multimodal_dir / "alignment_validation.json"
    with open(val_path, "w") as f:
        json.dump(validation, f, indent=2)

    return multimodal_dir, validation


def print_validation_report(validations: list[dict]):
    """Print alignment validation report."""
    print("\n" + "="*90)
    print("METRIC ALIGNMENT VALIDATION REPORT")
    print("="*90)
    print(f"{'Scenario':<12} {'Metric':<35} {'src_s':>6} {'ref_s':>6} {'tol_s':>6} {'drift':>6} {'nan%':>6} {'imp%':>6}")
    print("-"*90)
    for v in validations:
        sid = v["scenario"]
        ref_step = v["ref_step_s"]
        for mname, m in v["metrics"].items():
            mshort = mname.replace("pod_level_data_", "")[:30]
            print(f"{sid:<12} {mshort:<35} {m['src_step_s']:>6.0f} {ref_step:>6.0f} {m['tolerance_s']:>6.0f} {m['max_drift_s']:>6.1f} {m['nan_ratio']*100:>5.1f}% {m['imputed_ratio']*100:>5.1f}%")
    print("="*90)


all_validations = []
for sid in ["20231207", "20231221", "20240115", "20240215"]:
    print(f"\n{'='*60}")
    print(f"Processing {sid}...")
    print(f"{'='*60}")
    out_dir, validation = prepare_scenario(sid)
    all_validations.append(validation)
    print(f"[OK] {sid} -> {out_dir}")

print_validation_report(all_validations)



Processing 20231207...
  [1/5] Aligning metrics to 30s grid...
  [2/5] Selecting core metrics...
  [metrics] Created 7 parquets in /root/lemm/core_metrics_tmp/20231207
  [3/5] Cropping to fault window...
  [crop] 2642 -> 91 timesteps (3.4%)
  [crop] Window: 2023-12-07 07:38:00+00:00 to 2023-12-07 08:23:00+00:00
  [crop] Fault at index 30/91
  [4/5] Aggregating logs to services...
[INFO] Largest: currencyservice-696d6df76c-xdz9b_messages_structured.csv (1.2 GB)
[INFO] Processing 195 files with 20 CPU workers...
  Processing 20/195 files...
  Processing 40/195 files...
  Processing 60/195 files...
  Processing 80/195 files...
  Processing 100/195 files...
  Processing 120/195 files...
  Processing 140/195 files...
  Processing 160/195 files...
  Processing 180/195 files...
[OK] Processed 195 files
  [5/5] Building manifest and ground truth...
[OK] 20231207 -> /root/lemm/core_multimodal_tmp/20231207

Processing 20231221...
  [1/5] Aligning metrics to 30s grid...
  [2/5] Selecting core me