# Python for Optimization in Finance

---

**A comprehensive analysis of portfolio optimization strategies using Python**

*Financial Engineering & Quantitative Analysis*

---

## Data Overview

### Available Datasets

| **Dataset** | **Description** | **Count** |
|-------------|-----------------|-----------|
| **ETFs** | Anonymized ETF data since 2019 | **105** |
| **Financial Assets** | Historical prices of main asset classes | **14** |

### Main Financial Assets Categories

- **Regional Sovereign Bonds**
- **High Yield (HY) Bonds** 
- **Commodities**
- **Dollar Index**

### Allocation Strategies

 **Two distinct allocation methodologies will be analyzed:**

1. **Fixed Allocation Strategy**
   - Static allocation among the 105 ETFs
   - Constant throughout the entire period

2. **Dynamic Allocation Strategy**
   - Variable allocation among the 105 ETFs
   - Changes over time but with low frequency

---

## Project Objectives

### Key Research Questions

1. **ETF Classification Analysis**
   - Classify ETFs based on various risk metrics, relationships, and performance criteria
   - Identify patterns and clustering among the 105 ETFs

2. **Asset Class Relationship Mapping**
   - Establish relationships between ETFs and main asset classes
   - Identify each ETF's correlation with Regional Sovereign Bonds, HY Bonds, Commodities, and Dollar Index

3. **Risk Assessment**
   - Determine the risk ranges among the ETFs
   - Analyze risk distribution and identify outliers

4. **Mystery Allocation Analysis**
   - Identify the composition of both fixed and dynamic mystery allocations
   - Quantify uncertainty in the allocation results
   - Compare performance of both allocation strategies

5. **Critical Analysis & Validation**
   - Comment on results and methodology
   - Identify potential biases in the analysis
   - Assess coverage of asset classes among the ETFs
   - Highlight any forgotten or underrepresented asset classes

---

In [64]:
# ┌─────────────────────────────────────┐
# │    INSTALLATION DES DÉPENDANCES     │ 
# └─────────────────────────────────────┘

import subprocess
import sys

# Liste des bibliothèques à installer
packages = ['numpy', 'pandas', 'matplotlib', 'scipy']

# Installation des packages
for package in packages:
    try:
        __import__(package)
        print(f"✓ {package} déjà installé")
    except ImportError:
        print(f"Installation de {package}...")
        subprocess.check_call([sys.executable, "-m", "pip", "install", package])

print("Installation terminée!")

✓ numpy déjà installé
✓ pandas déjà installé
✓ matplotlib déjà installé
✓ scipy déjà installé
Installation terminée!


In [66]:
# ┌─────────────────────────────────────┐
# │    IMPORTATION DES BIBLIOTHÈQUES    │ 
# └─────────────────────────────────────┘

import os
import math
import json
import pathlib
import datetime as dt
from typing import Tuple, Dict, Optional

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from scipy.optimize import minimize, LinearConstraint, Bounds
from dataclasses import dataclass

In [67]:
# ┌─────────────────────────────────────┐
# │     CONFIGURATION GLOBALE           │ 
# └─────────────────────────────────────┘

# Configuration du répertoire de sortie
OUTPUT_DIR = "results"
os.makedirs(OUTPUT_DIR, exist_ok=True)

print(f"Répertoire de sortie configuré: {os.path.abspath(OUTPUT_DIR)}")

Répertoire de sortie configuré: c:\Users\emman\Documents\GitHub\Python-for-Optimization-in-Finance\results


In [68]:
# ┌─────────────────────────────────────┐
# │      FONCTIONS UTILITAIRES I/O      │ 
# └─────────────────────────────────────┘

def load_csv_safely(path: str, parse_dates: bool = True, skip_rows: int = None) -> Optional[pd.DataFrame]:
    """
    Charge un CSV de façon robuste :
    - détection automatique des lignes à ignorer (métadonnées)
    - détection de la colonne 'Date' (ou première colonne si doute)
    - conversion en datetime + index
    - enlève colonnes vides / dupliquées
    
    Args:
        path: chemin vers le fichier CSV
        parse_dates: si True, convertit la première colonne en datetime
        skip_rows: nombre de lignes à ignorer (si None, détection automatique)
    """
    if not os.path.exists(path):
        print(f"[WARN] Fichier introuvable : {path}")
        return None
    
    # Détection automatique des lignes à ignorer
    if skip_rows is None:
        # Lecture des premières lignes pour détecter le header
        with open(path, 'r') as f:
            lines = [f.readline().strip() for _ in range(10)]
        
        # Recherche de la ligne contenant "Date" ou des dates
        for i, line in enumerate(lines):
            if 'date' in line.lower() or 'time' in line.lower():
                skip_rows = i
                break
        else:
            skip_rows = 0
    
    # Chargement du fichier
    df = pd.read_csv(path, skiprows=skip_rows)
    
    # Nettoyage des colonnes vides au début
    df = df.dropna(how='all', axis=1)
    df = df.dropna(how='all', axis=0)
    
    # Tentative de détection de la colonne date
    date_col_candidates = [c for c in df.columns if 'date' in c.lower() or 'time' in c.lower()]
    
    if parse_dates:
        if date_col_candidates:
            date_col = date_col_candidates[0]
        else:
            date_col = df.columns[0]  # par défaut
        
        # Conversion en datetime
        df[date_col] = pd.to_datetime(df[date_col], errors="coerce", dayfirst=True)
        df = df.dropna(subset=[date_col]).sort_values(date_col).set_index(date_col)
        df = df[~df.index.duplicated(keep="first")]
    
    # Nettoyage final
    df = df.loc[:, ~df.columns.duplicated()]
    df = df.dropna(how="all", axis=1)
    
    return df

In [69]:
# ┌─────────────────────────────────────┐
# │       CALCUL DES RENDEMENTS         │ 
# └─────────────────────────────────────┘

def to_returns(prices: pd.DataFrame,
               kind: str = "log",
               periods: int = 1,
               dropna: object = True,
               coerce_numeric: bool = True) -> pd.DataFrame:
    """
    Calcule les rendements à partir d'une table de prix/NAV.

    Paramètres:
    - prices: DataFrame indexé (date) avec les prix/NAV
    - kind: 'log' ou 'simple'
    - periods: nombre de périodes pour le shift (1 = t / t-1)
    - dropna: True (drop rows all-NaN), False (ne rien faire), or 'any'/'all' to pass to dropna(how=...)
    - coerce_numeric: si True, convertit les colonnes en numériques (non-convertibles -> NaN puis supprimées)

    Retourne un DataFrame de rendements avec les mêmes colonnes numériques et index trié.
    """
    # validations basiques
    if kind not in ("log", "simple"):
        raise ValueError("kind must be 'log' or 'simple'")
    if not isinstance(prices, pd.DataFrame):
        raise TypeError("prices must be a pandas DataFrame")
    if not (isinstance(periods, int) and periods >= 1):
        raise ValueError("periods must be a positive integer")

    # trier l'index pour l'ordre temporel
    px = prices.sort_index()

    # conversion/coercion des colonnes en numérique si demandé
    if coerce_numeric:
        px = px.apply(pd.to_numeric, errors="coerce")
        # supprimer les colonnes devenues entièrement NaN
        px = px.loc[:, px.notna().any(axis=0)]
    else:
        non_numeric = [c for c in px.columns if not np.issubdtype(px[c].dtype, np.number)]
        if non_numeric:
            raise TypeError(f"Non-numeric columns present: {non_numeric}")

    # si pas assez d'observations pour calculer les rendements, renvoyer une structure vide cohérente
    if px.shape[0] <= periods:
        rets = px.iloc[0:0].astype(float)
        return rets

    # pour les rendements log, remplacer les prix <= 0 par NaN (log indéfini) et prévenir
    if kind == "log":
        mask_nonpos = (px <= 0)
        if mask_nonpos.any().any():
            import warnings
            warnings.warn("Zero or negative price(s) found; corresponding log returns will be NaN.")
            px = px.mask(mask_nonpos)
        rets = np.log(px / px.shift(periods))
    else:
        rets = px.pct_change(periods=periods)

    # gestion flexible du dropna
    if dropna is True:
        rets = rets.dropna(how="all")
    elif dropna in ("any", "all"):
        rets = rets.dropna(how=dropna)
    # si dropna est False ou None, on renvoie tout tel quel

    return rets

In [84]:
# ┌─────────────────────────────────────┐
# │    ANNUALISATION DES STATISTIQUES   │ 
# └─────────────────────────────────────┘

def annualize_mean_vol(rets: pd.DataFrame, periods_per_year: int = 252) -> Tuple[pd.Series, pd.DataFrame]:
    """
    Annualise la moyenne et la matrice de covariance des rendements.

    Args:
        rets: DataFrame des rendements (doit être des rendements simples, pas des log-returns)
        periods_per_year: nombre de périodes par an (252 pour les jours ouvrables)

    Returns:
        mu: Series des rendements moyens annualisés
        cov: DataFrame de la matrice de covariance annualisée
    """
    if not isinstance(rets, pd.DataFrame):
        raise TypeError("rets must be a pandas DataFrame")
    if rets.empty:
        return pd.Series(dtype=float), pd.DataFrame(dtype=float)

    # Calculs (en ignorant les colonnes entièrement NaN)
    rets_clean = rets.loc[:, rets.notna().any(axis=0)]
    mu = rets_clean.mean() * periods_per_year
    cov = rets_clean.cov() * periods_per_year

    # Avertir si des NaN subsistent
    if mu.isna().any() or cov.isna().any().any():
        import warnings
        warnings.warn("NaN values present in annualized mean or covariance; consider filtering assets.")

    return mu, cov

In [71]:
# ┌─────────────────────────────────────┐
# │       CONVERSION NUMÉRIQUE          │ 
# └─────────────────────────────────────┘

def ensure_numeric(df: pd.DataFrame) -> pd.DataFrame:
    """
    Convertit toutes les colonnes possibles en numérique.
    
    Args:
        df: DataFrame à convertir
    
    Returns:
        DataFrame avec colonnes converties en numérique (NaN pour les valeurs non convertibles)
    """
    for c in df.columns:
        df[c] = pd.to_numeric(df[c], errors="coerce")
    return df

In [72]:
# ┌─────────────────────────────────────┐
# │      MÉTRIQUES DE PERFORMANCE       │ 
# └─────────────────────────────────────┘

@dataclass
class PortfolioReport:
    """
    Contient des métriques standard d'évaluation d'un portefeuille.
    
    Attributes:
        cagr: Taux de croissance annuel composé
        vol: Volatilité annualisée
        sharpe: Ratio de Sharpe
        maxdd: Maximum Drawdown
        var_95: Value at Risk à 95%
        es_95: Expected Shortfall à 95%
    """
    cagr: float
    vol: float
    sharpe: float
    maxdd: float
    var_95: float
    es_95: float

In [73]:
# ┌─────────────────────────────────────┐
# │      CALCUL DES STATISTIQUES        │ 
# └─────────────────────────────────────┘

def compute_performance_stats(rets: pd.Series,
                              rf: float = 0.0,
                              periods_per_year: int = 252) -> PortfolioReport:
    """
    Calcule des statistiques classiques sur une série de rendements (Series).
    
    Args:
        rets: Série de rendements
        rf: Taux sans risque (par défaut 0.0)
        periods_per_year: Nombre de périodes par an (252 pour les jours ouvrables)
    
    Returns:
        PortfolioReport avec toutes les métriques calculées
    
    Métriques calculées:
        - CAGR: Taux de croissance annuel composé
        - Volatilité annualisée
        - Sharpe: (mu - rf) / vol
        - Max Drawdown: Perte maximale depuis un pic
        - VaR et ES 95%: Value at Risk et Expected Shortfall historiques
    """
    rets = rets.dropna()
    if rets.empty:
        return PortfolioReport(np.nan, np.nan, np.nan, np.nan, np.nan, np.nan)

    # CAGR approximé via cumprod des rendements
    # Formule robuste : cumprod(1+r)^(annualisation) - 1
    cum = (1 + rets).cumprod()
    n_years = len(rets) / periods_per_year
    cagr = cum.iloc[-1] ** (1.0 / n_years) - 1.0 if n_years > 0 else np.nan

    # Volatilité annualisée
    vol = rets.std() * np.sqrt(periods_per_year)

    # Ratio de Sharpe (rendement ajusté du risque)
    mu_ann = rets.mean() * periods_per_year
    sharpe = (mu_ann - rf) / vol if vol > 0 else np.nan

    # Maximum Drawdown
    rolling_max = cum.cummax()
    drawdown = cum / rolling_max - 1.0
    maxdd = drawdown.min()

    # VaR / ES 95% (approche historique)
    q = rets.quantile(0.05)
    var_95 = -q
    es_95 = -rets[rets <= q].mean() if (rets <= q).any() else np.nan

    return PortfolioReport(cagr, vol, sharpe, maxdd, var_95, es_95)

In [74]:
# ┌─────────────────────────────────────┐
# │    OPTIMISATION MARKOWITZ (MVO)     │ 
# └─────────────────────────────────────┘

def _min_var_given_return(mu, cov, target_ret, w_bounds, allow_short=False):
    """
    Résout le portefeuille de variance minimale pour un rendement cible donné.
    
    Problème d'optimisation: min w'Σw  s.c.  μ'w = target_ret, sum w=1, bounds
    
    Args:
        mu: vecteur des rendements moyens
        cov: matrice de covariance
        target_ret: rendement cible
        w_bounds: tuple (lower_bounds, upper_bounds)
        allow_short: permet les positions courtes (non utilisé actuellement)
        
    Returns:
        Résultat d'optimisation scipy avec attributs success, x, fun, etc.
    """
    n = len(mu)
    mu = np.asarray(mu)
    cov = np.asarray(cov)
    
    def obj(w): 
        return w @ cov @ w
    
    # Contraintes: somme = 1 et rendement = target
    cons = [
        {"type": "eq", "fun": lambda w: np.sum(w) - 1.0},
        {"type": "eq", "fun": lambda w: mu @ w - target_ret},
    ]
    
    lb, ub = w_bounds
    bounds = Bounds(lb, ub)
    w0 = np.full(n, 1.0 / n)  # Poids équipondérés comme point de départ
    
    res = minimize(obj, w0, method="SLSQP", bounds=bounds, constraints=cons)
    return res

In [75]:
# ┌─────────────────────────────────────┐
# │      OPTIMISATION MAX SHARPE        │ 
# └─────────────────────────────────────┘

def _max_sharpe(mu, cov, rf, w_bounds):
    """
    Résout le portefeuille de ratio de Sharpe maximal.
    
    Problème d'optimisation: max (μ'w - rf) / sqrt(w'Σw) <=> min -Sharpe
    s.c. sum w=1, bounds
    
    Args:
        mu: vecteur des rendements moyens
        cov: matrice de covariance  
        rf: taux sans risque
        w_bounds: tuple (lower_bounds, upper_bounds)
        
    Returns:
        Résultat d'optimisation scipy avec attributs success, x, fun, etc.
    """
    n = len(mu)
    mu = np.asarray(mu)
    cov = np.asarray(cov)

    def neg_sharpe(w):
        ret = mu @ w
        vol = math.sqrt(max(w @ cov @ w, 0))  # Protection contre variance négative
        return -(ret - rf) / vol if vol > 0 else 1e9

    cons = [{"type": "eq", "fun": lambda w: np.sum(w) - 1.0}]
    lb, ub = w_bounds
    bounds = Bounds(lb, ub)
    w0 = np.full(n, 1.0 / n)
    
    res = minimize(neg_sharpe, w0, method="SLSQP", bounds=bounds, constraints=cons)
    return res

In [76]:
# ┌─────────────────────────────────────┐
# │   PORTEFEUILLE VARIANCE MINIMALE    │ 
# └─────────────────────────────────────┘

def _global_min_var(cov, w_bounds):
    """
    Résout le portefeuille de variance globale minimale (GMV).
    
    Problème d'optimisation: min w'Σw  s.c. sum w=1, bounds
    
    Args:
        cov: matrice de covariance
        w_bounds: tuple (lower_bounds, upper_bounds)
        
    Returns:
        Résultat d'optimisation scipy avec attributs success, x, fun, etc.
    """
    n = cov.shape[0]
    
    def obj(w): 
        return w @ cov @ w
    
    cons = [{"type": "eq", "fun": lambda w: np.sum(w) - 1.0}]
    lb, ub = w_bounds
    bounds = Bounds(lb, ub)
    w0 = np.full(n, 1.0 / n)
    
    res = minimize(obj, w0, method="SLSQP", bounds=bounds, constraints=cons)
    return res

In [77]:
# ┌─────────────────────────────────────┐
# │        FRONTIÈRE EFFICIENTE         │ 
# └─────────────────────────────────────┘

def efficient_frontier(mu, cov, w_bounds, n_pts=50) -> pd.DataFrame:
    """
    Construit la frontière efficiente en balayant des rendements cibles.
    
    La frontière efficiente est l'ensemble des portefeuilles optimaux offrant
    le rendement maximal pour chaque niveau de risque donné.
    
    Args:
        mu: vecteur des rendements moyens
        cov: matrice de covariance
        w_bounds: tuple (lower_bounds, upper_bounds)
        n_pts: nombre de points sur la frontière (défaut: 50)
        
    Returns:
        DataFrame avec colonnes: target_ret, ret, vol, weights
    """
    # Point GMV pour la borne inférieure de la frontière
    gmv_res = _global_min_var(cov, w_bounds)
    if not gmv_res.success:
        raise RuntimeError("Échec optimisation GMV.")
    
    # Bornes des rendements cibles
    mu_min = float(np.dot(mu, gmv_res.x))  # Rendement du GMV
    mu_max = float(np.max(mu))  # Rendement de l'actif le plus performant
    targets = np.linspace(mu_min, mu_max, n_pts)

    rows = []
    for t in targets:
        res = _min_var_given_return(mu, cov, t, w_bounds, allow_short=False)
        if res.success:
            w = res.x
            ret = float(mu @ w)
            vol = float(np.sqrt(max(w @ cov @ w, 0)))  # Protection contre variance négative
            rows.append({
                "target_ret": t, 
                "ret": ret, 
                "vol": vol, 
                "weights": w
            })
    
    return pd.DataFrame(rows)  

In [78]:
# ┌─────────────────────────────────────┐
# │      CONTRIBUTIONS DE RISQUE        │
# └─────────────────────────────────────┘

def risk_contributions(w, cov):
    """
    Calcule les contributions de risque de chaque actif dans un portefeuille.
    
    La contribution de risque d'un actif i est définie comme:
    RC_i = w_i * (Σw)_i = w_i * MRC_i
    
    où MRC_i est la contribution de risque marginale (Marginal Risk Contribution).
    
    Args:
        w: vecteur des poids du portefeuille
        cov: matrice de covariance des rendements
        
    Returns:
        rc: vecteur des contributions de risque
        port_var: variance totale du portefeuille
    """
    w = np.asarray(w)
    port_var = w @ cov @ w
    mrc = cov @ w  # marginal risk contribution
    rc = w * mrc
    return rc, port_var

In [79]:
# ┌─────────────────────────────────────┐
# │    EQUAL RISK CONTRIBUTION (ERC)    │ 
# └─────────────────────────────────────┘

def solve_erc(cov, w_bounds, tol=1e-8):
    """
    Résout le portefeuille Equal Risk Contribution (ERC) ou Risk Parity.
    
    Le portefeuille ERC cherche à égaliser les contributions de risque de tous les actifs.
    Problème d'optimisation: min Σᵢⱼ (RC_i - RC_j)²  s.c. Σwᵢ = 1, bounds
    
    où RC_i est la contribution de risque de l'actif i au risque total du portefeuille.
    
    Args:
        cov: matrice de covariance des rendements
        w_bounds: tuple (lower_bounds, upper_bounds) pour les contraintes de poids
        tol: tolérance de convergence (défaut: 1e-8)
        
    Returns:
        Résultat d'optimisation scipy avec attributs success, x, fun, etc.
    """
    n = cov.shape[0]
    lb, ub = w_bounds
    bounds = Bounds(lb, ub)
    cons = [{"type": "eq", "fun": lambda w: np.sum(w) - 1.0}]
    w0 = np.full(n, 1.0 / n)  # Poids équipondérés comme point de départ

    def obj(w):
        rc, _ = risk_contributions(w, cov)
        if np.any(w < 0):  # sécurité contre les poids négatifs
            return 1e6
        avg = np.mean(rc)
        return np.sum((rc - avg) ** 2)  # Minimise la variance des contributions de risque

    res = minimize(obj, w0, method="SLSQP", bounds=bounds, constraints=cons, 
                   options={"ftol": tol, "maxiter": 500})
    return res

In [80]:
# ┌─────────────────────────────────────┐
# │   RÉPLICATION PORTEFEUILLE CIBLE    │ 
# └─────────────────────────────────────┘

def replicate_target_portfolio(etf_rets: pd.DataFrame,
                               target_rets: pd.Series,
                               long_only: bool = True,
                               sum_to_one: bool = True) -> Tuple[np.ndarray, dict]:
    """
    Résout le problème de réplication d'un portefeuille cible (Mystery Allocation).
    
    Cette fonction trouve les poids optimaux des ETFs pour répliquer au mieux
    les rendements d'un portefeuille cible en minimisant l'erreur quadratique.
    
    Problème d'optimisation: min_w ||R_ETF * w - R_target||²
    s.c. (optionnel) w ≥ 0, Σw = 1
    
    Args:
        etf_rets: DataFrame des rendements des ETFs (dates x ETFs)
        target_rets: Série des rendements du portefeuille cible à répliquer
        long_only: si True, impose des poids positifs uniquement (défaut: True)
        sum_to_one: si True, impose que la somme des poids = 1 (défaut: True)
        
    Returns:
        Tuple contenant:
        - w: vecteur des poids optimaux (ou poids équipondérés si échec)
        - info_optim: dictionnaire avec success, message, fun (erreur résiduelle)
    """
    # Alignement des données sur les mêmes dates
    XR, yR = etf_rets.align(target_rets, join="inner", axis=0)
    X = XR.values  # Matrice des rendements ETFs
    y = yR.values  # Vecteur des rendements cibles

    n = X.shape[1]  # Nombre d'ETFs
    
    def obj(w):
        """Fonction objectif: somme des carrés des résidus"""
        resid = X @ w - y
        return resid @ resid

    # Configuration des contraintes
    cons = []
    if sum_to_one:
        cons.append({"type": "eq", "fun": lambda w: np.sum(w) - 1.0})

    # Configuration des bornes
    if long_only:
        lb, ub = np.zeros(n), np.ones(n)
    else:
        lb, ub = -np.inf * np.ones(n), np.inf * np.ones(n)
    
    bounds = Bounds(lb, ub)
    w0 = np.full(n, 1.0 / n)  # Point de départ équipondéré

    # Optimisation
    res = minimize(obj, w0, method="SLSQP", bounds=bounds, constraints=cons)
    
    # Retour des résultats
    optimal_weights = res.x if res.success else w0
    optimization_info = {
        "success": res.success, 
        "message": res.message, 
        "fun": res.fun
    }
    
    return optimal_weights, optimization_info

In [81]:
# ┌─────────────────────────────────────┐
# │      CORRECTION D'ERREUR ALIGN      │ 
# └─────────────────────────────────────┘

# Force la re-définition de la fonction corrigée
def replicate_target_portfolio_fixed(etf_rets: pd.DataFrame,
                                     target_rets: pd.Series,
                                     long_only: bool = True,
                                     sum_to_one: bool = True) -> Tuple[np.ndarray, dict]:
    """
    Version corrigée de replicate_target_portfolio avec axis=0 spécifié.
    """
    # Alignement des données sur les mêmes dates avec axis=0 explicite
    common_idx = etf_rets.index.intersection(target_rets.index)
    X = etf_rets.loc[common_idx].values  # Matrice des rendements ETFs
    y = target_rets.loc[common_idx].values  # Vecteur des rendements cibles

    n = X.shape[1]  # Nombre d'ETFs
    
    def obj(w):
        """Fonction objectif: somme des carrés des résidus"""
        resid = X @ w - y
        return resid @ resid

    # Configuration des contraintes
    cons = []
    if sum_to_one:
        cons.append({"type": "eq", "fun": lambda w: np.sum(w) - 1.0})

    # Configuration des bornes
    if long_only:
        lb, ub = np.zeros(n), np.ones(n)
    else:
        lb, ub = -np.inf * np.ones(n), np.inf * np.ones(n)
    
    bounds = Bounds(lb, ub)
    w0 = np.full(n, 1.0 / n)  # Point de départ équipondéré

    # Optimisation
    res = minimize(obj, w0, method="SLSQP", bounds=bounds, constraints=cons)
    
    # Retour des résultats
    optimal_weights = res.x if res.success else w0
    optimization_info = {
        "success": res.success, 
        "message": res.message, 
        "fun": res.fun
    }
    
    return optimal_weights, optimization_info

# Remplace la fonction originale
replicate_target_portfolio = replicate_target_portfolio_fixed
print("✅ Fonction replicate_target_portfolio corrigée et mise à jour!")

✅ Fonction replicate_target_portfolio corrigée et mise à jour!


In [82]:
# ┌─────────────────────────────────────┐
# │        PIPELINE PRINCIPAL           │
# └─────────────────────────────────────┘

def main():
    """
    Pipeline principal d'analyse et d'optimisation de portefeuille.
    
    Ce script orchestre l'ensemble du processus d'analyse financière :
    1. Chargement et préparation des données (ETFs, classes d'actifs, allocations mystères)
    2. Calcul des rendements et statistiques descriptives
    3. Optimisation de portefeuilles (Markowitz, Risk Parity)
    4. Construction de la frontière efficiente
    5. Mapping vers les classes d'actifs
    6. Réplication des allocations mystères
    7. Export des résultats et visualisations
    
    Données requises:
        - Anonymized ETFs.csv: Prix historiques des 105 ETFs
        - Main Asset Classes.csv: Mapping ETFs vers classes d'actifs
        - Mystery Allocation 1.csv: Allocation mystère fixe
        - Mystery Allocation 2.csv: Allocation mystère dynamique
    
    Sorties générées:
        - CSV: statistiques, poids, performances, frontière efficiente
        - PNG: graphiques de la frontière efficiente et réplications
        - Excel: agrégation par classe d'actifs
    """
    # ---------- 6.1) Chargement des données ----------
    print("Chargement des données...")
    etf_path = "Anonymized ETFs.csv"
    classes_path = "Main Asset Classes.csv"
    mystery1_path = "Mystery Allocation 1.csv"
    mystery2_path = "Mystery Allocation 2.csv"

    etf_prices = load_csv_safely(etf_path)
    classes_map = load_csv_safely(classes_path, parse_dates=False)
    myst1_prices = load_csv_safely(mystery1_path)
    myst2_prices = load_csv_safely(mystery2_path)

    if etf_prices is None or etf_prices.empty:
        print("[ERREUR] Anonymized ETFs introuvable ou vide. Abandon.")
        sys.exit(1)

    # Harmonisation numérique
    etf_prices = ensure_numeric(etf_prices)
    if myst1_prices is not None:
        myst1_prices = ensure_numeric(myst1_prices)
    if myst2_prices is not None:
        myst2_prices = ensure_numeric(myst2_prices)

    # ---------- 6.2) Calcul des rendements ----------
    print("Calcul des rendements...")
    etf_returns = to_returns(etf_prices, kind="simple").dropna(how="all")

    # Rendements des allocations mystères (si disponibles)
    myst1_returns = to_returns(myst1_prices, kind="simple").iloc[:, 0] if myst1_prices is not None and myst1_prices.shape[1] >= 1 else None
    myst2_returns = to_returns(myst2_prices, kind="simple").iloc[:, 0] if myst2_prices is not None and myst2_prices.shape[1] >= 1 else None

    # ---------- 6.3) Statistiques descriptives ----------
    print("Calcul des statistiques descriptives...")
    periods = 252  # Fréquence quotidienne (jours ouvrables)
    mu_ann, cov_ann = annualize_mean_vol(etf_returns, periods_per_year=periods)

    # Export des statistiques de base par ETF
    desc = pd.DataFrame({
        "mu_ann": mu_ann,
        "vol_ann": np.sqrt(np.diag(cov_ann)),
    })
    desc["sharpe_ann_rf0"] = desc["mu_ann"] / desc["vol_ann"]
    desc.to_csv(os.path.join(OUTPUT_DIR, "stats_assets.csv"))

    # ---------- 6.4) Optimisation Markowitz (MVO) ----------
    print("Optimisation Markowitz...")
    tickers = list(mu_ann.index)
    n = len(tickers)
    # Contraintes : long-only, somme=1, poids max 30% par actif
    max_w = 0.30 if n >= 4 else 1.0
    w_bounds = (np.zeros(n), np.full(n, max_w))

    # Portefeuille de variance minimale globale (GMV)
    gmv_res = _global_min_var(cov_ann.values, w_bounds)
    gmv_w = gmv_res.x if gmv_res.success else np.full(n, 1.0 / n)

    # Portefeuille de Sharpe maximal (rf=0)
    ms_res = _max_sharpe(mu_ann.values, cov_ann.values, rf=0.0, w_bounds=w_bounds)
    ms_w = ms_res.x if ms_res.success else np.full(n, 1.0 / n)

    weights_gmv = pd.Series(gmv_w, index=tickers, name="GMV")
    weights_ms = pd.Series(ms_w, index=tickers, name="MaxSharpe")

    weights = pd.concat([weights_gmv, weights_ms], axis=1)
    weights.to_csv(os.path.join(OUTPUT_DIR, "weights_mvo.csv"))

    # Calcul des performances in-sample pour GMV & MaxSharpe
    portfolios = {
        "GMV": (etf_returns @ weights_gmv),
        "MaxSharpe": (etf_returns @ weights_ms),
    }

    perf_rows = []
    for name, rets in portfolios.items():
        rep = compute_performance_stats(rets, rf=0.0, periods_per_year=periods)
        perf_rows.append({
            "portfolio": name, "CAGR": rep.cagr, "Vol": rep.vol,
            "Sharpe": rep.sharpe, "MaxDD": rep.maxdd,
            "VaR95": rep.var_95, "ES95": rep.es_95
        })
    pd.DataFrame(perf_rows).to_csv(os.path.join(OUTPUT_DIR, "stats_portfolios.csv"), index=False)

    # ---------- 6.5) Construction de la frontière efficiente ----------
    print("Construction de la frontière efficiente...")
    ef = efficient_frontier(mu_ann.values, cov_ann.values, w_bounds, n_pts=40)
    ef[["ret", "vol"]].to_csv(os.path.join(OUTPUT_DIR, "efficient_frontier.csv"), index=False)
    
    # Visualisation de la frontière efficiente
    fig = plt.figure(figsize=(6.2, 4.2))
    plt.plot(ef["vol"], ef["ret"], marker="o", linestyle="-", linewidth=1)
    plt.scatter(np.sqrt(weights_gmv.values @ cov_ann.values @ weights_gmv.values),
                float(mu_ann.values @ weights_gmv.values),
                label="GMV", s=40)
    plt.scatter(np.sqrt(weights_ms.values @ cov_ann.values @ weights_ms.values),
                float(mu_ann.values @ weights_ms.values),
                label="Max Sharpe", s=40)
    plt.xlabel("Volatilité annualisée")
    plt.ylabel("Rendement annualisé")
    plt.title("Frontière Efficiente")
    plt.legend()
    plt.tight_layout()
    plt.savefig(os.path.join(OUTPUT_DIR, "efficient_frontier.png"))
    plt.close(fig)

    # ---------- 6.6) Optimisation Risk Parity (ERC) ----------
    print("Optimisation Risk Parity...")
    erc_res = solve_erc(cov_ann.values, w_bounds)
    erc_w = erc_res.x if erc_res.success else np.full(n, 1.0 / n)
    weights_erc = pd.Series(erc_w, index=tickers, name="ERC")
    weights_erc.to_csv(os.path.join(OUTPUT_DIR, "weights_erc.csv"))
    portfolios["ERC"] = etf_returns @ weights_erc
    rep_erc = compute_performance_stats(portfolios["ERC"], rf=0.0, periods_per_year=periods)

    # Ajout d'ERC aux statistiques
    df_stats = pd.read_csv(os.path.join(OUTPUT_DIR, "stats_portfolios.csv"))
    df_stats = pd.concat([df_stats, pd.DataFrame([{
        "portfolio": "ERC", "CAGR": rep_erc.cagr, "Vol": rep_erc.vol,
        "Sharpe": rep_erc.sharpe, "MaxDD": rep_erc.maxdd,
        "VaR95": rep_erc.var_95, "ES95": rep_erc.es_95
    }])], ignore_index=True)
    df_stats.to_csv(os.path.join(OUTPUT_DIR, "stats_portfolios.csv"), index=False)

    # ---------- 6.7) Mapping vers classes d'actifs ----------
    print("Mapping vers classes d'actifs...")
    if classes_map is not None and not classes_map.empty:
        cm = classes_map.copy()
        cm.columns = [c.strip() for c in cm.columns]
        # Détection automatique des colonnes pertinentes
        tick_col = next((c for c in cm.columns if "tick" in c.lower()), None)
        class_col = next((c for c in cm.columns if "class" in c.lower()), None)
        if tick_col and class_col:
            asset_map = cm.set_index(tick_col)[class_col].to_dict()
            # Agrégation des poids par classe d'actifs
            def agg_by_class(w: pd.Series) -> pd.Series:
                tmp = pd.DataFrame({"Ticker": w.index, "Weight": w.values})
                tmp["AssetClass"] = tmp["Ticker"].map(asset_map).fillna("Unknown")
                return tmp.groupby("AssetClass")["Weight"].sum().sort_values(ascending=False)
            
            agg = {
                "GMV": agg_by_class(weights_gmv),
                "MaxSharpe": agg_by_class(weights_ms),
                "ERC": agg_by_class(weights_erc)
            }
            # Export vers Excel
            with pd.ExcelWriter(os.path.join(OUTPUT_DIR, "weights_by_asset_class.xlsx")) as xw:
                for name, s in agg.items():
                    s.to_frame("Weight").to_excel(xw, sheet_name=name)
        else:
            print("[INFO] Colonnes de mapping non détectées (Ticker/AssetClass).")

    # ---------- 6.8) Réplication des allocations mystères ----------
    print("Réplication des allocations mystères...")
    def replicate_and_report(target_prices: Optional[pd.DataFrame], tag: str):
        if target_prices is None or target_prices.empty:
            print(f"[INFO] {tag}: pas de données cibles.")
            return
        target_rets = to_returns(target_prices, kind="simple").iloc[:, 0]
        # Alignement des données sur les mêmes dates
        er = etf_returns.copy()
        common_idx = er.index.intersection(target_rets.index)
        er = er.loc[common_idx]
        tr = target_rets.loc[common_idx]

        w_rep, info = replicate_target_portfolio(er, tr, long_only=True, sum_to_one=True)
        w_ser = pd.Series(w_rep, index=er.columns, name=f"Replicated_{tag}")
        w_ser.to_csv(os.path.join(OUTPUT_DIR, f"weights_replication_{tag}.csv"))

        # Métriques de qualité d'ajustement
        fitted = (er @ w_ser).rename(f"Rep_{tag}")
        te = (fitted - tr).std() * np.sqrt(periods)  # Tracking Error annualisé
        r2 = 1 - ((fitted - tr).var() / tr.var()) if tr.var() > 0 else np.nan

        # Statistiques de performance du portefeuille répliqué
        rep_stats = compute_performance_stats(fitted, rf=0.0, periods_per_year=periods)
        out = {
            "Target": tag,
            "Rep_success": info.get("success", False),
            "Rep_message": info.get("message", ""),
            "TrackingError_ann": te,
            "R2": r2,
            "CAGR": rep_stats.cagr,
            "Vol": rep_stats.vol,
            "Sharpe": rep_stats.sharpe,
            "MaxDD": rep_stats.maxdd,
        }
        dfout = pd.DataFrame([out])
        csv_path = os.path.join(OUTPUT_DIR, f"replication_report_{tag}.csv")
        if os.path.exists(csv_path):
            prev = pd.read_csv(csv_path)
            dfout = pd.concat([prev, dfout], ignore_index=True)
        dfout.to_csv(csv_path, index=False)

        # Visualisation des performances cumulées
        fig = plt.figure(figsize=(6.6, 4))
        (1 + tr).cumprod().plot(label=f"Target {tag}")
        (1 + fitted).cumprod().plot(label=f"Replicated {tag}")
        plt.title(f"Réplication — {tag}")
        plt.legend()
        plt.tight_layout()
        plt.savefig(os.path.join(OUTPUT_DIR, f"replication_{tag}.png"))
        plt.close(fig)

    replicate_and_report(myst1_prices, "Mystery1")
    replicate_and_report(myst2_prices, "Mystery2")

    print(f"\nAnalyse terminée! Résultats exportés dans: {os.path.abspath(OUTPUT_DIR)}")


# Point d'entrée principal
if __name__ == "__main__":
    main()

Chargement des données...
Calcul des rendements...
Calcul des statistiques descriptives...
Optimisation Markowitz...
Calcul des rendements...
Calcul des statistiques descriptives...
Optimisation Markowitz...
Construction de la frontière efficiente...
Construction de la frontière efficiente...
Optimisation Risk Parity...
Mapping vers classes d'actifs...
[INFO] Colonnes de mapping non détectées (Ticker/AssetClass).
Réplication des allocations mystères...
Optimisation Risk Parity...
Mapping vers classes d'actifs...
[INFO] Colonnes de mapping non détectées (Ticker/AssetClass).
Réplication des allocations mystères...

Analyse terminée! Résultats exportés dans: c:\Users\emman\Documents\GitHub\Python-for-Optimization-in-Finance\results

Analyse terminée! Résultats exportés dans: c:\Users\emman\Documents\GitHub\Python-for-Optimization-in-Finance\results
