In [6]:
import os
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed

import pandas as pd
import numpy as np
from tqdm import tqdm

# ---------------------------------------------------------------------
# CONFIGURATION
# ---------------------------------------------------------------------
BASE_DIR      = Path("../data/owd/tor/")   # top-level folder
MAX_WORKERS   = 8                                   # tune to your CPU
REMOVE_OUTLIERS = True                              # toggle IQR filtering
OUT_CSV       = "owd_call_stats.csv"                # summary output file

# ---------------------------------------------------------------------
# Helper: compute stats for one numeric array
# ---------------------------------------------------------------------
def compute_stats(values, remove_outliers=REMOVE_OUTLIERS):
    values = np.asarray(values, dtype=float)

    if remove_outliers and values.size:
        q1, q3 = np.percentile(values, [25, 75])
        iqr = q3 - q1
        low  = q1 - 1.5 * iqr
        high = q3 + 1.5 * iqr
        values = values[(values >= low) & (values <= high)]

    if values.size == 0:
        return dict(count=0, mean=np.nan, std=np.nan, p99=np.nan)

    return dict(
        count=int(values.size),
        mean =float(values.mean()),
        std  =float(values.std()),
        p99  =float(np.percentile(values, 99))
    )

# ---------------------------------------------------------------------
# Worker: process one CSV file and return two rows of stats
# ---------------------------------------------------------------------
def process_file(file_path):
    try:
        df = pd.read_csv(
            file_path,
            sep=r"\s+",                # works for tabs or spaces
            names=["seq", "owd_ms"],
            header=None,
            engine="python"
        )
        df["owd_ms"] = pd.to_numeric(df["owd_ms"], errors="coerce").dropna()

        caller    = df[df["owd_ms"] > 0]["owd_ms"].values
        callee    = np.abs(df[df["owd_ms"] < 0]["owd_ms"].values)

        stats_caller = compute_stats(caller)
        stats_callee = compute_stats(callee)

        # File name minus extension is a convenient call-id
        call_id = Path(file_path).stem

        row_caller = {
            "call_id": call_id,
            "direction": "caller→callee",
            **stats_caller
        }
        row_callee = {
            "call_id": call_id,
            "direction": "callee→caller",
            **stats_callee
        }
        return [row_caller, row_callee]

    except Exception as e:
        print(f"Failed to process {file_path}: {e}")
        return []

# ---------------------------------------------------------------------
# Gather all CSVs and crunch in parallel
# ---------------------------------------------------------------------
all_csv = [
    os.path.join(root, f)
    for root, _, files in os.walk(BASE_DIR)
    for f in files if f.endswith(".csv")
]

rows = []
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
    futures = {ex.submit(process_file, fp): fp for fp in all_csv}
    for fut in tqdm(as_completed(futures), total=len(futures), desc="Calls processed"):
        rows.extend(fut.result())

# ---------------------------------------------------------------------
# Save summary to CSV
# ---------------------------------------------------------------------
summary_df = pd.DataFrame(rows)
summary_df.to_csv(OUT_CSV, index=False)
print(f"✔  Per-call statistics written to {OUT_CSV}")


Calls processed: 100%|███████████████████| 23471/23471 [02:26<00:00, 160.49it/s]

✔  Per-call statistics written to owd_call_stats.csv



