# Production Behavior Manifold & Forecast

Обновлённый ноутбук объединяет основные шаги пайплайна PBM в единое рабочее место:
1. Загрузка и очистка сырых ежемесячных отчётов.
2. Предобработка профилей (Step 1) и сохранение результатов.
3. Построение дескрипторов, manifold-embedding и кластеризация.
4. Генерация отчётов по сегментации.
5. Построение прогнозов по префиксу (20 → 100).

Ниже каждая секция изолирована функциями и настройками, что упрощает повторный запуск и адаптацию под другие проекты.


## 0. Импорт, конфигурация и утилиты


In [None]:
from __future__ import annotations

import os
import warnings
from pathlib import Path
from typing import Any, Dict, Optional

import numpy as np
import pandas as pd
from IPython.display import display

warnings.filterwarnings("ignore")
os.environ["SCIPY_ARRAY_API"] = "1"

pd.options.display.max_columns = 50
pd.options.display.width = 120


In [None]:
# Базовые пути и параметры выполнения
DATA_DIR = Path("data/wells")
FORECAST_EXPORT_DIR = Path("reports/forecast_exports")
PBM_REPORT_DIR = Path("reports/pbm_report_exports")

# Ограничители для быстрых прогонов (None -> использовать всё)
MAX_CSV_FILES: Optional[int] = None   # например, 5 чтобы взять только первые файлы
SAMPLE_WELLS: Optional[int] = None    # например, 500 чтобы ограничить число скважин

# Параметры шагов
RUN_MANIFOLD = True
RUN_CLUSTERING = True
RUN_PBM_REPORT = False  # отчёт требует matplotlib/hdbscan и может быть тяжёлым
RUN_FORECAST = True

FORECAST_PREFIX = 20
RANDOM_SEED = 59

pipeline_state: Dict[str, Any] = {}


In [None]:
def discover_csv_files(data_dir: Path, limit: Optional[int] = None) -> list[Path]:
    """Поиск CSV с исходными данными."""
    if not data_dir.exists():
        raise FileNotFoundError(f"Каталог {data_dir} не найден.")
    csv_files = sorted(data_dir.glob("*.csv"))
    if limit is not None:
        csv_files = csv_files[:limit]
    if not csv_files:
        raise FileNotFoundError(f"В каталоге {data_dir} нет csv-файлов.")
    return csv_files

RENAME_MAP = {
    "BBLS_OIL_COND": "oil",
    "MCF_GAS": "gas",
    "BBLS_WTR": "water",
    "API_WellNo": "well_name",
    "RptDate": "date",
    "DAYS_PROD": "days_prod",
}

def load_raw_well_data(data_dir: Path, limit_files: Optional[int] = None) -> pd.DataFrame:
    csv_files = discover_csv_files(data_dir, limit_files)
    frames = [pd.read_csv(path) for path in csv_files]
    df = pd.concat(frames, ignore_index=True)
    print(f"Считано файлов: {len(csv_files)} — строк: {len(df):,}")
    return df

def clean_well_data(df: pd.DataFrame) -> pd.DataFrame:
    df = df.rename(columns=RENAME_MAP)
    df["date"] = pd.to_datetime(df["date"])
    drop_cols = [c for c in ("Lease_Unit", "Formation") if c in df.columns]
    if drop_cols:
        df = df.drop(columns=drop_cols)
    df = df[(df["oil"] >= 0) & (df["gas"] >= 0) & (df["water"] >= 0)].copy()
    df = df.sort_values(by=["well_name", "date"]).reset_index(drop=True)
    return df

def subset_wells(df: pd.DataFrame, n_wells: Optional[int] = None) -> pd.DataFrame:
    if n_wells is None:
        return df
    wells = (
        df["well_name"]
        .dropna()
        .drop_duplicates()
        .sort_values()
        .head(n_wells)
    )
    return df[df["well_name"].isin(wells)].copy()

def prepare_dataset(data_dir: Path, limit_files: Optional[int] = None, sample_wells: Optional[int] = None) -> pd.DataFrame:
    raw = load_raw_well_data(data_dir, limit_files)
    cleaned = clean_well_data(raw)
    subset = subset_wells(cleaned, sample_wells)
    print(f"Итоговый датафрейм: {subset.shape[0]:,} строк, {subset['well_name'].nunique()} скважин")
    return subset


In [None]:
try:
    df = prepare_dataset(DATA_DIR, MAX_CSV_FILES, SAMPLE_WELLS)
except FileNotFoundError as exc:
    print(f"⚠️ {exc}")
    df = None
else:
    pipeline_state["dataframe"] = df
    display(df.head())


## 1. Предобработка профилей (PBM Step 1)


In [None]:
from tools.preprocessing import PreprocConfig, preprocess_profiles

def run_preprocessing(df: pd.DataFrame, cfg: Optional[PreprocConfig] = None) -> Dict[str, Any]:
    if df is None:
        raise ValueError("Нет исходных данных — убедитесь, что предыдущая ячейка выполнена успешно.")
    cfg = cfg or PreprocConfig()
    print(f"Запуск preprocess_profiles для {df['well_name'].nunique()} скважин (T={cfg.T})")
    out = preprocess_profiles(df, cfg)
    print("panel_long shape:", out["panel_long"].shape)
    print("tensor shape:", out["X"].shape)
    display(out["panel_long"].head(12))
    out["config"] = dict(out.get("config", {}))
    return out

preproc_cfg = PreprocConfig()
preproc_out = run_preprocessing(df, preproc_cfg) if df is not None else None
pipeline_state["preprocessing"] = preproc_out
out = preproc_out  # совместимость со старыми скриптами


## 2–3. Компактные признаки и manifold


In [None]:
try:
    from tools.feature import compute_side_features, scale_features
    from tools.manifold import ManifoldConfig, embed_umap_euclid, embed_umap_fastdtw
    MANIFOLD_IMPORT_ERROR = None
except Exception as exc:
    MANIFOLD_IMPORT_ERROR = exc

def run_manifold(out_dict: Dict[str, Any], cfg: Optional[ManifoldConfig] = None, sample_size: Optional[int] = None) -> Dict[str, Any]:
    if out_dict is None:
        raise ValueError("Нет результатов предобработки.")
    if MANIFOLD_IMPORT_ERROR is not None:
        raise RuntimeError(f"Не удалось импортировать инструменты manifold: {MANIFOLD_IMPORT_ERROR}")
    cfg = cfg or ManifoldConfig()
    panel_long = out_dict["panel_long"]
    X = out_dict["X"]
    wells = out_dict["wells_used"]
    tensor_channels = out_dict["tensor_channels"]
    T = int(out_dict.get("config", {}).get("T", X.shape[1]))

    feats = compute_side_features(panel_long, T=T)
    feats_scaled, scaler = scale_features(feats)
    print("Side features shape:", feats_scaled.shape)

    Z_euclid, umap_e = embed_umap_euclid(
        X,
        tensor_channels=tensor_channels,
        channels=cfg.channels,
        n_neighbors=cfg.n_neighbors,
        min_dist=cfg.min_dist,
        n_components=cfg.n_components,
        random_state=cfg.random_state,
    )
    print("UMAP (euclid) shape:", Z_euclid.shape)

    Z_dtw, sub_idx, dist_matrix, info = embed_umap_fastdtw(
        X,
        tensor_channels=tensor_channels,
        channels=cfg.channels,
        cfg=cfg,
        sample_size=sample_size,
    )
    wells_sub = np.array(wells)[sub_idx]
    embedding_df = pd.DataFrame({
        "well_name": wells_sub,
        "x": Z_dtw[:, 0],
        "y": Z_dtw[:, 1],
    })
    display(embedding_df.head())
    return {
        "cfg": cfg,
        "features": feats,
        "features_scaled": feats_scaled,
        "scaler": scaler,
        "Z_euclid": Z_euclid,
        "Z_dtw": Z_dtw,
        "sub_idx": sub_idx,
        "dist_matrix": dist_matrix,
        "info": info,
        "embedding_df": embedding_df,
    }

manifold_cfg = None
manifold_out = None
if RUN_MANIFOLD and preproc_out is not None:
    manifold_cfg = ManifoldConfig(channels=("r_oil_norm", "wc"), random_state=RANDOM_SEED)
    manifold_out = run_manifold(preproc_out, manifold_cfg)
pipeline_state["manifold"] = manifold_out


## 4. Кластеризация и поиск аномалий


In [None]:
try:
    from tools.clustering import (
        ClusterConfig,
        assign_anomaly_scores,
        build_cluster_prototypes,
        cluster_hdbscan,
        summarize_clusters,
    )
    CLUSTER_IMPORT_ERROR = None
except Exception as exc:
    CLUSTER_IMPORT_ERROR = exc

def run_clustering(out_dict: Dict[str, Any], manifold_dict: Dict[str, Any], cfg: Optional[ClusterConfig] = None) -> Optional[Dict[str, Any]]:
    if out_dict is None or manifold_dict is None:
        print("Кластеризация пропущена — нет данных manifold.")
        return None
    if CLUSTER_IMPORT_ERROR is not None:
        raise RuntimeError(f"Не удалось импортировать инструменты кластеризации: {CLUSTER_IMPORT_ERROR}")
    cfg = cfg or ClusterConfig(min_cluster_size=45, min_samples=12)
    embedding = manifold_dict.get("Z_dtw")
    sub_idx = manifold_dict.get("sub_idx")
    if embedding is None or sub_idx is None:
        embedding = manifold_dict["Z_euclid"]
        sub_idx = np.arange(embedding.shape[0])
    wells_sub = np.array(out_dict["wells_used"])[sub_idx]
    res = cluster_hdbscan(embedding, wells_sub.tolist(), cfg)
    df_map = res["df_map"]
    df_map = assign_anomaly_scores(df_map, embedding, res["labels"], lof_k=35)
    summary = summarize_clusters(df_map)
    display(summary)
    prototypes = build_cluster_prototypes(
        out_dict["panel_long"],
        df_map,
        channels=("r_oil_s", "wc", "gor", "r_oil_norm"),
        T=int(out_dict["config"]["T"]),
        method="auto",
    )
    print(f"Silhouette={res['silhouette']:.3f}, DBCV={res['dbcv']:.3f}")
    return {
        "cfg": cfg,
        "result": res,
        "df_map": df_map,
        "summary": summary,
        "prototypes": prototypes,
    }

cluster_cfg = None
cluster_out = None
if RUN_CLUSTERING and manifold_out is not None:
    cluster_cfg = ClusterConfig(min_cluster_size=45, min_samples=12)
    cluster_out = run_clustering(preproc_out, manifold_out, cluster_cfg)
pipeline_state["clustering"] = cluster_out


## 5. Экспорт отчёта PBM


In [None]:
try:
    from tools.make_reports import (
        build_html_report,
        export_csv_summaries,
        save_cluster_distribution_plot,
        save_cluster_prototype_plots,
        save_pbm_map,
    )
    REPORT_IMPORT_ERROR = None
except Exception as exc:
    REPORT_IMPORT_ERROR = exc

def build_pbm_report(preproc_dict: Dict[str, Any], manifold_dict: Dict[str, Any], cluster_dict: Dict[str, Any], out_dir: Path) -> Dict[str, Any]:
    if cluster_dict is None:
        print("Отчёт пропущен — кластеризация не выполнена.")
        return {}
    if REPORT_IMPORT_ERROR is not None:
        raise RuntimeError(f"Не удалось импортировать генераторы отчётов: {REPORT_IMPORT_ERROR}")
    out_dir = Path(out_dir)
    out_dir.mkdir(parents=True, exist_ok=True)
    embedding = manifold_dict.get("Z_dtw")
    if embedding is None:
        embedding = manifold_dict["Z_euclid"]
    df_map = cluster_dict["df_map"]
    map_png = save_pbm_map(embedding, df_map, str(out_dir))
    sizes_png = save_cluster_distribution_plot(df_map, str(out_dir))
    T = int(preproc_dict["config"]["T"])
    proto_pngs = save_cluster_prototype_plots(
        preproc_dict["panel_long"],
        df_map,
        cluster_dict["prototypes"],
        channels=("r_oil_s", "wc", "gor", "r_oil_norm"),
        T=T,
        out_dir=str(out_dir),
    )
    summary = cluster_dict["summary"]
    csv_paths = export_csv_summaries(df_map, summary, str(out_dir), top_anoms=50)
    report_path = build_html_report(
        str(out_dir),
        map_png,
        sizes_png,
        proto_pngs,
        df_map,
        summary,
        title="PBM Report",
    )
    print("Отчёт сохранён:", report_path)
    return {
        "out_dir": out_dir,
        "map_png": map_png,
        "sizes_png": sizes_png,
        "proto_pngs": proto_pngs,
        "csv_paths": csv_paths,
        "report_path": report_path,
    }

pbm_report = None
if RUN_PBM_REPORT and cluster_out is not None:
    pbm_report = build_pbm_report(preproc_out, manifold_out, cluster_out, PBM_REPORT_DIR)
pipeline_state["pbm_report"] = pbm_report


## 6. Прогноз профиля (20 → 100)


In [None]:
from datetime import datetime

from tools.forecast import (
    build_prefix_scaled_channel,
    evaluate_forecasts,
    knn_forecast,
    make_matrices,
    multioutput_forecast,
)

def run_forecast_pipeline(out_dict: Dict[str, Any], T_pref: int, output_dir: Path, rng_seed: int = 59) -> Dict[str, Any]:
    if out_dict is None:
        raise ValueError("Нет результатов предобработки.")
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    panel_long = out_dict["panel_long"].copy()
    wells_used = out_dict["wells_used"]
    T = int(out_dict["config"]["T"])

    panel_long = build_prefix_scaled_channel(
        panel_long,
        wells_used,
        T=T,
        T_pref=T_pref,
        q=0.90,
        rate_col="r_oil_s",
        out_col="r_oil_pref_norm",
    )

    X_pref, Y_suffix_true, Y_full = make_matrices(
        panel_long,
        wells_used,
        T=T,
        T_pref=T_pref,
        channel="r_oil_pref_norm",
        target_col="r_oil_s",
    )

    Y_pred_knn, knn_info = knn_forecast(X_pref, Y_full, T_pref=T_pref, K=15)
    Y_pred_lr, lr_info = multioutput_forecast(panel_long, wells_used, T=T, T_pref=T_pref, Y_full=Y_full, random_state=43)

    metrics_knn = evaluate_forecasts(Y_suffix_true, Y_pred_knn)
    metrics_lr = evaluate_forecasts(Y_suffix_true, Y_pred_lr)

    metrics_df = pd.DataFrame([
        {"model": "knn", **metrics_knn},
        {"model": "elasticnet", **metrics_lr},
    ])
    metrics_path = output_dir / "metrics.csv"
    metrics_df.to_csv(metrics_path, index=False)

    np.save(output_dir / "Y_suffix_true.npy", Y_suffix_true)
    np.save(output_dir / "Y_pred_knn.npy", Y_pred_knn)
    np.save(output_dir / "Y_pred_enet.npy", Y_pred_lr)

    suffix_cols = [f"m{t}" for t in range(T_pref + 1, T + 1)]

    def save_suffix_csv(arr: np.ndarray, name: str) -> Path:
        df_pred = pd.DataFrame(arr, columns=suffix_cols)
        df_pred.insert(0, "well_name", wells_used)
        path = output_dir / f"pred_{name}.csv"
        df_pred.to_csv(path, index=False)
        return path

    suffix_paths = {
        "knn": save_suffix_csv(Y_pred_knn, "knn"),
        "elasticnet": save_suffix_csv(Y_pred_lr, "elasticnet"),
    }

    full_cols = [f"m{t+1}" for t in range(T)]

    def save_full_csv(arr: np.ndarray, name: str) -> Path:
        df_pred = pd.DataFrame(arr, columns=full_cols)
        df_pred.insert(0, "well_name", wells_used)
        path = output_dir / f"pred_full_{name}.csv"
        df_pred.to_csv(path, index=False)
        return path

    Y_hat_knn_full = Y_full.copy()
    Y_hat_enet_full = Y_full.copy()
    Y_hat_knn_full[:, T_pref:T] = Y_pred_knn
    Y_hat_enet_full[:, T_pref:T] = Y_pred_lr

    full_paths = {
        "knn": save_full_csv(Y_hat_knn_full, "knn"),
        "elasticnet": save_full_csv(Y_hat_enet_full, "elasticnet"),
    }

    rows = []
    for i, well in enumerate(wells_used):
        for t in range(T):
            segment = "observed" if t < T_pref else "forecast"
            rows.append({
                "well_name": well,
                "t": t,
                "y_true": float(Y_full[i, t]) if np.isfinite(Y_full[i, t]) else np.nan,
                "y_pred_knn": float(Y_hat_knn_full[i, t]) if np.isfinite(Y_hat_knn_full[i, t]) else np.nan,
                "y_pred_elasticnet": float(Y_hat_enet_full[i, t]) if np.isfinite(Y_hat_enet_full[i, t]) else np.nan,
                "segment": segment,
            })
    long_df = pd.DataFrame(rows)
    long_path = output_dir / "pred_long.csv"
    long_df.to_csv(long_path, index=False)

    import matplotlib.pyplot as plt

    def plot_full_example(idx: int, title_prefix: str, pred_full: np.ndarray) -> Path:
        fig, ax = plt.subplots(figsize=(8, 4))
        ax.plot(range(T), Y_full[idx], label="true")
        ax.plot(range(T), pred_full[idx], label="pred")
        ax.axvline(T_pref - 1, linestyle="--")
        ax.set_title(f"{title_prefix} — {wells_used[idx]}")
        ax.set_xlabel("month index")
        ax.set_ylabel("oil rate (r_oil_s)")
        ax.legend()
        path = output_dir / f"{title_prefix.replace(' ', '_').lower()}_{idx}_full.png"
        fig.tight_layout()
        fig.savefig(path, dpi=140)
        plt.close(fig)
        return path

    rng = np.random.default_rng(rng_seed)
    valid_idx = np.where(np.isfinite(Y_pred_knn).all(axis=1))[0]
    example_imgs: list[Path] = []
    if valid_idx.size:
        chosen = rng.choice(valid_idx, size=min(6, valid_idx.size), replace=False)
        for idx in chosen:
            example_imgs.append(plot_full_example(idx, "knn_example", Y_hat_knn_full))
            example_imgs.append(plot_full_example(idx, "enet_example", Y_hat_enet_full))

    html = f"""
<html><head><meta charset='utf-8'><title>Forecast Report</title></head><body>
<h2>Forecast evaluation (prefix {T_pref} → total {T})</h2>
<p>Generated: {datetime.utcnow().isoformat()}Z</p>
<table border='1' cellspacing='0' cellpadding='6'>
<tr><th>Model</th><th>RMSE</th><th>sMAPE</th><th>N eval wells</th></tr>
<tr><td>KNN</td><td>{metrics_knn['rmse']:.4f}</td><td>{metrics_knn['smape']:.4f}</td><td>{metrics_knn['n_eval']}</td></tr>
<tr><td>ElasticNet</td><td>{metrics_lr['rmse']:.4f}</td><td>{metrics_lr['smape']:.4f}</td><td>{metrics_lr['n_eval']}</td></tr>
</table>
<h3>Files</h3>
<ul>
  <li>{metrics_path.name}</li>
  <li>{suffix_paths['knn'].name}</li>
  <li>{suffix_paths['elasticnet'].name}</li>
  <li>{full_paths['knn'].name}</li>
  <li>{full_paths['elasticnet'].name}</li>
  <li>{long_path.name}</li>
</ul>
<h3>Full-series examples</h3>
{''.join(f"<img src='{img.name}' style='max-width:640px;display:block;margin-bottom:10px;'/>" for img in example_imgs)}
</body></html>
"""
    report_path = output_dir / "forecast_report.html"
    report_path.write_text(html, encoding="utf-8")

    print("KNN   → RMSE={rmse:.4f}, sMAPE={smape:.4f}, N={n_eval}".format(**metrics_knn))
    print("ENet  → RMSE={rmse:.4f}, sMAPE={smape:.4f}, N={n_eval}".format(**metrics_lr))
    print("Артефакты сохранены в:", output_dir)

    return {
        "metrics": metrics_df,
        "metrics_path": metrics_path,
        "suffix_paths": suffix_paths,
        "full_paths": full_paths,
        "long_path": long_path,
        "example_imgs": example_imgs,
        "report_path": report_path,
        "info": {"knn": knn_info, "enet": lr_info},
    }

forecast_artifacts = None
if RUN_FORECAST and preproc_out is not None:
    forecast_artifacts = run_forecast_pipeline(
        preproc_out,
        T_pref=FORECAST_PREFIX,
        output_dir=FORECAST_EXPORT_DIR,
        rng_seed=RANDOM_SEED,
    )
pipeline_state["forecast"] = forecast_artifacts


## 7. Сводка состояния пайплайна


In [None]:
def summarize_state(state: Dict[str, Any]) -> pd.DataFrame:
    rows = []
    for key, value in state.items():
        rows.append({
            "step": key,
            "available": value is not None,
            "type": type(value).__name__,
        })
    return pd.DataFrame(rows)

if pipeline_state:
    display(summarize_state(pipeline_state))
else:
    print("pipeline_state пуст — проверьте выполнение предыдущих ячеек.")
