# 02 — Experiments & Results
Цель: провести базовый прогон, ablation и sensitivity анализ на артефактах пайплайна, сохраняя результаты в `outputs/experiments` и `reports`.

In [None]:
import json
import os
import random
import subprocess
from pathlib import Path
from typing import Dict, List

import matplotlib.pyplot as plt
import networkx as nx
import numpy as np
import pandas as pd
import yaml
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression

ROOT = Path("..").resolve()
CFG_PATH = ROOT / "configs" / "research.yaml"
OUT_EXP = (ROOT / "outputs" / "experiments").resolve()
OUT_EXP.mkdir(parents=True, exist_ok=True)
REPORT_FIGS = (ROOT / "reports" / "figures").resolve()
REPORT_FIGS.mkdir(parents=True, exist_ok=True)
REPORT_TABLES = (ROOT / "reports" / "tables").resolve()
REPORT_TABLES.mkdir(parents=True, exist_ok=True)

GIT_HASH = subprocess.check_output(["git", "rev-parse", "HEAD"], cwd=ROOT).decode().strip()
PY_VER = subprocess.check_output(["python", "--version"], cwd=ROOT).decode().strip()
PIP_HEAD = (
    subprocess.check_output(["python", "-m", "pip", "freeze"], cwd=ROOT)
    .decode()
    .splitlines()[:10]
)
print("git:", GIT_HASH)
print("python:", PY_VER)
print("pip head:", PIP_HEAD)

with CFG_PATH.open("r", encoding="utf-8") as f:
    CFG: Dict = yaml.safe_load(f)

random.seed(CFG["seed"]["python"])
np.random.seed(CFG["seed"]["numpy"])

resolved_config_path = OUT_EXP / "config_resolved.yaml"
with resolved_config_path.open("w", encoding="utf-8") as f:
    yaml.safe_dump(CFG, f)

{"git": GIT_HASH, "python": PY_VER, "config_saved": str(resolved_config_path)}

In [None]:
PIPELINE_DIR = ROOT / "outputs" / "pipeline"
clean_path = PIPELINE_DIR / "cleaned_combined.parquet"
features_path = PIPELINE_DIR / "graph_features.parquet"
metrics_path = PIPELINE_DIR / "baseline_metrics.json"

if clean_path.exists():
    base_df = pd.read_parquet(clean_path)
else:
    raise FileNotFoundError("Run notebook 01 to generate cleaned_combined.parquet")

if features_path.exists():
    graph_features_df = pd.read_parquet(features_path)
else:
    graph_features_df = pd.DataFrame()

if metrics_path.exists():
    with metrics_path.open("r", encoding="utf-8") as f:
        baseline_metrics = json.load(f)
else:
    baseline_metrics = {}

print("Loaded", len(base_df), "rows; features shape", graph_features_df.shape)

## Research Questions
- Насколько признаки (ra/dec/redshift/mag) разделяют космологию и квантовую решетку?
- Чувствительность к выбору k в kNN и числу eigenvalues.
- Что происходит при отключении спектральной компоненты или embeddings (абляции из config).

In [None]:
def expand_experiments(cfg: Dict) -> List[Dict]:
    runs: List[Dict] = []
    runs.append({"name": "baseline", "features": cfg["features"], "graph": cfg["cosmology"]["graph"]})
    for abl in cfg.get("experiments", {}).get("ablations", []):
        merged = {**cfg["features"], **abl.get("features", {})}
        runs.append({"name": f"ablation_{abl['name']}", "features": merged, "graph": cfg["cosmology"]["graph"]})
    for k in cfg.get("experiments", {}).get("sensitivity", {}).get("k_neighbors", []):
        runs.append({"name": f"sensitivity_k_{k}", "features": cfg["features"], "graph": {**cfg["cosmology"]["graph"], "k_neighbors": k}})
    for k in cfg.get("experiments", {}).get("sensitivity", {}).get("spectral_k", []):
        runs.append({"name": f"sensitivity_eigs_{k}", "features": {**cfg["features"], "spectral_k": k}, "graph": cfg["cosmology"]["graph"]})
    return runs


experiments_plan = expand_experiments(CFG)
pd.DataFrame(experiments_plan)

In [None]:
BASE_COLUMNS = ["ra", "dec", "redshift", "mag_g", "mag_r"]


def prepare_matrix(df: pd.DataFrame, use_spectral: bool) -> pd.DataFrame:
    cols = BASE_COLUMNS.copy()
    if use_spectral and not graph_features_df.empty:
        spectral_cols = [c for c in graph_features_df.columns if c.startswith("eig_")]
        if spectral_cols:
            spectral_row = graph_features_df.iloc[0].to_dict()
            for col in spectral_cols:
                df[col] = spectral_row[col]
            cols += spectral_cols
    return df[cols]


def run_logreg(df: pd.DataFrame, use_spectral: bool, test_size: float) -> Dict[str, float]:
    X = prepare_matrix(df.copy(), use_spectral)
    y = (df["system_type"] == "cosmology").astype(int)
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    X_train, X_test, y_train, y_test = train_test_split(
        X_scaled, y, test_size=test_size, random_state=CFG["models"]["classification"]["random_state"], stratify=y
    )
    model = LogisticRegression(max_iter=200)
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    acc = accuracy_score(y_test, y_pred)
    return {"accuracy": acc}


In [None]:
results: List[Dict] = []
for run in experiments_plan:
    use_spectral = run["features"].get("include_spectral", True)
    metrics = run_logreg(base_df, use_spectral=use_spectral, test_size=CFG["models"]["classification"]["test_size"])
    results.append({"run": run["name"], "accuracy": metrics["accuracy"], "use_spectral": use_spectral, "k_neighbors": run["graph"].get("k_neighbors")})

results_df = pd.DataFrame(results)
results_path = REPORT_TABLES / "results.csv"
results_df.to_csv(results_path, index=False)
results_df

In [None]:
sensitivity_k = results_df[results_df["run"].str.contains("sensitivity_k")]
fig, ax = plt.subplots(figsize=(5, 3))
ax.plot(sensitivity_k["k_neighbors"], sensitivity_k["accuracy"], marker="o")
ax.set_xlabel("k neighbors")
ax.set_ylabel("Accuracy")
ax.set_title("Sensitivity to k")
fig.tight_layout()
sensitivity_fig = REPORT_FIGS / "sensitivity_k.png"
fig.savefig(sensitivity_fig, dpi=200)
plt.close(fig)
sensitivity_fig

In [None]:
def cross_domain_distance(df: pd.DataFrame) -> float:
    cosmo = df[df["system_type"] == "cosmology"][BASE_COLUMNS].mean().to_numpy()
    quantum = df[df["system_type"] == "quantum"][BASE_COLUMNS].mean().to_numpy()
    return float(np.linalg.norm(cosmo - quantum))


distance_score = cross_domain_distance(base_df)
print("Cross-domain mean feature distance", distance_score)


In [None]:
shuffled = base_df.copy()
shuffled["system_type"] = np.random.permutation(shuffled["system_type"].values)
sanity_metrics = run_logreg(shuffled, use_spectral=True, test_size=CFG["models"]["classification"]["test_size"])
print("Sanity (shuffled labels) accuracy", sanity_metrics["accuracy"])


In [None]:
summary = {
    "git": GIT_HASH,
    "baseline_accuracy": float(results_df[results_df["run"] == "baseline"]["accuracy"].iloc[0]),
    "best_run": results_df.sort_values("accuracy", ascending=False).iloc[0].to_dict(),
    "sensitivity_fig": str(sensitivity_fig),
    "cross_domain_distance": distance_score,
    "sanity_accuracy": sanity_metrics["accuracy"],
    "results_table": str(results_path),
}
summary_path = REPORT_TABLES / "summary.json"
with summary_path.open("w", encoding="utf-8") as f:
    json.dump(summary, f, indent=2)

summary_md = REPORT_TABLES / "summary.md"
with summary_md.open("w", encoding="utf-8") as f:
    f.write("# Experiments Summary\n")
    f.write(f"Git: {GIT_HASH}\n")
    f.write(f"Baseline accuracy: {summary['baseline_accuracy']:.3f}\n")
    f.write(f"Best run: {summary['best_run']}\n")
    f.write(f"Cross-domain distance: {distance_score:.3f}\n")
    f.write(f"Sanity (shuffled) accuracy: {sanity_metrics['accuracy']:.3f}\n")
    f.write(f"Sensitivity figure: {sensitivity_fig}\n")

summary

## Эксперименты завершены
- Таблица результатов: `reports/tables/results.csv`
- График чувствительности: `reports/figures/sensitivity_k.png`
- Сводка: `reports/tables/summary.json` и `reports/tables/summary.md`
- Исходные артефакты пайплайна: `outputs/pipeline/*`