In [1]:
# -*- coding: utf-8 -*-
# ================================================================
# AMF Stress Test — workflow JOUR PAR JOUR (base = exposures)
# ================================================================
# Ce script :
# 1) charge le mapping (POS + scenario_paths),
# 2) charge 'exposures' (greeks) + nettoie/contrôle qualité (QC),
# 3) restreint les scénarios au périmètre d'exposures (par Identifier),
# 4) applique les chocs pour un jour donné (day_step_apply),
# 5) renvoie un 'exposures_next' prêt à être MODIFIÉ avant le jour suivant.
# ------------------------------------------------
# CLÉ DE MERGE / AGRÉGATION = Identifier  (ISIN conservé pour info)
# ------------------------------------------------

from pathlib import Path
import pandas as pd
import numpy as np

# =======================
# CONFIG — à adapter
# =======================
EXCEL_MAPPING_PATH = r"C:\Users\abenjelloun\OneDrive - Cooperactions\GAM-E-Risk Perf - RMP\1.PROD\1.REGLEMENTAIRE\14.Stress Test AMF (JB)\Production\Périmètre et positions\Matrices correspondance.xlsx"
SHEET_POS = "POS avec transpa"   # entêtes ligne 2 -> header=1
SHEET_SCEN = "scenario_paths"
POS_HEADER_ROW = 1               # 0-based

EXPOSURES_PATH = r"C:\Users\abenjelloun\OneDrive - Cooperactions\GAM-E-Risk Perf - RMP\1.PROD\1.REGLEMENTAIRE\14.Stress Test AMF (JB)\Production\Périmètre et positions\Exposures-Results-sans transpa-Test_AMF_VDE-2025-03-31.xlsx"
TRIOPTIMA_PATH = r"C\Users\abenjelloun\OneDrive - Cooperactions\GAM-E-Risk Perf - RMP\1.PROD\1.REGLEMENTAIRE\14.Stress Test AMF (JB)\Production\Périmètre et positions\search_groupama-am_2025-03-31.xlsx"

# Jours possibles (doivent exister dans scenario_paths)
DAYS = ["Day 1","Day 2","Day 3","Day 4","Day 5","Day 10"]

# Règles de méthode
PREFER_RATE_PV01 = True          # Taux: RateDelta1bp × shock_bps ; sinon fallback TV×Duration×shock_dec (−)
INCLUDE_OTHER_INFLATION_SWAP = True

# =======================
# Utils colonnes / texte
# =======================
def _clean_cols(df: pd.DataFrame) -> pd.DataFrame:
    """Nettoie des colonnes Excel (espaces, 'Unnamed', points)."""
    new_cols = []
    for c in df.columns:
        if c is None or (isinstance(c, str) and c.lower().startswith("unnamed")):
            new_cols.append(None); continue
        s = str(c).strip().replace("\u00A0", " ")
        s = " ".join(s.split())
        s = s.replace(". ", " ").replace(".", " ")
        new_cols.append(s)
    df.columns = new_cols
    return df

def _norm_str(x):
    """Normalise légèrement une chaîne (pour Market / Variable)."""
    if pd.isna(x): return np.nan
    s = str(x).strip().replace("\u00A0", " ")
    s = " ".join(s.split()).replace(". ", " ").replace(".", " ")
    return s

# =======================
# Chargement mapping (POS + scen)
# =======================
POS_BASE_COLS = [
    "Identifier","ISIN","Description","Currency","AssetType","Sector1","Seniority",
    "CompositeBroadRating","MaturityDate","Maturity","Maturity Band","EffectiveMaturityDate",
    "LiquidityScore","Country","{Class_Rating}"
]
POS_MV_PAIRS = [(f"Market {i}", f"Variable {i}") for i in range(1, 7)]
SCEN_BASE_COLS = ["Market","Variable","Comment","Type","Unit","T0",
                  "Day 1","Day 2","Day 3","Day 4","Day 5","Day 10"]

def load_mapping(path_excel: str|Path) -> tuple[pd.DataFrame, pd.DataFrame]:
    """Lit la feuille POS et scenario_paths."""
    xls = pd.ExcelFile(path_excel)
    pos  = pd.read_excel(xls, SHEET_POS, header=POS_HEADER_ROW)
    scen = pd.read_excel(xls, SHEET_SCEN)
    pos  = _clean_cols(pos)
    scen = _clean_cols(scen)
    return pos, scen

def melt_pos(pos: pd.DataFrame) -> pd.DataFrame:
    """Transforme POS en format long : 1 ligne par (Market k, Variable k) non vide."""
    pos = pos.copy()
    for mk, vk in POS_MV_PAIRS:
        if mk in pos.columns: pos[mk] = pos[mk].apply(_norm_str)
        if vk in pos.columns: pos[vk] = pos[vk].apply(_norm_str)

    base_cols = [c for c in POS_BASE_COLS if c in pos.columns]
    mv_pairs_present = [(mk,vk) for mk,vk in POS_MV_PAIRS if mk in pos.columns and vk in pos.columns]

    rows = []
    for _, row in pos.iterrows():
        base = {c: row.get(c, np.nan) for c in base_cols}
        for mk, vk in mv_pairs_present:
            market, variable = row[mk], row[vk]
            if pd.notna(market) and str(market) != "":
                rows.append({**base, "Market": market, "Variable": (variable if pd.notna(variable) else np.nan)})
    out = pd.DataFrame(rows)
    if "Identifier" not in out.columns: out["Identifier"] = np.arange(len(out))
    return out

def prepare_scenarios(scen: pd.DataFrame) -> pd.DataFrame:
    """Nettoie la table scenario_paths & conserve les colonnes utiles."""
    scen = scen.copy()
    scen = scen[[c for c in SCEN_BASE_COLS if c in scen.columns]]
    scen["Market"] = scen["Market"].apply(_norm_str)
    if "Variable" in scen.columns: scen["Variable"] = scen["Variable"].apply(_norm_str)
    if "Type" in scen.columns:     scen["Type"]     = scen["Type"].apply(lambda x: str(x).strip().lower() if pd.notna(x) else x)
    if "Unit" in scen.columns:     scen["Unit"]     = scen["Unit"].apply(lambda x: str(x).strip().lower() if pd.notna(x) else x)
    return scen

def merge_pos_scen(pos_long: pd.DataFrame, scen: pd.DataFrame) -> pd.DataFrame:
    """Relie POS (long) aux scénarios : jointure (Market,Variable) ; si Variable vide -> jointure sur Market seul."""
    left_mv = pos_long.dropna(subset=["Market","Variable"]) if "Variable" in pos_long.columns else pos_long.copy()
    mv_merge = left_mv.merge(scen, on=["Market","Variable"], how="left")
    if "Variable" in pos_long.columns: left_m = pos_long[pos_long["Variable"].isna()].copy()
    else:                               left_m = pd.DataFrame(columns=pos_long.columns)
    if not left_m.empty:
        m_merge = left_m.merge(scen.drop(columns=["Variable"], errors="ignore"), on="Market", how="left")
        return pd.concat([mv_merge, m_merge], ignore_index=True)
    return mv_merge

def available_days(scen: pd.DataFrame) -> list[str]:
    """Retourne la liste des colonnes 'Day n' disponibles."""
    return [c for c in scen.columns if isinstance(c, str) and c.lower().startswith("day")]

# =======================
# Limiter les scénarios au périmètre d'exposures (clé = Identifier)
# =======================
def restrict_scenarios_to_exposures(merged_mapping: pd.DataFrame,
                                    exposures: pd.DataFrame,
                                    key_col: str = "Identifier") -> pd.DataFrame:
    """Garde uniquement les lignes de mapping dont la clé existe dans exposures."""
    if key_col not in merged_mapping.columns or key_col not in exposures.columns:
        return merged_mapping
    keys = exposures[key_col].astype(str).unique()
    return merged_mapping[merged_mapping[key_col].astype(str).isin(keys)].copy()

# =======================
# Standardisation des chocs
# =======================
def standardize_shock(value, unit: str|None) -> float|None:
    """Choc standardisé en décimal: 50 bps -> 0.005 ; -10% -> -0.10 ; 2 p.p -> 0.02."""
    if pd.isna(value): return None
    try: val = float(value)
    except: return None
    if unit is None: return val
    u = unit.lower()
    if u in ["bp","bps"]: return val / 10_000.0
    if u in ["%","percent","percentage","p.p","pp","ppt","percentage point","percentage points"]:
        return val / 100.0
    return val

def _to_bps(shock_std: float, unit: str|None) -> float:
    """Convertit un choc standardisé en bps numériques si besoin (pour PV01 / CS01 / Infl01)."""
    if shock_std is None or pd.isna(shock_std) or unit is None: return np.nan
    u = unit.lower()
    if u in ["bp","bps"]: return shock_std * 10_000.0
    if u in ["%","percent","percentage","p.p","pp","ppt","percentage point","percentage points"]:
        return shock_std * 10_000.0
    return np.nan

def build_daily_shocks(merged: pd.DataFrame, day_col: str) -> pd.DataFrame:
    """Prépare les chocs d'un jour (inclut Identifier & ISIN si présents)."""
    if day_col not in merged.columns:
        raise ValueError(f"Jour '{day_col}' introuvable. Jours dispo: {available_days(merged)}")
    out = merged.copy()
    out["shock_raw"] = out[day_col]
    out["shock_std"] = [standardize_shock(v, u) for v, u in zip(out["shock_raw"], out.get("Unit", pd.Series([None]*len(out))))]
    keep = ["Identifier","ISIN","Market","Variable","Type","Unit","T0", day_col, "shock_std","Comment"]
    keep = [c for c in keep if c in out.columns]
    return out[keep]

# =======================
# Chargement exposures + QC
# =======================
def load_exposures(path_excel: str|Path, return_qc: bool = True):
    """Charge le fichier d'expositions, nettoie et met des valeurs par défaut pour éviter les NaN."""
    df = pd.read_excel(path_excel)
    df = _clean_cols(df)

    qc = {"path": str(path_excel)}
    qc["rows_raw"] = int(df.shape[0])

    # Convertir en numérique les colonnes clés si elles existent
    num_cols = [
        "TV","MacaulayDuration","Duration","RateConvexity","DollarRateConvexity1pc",
        "RateDelta1bp","RateVega","SpreadDelta1bp","CreditVega",
        "EquityDelta","EquityGamma","EquityVega","FXDelta","FXVega","InflationDelta1bp",
        "Nominal","TVPercent"
    ]
    for c in num_cols:
        if c in df.columns:
            df[c] = pd.to_numeric(df[c], errors="coerce")

    # Enlever les lignes agrégées 'TOTAL' si jamais elles existent
    removed_total = 0
    if "AssetID" in df.columns:
        mask_total = df["AssetID"].astype(str).str.upper().eq("TOTAL")
        removed_total = int(mask_total.sum())
        df = df[~mask_total].copy()
    qc["rows_removed_total"] = removed_total
    qc["rows_after_filter"] = int(df.shape[0])

    # Sensi NaN -> 0 (évite les effets NaN)
    sensi_zero = [
        "RateDelta1bp","SpreadDelta1bp","InflationDelta1bp",
        "EquityDelta","FXDelta","RateVega","CreditVega","EquityVega","FXVega","EquityGamma"
    ]
    qc["filled_zero"] = {}
    for c in sensi_zero:
        if c in df.columns:
            n = int(df[c].isna().sum())
            df[c] = df[c].fillna(0.0)
            qc["filled_zero"][c] = n

    # Duration NaN -> 0
    if "Duration" in df.columns:
        qc["duration_filled_zero"] = int(df["Duration"].isna().sum())
        df["Duration"] = df["Duration"].fillna(0.0)
    else:
        qc["duration_missing_col"] = True
        df["Duration"] = 0.0

    # RateConvexity : approx via DollarRateConvexity1pc / TV si possible, sinon 0
    if "RateConvexity" not in df.columns:
        df["RateConvexity"] = np.nan
        qc["rateconvexity_missing_col"] = True
    else:
        qc["rateconvexity_missing_col"] = False

    approx_count, fill_zero_count = 0, 0
    if "DollarRateConvexity1pc" in df.columns and "TV" in df.columns:
        need_conv = df["RateConvexity"].isna()
        valid_tv  = df["TV"].notna() & (df["TV"] != 0.0)
        mask = need_conv & df["DollarRateConvexity1pc"].notna() & valid_tv
        with np.errstate(divide='ignore', invalid='ignore'):
            approx = df.loc[mask, "DollarRateConvexity1pc"] / df.loc[mask, "TV"]
        df.loc[mask, "RateConvexity"] = approx.replace([np.inf, -np.inf], np.nan)
        approx_count = int(mask.sum())
    still_nan = df["RateConvexity"].isna()
    fill_zero_count = int(still_nan.sum())
    df.loc[still_nan, "RateConvexity"] = 0.0

    qc["rateconvexity_approximated"] = approx_count
    qc["rateconvexity_filled_zero"] = fill_zero_count

    return (df, qc) if return_qc else df

def print_qc_report(qc: dict):
    """Affiche un petit rapport de nettoyage des expositions."""
    print(f"📄 Fichier : {qc.get('path','')}")
    print(f"📦 Lignes brutes : {qc['rows_raw']}")
    print(f"🧹 Lignes 'TOTAL' supprimées : {qc['rows_removed_total']}")
    print(f"✅ Lignes après filtre : {qc['rows_after_filter']}")
    print("\n🔧 NaN → 0 (sensis) :")
    for k, v in qc["filled_zero"].items():
        print(f"  - {k:<22}: {v}")
    if "duration_filled_zero" in qc:
        print(f"\n⏱  Duration NaN → 0 : {qc['duration_filled_zero']}")
    if qc.get("duration_missing_col"):
        print("⚠️  Colonne 'Duration' manquante → créée à 0")
    if qc.get("rateconvexity_missing_col"):
        print("⚠️  Colonne 'RateConvexity' manquante → créée")
    print(f"\n📐 RateConvexity approximée (DollarRateConvexity1pc/TV) : {qc['rateconvexity_approximated']}")
    print(f"0️⃣  RateConvexity NaN restants mis à 0 : {qc['rateconvexity_filled_zero']}")

def load_counterparty_mapping(path_excel: str|Path, id_col='FREE_TEXT_1', cp_col='CP') -> pd.DataFrame:
    """Lit le fichier Trioptima et renvoie un mapping Identifier→Counterparty."""
    df = pd.read_excel(path_excel)
    df = _clean_cols(df)
    if id_col not in df.columns or cp_col not in df.columns:
        raise KeyError(f"Colonnes '{id_col}' ou '{cp_col}' manquantes dans {path_excel}")
    return (df[[id_col, cp_col]]
            .dropna(subset=[id_col])
            .rename(columns={id_col: 'Identifier', cp_col: 'Counterparty'})
            .drop_duplicates('Identifier'))

# =======================
# Étape "un jour" — base = exposures, clé = Identifier
# =======================
def day_step_apply(
    exposures: pd.DataFrame,
    merged_mapping: pd.DataFrame,
    day_col: str = "Day 1",
    prefer_rate_pv01: bool = True,
    include_other_inflation_swap: bool = True,
    update_duration: bool = True,   # Duration_next = Duration_t + RateConvexity × shock_rates_dec
    update_tv: bool = True,         # TV_day = TV - TotalEffect
    return_pivot: bool = True,
    key_col: str = "Identifier",    # clé de jointure
):
    """
    Applique les chocs d'un jour et renvoie:
      - detailed       : lignes (Identifier × Market × Variable) avec Effect/Method (+ ISIN si dispo)
      - per_id         : agrégat par Identifier (ISIN=first), TV_day, (Duration_next si update_duration)
      - exposures_next : copie de exposures avec TV/Duration mises à jour (selon flags)
      - pivot          : (optionnel) large des Effects ('Market :: Variable'), index = Identifier
    """

    # 1) Chocs du jour (standardisés en décimal)
    shocks_day = build_daily_shocks(merged_mapping, day_col=day_col)  # contient Identifier, ISIN (POS), Market, Variable, shock_std...

    # 2) Merge en LEFT depuis exposures (la base de calcul) par Identifier
    cols_needed = [
        key_col, "AssetClass","Country","AssetID","Nominal","TVPercent","TV",
        "MacaulayDuration","Duration","RateConvexity","DollarRateConvexity1pc","RateDelta1bp","RateVega",
        "SpreadDelta1bp","CreditVega","EquityDelta","EquityGamma","EquityVega","FXDelta","FXVega","InflationDelta1bp"
    ]
    cols_needed = [c for c in cols_needed if c in exposures.columns]
    base = exposures[cols_needed].copy()
    if key_col not in base.columns or key_col not in shocks_day.columns:
        raise KeyError(f"Clé '{key_col}' absente de exposures ou du mapping de scénarios.")

    df = base.merge(shocks_day, on=key_col, how="left")  # garde les instruments sans mapping (effet=0)

    # 3) Calcul des effets par ligne (Identifier × Market × Variable)
    effects, methods = [], []
    for _, r in df.iterrows():
        market_raw = r.get("Market")
        market = market_raw.lower() if isinstance(market_raw, str) else ""
        variable = (r.get("Variable") or "")
        unit     = r.get("Unit")
        shock_std = r.get("shock_std", np.nan)

        shock_bps = _to_bps(shock_std, unit)  # pour PV01/CS01/Infl01 (en bps numériques)
        shock_dec = shock_std                 # décimal (ex: +50 bps -> +0.005)

        effect = np.nan
        method = None

        if market == "equity":
            eq_delta = r.get("EquityDelta", np.nan)
            if pd.notna(eq_delta) and pd.notna(shock_dec):
                effect = eq_delta * shock_dec
                method = "EquityDelta × shock_dec"

        elif market == "interest rates":
            if prefer_rate_pv01 and pd.notna(r.get("RateDelta1bp")) and pd.notna(shock_bps):
                effect = r["RateDelta1bp"] * shock_bps
                method = "RateDelta1bp × shock_bps"
            else:
                tv, duration = r.get("TV", np.nan), r.get("Duration", np.nan)
                if pd.notna(tv) and pd.notna(duration) and pd.notna(shock_dec):
                    effect = tv * duration * shock_dec * (-1.0)  # signe - (duration)
                    method = "TV × Duration × shock_dec (approx)"

        elif "spread" in market:  # couvre Gov Spreads / Corp Spreads (peu importe la casse)
            sp01 = r.get("SpreadDelta1bp", np.nan)
            if pd.notna(sp01) and pd.notna(shock_bps):
                effect = sp01 * shock_bps
                method = "SpreadDelta1bp × shock_bps"

        elif market == "fx":
            fx_delta = r.get("FXDelta", np.nan)
            if pd.notna(fx_delta) and pd.notna(shock_dec):
                effect = fx_delta * shock_dec
                method = "FXDelta × shock_dec"

        elif market == "inflation":
            infl01 = r.get("InflationDelta1bp", np.nan)
            if pd.notna(infl01) and pd.notna(shock_bps):
                effect = infl01 * shock_bps
                method = "InflationDelta1bp × shock_bps"

        elif market == "other":
            # Cas spécifique demandé: Other + Variable = Inflation Swap
            if isinstance(variable, str) and "inflation swap" in variable.lower():
                infl01 = r.get("InflationDelta1bp", np.nan)
                if pd.notna(infl01) and pd.notna(shock_bps):
                    effect = infl01 * shock_bps
                    method = "InflationDelta1bp × shock_bps (Other/Inflation Swap)"

        effects.append(effect)
        methods.append(method)

    df["Effect"] = effects
    df["Method"] = methods
    df["Effect"] = df["Effect"].fillna(0.0)  # pas de mapping -> effet 0

    # 4) Agrégat par Identifier (on conserve ISIN = first pour info)
    agg = {
        "TV": ("TV","first"),
        "Duration_t": ("Duration","first"),
        "RateConvexity": ("RateConvexity","first"),
        "TotalEffect": ("Effect","sum"),
    }
    if "ISIN" in df.columns:
        agg["ISIN"] = ("ISIN","first")

    per_id = df.groupby(key_col, as_index=False).agg(**agg)
    per_id["TV_day"] = per_id["TV"] - per_id["TotalEffect"]

    # 5) (option) mise à jour de la Duration : Duration_next = Duration_t + RateConvexity × (∑choc_rates_dec)
    if update_duration:
        rates_mask = df["Market"].fillna("").str.lower().eq("interest rates")
        rates_choc_dec = (df.loc[rates_mask]
                            .groupby(key_col, as_index=False)["shock_std"]
                            .sum()
                            .rename(columns={"shock_std":"shock_rates_dec"}))
        per_id = per_id.merge(rates_choc_dec, on=key_col, how="left")
        per_id["shock_rates_dec"] = per_id["shock_rates_dec"].fillna(0.0)
        per_id["RateConvexity"]   = per_id["RateConvexity"].fillna(0.0)
        per_id["Duration_t"]      = per_id["Duration_t"].fillna(0.0)
        per_id["Duration_next"]   = per_id["Duration_t"] + per_id["RateConvexity"] * per_id["shock_rates_dec"]

    # 6) Construire exposures_next (mise à jour TV/Duration par Identifier)
    exposures_next = exposures.copy()
    tv_map = per_id.set_index(key_col)["TV_day"]
    exposures_next["TV"] = exposures_next[key_col].map(tv_map).fillna(exposures_next["TV"])
    if update_duration and "Duration_next" in per_id.columns:
        dur_map = per_id.set_index(key_col)["Duration_next"]
        exposures_next["Duration"] = exposures_next[key_col].map(dur_map).fillna(exposures_next["Duration"])

    # 7) Pivot (optionnel) des effets par 'Market :: Variable'
    pivot = None
    if return_pivot:
        mv_key = df["Market"].fillna("").astype(str)
        if "Variable" in df.columns:
            mv_key = mv_key + " :: " + df["Variable"].fillna("").astype(str)
        df["_MV_"] = mv_key
        pivot = (df.pivot_table(index=key_col, columns="_MV_", values="Effect", aggfunc="sum")
                   .reset_index())
        pivot.columns.name = None

    return df, per_id, exposures_next, pivot


# ... puis Day 3, Day 4, Day 5, Day 10 de la même manière.


In [None]:
def process_pv_after_day1(exposures_next: pd.DataFrame,
                          future_classes=("Bond Future", "Equity Index Future", "FX Future"),
                          cp_col="Counterparty", port_col="Portefeuille") -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """Aggregate TV after Day 1 and reset PV for selected futures classes.

    Parameters
    ----------
    exposures_next : pd.DataFrame
        DataFrame returned by ``day_step_apply`` for Day 1.
    future_classes : iterable[str]
        Asset classes whose TV must be summed then reset to 0.
    cp_col, port_col : str
        Column names for counterparty and portfolio.

    Returns
    -------
    updated : pd.DataFrame
        ``exposures_next`` with TV set to 0 for the specified futures classes.
    futures_tv : pd.DataFrame
        Sum of TV by AssetClass before reset (column ``TV_before_reset``).
    cp_port_tv : pd.DataFrame
        Sum of TV by Counterparty and Portfolio for remaining asset classes before reset (column ``TV_before_reset``).
    """
    df = exposures_next.copy()
    if "AssetClass" not in df.columns:
        raise KeyError("Column 'AssetClass' missing in exposures_next")
    mask = df["AssetClass"].isin(future_classes)
    futures_tv = (
        df.loc[mask]
          .groupby("AssetClass", as_index=False)["TV"].sum()
          .rename(columns={"TV": "TV_before_reset"})
          .sort_values("AssetClass")
    )
    other_mask = ~mask
    group_cols = [c for c in (cp_col, port_col) if c in df.columns]
    cp_port_tv = (
        df.loc[other_mask]
          .groupby(group_cols, as_index=False)["TV"].sum()
          .rename(columns={"TV": "TV_before_reset"})
          .sort_values(group_cols)
        if len(group_cols) == 2 else pd.DataFrame(columns=group_cols + ["TV_before_reset"])
    )
    df.loc[mask, "TV"] = 0.0
    return df, futures_tv, cp_port_tv


In [2]:
# =======================
# EXEMPLE D'UTILISATION
# =======================
# 1) Charger mapping & scénarios
pos_raw, scen_raw = load_mapping(EXCEL_MAPPING_PATH)
scen_raw=scen_raw.iloc[:28,:]
pos_long = melt_pos(pos_raw)
scen     = prepare_scenarios(scen_raw)
merged   = merge_pos_scen(pos_long, scen)          # mapping POS ↔ scénarios

# 2) Charger exposures (greeks) + QC
exposures, qc = load_exposures(EXPOSURES_PATH, return_qc=True)
print_qc_report(qc)

# Ajouter les contreparties depuis Trioptima
cp_map = load_counterparty_mapping(TRIOPTIMA_PATH)
exposures = exposures.merge(cp_map, on='Identifier', how='left')


# 3) Restreindre le mapping au périmètre d'exposures (par Identifier)
merged = restrict_scenarios_to_exposures(merged, exposures, key_col="Identifier")

# 4) DAY 1
d1_det, d1_id, exp_d2, d1_pivot = day_step_apply(
    exposures=exposures,
    merged_mapping=merged,
    day_col="Day 1",
    prefer_rate_pv01=PREFER_RATE_PV01,
    include_other_inflation_swap=INCLUDE_OTHER_INFLATION_SWAP,
    update_duration=True,
    update_tv=True,
    return_pivot=True,
    key_col="Identifier",
)
# Agrégation des TV et remise à zéro pour certains futures
exp_d2, tv_futures, tv_cp_port = process_pv_after_day1(exp_d2)
print("TV avant remise à zéro par AssetClass (futures):")
print(tv_futures)
print("TV avant remise à zéro par Counterparty/Portefeuille pour les autres classes d'actifs:")
print(tv_cp_port)
# 👉 exp_d2 est ta base pour Day 2 (tu peux la MODIFIER maintenant)

# 5) (EXEMPLE) Modifs entre Day 1 et Day 2
# exp_d2.loc[exp_d2["Identifier"]=="ID_À_VENDRE","TV"] *= 0.95
# exp_d2 = exp_d2[exp_d2["Identifier"]!="ID_A_SORTIR"]
# exp_d2.loc[exp_d2["Identifier"]=="NOUVEL_ID","Duration"] = 6.8
# (si tu as recalculé des sensis overnight, écrase les colonnes correspondantes dans exp_d2)

# 6) DAY 2 (sur la base modifiée)
d2_det, d2_id, exp_d3, d2_pivot = day_step_apply(
    exposures=exp_d2,
    merged_mapping=merged,
    day_col="Day 2",
    prefer_rate_pv01=PREFER_RATE_PV01,
    include_other_inflation_swap=INCLUDE_OTHER_INFLATION_SWAP,
    update_duration=True,
    update_tv=True,
    return_pivot=True,
    key_col="Identifier",
)

PermissionError: [Errno 13] Permission denied: 'C:\\Users\\abenjelloun\\OneDrive - Cooperactions\\GAM-E-Risk Perf - RMP\\1.PROD\\1.REGLEMENTAIRE\\14.Stress Test AMF (JB)\\Production\\Périmètre et positions\\Matrices correspondance.xlsx'

In [65]:
shocks_day = build_daily_shocks(merged, day_col='Day 1')

In [70]:
cols_needed = [
    'Identifier', "AssetClass","Country","AssetID","Nominal","TVPercent","TV",
        "MacaulayDuration","Duration","RateConvexity","DollarRateConvexity1pc","RateDelta1bp","RateVega",
        "SpreadDelta1bp","CreditVega","EquityDelta","EquityGamma","EquityVega","FXDelta","FXVega","InflationDelta1bp"
    ]
cols_needed = [c for c in cols_needed if c in exposures.columns]
base = exposures[cols_needed].copy()
    

In [74]:
if 'Identifier' not in base.columns or 'Identifier' not in shocks_day.columns:
        raise KeyError(f"Clé '{'Identifier'}' absente de exposures ou du mapping de scénarios.")

df = base.merge(shocks_day, on='Identifier', how="left")  # garde les instruments sans mapping (effet=0)

In [81]:
shocks_day
df.loc[df['Market'].isna()]

Unnamed: 0,Identifier,AssetClass,Country,AssetID,Nominal,TVPercent,TV,MacaulayDuration,Duration,RateConvexity,...,InflationDelta1bp,ISIN,Market,Variable,Type,Unit,T0,Day 1,shock_std,Comment
5478,FXF11702,Currency Forward,Not Classified,BNP_PARIBAS_FR EUR/CAD 20250414,-2.400000e+07,-5.884931e-02,-2.397696e+07,0.038356,0.038356,0.001471,...,0.0,,,,,,,,,
5479,FXF11703,Currency Forward,Not Classified,NATIXIS_FR EUR/CAD 20250414,-2.700000e+07,-6.620548e-02,-2.697408e+07,0.038356,0.038356,0.001471,...,0.0,,,,,,,,,
5480,FXF11704,Currency Forward,Not Classified,NATIXIS_FR EUR/CAD 20250414,-1.300000e+07,-3.187671e-02,-1.298752e+07,0.038356,0.038356,0.001471,...,0.0,,,,,,,,,
5481,FXF11702,Currency Forward,Not Classified,BNP_PARIBAS_FR EUR/CAD 20250414,2.396180e+07,5.875309e-02,2.393775e+07,0.038356,0.038356,0.001471,...,0.0,,,,,,,,,
5482,FXF11703,Currency Forward,Not Classified,NATIXIS_FR EUR/CAD 20250414,2.695702e+07,6.609722e-02,2.692997e+07,0.038356,0.038356,0.001471,...,0.0,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
8430,CSH_MXN_DB,Cash,Not Classified,MXN,9.590226e+03,2.353836e-05,9.590226e+03,0.000000,0.000000,0.000000,...,0.0,,,,,,,,,
8431,CSH_NZD_DB,Cash,Not Classified,NZD,5.387351e+03,1.322277e-05,5.387351e+03,0.000000,0.000000,0.000000,...,0.0,,,,,,,,,
8432,CSH_ZAR_DB,Cash,Not Classified,ZAR,4.709060e+03,1.155797e-05,4.709060e+03,0.000000,0.000000,0.000000,...,0.0,,,,,,,,,
8433,CSH_ILS_DB,Cash,Not Classified,ILS,1.563890e+03,3.838430e-06,1.563890e+03,0.000000,0.000000,0.000000,...,0.0,,,,,,,,,


In [79]:
scen

Unnamed: 0,Market,Variable,Comment,Type,Unit,T0,Day 1,Day 2,Day 3,Day 4,Day 5,Day 10
0,Equity,EU Equities,...,relative,%,0,-13.0,-25,-31.0,-30,-28.0,-25
1,Equity,North American Equities,,relative,%,0,-10.0,-20,-25.0,-24,-22.0,-20
2,Equity,Japan Equities,,relative,%,0,-7.0,-13,-17.0,-16,-15.0,-13
3,Equity,Equity market volatilities,,absolute,p.p.,0,15.0,30,37.5,36,33.0,30
4,Interest rates,EU rates,Parallell shift,absolute,bps,0,25.0,50,62.5,60,55.0,50
5,Interest rates,RoW rates,Parallell shift,absolute,bps,0,12.5,25,31.25,30,27.5,25
6,Interest rates,Swaption implied volatility,Parallell shift to the vol. cube (normal vol.),absolute,bps,0,17.5,35,43.75,42,38.5,35
7,Interest rates,Base Repo Rate,,absolute,bps,0,20.0,40,50.0,48,44.0,40
8,Gov Spreads,Medium risk countries,Spread vs. swap rate,absolute,bps,0,20.0,40,50.0,48,44.0,40
9,Gov Spreads,Low risk countries,Spread vs. swap rate,absolute,bps,0,15.0,30,37.5,36,33.0,30
