# OC-SVM v2 (Runtime-Capped) — AIS Anomaly
**Objetivo**: maximizar CPU/RAM y limitar runtime con:
- Subsampling controlado para HP search (FAST/FULL).
- RobustScaler (menos sensible a outliers).
- (Opcional) PCA 32D para acelerar kernel sin perder señal.
- HP search paralela (joblib) con grid acotado eficaz.
- Evaluación por lotes **reanudable** (memmap) + métricas y @k.

In [1]:
# === Config dinámica consciente de RAM (reemplaza tu Celda 2) ===
import os, sys, json, getpass, socket, psutil
from pathlib import Path

print("User:", getpass.getuser())
print("Host:", socket.gethostname())
print("Python:", sys.executable)
print("CWD:", os.getcwd())

# MODO por defecto (rápido y seguro). Si todo va bien, puedes subir a "FULL_SAFE".
MODE = "FAST_SAFE"   # "FAST_SAFE" o "FULL_SAFE"

RAM_GB = psutil.virtual_memory().total / 1e9
N_CPU  = os.cpu_count()

MODES = {
    "FAST_SAFE": dict(
        max_train_samples   = 120_000,   # OC-SVM kernelizado no escala más sin cluster.
        max_search_samples  = 60_000,
        kfold_splits        = 3,
        eval_batch_size     = 2_000_000,
        use_pca             = True,
        pca_components      = 16,        # reducimos para acelerar kernel
        svm_nu_grid         = [0.02, 0.03, 0.05, 0.08],
        svm_gamma_grid      = ["auto", "scale", 0.1, 0.03],
    ),
    "FULL_SAFE": dict(
        max_train_samples   = 180_000,
        max_search_samples  = 100_000,
        kfold_splits        = 4,
        eval_batch_size     = 2_000_000,
        use_pca             = True,
        pca_components      = 24,
        svm_nu_grid         = [0.01, 0.02, 0.03, 0.05, 0.08],
        svm_gamma_grid      = ["auto", "scale", 0.3, 0.1, 0.03, 0.01],
    )
}

# Hilos BLAS (no procesos) para compartir memoria y evitar forks gigantes:
N_THREADS = max(1, N_CPU - 1)
os.environ["OMP_NUM_THREADS"] = str(N_THREADS)
os.environ["OPENBLAS_NUM_THREADS"] = str(N_THREADS)
os.environ["MKL_NUM_THREADS"] = str(N_THREADS)
os.environ["NUMEXPR_NUM_THREADS"] = str(N_THREADS)

# Rutas
EXTERNAL_DATA_DIR = Path("/teamspace/studios/this_studio/data").resolve()
OUT_DIR = Path("data/ocsvm_runs").resolve(); OUT_DIR.mkdir(parents=True, exist_ok=True)

CFG = {
    "external_data_dir": str(EXTERNAL_DATA_DIR),
    "out_dir": str(OUT_DIR),
    "artifact_prefix": "ocsvm_v2",
    "kernel": "rbf",
    **MODES[MODE],
}

# Tamaño de caché por modelo (MB) controlado por RAM y n_splits (no por #CPU):
# Usamos ~6% de RAM total repartida entre folds en paralelo; límite superior de 2048 MB por modelo.
per_model_cache_mb = int(min(2048, (RAM_GB * 0.06 * 1024) / CFG["kfold_splits"]))
CFG["per_model_cache_mb"] = max(256, per_model_cache_mb)  # no menos de 256MB

# Concurrencia: paralelizamos sobre FOLDS (no sobre parámetros), para acotar el peak de RAM.
# n_jobs_folds <= kfold_splits y <= 6 para no saturar.
CFG["n_jobs_folds"] = int(min(CFG["kfold_splits"], 6))

print("Mode:", MODE, "| Cores:", N_CPU, "| Threads BLAS:", N_THREADS, "| RAM:", f"{RAM_GB:.1f} GB")
print(json.dumps(CFG, indent=2))

User: erickdsuarez10
Host: computeinstance-e00xe50gspktqg7q39
Python: /home/zeus/miniconda3/envs/cloudspace/bin/python
CWD: /teamspace/studios/this_studio
Mode: FAST_SAFE | Cores: 32 | Threads BLAS: 31 | RAM: 135.1 GB
{
  "external_data_dir": "/teamspace/studios/this_studio/data",
  "out_dir": "/teamspace/studios/this_studio/data/ocsvm_runs",
  "artifact_prefix": "ocsvm_v2",
  "kernel": "rbf",
  "max_train_samples": 120000,
  "max_search_samples": 60000,
  "kfold_splits": 3,
  "eval_batch_size": 2000000,
  "use_pca": true,
  "pca_components": 16,
  "svm_nu_grid": [
    0.02,
    0.03,
    0.05,
    0.08
  ],
  "svm_gamma_grid": [
    "auto",
    "scale",
    0.1,
    0.03
  ],
  "per_model_cache_mb": 2048,
  "n_jobs_folds": 3
}


In [2]:
import numpy as np, pandas as pd, gc, pyarrow.parquet as pq

DATA_DIR = Path(CFG["external_data_dir"])
assert DATA_DIR.exists(), f"No existe: {DATA_DIR}"

def is_valid_parquet(p: Path):
    try: pq.ParquetFile(p); return True
    except: return False

def read_min(p: Path) -> pd.DataFrame:
    df = pd.read_parquet(p, engine="pyarrow")
    for c in df.columns:
        if pd.api.types.is_float_dtype(df[c]): df[c] = df[c].astype(np.float32)
        elif pd.api.types.is_integer_dtype(df[c]) and df[c].max() <= np.iinfo(np.int32).max:
            df[c] = df[c].astype(np.int32)
    return df

def detect_col(df, cands):
    for c in cands:
        if c in df.columns: return c
    return None

def pick(base: Path, *names):
    # exactos
    for n in names:
        if "*" not in n:
            p = base / n
            if p.exists() and is_valid_parquet(p): return p
    # patrones
    for patt in names:
        if "*" in patt:
            ms = sorted(base.glob(patt), key=lambda x: x.stat().st_size if x.exists() else 0, reverse=True)
            for m in ms:
                if is_valid_parquet(m): return m
    return None

# TRAIN (normal windows)
train_path = pick(DATA_DIR,
                  "windows_aligned_normal.parquet",
                  "norm_windows_flat.parquet",
                  "*windows_with_labels_aligned_norm*.parquet",
                  "*eval_windows_aligned_norm*.parquet")
assert train_path, "No encontré TRAIN normal."
df_tr = read_min(train_path)
gcol_tr = detect_col(df_tr, ["mmsi","group","ship_id"])

drop_common = {"lat","lon","idx","idx_end","window_id"}
feat_tr = [c for c in df_tr.columns if c not in set([gcol_tr]) | drop_common]
X_train = df_tr[feat_tr].to_numpy(dtype=np.float32)
groups_train = df_tr[gcol_tr].to_numpy() if gcol_tr else None
print("TRAIN ->", train_path.name, "| X:", X_train.shape)

# EVAL (preferimos single alineado con labels embebidas; si no, tomamos el mayor aligned)
eval_single = pick(DATA_DIR,
    "windows_with_labels_aligned.parquet",
    "*windows_with_labels_aligned*.parquet",
    "eval_windows_aligned.parquet",
    "*eval_windows_aligned*.parquet",
)
assert eval_single, "No encontré EVAL aligned."
df_ev = read_min(eval_single)
gcol_ev = detect_col(df_ev, ["mmsi","group","ship_id"])
ycol_ev = detect_col(df_ev, ["is_suspicious","label","y","target"])  # puede no existir

feat_ev = [c for c in df_ev.columns if c not in set([gcol_ev, ycol_ev]) | drop_common if c is not None]
X_eval = df_ev[feat_ev].to_numpy(dtype=np.float32)
groups_eval = df_ev[gcol_ev].to_numpy() if gcol_ev else None

# y_eval: usaremos labels_anom por JOIN (más confiable); para métricas
labels_anom = pick(DATA_DIR, "labels_anom.parquet", "*labels_anom*.parquet")
assert labels_anom, "No encontré labels_anom.parquet (necesario para métricas)."

print("EVAL ->", eval_single.name, "| X:", X_eval.shape, "| y from:", labels_anom.name)
del df_tr; gc.collect()

TRAIN -> windows_aligned_normal.parquet | X: (27789660, 19)
EVAL -> windows_with_labels_aligned.parquet | X: (27789660, 19) | y from: labels_anom.parquet


0

In [3]:
# --- Preprocesamiento RAM-friendly con imputación robusta + RobustScaler + (opcional) PCA ---
import numpy as np, gc
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import RobustScaler
from sklearn.decomposition import PCA

def naninf_to_nan(arr):
    """Reemplaza ±Inf por NaN in-place (float32 friendly)."""
    if not np.issubdtype(arr.dtype, np.floating):
        arr = arr.astype(np.float32, copy=False)
    # ±Inf -> NaN
    mask_inf = ~np.isfinite(arr)
    if mask_inf.any():
        arr[mask_inf] = np.nan
    return arr

def sample_by_group(n_max, X, groups):
    if (n_max is None) or (X.shape[0] <= n_max):
        idx = np.arange(X.shape[0]); return X, (groups if groups is not None else None), idx
    rng = np.random.default_rng(42)
    if groups is None:
        idx = rng.choice(X.shape[0], n_max, replace=False); return X[idx], None, idx
    uniq = np.unique(groups); per_g = max(1, n_max // len(uniq)); take=[]
    for g in uniq:
        gi = np.where(groups==g)[0]
        take.extend(rng.choice(gi, min(per_g, gi.size), replace=False).tolist())
    take = np.array(take)
    if take.size > n_max: take = rng.choice(take, n_max, replace=False)
    return X[take], groups[take], take

# 0) Subset para HP search
X_train_s, groups_train_s, idx_train_s = sample_by_group(CFG["max_search_samples"], X_train, groups_train)
print("Search subset:", X_train_s.shape)

# 1) Limpieza de ±Inf -> NaN (subset y full)
X_train_s = naninf_to_nan(X_train_s.astype(np.float32, copy=False))
X_train    = naninf_to_nan(X_train.astype(np.float32, copy=False))

# 2) Imputación por mediana (fit en subset para ser eficiente/estable)
imputer = SimpleImputer(strategy="median")
X_train_s = imputer.fit_transform(X_train_s)
X_train_i = imputer.transform(X_train)        # full train imputado

# 3) RobustScaler (fit en subset imputado; apply a todo)
scaler = RobustScaler(with_centering=True, with_scaling=True, quantile_range=(25.0, 75.0))
X_train_s = scaler.fit_transform(X_train_s)
X_train_sc = scaler.transform(X_train_i)      # full train escalado

# 4) (Opcional) PCA para acelerar kernel
USE_PCA = bool(CFG.get("use_pca", True))
if USE_PCA:
    # Ajuste automático al máximo permitido (no más de n_features)
    n_comp = min(int(CFG["pca_components"]), X_train_s.shape[1])
    if n_comp < 2:
        USE_PCA = False
        print("⚠️ PCA deshabilitado: menos de 2 features útiles.")
    else:
        pca = PCA(n_components=n_comp, svd_solver="auto", random_state=42, whiten=False)
        X_train_s = pca.fit_transform(X_train_s)
        X_train_sc = pca.transform(X_train_sc)
        print(f"PCA aplicado: {n_comp} componentes  →  {X_train_sc.shape[1]} features")
else:
    print("PCA desactivado.")
# 5) Transformador por lotes para EVAL (misma imputación/escala/PCA)
def transform_eval_batches(X, batch_size:int):
    n = X.shape[0]
    for s in range(0, n, batch_size):
        e = min(s+batch_size, n)
        Xe = X[s:e].astype(np.float32, copy=False)
        Xe = naninf_to_nan(Xe)          # ±Inf -> NaN
        Xe = imputer.transform(Xe)      # imputación con mediana
        Xe = scaler.transform(Xe)       # robust scaling
        if USE_PCA:
            Xe = pca.transform(Xe)      # reducción
        yield s, e, Xe.astype(np.float32, copy=False)

print("Train (scaled):", X_train_sc.shape)
gc.collect()

Search subset: (59950, 19)
PCA aplicado: 16 componentes  →  16 features
Train (scaled): (27789660, 16)


0

In [4]:
# === HP search estable (paralelismo por FOLDS; memoria acotada) — reemplaza tu Celda 5 ===
import numpy as np, pandas as pd
from sklearn.model_selection import GroupKFold, KFold, ParameterGrid
from sklearn.svm import OneClassSVM
from joblib import Parallel, delayed, parallel_backend
import time, math, gc

param_grid = list(ParameterGrid({"nu": CFG["svm_nu_grid"], "gamma": CFG["svm_gamma_grid"]}))
X_search = X_train_s
groups_search = groups_train_s

# Splitter (group-aware si procede)
if (groups_search is not None) and (len(np.unique(groups_search)) >= CFG["kfold_splits"]):
    splitter = GroupKFold(n_splits=CFG["kfold_splits"])
    split_args = dict(X=X_search, y=None, groups=groups_search)
else:
    splitter = KFold(n_splits=CFG["kfold_splits"], shuffle=True, random_state=42)
    split_args = dict(X=X_search, y=None)

target_rate = 0.035  # ~3.5% prior
cache_mb = int(CFG["per_model_cache_mb"])
n_jobs_folds = int(CFG["n_jobs_folds"])

def out_rate(pred):
    return float((pred == -1).mean())

def eval_one_fold(p, tr, va):
    # Entrena un OCSVM y devuelve outlier_rate en va
    m = OneClassSVM(
        kernel=CFG["kernel"],
        nu=p["nu"],
        gamma=p["gamma"],
        cache_size=cache_mb,   # cache por modelo controlado
        tol=1e-3,
        shrinking=True
    )
    m.fit(X_search[tr])
    pred = m.predict(X_search[va])
    return out_rate(pred)

def eval_param_stable(p):
    t0 = time.time()
    folds = list(splitter.split(**split_args))
    # Paralelismo solo sobre folds, con backend "threads" para compartir memoria
    with parallel_backend("threading", n_jobs=n_jobs_folds):
        rates = Parallel(verbose=0)(
            delayed(eval_one_fold)(p, tr, va) for (tr, va) in folds
        )
    rmean, rstd = float(np.mean(rates)), float(np.std(rates))
    obj = abs(rmean - target_rate) + rstd
    dt = time.time() - t0
    return {"params": p, "rate_mean": rmean, "rate_std": rstd, "obj": obj, "secs": dt}

rows = []
for i, p in enumerate(param_grid, 1):
    row = eval_param_stable(p)
    rows.append(row)
    print(f"[{i}/{len(param_grid)}] {row['params']} | rate_mean={row['rate_mean']:.4f} "
          f"| rate_std={row['rate_std']:.4f} | obj={row['obj']:.4f} | {row['secs']:.1f}s")
    # Garbage collect para evitar crecimiento de memoria entre iteraciones
    gc.collect()

res = pd.DataFrame(rows).sort_values("obj").reset_index(drop=True)
display(res.head(10))
best_cfg = res.iloc[0]["params"]
print("Best:", best_cfg, "| cache/worker:", cache_mb, "MB", "| folds_workers:", n_jobs_folds)

[1/16] {'gamma': 'auto', 'nu': 0.02} | rate_mean=0.4450 | rate_std=0.0335 | obj=0.4435 | 97.4s
[2/16] {'gamma': 'auto', 'nu': 0.03} | rate_mean=0.4451 | rate_std=0.0333 | obj=0.4434 | 110.1s
[3/16] {'gamma': 'auto', 'nu': 0.05} | rate_mean=0.4451 | rate_std=0.0335 | obj=0.4436 | 109.0s
[4/16] {'gamma': 'auto', 'nu': 0.08} | rate_mean=0.4452 | rate_std=0.0334 | obj=0.4436 | 95.0s
[5/16] {'gamma': 'scale', 'nu': 0.02} | rate_mean=0.0201 | rate_std=0.0037 | obj=0.0186 | 2.5s
[6/16] {'gamma': 'scale', 'nu': 0.03} | rate_mean=0.0306 | rate_std=0.0058 | obj=0.0102 | 4.0s
[7/16] {'gamma': 'scale', 'nu': 0.05} | rate_mean=0.0503 | rate_std=0.0083 | obj=0.0236 | 7.1s
[8/16] {'gamma': 'scale', 'nu': 0.08} | rate_mean=0.0806 | rate_std=0.0110 | obj=0.0567 | 12.5s
[9/16] {'gamma': 0.1, 'nu': 0.02} | rate_mean=0.5048 | rate_std=0.0374 | obj=0.5071 | 120.0s
[10/16] {'gamma': 0.1, 'nu': 0.03} | rate_mean=0.5055 | rate_std=0.0372 | obj=0.5077 | 124.7s
[11/16] {'gamma': 0.1, 'nu': 0.05} | rate_mean=0.5

Unnamed: 0,params,rate_mean,rate_std,obj,secs
0,"{'gamma': 'scale', 'nu': 0.03}",0.030601,0.005824,0.010223,3.957843
1,"{'gamma': 'scale', 'nu': 0.02}",0.020063,0.003675,0.018612,2.526136
2,"{'gamma': 'scale', 'nu': 0.05}",0.05028,0.008271,0.023551,7.097527
3,"{'gamma': 'scale', 'nu': 0.08}",0.080638,0.011033,0.05667,12.485001
4,"{'gamma': 0.03, 'nu': 0.03}",0.36164,0.027008,0.353648,67.754969
5,"{'gamma': 0.03, 'nu': 0.02}",0.36199,0.027027,0.354016,66.003989
6,"{'gamma': 0.03, 'nu': 0.08}",0.361939,0.027108,0.354047,63.709982
7,"{'gamma': 0.03, 'nu': 0.05}",0.362056,0.027061,0.354117,71.579432
8,"{'gamma': 'auto', 'nu': 0.03}",0.445067,0.033348,0.443415,110.119922
9,"{'gamma': 'auto', 'nu': 0.02}",0.444965,0.033505,0.44347,97.429803


Best: {'gamma': 'scale', 'nu': 0.03} | cache/worker: 2048 MB | folds_workers: 3


In [3]:
# === Celda de recuperación de artefactos (CFG + Imputer + Scaler + PCA + X_train) ===
import os, json, numpy as np, pandas as pd, gc, joblib
from pathlib import Path
import pyarrow.parquet as pq
from sklearn.impute import SimpleImputer

OUT_DIR = Path("data/ocsvm_runs").resolve()
DATA_DIR = Path("/teamspace/studios/this_studio/data").resolve()

# 1) Cargar CFG
cfg_candidates = ["ocsvm_v2_config.json", "ocsvm_rbf_config.json"]
cfg_path = next((OUT_DIR / f for f in cfg_candidates if (OUT_DIR / f).exists()), None)
assert cfg_path is not None, f"No encontré config en {OUT_DIR}"
CFG = json.loads(cfg_path.read_text())
print("CFG:", json.dumps({k: CFG.get(k) for k in ["artifact_prefix","max_train_samples","kernel"]}, indent=2))

prefix = CFG.get("artifact_prefix", "ocsvm_rbf")

# 2) Cargar / reconstruir IMPUTER
imputer_pkl = OUT_DIR / f"{prefix}_imputer.pkl"
imputer_meds = OUT_DIR / f"{prefix}_imputer_medians.npy"
if imputer_pkl.exists():
    imputer = joblib.load(imputer_pkl)
    print("✅ Imputer cargado:", imputer_pkl.name)
elif imputer_meds.exists():
    meds = np.load(imputer_meds).astype(np.float32)
    imputer = SimpleImputer(strategy="median")
    # hack limpio para inicializar la forma y luego inyectar medianas
    imputer.fit(np.zeros((1, meds.size), dtype=np.float32))
    imputer.statistics_ = meds
    print("✅ Imputer reconstruido desde medianas:", imputer_meds.name, "| n_features:", meds.size)
else:
    raise FileNotFoundError("No hay imputer.pkl ni *_imputer_medians.npy en OUT_DIR.")

# 3) Cargar SCALER
scaler_pkl = OUT_DIR / f"{prefix}_scaler.pkl"
assert scaler_pkl.exists(), f"No existe {scaler_pkl}"
scaler = joblib.load(scaler_pkl)
print("✅ Scaler cargado:", scaler_pkl.name)

# 4) Cargar PCA (opcional)
pca_pkl = OUT_DIR / f"{prefix}_pca.pkl"
pca = joblib.load(pca_pkl) if pca_pkl.exists() else None
print("ℹ️ PCA:", "cargado" if pca is not None else "no encontrado (se trabajará sin PCA)")

# 5) Cargar X_train rápido (para re-fit o SGD-RFF)
train_path = DATA_DIR / "windows_aligned_normal.parquet"
assert train_path.exists(), f"No se encontró {train_path}"
df_tr = pq.read_table(train_path).to_pandas()
for c in df_tr.columns:
    if pd.api.types.is_float_dtype(df_tr[c]): df_tr[c] = df_tr[c].astype(np.float32)

mmsi_col = "mmsi" if "mmsi" in df_tr.columns else None
drop_cols = {"lat","lon","idx","idx_end","window_id"}
feature_cols = [c for c in df_tr.columns if c not in drop_cols and c != mmsi_col]
X_train = df_tr[feature_cols].to_numpy(dtype=np.float32)
groups_train = df_tr[mmsi_col].to_numpy() if mmsi_col else None
del df_tr; gc.collect()

print(f"✅ X_train listo: {X_train.shape} | features={len(feature_cols)} | groups={'yes' if groups_train is not None else 'no'}")

CFG: {
  "artifact_prefix": "ocsvm_rbf",
  "max_train_samples": 800000,
  "kernel": "rbf"
}
✅ Imputer reconstruido desde medianas: ocsvm_rbf_imputer_medians.npy | n_features: 19
✅ Scaler cargado: ocsvm_rbf_scaler.pkl
ℹ️ PCA: no encontrado (se trabajará sin PCA)


✅ X_train listo: (27789660, 19) | features=19 | groups=yes


In [6]:
# === 6B (autosuficiente): RFF + SGDOneClassSVM (rápido y estable) ===
import numpy as np, time, joblib, gc, psutil, os
from sklearn.kernel_approximation import RBFSampler
from sklearn.linear_model import SGDOneClassSVM

# ---- helpers (por si no están en memoria) ----
def sample_by_group(n_max, X, groups):
    if (n_max is None) or (X.shape[0] <= n_max):
        idx = np.arange(X.shape[0]); return X, (groups if groups is not None else None), idx
    rng = np.random.default_rng(42)
    if groups is None:
        idx = rng.choice(X.shape[0], n_max, replace=False); return X[idx], None, idx
    uniq = np.unique(groups); per_g = max(1, n_max // len(uniq)); take=[]
    for g in uniq:
        gi = np.where(groups==g)[0]
        take.extend(rng.choice(gi, min(per_g, gi.size), replace=False).tolist())
    take = np.array(take)
    if take.size > n_max: take = rng.choice(take, n_max, replace=False)
    return X[take], (groups[take] if groups is not None else None), take

def naninf_to_nan(arr):
    if not np.issubdtype(arr.dtype, np.floating):
        arr = arr.astype(np.float32, copy=False)
    bad = ~np.isfinite(arr)
    if bad.any(): arr[bad] = np.nan
    return arr

# ---- 1) Subset para el fit (respetando CFG) ----
n_fit = int(CFG.get("max_train_samples", 120_000))
X_fit_raw, _, _ = sample_by_group(n_fit, X_train, groups_train)
print(f"[FIT-RFF] usando subset: {X_fit_raw.shape[0]:,} filas  |  RAM libre: {psutil.virtual_memory().available/1e9:.1f} GB")

# ---- 2) Prepro (imputer -> scaler -> (opcional) pca) ----
X_fit = X_fit_raw.astype(np.float32, copy=False)
X_fit = naninf_to_nan(X_fit)
X_fit = imputer.transform(X_fit)
X_fit = scaler.transform(X_fit)
USE_PCA = 'pca' in globals() and (pca is not None)
if USE_PCA:
    X_fit = pca.transform(X_fit)
print("[FIT-RFF] shape tras prepro:", X_fit.shape)

# ---- 3) RFF: elegir dimensionalidad D dinámica según RAM y n_fit ----
# Objetivo: ~2 GB para la matriz expandida (float32)
# D ≈ floor(2e9 bytes / (n_fit * 4 bytes))
D_target = max(256, min(1024, int(2_000_000_000 // (max(1, X_fit.shape[0]) * 4))))
D = int(D_target)
# gamma RBF: si no tenemos best_cfg, usa 0.1 como heurística segura
gamma_cfg = 0.1
if "best_cfg" in globals() and best_cfg:
    g = best_cfg.get("gamma", "auto")
    gamma_cfg = 0.1 if (g in ["auto","scale"]) else float(g)
print(f"[FIT-RFF] D={D}  |  gamma≈{gamma_cfg}")

rff = RBFSampler(gamma=gamma_cfg, n_components=D, random_state=42)
t0 = time.time()
X_rff = rff.fit_transform(X_fit)  # ocupa ~ n_fit x D x 4 bytes
print(f"[FIT-RFF] RFF listo en {time.time()-t0:.1f}s  |  RAM libre: {psutil.virtual_memory().available/1e9:.1f} GB")

# ---- 4) Entrenar SGDOneClassSVM (lineal en espacio RFF) ----
nu_val = 0.05 if "best_cfg" not in globals() or not best_cfg else float(best_cfg.get("nu", 0.05))

# Constructor compatible y estable
sgd_oc = SGDOneClassSVM(
    nu=nu_val,
    fit_intercept=True,
    shuffle=True,
    random_state=42,
    tol=1e-3,
    max_iter=2000,      # puedes subir a 5000 si lo ves converger rápido
    # nota: NO usar early_stopping / learning_rate aquí
)

t1 = time.time()
sgd_oc.fit(X_rff)
print(f"[FIT-RFF] SGDOneClassSVM entrenado en {(time.time()-t1)/60:.1f} min  |  soporte: {np.count_nonzero(sgd_oc.coef_)} de {D}")

# ---- 5) Guardar artefactos (pipeline) ----
prefix = CFG.get("artifact_prefix","ocsvm_rbf")
joblib.dump(imputer, OUT_DIR / f"{prefix}_imputer.pkl")
joblib.dump(scaler,  OUT_DIR / f"{prefix}_robust_scaler.pkl")
if USE_PCA: joblib.dump(pca, OUT_DIR / f"{prefix}_pca.pkl")
joblib.dump(rff,     OUT_DIR / f"{prefix}_rff.pkl")
joblib.dump(sgd_oc,  OUT_DIR / f"{prefix}_model_sgd_rff.pkl")

print("✅ Guardados:",
      (OUT_DIR / f"{prefix}_imputer.pkl").name,
      (OUT_DIR / f"{prefix}_robust_scaler.pkl").name,
      (OUT_DIR / f"{prefix}_rff.pkl").name,
      (OUT_DIR / f"{prefix}_model_sgd_rff.pkl").name)

gc.collect()

[FIT-RFF] usando subset: 799,920 filas  |  RAM libre: 118.1 GB
[FIT-RFF] shape tras prepro: (799920, 19)
[FIT-RFF] D=625  |  gamma≈0.1
[FIT-RFF] RFF listo en 2.1s  |  RAM libre: 118.0 GB
[FIT-RFF] SGDOneClassSVM entrenado en 0.1 min  |  soporte: 625 de 625
✅ Guardados: ocsvm_rbf_imputer.pkl ocsvm_rbf_robust_scaler.pkl ocsvm_rbf_rff.pkl ocsvm_rbf_model_sgd_rff.pkl


362

In [8]:
# --- Reconstrucción X_eval (desde parquets aligned) ---
import pandas as pd, numpy as np, pyarrow.parquet as pq
from pathlib import Path
import gc

DATA_DIR = Path(CFG["external_data_dir"])
assert DATA_DIR.exists()

# Elegimos el mayor *windows*aligned*.parquet para garantizar orden/longitud
eval_path = max(DATA_DIR.glob("*windows*aligned*.parquet"), key=lambda p: p.stat().st_size)
print("EVAL parquet:", eval_path.name)

# Lectura
df_ev = pq.read_table(eval_path).to_pandas()
for c in df_ev.columns:
    if pd.api.types.is_float_dtype(df_ev[c]): df_ev[c] = df_ev[c].astype(np.float32)

# Detectar columnas a excluir
drop_common = {"lat","lon","idx","idx_end","window_id"}
gcol_ev = next((c for c in ["mmsi","group","ship_id"] if c in df_ev.columns), None)
ycol_ev = next((c for c in ["is_suspicious","label","y","target"] if c in df_ev.columns), None)

feature_cols_ev = [c for c in df_ev.columns if c not in (drop_common | {gcol_ev, ycol_ev} if gcol_ev else drop_common)]
X_eval = df_ev[feature_cols_ev].to_numpy(dtype=np.float32)
groups_eval = df_ev[gcol_ev].to_numpy() if gcol_ev else None

print("X_eval:", X_eval.shape, "| feats:", len(feature_cols_ev), "| groups:", gcol_ev)
del df_ev; gc.collect()

EVAL parquet: windows_with_labels_aligned.parquet
X_eval: (27789660, 19) | feats: 19 | groups: mmsi


1871

In [9]:
# --- Scoring por lotes (RFF+SGD) ---
import numpy as np, os, time, joblib
from pathlib import Path

OUT = Path(CFG["out_dir"]); OUT.mkdir(parents=True, exist_ok=True)
prefix = CFG.get("artifact_prefix","ocsvm_rbf")
scores_path = OUT / f"{prefix}_eval_scores_mm.dat"

# Cargar artefactos
imputer = joblib.load(OUT / f"{prefix}_imputer.pkl")
scaler  = joblib.load(OUT / f"{prefix}_robust_scaler.pkl")
rff     = joblib.load(OUT / f"{prefix}_rff.pkl")
sgd_oc  = joblib.load(OUT / f"{prefix}_model_sgd_rff.pkl")

# Memmap
n_eval = X_eval.shape[0]
if scores_path.exists(): os.remove(scores_path)
scores_mm = np.memmap(scores_path, dtype=np.float32, mode="w+", shape=(n_eval,))

def naninf_to_nan(arr):
    if not np.issubdtype(arr.dtype, np.floating):
        arr = arr.astype(np.float32, copy=False)
    bad = ~np.isfinite(arr)
    if bad.any(): arr[bad] = np.nan
    return arr

B = int(CFG.get("eval_batch_size", 2_000_000))
t0=time.time(); last=t0
for s in range(0, n_eval, B):
    e = min(s+B, n_eval)
    Xe = X_eval[s:e].astype(np.float32, copy=False)
    Xe = naninf_to_nan(Xe)
    Xe = imputer.transform(Xe)
    Xe = scaler.transform(Xe)
    Xe = rff.transform(Xe)
    scores_mm[s:e] = -sgd_oc.decision_function(Xe)
    scores_mm.flush()
    now=time.time()
    if now-last>10:
        print(f"Progress: {100*e/n_eval:5.2f}% | {e:,}/{n_eval:,} rows")
        last=now

scores = scores_mm  # vista
print("Scores memmap:", scores_path, "| shape:", scores.shape)

Progress: 14.39% | 4,000,000/27,789,660 rows
Progress: 28.79% | 8,000,000/27,789,660 rows
Progress: 43.18% | 12,000,000/27,789,660 rows
Progress: 57.58% | 16,000,000/27,789,660 rows
Progress: 71.97% | 20,000,000/27,789,660 rows
Progress: 86.36% | 24,000,000/27,789,660 rows
Progress: 100.00% | 27,789,660/27,789,660 rows
Scores memmap: /teamspace/studios/this_studio/data/ocsvm_runs/ocsvm_rbf_eval_scores_mm.dat | shape: (27789660,)


In [10]:
# --- Métricas: JOIN por 'window_id' + guardado ---
import json, numpy as np, pandas as pd, pyarrow.parquet as pq
from sklearn.metrics import roc_auc_score, average_precision_score, precision_recall_curve, auc

OUT = Path(CFG["out_dir"]); prefix = CFG.get("artifact_prefix","ocsvm_rbf")

# Claves de eval (orden exacto)
eval_pf = pq.ParquetFile(max(Path(CFG["external_data_dir"]).glob("*windows*aligned*.parquet"),
                             key=lambda p: p.stat().st_size))
key_eval = next(k for k in ["window_id","idx","idx_end","row","row_id"] if k in eval_pf.schema_arrow.names)
eval_key = eval_pf.read(columns=[key_eval]).to_pandas()[key_eval].astype(np.int64).reset_index(drop=True)

# Conjunto de anómalos
lab_pf = pq.ParquetFile(next(Path(CFG["external_data_dir"]).glob("*labels_anom*.parquet")))
key_lab = key_eval if key_eval in lab_pf.schema_arrow.names else \
          next(k for k in ["window_id","idx","idx_end","row","row_id"] if k in lab_pf.schema_arrow.names)
anom_set = set(lab_pf.read(columns=[key_lab]).to_pandas()[key_lab].astype(np.int64).tolist())

y_eval = eval_key.isin(anom_set).astype(int).to_numpy()
print("Positivos:", int(y_eval.sum()), "/", len(y_eval))

# Métricas globales
if len(np.unique(y_eval)) > 1:
    roc = roc_auc_score(y_eval, scores)
    prc, rec, _ = precision_recall_curve(y_eval, scores); pr_auc = auc(rec, prc)
    ap = average_precision_score(y_eval, scores)
else:
    roc = pr_auc = ap = float('nan')
print(f"ROC-AUC:{roc:.4f} | PR-AUC:{pr_auc:.4f} | AP:{ap:.4f}")

# @k=1%
k_rate=0.01; k=max(1,int(len(scores)*k_rate))
thr=np.partition(scores,-k)[-k]; pred=(scores>=thr).astype(np.int8)
tp=int(((pred==1)&(y_eval==1)).sum()); fp=int(((pred==1)&(y_eval==0)).sum()); fn=int(((pred==0)&(y_eval==1)).sum())
P=tp/(tp+fp) if (tp+fp)>0 else 0.0; R=tp/(tp+fn) if (tp+fn)>0 else 0.0; F1=2*P*R/(P+R) if (P+R)>0 else 0.0
print(f"@1% -> P:{P:.4f} | R:{R:.4f} | F1:{F1:.4f}  (k={k})")

# Guardar/actualizar config
cfg_path = OUT / f"{prefix}_config.json"
cfg = json.loads(cfg_path.read_text()) if cfg_path.exists() else {}
cfg.update({
    "artifact_prefix": prefix,
    "external_data_dir": CFG["external_data_dir"],
    "model_type": "SGDOneClassSVM_RFF",
    "rff_components": int(rff.n_components),
    "best_params": {"nu": float(getattr(sgd_oc, "nu", 0.05)), "gamma": "approx(%.4f)" % getattr(rff, "gamma", 0.1)},
    "metrics": {"roc_auc": float(roc), "pr_auc": float(pr_auc), "ap": float(ap)},
    "metrics_at_k": {"0.01": {"k": int(k), "precision": float(P), "recall": float(R), "f1": float(F1)}},
    "label_file": lab_pf.metadata.metadata.get(b"ARROW:schema", b"labels_anom.parquet").decode(errors="ignore") if hasattr(lab_pf,'metadata') else "labels_anom.parquet",
    "label_col": key_lab,
    "label_mapping": f"JOIN on {key_eval}"
})
cfg_path.write_text(json.dumps(cfg, indent=2))
print("✅ Config actualizado:", cfg_path)

Positivos: 1003200 / 27789660
ROC-AUC:0.4848 | PR-AUC:0.0390 | AP:0.0390
@1% -> P:0.0671 | R:0.0186 | F1:0.0291  (k=277896)
✅ Config actualizado: /teamspace/studios/this_studio/data/ocsvm_runs/ocsvm_rbf_config.json


In [11]:
# --- Export Top-1% y agregados por MMSI ---
import numpy as np, pandas as pd, os

prefix = CFG.get("artifact_prefix","ocsvm_rbf")
topk_rate = 0.01
k = max(1, int(len(scores) * topk_rate))
thr_k = np.partition(scores, -k)[-k]
pred_topk = (scores >= thr_k)

rows = np.where(pred_topk)[0]
top = pd.DataFrame({"idx": rows, "anomaly_score": scores[rows].astype(np.float32)})
if 'groups_eval' in globals() and groups_eval is not None:
    top["mmsi"] = groups_eval[rows].astype(np.int64)

top = top.sort_values("anomaly_score", ascending=False)
out_pq = OUT / f"{prefix}_top1pct_detailed.parquet"
out_csv = OUT / f"{prefix}_top1pct_detailed.csv"
top.to_parquet(out_pq, index=False); top.to_csv(out_csv, index=False)
print("Saved TOP-1%:", out_pq, "y", out_csv, "| rows:", len(top))

# Agregado por MMSI (si hay grupos)
if 'groups_eval' in globals() and groups_eval is not None:
    mmsi_all, n_by_mmsi = np.unique(groups_eval, return_counts=True)
    mmsi_top, n_top_by_mmsi = np.unique(groups_eval[pred_topk==1], return_counts=True)
    top_map = dict(zip(mmsi_top.tolist(), n_top_by_mmsi.tolist()))
    anom_win = np.array([top_map.get(m, 0) for m in mmsi_all], dtype=np.int32)
    anom_rate = anom_win / n_by_mmsi
    agg_df = pd.DataFrame({"mmsi": mmsi_all, "n_win": n_by_mmsi, "anom_win": anom_win, "anom_rate": anom_rate})
    agg_path = OUT / f"{prefix}_mmsi_agg.parquet"
    agg_df.to_parquet(agg_path, index=False)
    print("Saved MMSI agg:", agg_path)
    display(agg_df.sort_values("anom_rate", ascending=False).head(10))

Saved TOP-1%: /teamspace/studios/this_studio/data/ocsvm_runs/ocsvm_rbf_top1pct_detailed.parquet y /teamspace/studios/this_studio/data/ocsvm_runs/ocsvm_rbf_top1pct_detailed.csv | rows: 277897
Saved MMSI agg: /teamspace/studios/this_studio/data/ocsvm_runs/ocsvm_rbf_mmsi_agg.parquet


Unnamed: 0,mmsi,n_win,anom_win,anom_rate
12,33266086194351,428760,26474,0.061745
106,277458320934593,22520,1153,0.051199
61,168015789077263,26420,1344,0.050871
72,196010605910823,306700,11723,0.038223
40,100771710683634,32440,1133,0.034926
1,12639560807591,23520,820,0.034864
27,77182424306278,169660,5090,0.030001
83,208886908053041,19200,507,0.026406
33,87919276942456,567520,14732,0.025959
42,103576446797335,148920,3844,0.025813


In [13]:
# === Mini-HPO RFF+SGD (rápido) ===
import numpy as np, time, joblib, gc, pyarrow.parquet as pq, pandas as pd
from pathlib import Path
from sklearn.kernel_approximation import RBFSampler
from sklearn.linear_model import SGDOneClassSVM
from sklearn.metrics import roc_auc_score, precision_recall_curve, auc, average_precision_score

# 0) helpers
def sample_by_idx(n_max, n_total, seed=42):
    rng = np.random.default_rng(seed)
    if n_total <= n_max: return np.arange(n_total)
    return np.sort(rng.choice(n_total, n_max, replace=False))

def build_eval_subset(n_eval_sub=3_000_000):
    DATA_DIR = Path(CFG["external_data_dir"])
    eval_path = max(DATA_DIR.glob("*windows*aligned*.parquet"), key=lambda p: p.stat().st_size)
    df = pq.read_table(eval_path).to_pandas()
    for c in df.columns:
        if pd.api.types.is_float_dtype(df[c]): df[c] = df[c].astype(np.float32)
    drop = {"lat","lon","idx","idx_end","window_id"}
    gcol = next((c for c in ["mmsi","group","ship_id"] if c in df.columns), None)
    ycol = next((c for c in ["is_suspicious","label","y","target"] if c in df.columns), None)
    feats = [c for c in df.columns if c not in (drop | {gcol,ycol} if gcol else drop)]
    Xev = df[feats].to_numpy(np.float32)

    # y por JOIN window_id
    key_eval = next(k for k in ["window_id","idx","idx_end","row","row_id"] if k in pq.ParquetFile(eval_path).schema_arrow.names)
    eval_key = df[key_eval].astype(np.int64).to_numpy()
    lab_pf = pq.ParquetFile(next(DATA_DIR.glob("*labels_anom*.parquet")))
    key_lab = key_eval if key_eval in lab_pf.schema_arrow.names else next(k for k in ["window_id","idx","idx_end","row","row_id"] if k in lab_pf.schema_arrow.names)
    anom_set = set(lab_pf.read(columns=[key_lab]).to_pandas()[key_lab].astype(np.int64).tolist())
    yb = np.isin(eval_key, list(anom_set)).astype(int)

    # subset indices
    idx = sample_by_idx(n_eval_sub, Xev.shape[0])
    return Xev[idx], yb[idx]

def prepro(X):
    X = X.astype(np.float32, copy=False)
    X = np.where(np.isfinite(X), X, np.nan)
    # imputar (imputer ya cargado)
    X = imputer.transform(X)
    X = scaler.transform(X)
    return X

# 1) subset train para fit
n_fit = int(CFG.get("max_train_samples", 800_000))
rng_idx = sample_by_idx(n_fit, X_train.shape[0])
Xf = prepro(X_train[rng_idx])

# 2) subset eval para validar
Xe_sub, y_sub = build_eval_subset(n_eval_sub=3_000_000)
Xe_sub = prepro(Xe_sub)

# 3) grillas pequeñas
nu_grid    = [0.01, 0.03, 0.05, 0.08]
gamma_grid = [0.03, 0.10, 0.30]
D_grid     = [512, 1024, 2048]  # si te sobra RAM, añade 2048

results = []
t_all = time.time()
for D in D_grid:
    for g in gamma_grid:
        # transformar train una sola vez por (D,g)
        rff = RBFSampler(gamma=g, n_components=D, random_state=42)
        Xf_rff = rff.fit_transform(Xf)
        Xe_rff = rff.transform(Xe_sub)
        for nu in nu_grid:
            model = SGDOneClassSVM(nu=nu, fit_intercept=True, shuffle=True,
                                   random_state=42, tol=1e-3, max_iter=2000)
            t0 = time.time()
            model.fit(Xf_rff)
            # scores (neg decision_function mayor => más anómalo)
            scores = -model.decision_function(Xe_rff)
            if len(np.unique(y_sub))>1:
                roc = roc_auc_score(y_sub, scores)
                prec, rec, _ = precision_recall_curve(y_sub, scores); pr_auc = auc(rec, prec)
                ap  = average_precision_score(y_sub, scores)
            else:
                roc = pr_auc = ap = float("nan")
            results.append({"D":D,"gamma":g,"nu":nu,"ROC":roc,"PR":pr_auc,"AP":ap,"secs":time.time()-t0})
            print(f"D={D} g={g} nu={nu} | ROC={roc:.4f} PR={pr_auc:.4f} AP={ap:.4f} | {results[-1]['secs']:.1f}s")

# 4) elegir mejor por PR-AUC
res = pd.DataFrame(results).sort_values(["PR","AP","ROC"], ascending=False)
display(res.head(10))
best = res.iloc[0].to_dict()
print("Best (mini-HPO):", best)

# 5) OPCIONAL: guardar pipeline ganador para scoring completo
SAVE = True
if SAVE:
    rff_best = RBFSampler(gamma=float(best["gamma"]), n_components=int(best["D"]), random_state=42)
    Xf_rff = rff_best.fit_transform(Xf)
    model_best = SGDOneClassSVM(nu=float(best["nu"]), fit_intercept=True, shuffle=True,
                                random_state=42, tol=1e-3, max_iter=2000)
    model_best.fit(Xf_rff)
    prefix = CFG.get("artifact_prefix","ocsvm_rbf")
    joblib.dump(rff_best, Path(CFG["out_dir"]) / f"{prefix}_rff.pkl")
    joblib.dump(model_best, Path(CFG["out_dir"]) / f"{prefix}_model_sgd_rff.pkl")
    print("✅ Guardado pipeline ganador (rff + sgd_oc). Ahora re-ejecuta la Celda B para recalcular scores completos.")

D=512 g=0.03 nu=0.01 | ROC=0.4859 PR=0.0392 AP=0.0392 | 7.9s
D=512 g=0.03 nu=0.03 | ROC=0.4804 PR=0.0387 AP=0.0387 | 8.0s
D=512 g=0.03 nu=0.05 | ROC=0.4861 PR=0.0393 AP=0.0393 | 8.2s
D=512 g=0.03 nu=0.08 | ROC=0.4934 PR=0.0401 AP=0.0401 | 8.3s
D=512 g=0.1 nu=0.01 | ROC=0.5207 PR=0.0443 AP=0.0444 | 7.9s
D=512 g=0.1 nu=0.03 | ROC=0.5207 PR=0.0445 AP=0.0445 | 8.1s
D=512 g=0.1 nu=0.05 | ROC=0.5160 PR=0.0439 AP=0.0439 | 8.2s
D=512 g=0.1 nu=0.08 | ROC=0.5105 PR=0.0431 AP=0.0431 | 8.3s
D=512 g=0.3 nu=0.01 | ROC=0.5177 PR=0.0412 AP=0.0412 | 8.0s
D=512 g=0.3 nu=0.03 | ROC=0.5306 PR=0.0434 AP=0.0434 | 8.1s
D=512 g=0.3 nu=0.05 | ROC=0.5291 PR=0.0445 AP=0.0445 | 8.3s
D=512 g=0.3 nu=0.08 | ROC=0.5335 PR=0.0450 AP=0.0450 | 8.4s
D=1024 g=0.03 nu=0.01 | ROC=0.4942 PR=0.0389 AP=0.0389 | 11.9s
D=1024 g=0.03 nu=0.03 | ROC=0.5008 PR=0.0404 AP=0.0404 | 12.0s
D=1024 g=0.03 nu=0.05 | ROC=0.5061 PR=0.0411 AP=0.0411 | 12.2s
D=1024 g=0.03 nu=0.08 | ROC=0.5081 PR=0.0413 AP=0.0413 | 12.5s
D=1024 g=0.1 nu=0.01 | R

Unnamed: 0,D,gamma,nu,ROC,PR,AP,secs
35,2048,0.3,0.08,0.536598,0.046422,0.046426,21.46326
34,2048,0.3,0.05,0.5343,0.046043,0.046048,20.88909
32,2048,0.3,0.01,0.532473,0.045127,0.04513,20.420836
11,512,0.3,0.08,0.533468,0.045016,0.045023,8.401397
23,1024,0.3,0.08,0.5397,0.044824,0.044828,12.443507
33,2048,0.3,0.03,0.526614,0.044763,0.044767,20.74186
22,1024,0.3,0.05,0.540656,0.044617,0.044622,12.303638
5,512,0.1,0.03,0.520742,0.044505,0.044511,8.06384
10,512,0.3,0.05,0.529117,0.044465,0.044473,8.275153
21,1024,0.3,0.03,0.538653,0.044354,0.044359,12.157986


Best (mini-HPO): {'D': 2048.0, 'gamma': 0.3, 'nu': 0.08, 'ROC': 0.536597509400389, 'PR': 0.04642206818669361, 'AP': 0.04642636837618991, 'secs': 21.463260173797607}
✅ Guardado pipeline ganador (rff + sgd_oc). Ahora re-ejecuta la Celda B para recalcular scores completos.


In [14]:
# --- Scoring por lotes (RFF+SGD) ---
import numpy as np, os, time, joblib
from pathlib import Path

OUT = Path(CFG["out_dir"]); OUT.mkdir(parents=True, exist_ok=True)
prefix = CFG.get("artifact_prefix","ocsvm_rbf")
scores_path = OUT / f"{prefix}_eval_scores_mm.dat"

# Cargar artefactos
imputer = joblib.load(OUT / f"{prefix}_imputer.pkl")
scaler  = joblib.load(OUT / f"{prefix}_robust_scaler.pkl")
rff     = joblib.load(OUT / f"{prefix}_rff.pkl")
sgd_oc  = joblib.load(OUT / f"{prefix}_model_sgd_rff.pkl")

# Memmap
n_eval = X_eval.shape[0]
if scores_path.exists(): os.remove(scores_path)
scores_mm = np.memmap(scores_path, dtype=np.float32, mode="w+", shape=(n_eval,))

def naninf_to_nan(arr):
    if not np.issubdtype(arr.dtype, np.floating):
        arr = arr.astype(np.float32, copy=False)
    bad = ~np.isfinite(arr)
    if bad.any(): arr[bad] = np.nan
    return arr

B = int(CFG.get("eval_batch_size", 2_000_000))
t0=time.time(); last=t0
for s in range(0, n_eval, B):
    e = min(s+B, n_eval)
    Xe = X_eval[s:e].astype(np.float32, copy=False)
    Xe = naninf_to_nan(Xe)
    Xe = imputer.transform(Xe)
    Xe = scaler.transform(Xe)
    Xe = rff.transform(Xe)
    scores_mm[s:e] = -sgd_oc.decision_function(Xe)
    scores_mm.flush()
    now=time.time()
    if now-last>10:
        print(f"Progress: {100*e/n_eval:5.2f}% | {e:,}/{n_eval:,} rows")
        last=now

scores = scores_mm  # vista
print("Scores memmap:", scores_path, "| shape:", scores.shape)

Progress:  7.20% | 2,000,000/27,789,660 rows
Progress: 14.39% | 4,000,000/27,789,660 rows
Progress: 21.59% | 6,000,000/27,789,660 rows
Progress: 28.79% | 8,000,000/27,789,660 rows
Progress: 35.98% | 10,000,000/27,789,660 rows
Progress: 43.18% | 12,000,000/27,789,660 rows
Progress: 50.38% | 14,000,000/27,789,660 rows
Progress: 57.58% | 16,000,000/27,789,660 rows
Progress: 64.77% | 18,000,000/27,789,660 rows
Progress: 71.97% | 20,000,000/27,789,660 rows
Progress: 79.17% | 22,000,000/27,789,660 rows
Progress: 86.36% | 24,000,000/27,789,660 rows
Progress: 93.56% | 26,000,000/27,789,660 rows
Progress: 100.00% | 27,789,660/27,789,660 rows
Scores memmap: /teamspace/studios/this_studio/data/ocsvm_runs/ocsvm_rbf_eval_scores_mm.dat | shape: (27789660,)


In [15]:
# --- Métricas: JOIN por 'window_id' + guardado ---
import json, numpy as np, pandas as pd, pyarrow.parquet as pq
from sklearn.metrics import roc_auc_score, average_precision_score, precision_recall_curve, auc

OUT = Path(CFG["out_dir"]); prefix = CFG.get("artifact_prefix","ocsvm_rbf")

# Claves de eval (orden exacto)
eval_pf = pq.ParquetFile(max(Path(CFG["external_data_dir"]).glob("*windows*aligned*.parquet"),
                             key=lambda p: p.stat().st_size))
key_eval = next(k for k in ["window_id","idx","idx_end","row","row_id"] if k in eval_pf.schema_arrow.names)
eval_key = eval_pf.read(columns=[key_eval]).to_pandas()[key_eval].astype(np.int64).reset_index(drop=True)

# Conjunto de anómalos
lab_pf = pq.ParquetFile(next(Path(CFG["external_data_dir"]).glob("*labels_anom*.parquet")))
key_lab = key_eval if key_eval in lab_pf.schema_arrow.names else \
          next(k for k in ["window_id","idx","idx_end","row","row_id"] if k in lab_pf.schema_arrow.names)
anom_set = set(lab_pf.read(columns=[key_lab]).to_pandas()[key_lab].astype(np.int64).tolist())

y_eval = eval_key.isin(anom_set).astype(int).to_numpy()
print("Positivos:", int(y_eval.sum()), "/", len(y_eval))

# Métricas globales
if len(np.unique(y_eval)) > 1:
    roc = roc_auc_score(y_eval, scores)
    prc, rec, _ = precision_recall_curve(y_eval, scores); pr_auc = auc(rec, prc)
    ap = average_precision_score(y_eval, scores)
else:
    roc = pr_auc = ap = float('nan')
print(f"ROC-AUC:{roc:.4f} | PR-AUC:{pr_auc:.4f} | AP:{ap:.4f}")

# @k=1%
k_rate=0.01; k=max(1,int(len(scores)*k_rate))
thr=np.partition(scores,-k)[-k]; pred=(scores>=thr).astype(np.int8)
tp=int(((pred==1)&(y_eval==1)).sum()); fp=int(((pred==1)&(y_eval==0)).sum()); fn=int(((pred==0)&(y_eval==1)).sum())
P=tp/(tp+fp) if (tp+fp)>0 else 0.0; R=tp/(tp+fn) if (tp+fn)>0 else 0.0; F1=2*P*R/(P+R) if (P+R)>0 else 0.0
print(f"@1% -> P:{P:.4f} | R:{R:.4f} | F1:{F1:.4f}  (k={k})")

# Guardar/actualizar config
cfg_path = OUT / f"{prefix}_config.json"
cfg = json.loads(cfg_path.read_text()) if cfg_path.exists() else {}
cfg.update({
    "artifact_prefix": prefix,
    "external_data_dir": CFG["external_data_dir"],
    "model_type": "SGDOneClassSVM_RFF",
    "rff_components": int(rff.n_components),
    "best_params": {"nu": float(getattr(sgd_oc, "nu", 0.05)), "gamma": "approx(%.4f)" % getattr(rff, "gamma", 0.1)},
    "metrics": {"roc_auc": float(roc), "pr_auc": float(pr_auc), "ap": float(ap)},
    "metrics_at_k": {"0.01": {"k": int(k), "precision": float(P), "recall": float(R), "f1": float(F1)}},
    "label_file": lab_pf.metadata.metadata.get(b"ARROW:schema", b"labels_anom.parquet").decode(errors="ignore") if hasattr(lab_pf,'metadata') else "labels_anom.parquet",
    "label_col": key_lab,
    "label_mapping": f"JOIN on {key_eval}"
})
cfg_path.write_text(json.dumps(cfg, indent=2))
print("✅ Config actualizado:", cfg_path)

Positivos: 1003200 / 27789660
ROC-AUC:0.5362 | PR-AUC:0.0462 | AP:0.0462
@1% -> P:0.0799 | R:0.0221 | F1:0.0347  (k=277896)
✅ Config actualizado: /teamspace/studios/this_studio/data/ocsvm_runs/ocsvm_rbf_config.json


In [2]:
# === ALL-IN-ONE: PCA(16) + RFF(D=4096, gamma=0.3) + SGDOneClassSVM(nu=0.08) + Scoring + Métricas + TopK ===
import os, json, time, gc, getpass, socket
from pathlib import Path

import numpy as np
import pandas as pd
import pyarrow.parquet as pq

import joblib
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import RobustScaler
from sklearn.decomposition import PCA
from sklearn.kernel_approximation import RBFSampler
from sklearn.linear_model import SGDOneClassSVM
from sklearn.metrics import roc_auc_score, precision_recall_curve, auc, average_precision_score

# -------- Utilidades --------
def log_sys():
    try:
        import psutil
        ram = psutil.virtual_memory()
        print(f"User: {getpass.getuser()} | Host: {socket.gethostname()} | RAM total: {ram.total/1e9:.1f} GB | Libre: {ram.available/1e9:.1f} GB")
    except Exception:
        pass

def naninf_to_nan(X):
    X = X.astype(np.float32, copy=False)
    bad = ~np.isfinite(X)
    if bad.any(): X[bad] = np.nan
    return X

def sample_idx(n_max, n_total, seed=42):
    if n_total <= n_max: return np.arange(n_total)
    rng = np.random.default_rng(seed)
    return np.sort(rng.choice(n_total, n_max, replace=False))

def metrics_at_k(scores, yb, rate):
    n=len(scores); k=max(1,int(n*rate))
    thr=np.partition(scores,-k)[-k]; pred=(scores>=thr).astype(np.int8)
    tp=int(((pred==1)&(yb==1)).sum()); fp=int(((pred==1)&(yb==0)).sum()); fn=int(((pred==0)&(yb==1)).sum())
    P=tp/(tp+fp) if (tp+fp)>0 else 0.0; R=tp/(tp+fn) if (tp+fn)>0 else 0.0; F1=2*P*R/(P+R) if (P+R)>0 else 0.0
    return k,P,R,F1

log_sys()

# -------- Rutas / CFG --------
OUT_DIR = Path("data/ocsvm_runs").resolve(); OUT_DIR.mkdir(parents=True, exist_ok=True)
DATA_DIR = Path("/teamspace/studios/this_studio/data").resolve()
cfg_candidates = ["ocsvm_v2_config.json", "ocsvm_rbf_config.json"]
cfg_path = next((OUT_DIR / f for f in cfg_candidates if (OUT_DIR / f).exists()), OUT_DIR / "ocsvm_rbf_config.json")
CFG = json.loads(cfg_path.read_text()) if cfg_path.exists() else {
    "artifact_prefix": "ocsvm_rbf",
    "external_data_dir": str(DATA_DIR),
    "out_dir": str(OUT_DIR),
    "max_train_samples": 800_000,
    "eval_batch_size": 2_000_000
}
CFG.setdefault("artifact_prefix", "ocsvm_rbf")
CFG.setdefault("external_data_dir", str(DATA_DIR))
CFG.setdefault("out_dir", str(OUT_DIR))
print("CFG:", json.dumps({k: CFG.get(k) for k in ["artifact_prefix","external_data_dir","max_train_samples","eval_batch_size"]}, indent=2))

prefix = CFG["artifact_prefix"]

# -------- Cargar / reconstruir preprocesadores --------
# Imputer
imp_pkl = OUT_DIR / f"{prefix}_imputer.pkl"
imp_meds = OUT_DIR / f"{prefix}_imputer_medians.npy"
if imp_pkl.exists():
    imputer = joblib.load(imp_pkl)
    print("Imputer:", imp_pkl.name)
elif imp_meds.exists():
    meds = np.load(imp_meds).astype(np.float32)
    imputer = SimpleImputer(strategy="median")
    imputer.fit(np.zeros((1, meds.size), dtype=np.float32))
    imputer.statistics_ = meds
    print("Imputer reconstruido desde:", imp_meds.name)
else:
    imputer = SimpleImputer(strategy="median")
    print("Imputer NUEVO (se ajustará)")

# Scaler
scaler_pkl = OUT_DIR / f"{prefix}_robust_scaler.pkl"
if scaler_pkl.exists():
    scaler = joblib.load(scaler_pkl)
    print("Scaler:", scaler_pkl.name)
else:
    scaler = RobustScaler(with_centering=True, with_scaling=True, quantile_range=(25.0, 75.0))
    print("Scaler NUEVO (se ajustará)")

# -------- Cargar TRAIN mínimo --------
train_path = DATA_DIR / "windows_aligned_normal.parquet"
assert train_path.exists(), f"No se encontró {train_path}"
df_tr = pq.read_table(train_path).to_pandas()
for c in df_tr.columns:
    if pd.api.types.is_float_dtype(df_tr[c]): df_tr[c] = df_tr[c].astype(np.float32)

mmsi_col = "mmsi" if "mmsi" in df_tr.columns else None
drop_cols = {"lat","lon","idx","idx_end","window_id"}
feature_cols = [c for c in df_tr.columns if c not in drop_cols and c != mmsi_col]
X_train_full = df_tr[feature_cols].to_numpy(np.float32)
del df_tr; gc.collect()
print("TRAIN loaded:", X_train_full.shape, "| feats:", len(feature_cols))

# Subset para ajustar PCA/RFF/SGD
n_fit = int(CFG.get("max_train_samples", 800_000))
idx = sample_idx(n_fit, X_train_full.shape[0])
Xf_raw = X_train_full[idx]
Xf_raw = naninf_to_nan(Xf_raw)

# Ajustar Imputer/Scaler si son nuevos
if not hasattr(imputer, "statistics_"):
    X_tmp = imputer.fit_transform(Xf_raw)
    print("Imputer ajustado.")
else:
    X_tmp = imputer.transform(Xf_raw)

if not hasattr(scaler, "scale_"):
    X_tmp = scaler.fit_transform(X_tmp)
    print("Scaler ajustado.")
else:
    X_tmp = scaler.transform(X_tmp)

# -------- PCA(16) --------
NCOMP = 16
pca = PCA(n_components=NCOMP, random_state=42)
Xf_pca = pca.fit_transform(X_tmp)
print(f"PCA ajustado: {NCOMP} comps -> {Xf_pca.shape[1]} feats")

# -------- RFF(D=4096, gamma=0.3) + SGDOneClassSVM(nu=0.08) --------
D = 4096
gamma_rbf = 0.3
nu_val = 0.08

rff = RBFSampler(gamma=gamma_rbf, n_components=D, random_state=42)
t0 = time.time()
Xf_rff = rff.fit_transform(Xf_pca)
print(f"RFF fit+transform (train subset): {Xf_rff.shape} | {time.time()-t0:.1f}s")

sgd_oc = SGDOneClassSVM(nu=nu_val, fit_intercept=True, shuffle=True,
                        random_state=42, tol=1e-3, max_iter=2000)
t1 = time.time()
sgd_oc.fit(Xf_rff)
print(f"SGDOneClassSVM fit: {(time.time()-t1)/60:.1f} min | soporte:{np.count_nonzero(sgd_oc.coef_)} / {D}")

# Guardar artefactos
joblib.dump(imputer, OUT_DIR / f"{prefix}_imputer.pkl")
joblib.dump(scaler,  OUT_DIR / f"{prefix}_robust_scaler.pkl")
joblib.dump(pca,     OUT_DIR / f"{prefix}_pca.pkl")
joblib.dump(rff,     OUT_DIR / f"{prefix}_rff.pkl")
joblib.dump(sgd_oc,  OUT_DIR / f"{prefix}_model_sgd_rff.pkl")
print("Artefactos guardados en", OUT_DIR)

# -------- Cargar EVAL y scorear por lotes --------
eval_path = max(DATA_DIR.glob("*windows*aligned*.parquet"), key=lambda p: p.stat().st_size)
df_ev = pq.read_table(eval_path).to_pandas()
for c in df_ev.columns:
    if pd.api.types.is_float_dtype(df_ev[c]): df_ev[c] = df_ev[c].astype(np.float32)

drop_common = {"lat","lon","idx","idx_end","window_id"}
gcol_ev = next((c for c in ["mmsi","group","ship_id"] if c in df_ev.columns), None)
ycol_ev = next((c for c in ["is_suspicious","label","y","target"] if c in df_ev.columns), None)
feature_cols_ev = [c for c in df_ev.columns if c not in (drop_common | {gcol_ev, ycol_ev} if gcol_ev else drop_common)]
X_eval = df_ev[feature_cols_ev].to_numpy(np.float32)
groups_eval = df_ev[gcol_ev].to_numpy() if gcol_ev else None
print("EVAL:", X_eval.shape, "| groups:", gcol_ev)
# clave para JOIN
key_eval = next(k for k in ["window_id","idx","idx_end","row","row_id"] if k in df_ev.columns)
eval_key = df_ev[key_eval].astype(np.int64).to_numpy()
del df_ev; gc.collect()

# Memmap de scores
scores_path = OUT_DIR / f"{prefix}_eval_scores_mm.dat"
if scores_path.exists(): os.remove(scores_path)
scores_mm = np.memmap(scores_path, dtype=np.float32, mode="w+", shape=(X_eval.shape[0],))
print("Scoring ->", scores_path)

# Batch size dinámico (~ objetivo 20GB para matriz RFF): 20e9 / (4 bytes * D)
target_bytes = 20_000_000_000
B_max = int(target_bytes // (4 * D))
B_cfg = int(CFG.get("eval_batch_size", 2_000_000))
B = max(200_000, min(B_cfg, B_max))  # límite seguro
print(f"Batch size: {B:,} filas")

t0 = time.time(); last=t0; n = X_eval.shape[0]
for s in range(0, n, B):
    e = min(s+B, n)
    Xe = X_eval[s:e]
    Xe = naninf_to_nan(Xe)
    Xe = imputer.transform(Xe)
    Xe = scaler.transform(Xe)
    Xe = pca.transform(Xe)
    Xe = rff.transform(Xe)
    scores_mm[s:e] = -sgd_oc.decision_function(Xe)
    scores_mm.flush()
    now = time.time()
    if now - last > 10:
        print(f"Progress: {100*e/n:5.2f}% | {e:,}/{n:,} rows")
        last = now
print(f"Scoring listo en {(time.time()-t0)/60:.1f} min")

scores = scores_mm  # vista

# -------- Construir y_eval por JOIN window_id --------
lab_pf = pq.ParquetFile(next(DATA_DIR.glob("*labels_anom*.parquet")))
key_lab = key_eval if key_eval in lab_pf.schema_arrow.names else \
          next(k for k in ["window_id","idx","idx_end","row","row_id"] if k in lab_pf.schema_arrow.names)
anom_set = set(lab_pf.read(columns=[key_lab]).to_pandas()[key_lab].astype(np.int64).tolist())
y_eval = np.isin(eval_key, list(anom_set)).astype(int)
print("Positivos:", int(y_eval.sum()), "/", len(y_eval))

# -------- Métricas globales + @1% --------
if len(np.unique(y_eval))>1:
    roc = roc_auc_score(y_eval, scores)
    prc, rec, _ = precision_recall_curve(y_eval, scores); pr_auc = auc(rec, prc)
    ap = average_precision_score(y_eval, scores)
else:
    roc = pr_auc = ap = float('nan')
k,P,R,F1 = metrics_at_k(scores, y_eval, 0.01)
print(f"ROC-AUC:{roc:.4f} | PR-AUC:{pr_auc:.4f} | AP:{ap:.4f}")
print(f"@1% -> P:{P:.4f} | R:{R:.4f} | F1:{F1:.4f}  (k={k})")

# -------- Guardar métricas en config --------
cfg = json.loads(cfg_path.read_text()) if cfg_path.exists() else {}
cfg.update({
    "artifact_prefix": prefix,
    "external_data_dir": CFG["external_data_dir"],
    "model_type": "PCA16 + RFF4096 + SGDOneClassSVM",
    "best_params": {"nu": float(nu_val), "gamma": float(gamma_rbf), "D": int(D), "pca_components": int(NCOMP)},
    "metrics": {"roc_auc": float(roc), "pr_auc": float(pr_auc), "ap": float(ap)},
    "metrics_at_k": {"0.01": {"k": int(k), "precision": float(P), "recall": float(R), "f1": float(F1)}},
    "label_file": "labels_anom.parquet",
    "label_col": key_lab,
    "label_mapping": f"JOIN on {key_eval}"
})
cfg_path.write_text(json.dumps(cfg, indent=2))
print("Config actualizado:", cfg_path)

# -------- Guardar Top-1% y agregados por MMSI --------
top_rate = 0.01
k = max(1, int(len(scores)*top_rate))
thr = np.partition(scores,-k)[-k]
pred = (scores >= thr)
rows = np.where(pred)[0]

top = pd.DataFrame({"idx": rows, "anomaly_score": scores[rows].astype(np.float32)})
if groups_eval is not None:
    top["mmsi"] = groups_eval[rows].astype(np.int64)
top = top.sort_values("anomaly_score", ascending=False)

top_pq = OUT_DIR / f"{prefix}_top1pct_detailed.parquet"
top_csv = OUT_DIR / f"{prefix}_top1pct_detailed.csv"
top.to_parquet(top_pq, index=False); top.to_csv(top_csv, index=False)
print("TOP-1% guardado:", top_pq, "y", top_csv, "| rows:", len(top))

if groups_eval is not None:
    mmsi_all, n_by_mmsi = np.unique(groups_eval, return_counts=True)
    mmsi_top, n_top_by_mmsi = np.unique(groups_eval[pred==1], return_counts=True)
    top_map = dict(zip(mmsi_top.tolist(), n_top_by_mmsi.tolist()))
    anom_win = np.array([top_map.get(m, 0) for m in mmsi_all], dtype=np.int32)
    anom_rate = anom_win / n_by_mmsi
    agg_df = pd.DataFrame({"mmsi": mmsi_all, "n_win": n_by_mmsi, "anom_win": anom_win, "anom_rate": anom_rate})
    agg_path = OUT_DIR / f"{prefix}_mmsi_agg.parquet"
    agg_df.to_parquet(agg_path, index=False)
    print("MMSI agg guardado:", agg_path)
    display(agg_df.sort_values("anom_rate", ascending=False).head(10))

gc.collect()

User: erickdsuarez10 | Host: computeinstance-e00vebkmv7kfvb90pk | RAM total: 135.1 GB | Libre: 128.7 GB
CFG: {
  "artifact_prefix": "ocsvm_rbf",
  "external_data_dir": "/teamspace/studios/this_studio/data",
  "max_train_samples": 800000,
  "eval_batch_size": 2000000
}
Imputer: ocsvm_rbf_imputer.pkl
Scaler: ocsvm_rbf_robust_scaler.pkl
TRAIN loaded: (27789660, 19) | feats: 19
PCA ajustado: 16 comps -> 16 feats
RFF fit+transform (train subset): (800000, 4096) | 12.7s
SGDOneClassSVM fit: 0.5 min | soporte:4096 / 4096
Artefactos guardados en /teamspace/studios/this_studio/data/ocsvm_runs
EVAL: (27789660, 19) | groups: mmsi
Scoring -> /teamspace/studios/this_studio/data/ocsvm_runs/ocsvm_rbf_eval_scores_mm.dat
Batch size: 1,220,703 filas
Progress:  4.39% | 1,220,703/27,789,660 rows
Progress:  8.79% | 2,441,406/27,789,660 rows
Progress: 13.18% | 3,662,109/27,789,660 rows
Progress: 17.57% | 4,882,812/27,789,660 rows
Progress: 21.96% | 6,103,515/27,789,660 rows
Progress: 26.36% | 7,324,218/27,78

Unnamed: 0,mmsi,n_win,anom_win,anom_rate
61,168015789077263,26420,3135,0.11866
1,12639560807591,23520,1526,0.064881
108,281060477973977,36160,2276,0.062942
42,103576446797335,148920,7814,0.052471
106,277458320934593,22520,1109,0.049245
12,33266086194351,428760,19839,0.046271
27,77182424306278,169660,7718,0.045491
40,100771710683634,32440,1473,0.045407
83,208886908053041,19200,736,0.038333
56,149117408188942,36040,1147,0.031826


0

In [3]:
# === Multi-k (0.5/1/2/5%) + CSV ===
import numpy as np, pandas as pd, json
from pathlib import Path
from sklearn.metrics import precision_recall_curve, auc, average_precision_score, roc_auc_score

OUT = Path(CFG["out_dir"]); prefix = CFG.get("artifact_prefix","ocsvm_rbf")
scores = np.memmap(OUT / f"{prefix}_eval_scores_mm.dat", dtype=np.float32, mode="r")

# Reconstruir y_eval si no está en memoria (reusa el JOIN de tu celda C si hace falta)
try:
    yb = y_eval
except NameError:
    import pyarrow.parquet as pq
    DATA_DIR = Path(CFG["external_data_dir"])
    eval_path = max(DATA_DIR.glob("*windows*aligned*.parquet"), key=lambda p: p.stat().st_size)
    pf_eval = pq.ParquetFile(eval_path)
    key_eval = next(k for k in ["window_id","idx","idx_end","row","row_id"] if k in pf_eval.schema_arrow.names)
    eval_key = pf_eval.read(columns=[key_eval]).to_pandas()[key_eval].astype(np.int64).to_numpy()
    lab_pf = pq.ParquetFile(next(DATA_DIR.glob("*labels_anom*.parquet")))
    key_lab = key_eval if key_eval in lab_pf.schema_arrow.names else \
              next(k for k in ["window_id","idx","idx_end","row","row_id"] if k in lab_pf.schema_arrow.names)
    anom = set(lab_pf.read(columns=[key_lab]).to_pandas()[key_lab].astype(np.int64).tolist())
    yb = np.isin(eval_key, list(anom)).astype(int)

def metrics_at_k(scores, yb, rate):
    n=len(scores); k=max(1,int(n*rate))
    thr=np.partition(scores,-k)[-k]; pred=(scores>=thr).astype(np.int8)
    tp=int(((pred==1)&(yb==1)).sum()); fp=int(((pred==1)&(yb==0)).sum()); fn=int(((pred==0)&(yb==1)).sum())
    P=tp/(tp+fp) if (tp+fp)>0 else 0.0; R=tp/(tp+fn) if (tp+fn)>0 else 0.0; F1=2*P*R/(P+R) if (P+R)>0 else 0.0
    return {"k":k,"precision":P,"recall":R,"f1":F1}

roc = roc_auc_score(yb, scores)
prec, rec, _ = precision_recall_curve(yb, scores); pr_auc = auc(rec, prec); ap = average_precision_score(yb, scores)

rows=[]
for r in [0.005, 0.01, 0.02, 0.05]:
    m=metrics_at_k(scores, yb, r); m["k_rate"]=r; rows.append(m)

summary = pd.DataFrame(rows)[["k_rate","k","precision","recall","f1"]]
summary.insert(0,"AP",ap); summary.insert(0,"PR-AUC",pr_auc); summary.insert(0,"ROC-AUC",roc)
csv_path = OUT / f"{prefix}_summary_metrics.csv"
summary.to_csv(csv_path, index=False)
print("Guardado:", csv_path)
display(summary.style.format({"ROC-AUC":"{:.4f}","PR-AUC":"{:.4f}","AP":"{:.4f}","precision":"{:.4f}","recall":"{:.4f}","f1":"{:.4f}"}))

Guardado: /teamspace/studios/this_studio/data/ocsvm_runs/ocsvm_rbf_summary_metrics.csv


Unnamed: 0,ROC-AUC,PR-AUC,AP,k_rate,k,precision,recall,f1
0,0.53,0.0464,0.0464,0.005,138948,0.0865,0.012,0.021
1,0.53,0.0464,0.0464,0.01,277896,0.0865,0.024,0.0375
2,0.53,0.0464,0.0464,0.02,555793,0.0834,0.0462,0.0595
3,0.53,0.0464,0.0464,0.05,1389483,0.0704,0.0975,0.0817


In [4]:
# === Suavizado por MMSI (rolling 3) y nueva @1% ===
import numpy as np, pandas as pd
from pathlib import Path

OUT = Path(CFG["out_dir"]); prefix = CFG.get("artifact_prefix","ocsvm_rbf")
scores = np.memmap(OUT / f"{prefix}_eval_scores_mm.dat", dtype=np.float32, mode="r")
assert 'groups_eval' in globals() and groups_eval is not None, "Necesito groups_eval (mmsi)."

df = pd.DataFrame({"mmsi": groups_eval, "score": np.asarray(scores, dtype=np.float32)})
df["score_smooth"] = df.groupby("mmsi")["score"].transform(lambda s: s.rolling(3, min_periods=1, center=True).mean())
scores_s = df["score_smooth"].to_numpy(np.float32)

# @1%
n=len(scores_s); k=max(1,int(n*0.01))
thr=np.partition(scores_s,-k)[-k]; pred=(scores_s>=thr).astype(np.int8)
tp=int(((pred==1)&(y_eval==1)).sum()); fp=int(((pred==1)&(y_eval==0)).sum()); fn=int(((pred==0)&(y_eval==1)).sum())
P=tp/(tp+fp) if (tp+fp)>0 else 0.0; R=tp/(tp+fn) if (tp+fn)>0 else 0.0; F1=2*P*R/(P+R) if (P+R)>0 else 0.0
print(f"[scores_smooth] @1% -> P:{P:.4f} | R:{R:.4f} | F1:{F1:.4f}")

[scores_smooth] @1% -> P:0.1060 | R:0.0294 | F1:0.0460


In [5]:
# === RFF(D=8192) + SGD re-fit (usando PCA ya guardado) y re-scoring ===
import joblib, time, numpy as np, os
from pathlib import Path
from sklearn.kernel_approximation import RBFSampler
from sklearn.linear_model import SGDOneClassSVM

OUT=Path(CFG["out_dir"]); prefix=CFG.get("artifact_prefix","ocsvm_rbf")
imputer = joblib.load(OUT / f"{prefix}_imputer.pkl")
scaler  = joblib.load(OUT / f"{prefix}_robust_scaler.pkl")
pca     = joblib.load(OUT / f"{prefix}_pca.pkl")

# subset de train para refit
n_fit = int(CFG.get("max_train_samples", 800_000))
rng = np.random.default_rng(42); idx = np.sort(rng.choice(X_train.shape[0], n_fit, replace=False))
Xf = imputer.transform(X_train[idx].astype(np.float32)); Xf = scaler.transform(Xf); Xf = pca.transform(Xf)

D=8192; gamma_rbf=0.3; nu_val=0.08
rff = RBFSampler(gamma=gamma_rbf, n_components=D, random_state=42)
t0=time.time(); Xf_rff = rff.fit_transform(Xf); print("RFF(8192) listo en", f"{time.time()-t0:.1f}s")
sgd_oc = SGDOneClassSVM(nu=nu_val, fit_intercept=True, shuffle=True, random_state=42, tol=1e-3, max_iter=2000)
t1=time.time(); sgd_oc.fit(Xf_rff); print("SGD fit en", f"{(time.time()-t1)/60:.1f} min")

# guardar
joblib.dump(rff, OUT / f"{prefix}_rff.pkl")
joblib.dump(sgd_oc, OUT / f"{prefix}_model_sgd_rff.pkl")
print("✅ Reemplazado RFF+SGD. Re-ejecuta la celda de scoring por lotes (B) y métricas (C).")

NameError: name 'X_train' is not defined

In [6]:
# === Re-scoring completo con el NUEVO RFF=8192 + SGD guardados ===
import numpy as np, os, time, joblib, pyarrow.parquet as pq, pandas as pd
from pathlib import Path

OUT = Path(CFG["out_dir"]); OUT.mkdir(parents=True, exist_ok=True)
prefix = CFG.get("artifact_prefix","ocsvm_rbf")
scores_path = OUT / f"{prefix}_eval_scores_mm.dat"

# Cargar artefactos actualizados
imputer = joblib.load(OUT / f"{prefix}_imputer.pkl")
scaler  = joblib.load(OUT / f"{prefix}_robust_scaler.pkl")
pca     = joblib.load(OUT / f"{prefix}_pca.pkl")
rff     = joblib.load(OUT / f"{prefix}_rff.pkl")              # << ahora D=8192
sgd_oc  = joblib.load(OUT / f"{prefix}_model_sgd_rff.pkl")    # << reentrenado con D=8192

# X_eval ya en memoria; si no, reconstruir como antes:
try:
    X_eval
except NameError:
    DATA_DIR = Path(CFG["external_data_dir"])
    eval_path = max(DATA_DIR.glob("*windows*aligned*.parquet"), key=lambda p: p.stat().st_size)
    df = pq.read_table(eval_path).to_pandas()
    for c in df.columns:
        if pd.api.types.is_float_dtype(df[c]): df[c] = df[c].astype(np.float32)
    drop={"lat","lon","idx","idx_end","window_id"}
    gcol = next((c for c in ["mmsi","group","ship_id"] if c in df.columns), None)
    ycol = next((c for c in ["is_suspicious","label","y","target"] if c in df.columns), None)
    feats=[c for c in df.columns if c not in (drop | {gcol,ycol} if gcol else drop)]
    X_eval = df[feats].to_numpy(np.float32)
    groups_eval = df[gcol].to_numpy() if gcol else None
    del df

# Memmap
n_eval = X_eval.shape[0]
if scores_path.exists(): os.remove(scores_path)
scores_mm = np.memmap(scores_path, dtype=np.float32, mode="w+", shape=(n_eval,))

def naninf_to_nan(arr):
    arr = arr.astype(np.float32, copy=False)
    bad = ~np.isfinite(arr)
    if bad.any(): arr[bad] = np.nan
    return arr

B = int(CFG.get("eval_batch_size", 2_000_000))
t0 = time.time(); last=t0
for s in range(0, n_eval, B):
    e = min(s+B, n_eval)
    Xe = naninf_to_nan(X_eval[s:e])
    Xe = imputer.transform(Xe)
    Xe = scaler.transform(Xe)
    Xe = pca.transform(Xe)
    Xe = rff.transform(Xe)
    scores_mm[s:e] = -sgd_oc.decision_function(Xe)
    scores_mm.flush()
    now=time.time()
    if now-last>10:
        print(f"Progress: {100*e/n_eval:5.2f}% | {e:,}/{n_eval:,}")
        last=now

scores = scores_mm
print("Scores memmap:", scores_path, "| shape:", scores.shape)

Progress:  7.20% | 2,000,000/27,789,660
Progress: 14.39% | 4,000,000/27,789,660
Progress: 21.59% | 6,000,000/27,789,660
Progress: 28.79% | 8,000,000/27,789,660
Progress: 35.98% | 10,000,000/27,789,660
Progress: 43.18% | 12,000,000/27,789,660
Progress: 50.38% | 14,000,000/27,789,660
Progress: 57.58% | 16,000,000/27,789,660
Progress: 64.77% | 18,000,000/27,789,660
Progress: 71.97% | 20,000,000/27,789,660
Progress: 79.17% | 22,000,000/27,789,660
Progress: 86.36% | 24,000,000/27,789,660
Progress: 93.56% | 26,000,000/27,789,660
Progress: 100.00% | 27,789,660/27,789,660
Scores memmap: /teamspace/studios/this_studio/data/ocsvm_runs/ocsvm_rbf_eval_scores_mm.dat | shape: (27789660,)


In [7]:
# === Métricas RAW + SMOOTH y persistencia en config ===
import json, numpy as np, pandas as pd, pyarrow.parquet as pq
from pathlib import Path
from sklearn.metrics import roc_auc_score, precision_recall_curve, auc, average_precision_score

OUT = Path(CFG["out_dir"]); prefix = CFG.get("artifact_prefix","ocsvm_rbf")
scores = np.memmap(OUT / f"{prefix}_eval_scores_mm.dat", dtype=np.float32, mode="r")

# JOIN para y_eval
DATA_DIR = Path(CFG["external_data_dir"])
eval_path = max(DATA_DIR.glob("*windows*aligned*.parquet"), key=lambda p: p.stat().st_size)
pf_eval = pq.ParquetFile(eval_path)
key_eval = next(k for k in ["window_id","idx","idx_end","row","row_id"] if k in pf_eval.schema_arrow.names)
eval_key = pf_eval.read(columns=[key_eval]).to_pandas()[key_eval].astype(np.int64).to_numpy()

lab_pf = pq.ParquetFile(next(DATA_DIR.glob("*labels_anom*.parquet")))
key_lab = key_eval if key_eval in lab_pf.schema_arrow.names else \
          next(k for k in ["window_id","idx","idx_end","row","row_id"] if k in lab_pf.schema_arrow.names)
anom = set(lab_pf.read(columns=[key_lab]).to_pandas()[key_lab].astype(np.int64).tolist())
y_eval = np.isin(eval_key, list(anom)).astype(int)
print("Positivos:", int(y_eval.sum()), "/", len(y_eval))

def global_metrics(s, y):
    roc = roc_auc_score(y, s)
    prc, rec, _ = precision_recall_curve(y, s); pr_auc = auc(rec, prc); ap = average_precision_score(y, s)
    return roc, pr_auc, ap

def at_k(s, y, rate):
    n=len(s); k=max(1,int(n*rate))
    thr=np.partition(s, -k)[-k]; pred=(s>=thr).astype(np.int8)
    tp=int(((pred==1)&(y==1)).sum()); fp=int(((pred==1)&(y==0)).sum()); fn=int(((pred==0)&(y==1)).sum())
    P=tp/(tp+fp) if (tp+fp)>0 else 0.0; R=tp/(tp+fn) if (tp+fn)>0 else 0.0; F1=2*P*R/(P+R) if (P+R)>0 else 0.0
    return k,P,R,F1

# RAW
roc_raw, pr_raw, ap_raw = global_metrics(scores, y_eval)
k1, P1, R1, F1 = at_k(scores, y_eval, 0.01)
print(f"[RAW] ROC:{roc_raw:.4f} PR-AUC:{pr_raw:.4f} AP:{ap_raw:.4f} | @1% P:{P1:.4f} R:{R1:.4f} F1:{F1:.4f}")

# SMOOTH por MMSI (rolling=3)
if 'groups_eval' not in globals() or groups_eval is None:
    # reconstruir groups_eval si no está
    df_tmp = pf_eval.read(columns=[key_eval,"mmsi"]).to_pandas() if "mmsi" in pf_eval.schema_arrow.names else None
    assert df_tmp is not None, "No hallé mmsi en eval para suavizado."
    groups_eval = df_tmp["mmsi"].to_numpy()

df = pd.DataFrame({"mmsi": groups_eval, "score": np.asarray(scores, dtype=np.float32)})
df["score_smooth"] = df.groupby("mmsi")["score"].transform(lambda s: s.rolling(3, min_periods=1, center=True).mean())
scores_s = df["score_smooth"].to_numpy(np.float32)

roc_s, pr_s, ap_s = global_metrics(scores_s, y_eval)
k1s, P1s, R1s, F1s = at_k(scores_s, y_eval, 0.01)
print(f"[SMOOTH] ROC:{roc_s:.4f} PR-AUC:{pr_s:.4f} AP:{ap_s:.4f} | @1% P:{P1s:.4f} R:{R1s:.4f} F1:{F1s:.4f}")

# Persistir en config
cfg_path = OUT / f"{prefix}_config.json"
cfg = json.loads(cfg_path.read_text()) if cfg_path.exists() else {}
cfg.setdefault("metrics", {})
cfg.setdefault("metrics_at_k", {})
cfg["metrics"]["raw"]   = {"roc_auc": float(roc_raw), "pr_auc": float(pr_raw), "ap": float(ap_raw)}
cfg["metrics"]["smooth"]= {"roc_auc": float(roc_s),   "pr_auc": float(pr_s),   "ap": float(ap_s)}
cfg["metrics_at_k"]["raw@0.01"]    = {"k": int(k1),  "precision": float(P1),  "recall": float(R1),  "f1": float(F1)}
cfg["metrics_at_k"]["smooth@0.01"] = {"k": int(k1s), "precision": float(P1s), "recall": float(R1s), "f1": float(F1s)}
cfg["best_params"] = {"nu": 0.08, "gamma": 0.3, "D": int(getattr(rff, "n_components", 0)), "pca_components": 16}
cfg["model_type"]  = "PCA16 + RFF8192 + SGDOneClassSVM"
cfg["label_file"]  = "labels_anom.parquet"
cfg["label_col"]   = key_lab
cfg["label_mapping"] = f"JOIN on {key_eval}"
cfg_path.write_text(json.dumps(cfg, indent=2))
print("✅ Config actualizado con RAW y SMOOTH:", cfg_path)

Positivos: 1003200 / 27789660
[RAW] ROC:0.5300 PR-AUC:0.0464 AP:0.0464 | @1% P:0.0865 R:0.0240 F1:0.0375
[SMOOTH] ROC:0.5463 PR-AUC:0.0497 AP:0.0497 | @1% P:0.1060 R:0.0294 F1:0.0460
✅ Config actualizado con RAW y SMOOTH: /teamspace/studios/this_studio/data/ocsvm_runs/ocsvm_rbf_config.json


In [8]:
# === Top-1% suavizado por MMSI (rolling=3) ===
import numpy as np, pandas as pd, os
from pathlib import Path

OUT = Path(CFG["out_dir"]); prefix = CFG.get("artifact_prefix","ocsvm_rbf")
scores = np.memmap(OUT / f"{prefix}_eval_scores_mm.dat", dtype=np.float32, mode="r")
df = pd.DataFrame({"mmsi": groups_eval, "score": np.asarray(scores, dtype=np.float32)})
df["score_smooth"] = df.groupby("mmsi")["score"].transform(lambda s: s.rolling(3, min_periods=1, center=True).mean())
scores_s = df["score_smooth"].to_numpy(np.float32)

n=len(scores_s); k=max(1, int(n*0.01))
thr=np.partition(scores_s, -k)[-k]; pred=(scores_s>=thr)
rows = np.where(pred)[0]
top = pd.DataFrame({"idx": rows, "anomaly_score_smooth": scores_s[rows].astype(np.float32)})
top["mmsi"] = groups_eval[rows].astype(np.int64)
top = top.sort_values("anomaly_score_smooth", ascending=False)

out_pq = OUT / f"{prefix}_top1pct_smooth.parquet"
out_csv = OUT / f"{prefix}_top1pct_smooth.csv"
top.to_parquet(out_pq, index=False); top.to_csv(out_csv, index=False)
print("✅ TOP-1% SMOOTH guardado:", out_pq, "y", out_csv, "| rows:", len(top))

✅ TOP-1% SMOOTH guardado: /teamspace/studios/this_studio/data/ocsvm_runs/ocsvm_rbf_top1pct_smooth.parquet y /teamspace/studios/this_studio/data/ocsvm_runs/ocsvm_rbf_top1pct_smooth.csv | rows: 277897


In [9]:
# === Snapshot final de artefactos y reportes ===
import shutil, datetime, json
from pathlib import Path

OUT = Path(CFG["out_dir"]); prefix = CFG.get("artifact_prefix","ocsvm_rbf")
stamp = datetime.datetime.now().strftime("%Y%m%d_%H%M")
dest = OUT / f"final_run_{stamp}"
dest.mkdir(parents=True, exist_ok=True)

# Archivos clave
to_copy = [
    f"{prefix}_config.json",
    f"{prefix}_imputer.pkl",
    f"{prefix}_robust_scaler.pkl",
    f"{prefix}_pca.pkl",
    f"{prefix}_rff.pkl",
    f"{prefix}_model_sgd_rff.pkl",
    f"{prefix}_eval_scores_mm.dat",
    f"{prefix}_summary_metrics.csv",
    f"{prefix}_top1pct_detailed.parquet",
    f"{prefix}_top1pct_detailed.csv",
    f"{prefix}_top1pct_smooth.parquet",
    f"{prefix}_top1pct_smooth.csv",
    f"{prefix}_mmsi_agg.parquet",
]
for name in to_copy:
    p = OUT / name
    if p.exists(): shutil.copy2(p, dest / name)

# Model card mínimo
cfg = json.loads((OUT / f"{prefix}_config.json").read_text())
model_card = {
    "model": cfg.get("model_type", "PCA16 + RFF8192 + SGDOneClassSVM"),
    "hyperparams": cfg.get("best_params", {}),
    "data": {"eval_parquet": "windows_with_labels_aligned.parquet", "labels": "labels_anom.parquet"},
    "metrics": cfg.get("metrics", {}),
    "metrics_at_k": cfg.get("metrics_at_k", {}),
    "notes": [
        "scores_smooth = rolling mean (w=3) por MMSI",
        "usar top1pct_smooth para priorización operativa"
    ]
}
( dest / "MODEL_CARD.json" ).write_text(json.dumps(model_card, indent=2))
print("📦 Snapshot final en:", dest)

📦 Snapshot final en: /teamspace/studios/this_studio/data/ocsvm_runs/final_run_20251027_0238
