# Analytics

## periodic perf

In [7]:
import pandas as pd
import yfinance as yf

PERIODS = ["1D", "1W", "1M", "YTD", "1Y", "3Y", "5Y"]

def _to_ts(d):
    if d is None:
        return None
    return pd.Timestamp(d).normalize()

def _nearest_prev_close(close: pd.Series, target_date: pd.Timestamp):
    s = close.loc[:target_date]
    if s.empty:
        return None, None
    return float(s.iloc[-1]), s.index[-1]

def yahoo_perf_asof(ticker: str, asof=None, auto_adjust=True) -> pd.DataFrame:
    """
    asof: str|date|datetime|Timestamp|None
      - None => prend le dernier close dispo
      - sinon => prend le dernier close <= asof
    """
    t = yf.Ticker(ticker)

    # On prend large pour couvrir 5Y m√™me si asof est dans le pass√©
    # (si asof est tr√®s vieux, augmente period ou utilise start/end)
    hist = t.history(period="10y", interval="1d", auto_adjust=auto_adjust)
    if hist.empty:
        raise ValueError(f"No data for ticker '{ticker}'")

    close = hist["Close"].dropna()
    close.index = pd.to_datetime(close.index).tz_localize(None)

    asof_ts = _to_ts(asof)

    # Last price √† asOf (dernier close <= asof)
    if asof_ts is None:
        last_price = float(close.iloc[-1])
        last_date = close.index[-1]
    else:
        last_price, last_date = _nearest_prev_close(close, asof_ts)
        if last_price is None:
            raise ValueError(f"No data on or before asof={asof_ts.date()} for '{ticker}'")

    # Dates cibles relatives √† last_date (qui est le trading day retenu)
    targets = {
        "1D": last_date - pd.Timedelta(days=1),
        "1W": last_date - pd.Timedelta(days=7),
        "1M": last_date - pd.Timedelta(days=30),
        "YTD": pd.Timestamp(year=last_date.year, month=1, day=1),
        "1Y": last_date - pd.Timedelta(days=365),
        "3Y": last_date - pd.Timedelta(days=365 * 3),
        "5Y": last_date - pd.Timedelta(days=365 * 5),
    }

    out = {
        "Ticker": ticker,
        "AsOfRequested": None if asof_ts is None else asof_ts.date(),
        "AsOfUsed": last_date.date(),        # trading day effectivement utilis√©
        "Last": last_price,
    }

    for k, d in targets.items():
        past_price, past_date = _nearest_prev_close(close, d)
        if past_price is None or past_price == 0:
            out[k] = None
        else:
            out[k] = (last_price / past_price - 1) * 100.0

    return pd.DataFrame([out])

def yahoo_perf_table_asof(tickers, asof=None, auto_adjust=True) -> pd.DataFrame:
    dfs = [yahoo_perf_asof(t, asof=asof, auto_adjust=auto_adjust) for t in tickers]
    df = pd.concat(dfs, ignore_index=True)

    pct_cols = ["1D", "1W", "1M", "YTD", "1Y", "3Y", "5Y"]
    df["Last"] = df["Last"].round(6)
    df[pct_cols] = df[pct_cols].round(2)
    return df


In [10]:
df = yahoo_perf_table_asof(
    ["AAPL", "SPY", "AIR.PA", "IWDA.AS"],
    asof="2025-11-01"  # <= tu changes √ßa quand tu veux
)
pct_cols = ["1D", "1W", "1M", "YTD", "1Y", "3Y", "5Y"]
df.style.format({**{"Last":"{:.2f}"}, **{c:"{:+.2f}%" for c in pct_cols}})

Unnamed: 0,Ticker,AsOfRequested,AsOfUsed,Last,1D,1W,1M,YTD,1Y,3Y,5Y
0,AAPL,2025-11-01,2025-10-31,270.11,-0.38%,+2.87%,+5.84%,+8.35%,+20.24%,+82.30%,+155.33%
1,SPY,2025-11-01,2025-10-31,680.05,+0.33%,+0.71%,+2.04%,+17.40%,+21.40%,+84.83%,+124.05%
2,AIR.PA,2025-11-01,2025-10-31,213.4,+0.33%,+2.37%,+7.19%,+40.97%,+55.68%,+100.10%,+264.89%
3,IWDA.AS,2025-11-01,2025-10-31,111.79,-0.12%,+1.09%,+3.49%,+7.32%,+14.59%,+54.26%,+108.89%


## Returns

In [32]:
import pandas as pd

def arithmetic_return_from_series(
    prices: pd.Series,
    start_date: str,
    end_date: str
) -> dict:
    """
    Calcule un rendement arithm√©tique √† partir d'une s√©rie de prix (index√©e par date).

    prices : pd.Series
        S√©rie de prix (id√©alement cl√¥ture ajust√©e), index = datetime
    start_date / end_date : YYYY-MM-DD
    """

    if prices.empty:
        raise ValueError("Price series is empty.")

    prices = prices.dropna().copy()
    prices.index = pd.to_datetime(prices.index).tz_localize(None)

    start_ts = pd.Timestamp(start_date)
    end_ts = pd.Timestamp(end_date)

    if start_ts > end_ts:
        raise ValueError("start_date must be <= end_date")

    start_slice = prices.loc[:start_ts]
    end_slice = prices.loc[:end_ts]

    if start_slice.empty:
        raise ValueError(f"No price data on or before {start_ts.date()}")
    if end_slice.empty:
        raise ValueError(f"No price data on or before {end_ts.date()}")

    start_price = float(start_slice.iloc[-1])
    end_price = float(end_slice.iloc[-1])

    return {
        "StartDateRequested": start_ts.date(),
        "EndDateRequested": end_ts.date(),
        "StartDateUsed": start_slice.index[-1].date(),
        "EndDateUsed": end_slice.index[-1].date(),
        "StartPrice": start_price,
        "EndPrice": end_price,
        "ArithmeticReturn": end_price / start_price - 1,
    }


In [33]:
import pandas as pd

def _price_on_or_near(
    prices: pd.Series,
    target: pd.Timestamp,
    direction: str = "backward"  # backward | forward
):
    """
    Retourne (price: float, date_used: Timestamp)
    Compatible pandas >= 2.x (no FutureWarning)
    """

    if direction == "backward":
        s = prices.loc[:target]
        if not s.empty:
            price = s.iloc[-1].item()   # üëà SCALAIRE GARANTI
            return float(price), s.index[-1]

    elif direction == "forward":
        s = prices.loc[target:]
        if not s.empty:
            price = s.iloc[0].item()    # üëà SCALAIRE GARANTI
            return float(price), s.index[0]

    else:
        raise ValueError("direction must be 'backward' or 'forward'")

    return None, None




def arithmetic_return_from_series(
    prices: pd.Series,
    start_date: str,
    end_date: str,
    fallback_start: str = "forward",   # üëà cl√© ici
    fallback_end: str = "backward"
) -> dict:
    prices = prices.dropna().copy()
    prices.index = pd.to_datetime(prices.index).tz_localize(None)

    start_ts = pd.Timestamp(start_date)
    end_ts = pd.Timestamp(end_date)

    if start_ts > end_ts:
        raise ValueError("start_date must be <= end_date")

    # --- start price ---
    start_price, start_used = _price_on_or_near(
        prices, start_ts, direction="backward"
    )
    if start_price is None and fallback_start == "forward":
        start_price, start_used = _price_on_or_near(
            prices, start_ts, direction="forward"
        )

    # --- end price ---
    end_price, end_used = _price_on_or_near(
        prices, end_ts, direction="backward"
    )
    if end_price is None and fallback_end == "forward":
        end_price, end_used = _price_on_or_near(
            prices, end_ts, direction="forward"
        )

    if start_price is None:
        raise ValueError(f"No price data around start_date={start_date}")
    if end_price is None:
        raise ValueError(f"No price data around end_date={end_date}")

    return {
        "StartDateRequested": start_ts.date(),
        "EndDateRequested": end_ts.date(),
        "StartDateUsed": start_used.date(),
        "EndDateUsed": end_used.date(),
        "StartPrice": start_price,
        "EndPrice": end_price,
        "ArithmeticReturn": end_price / start_price - 1,
    }


In [34]:
import yfinance as yf

def yahoo_adjusted_return(
    ticker: str,
    start_date: str,
    end_date: str
) -> dict:
    df = yf.download(
        ticker,
        start=start_date,
        end=pd.to_datetime(end_date) + pd.Timedelta(days=1),
        auto_adjust=True,
        progress=False
    )

    if df.empty:
        raise ValueError(f"No data returned for ticker {ticker}")

    prices = df["Close"]

    res = arithmetic_return_from_series(
        prices=prices,
        start_date=start_date,
        end_date=end_date,
        fallback_start="forward",   # üëà crucial
        fallback_end="backward"
    )

    res["Ticker"] = ticker
    res["PriceType"] = "Adjusted Close"

    return res


In [43]:
yahoo_adjusted_return(
    ticker="PUST.PA",
    start_date="2025-01-01",
    end_date="2025-12-25"
)


{'StartDateRequested': datetime.date(2025, 1, 1),
 'EndDateRequested': datetime.date(2025, 12, 25),
 'StartDateUsed': datetime.date(2025, 1, 2),
 'EndDateUsed': datetime.date(2025, 12, 24),
 'StartPrice': 82.56999969482422,
 'EndPrice': 87.1500015258789,
 'ArithmeticReturn': 0.05546811006397245,
 'Ticker': 'PUST.PA',
 'PriceType': 'Adjusted Close'}

## Cumulative returns

In [44]:
import pandas as pd

def cumulative_return_from_prices(
    prices: pd.Series,
    start_date: str | None = None,
    end_date: str | None = None
) -> pd.Series:
    """
    Calcule le cumulative return √† partir d'une s√©rie de prix ajust√©s.

    prices : pd.Series (index datetime, valeurs = adjusted close)
    start_date / end_date : optionnels (YYYY-MM-DD)

    Retourne une Series de cumulative returns (0 au point de d√©part).
    """

    prices = prices.dropna().copy()
    prices.index = pd.to_datetime(prices.index).tz_localize(None)

    if start_date:
        prices = prices.loc[pd.Timestamp(start_date):]
    if end_date:
        prices = prices.loc[:pd.Timestamp(end_date)]

    if len(prices) < 2:
        raise ValueError("Not enough data to compute cumulative return")

    base_price = prices.iloc[0].item()

    cumret = prices / base_price - 1
    cumret.name = "CumulativeReturn"

    return cumret


In [45]:
import yfinance as yf

def yahoo_cumulative_return(
    ticker: str,
    start_date: str,
    end_date: str
) -> pd.Series:
    """
    Cumulative return bas√© sur la cl√¥ture ajust√©e Yahoo Finance.
    """

    df = yf.download(
        ticker,
        start=start_date,
        end=pd.to_datetime(end_date) + pd.Timedelta(days=1),
        auto_adjust=True,   # <<< cl√©
        progress=False
    )

    if df.empty:
        raise ValueError(f"No data returned for {ticker}")

    prices = df["Close"]

    return cumulative_return_from_prices(
        prices=prices,
        start_date=start_date,
        end_date=end_date
    )


In [57]:
cumret = yahoo_cumulative_return(
    ticker="PUST.PA",
    start_date="2023-11-01",
    end_date="2025-12-26"
)

cumret.tail()


Ticker,PUST.PA
Date,Unnamed: 1_level_1
2025-12-18,0.560821
2025-12-19,0.573529
2025-12-22,0.576416
2025-12-23,0.578431
2025-12-24,0.582244


## Rolling Returns

In [58]:
import pandas as pd

ROLLING_WINDOWS = {
    "1M": 21,
    "3M": 63,
    "12M": 252,
}

def rolling_returns_from_prices(
    prices: pd.Series,
    windows: dict[str, int] = ROLLING_WINDOWS
) -> pd.DataFrame:
    """
    prices : pd.Series (Adjusted Close, index datetime)
    retourne un DataFrame avec une colonne par fen√™tre
    """

    prices = prices.dropna().copy()
    prices.index = pd.to_datetime(prices.index).tz_localize(None)

    out = pd.DataFrame(index=prices.index)

    for label, n in windows.items():
        out[f"RollingReturn_{label}"] = prices / prices.shift(n) - 1

    return out


In [59]:
import yfinance as yf

def yahoo_rolling_returns(
    ticker: str,
    start_date: str,
    end_date: str
) -> pd.DataFrame:
    df = yf.download(
        ticker,
        start=start_date,
        end=pd.to_datetime(end_date) + pd.Timedelta(days=1),
        auto_adjust=True,
        progress=False
    )

    if df.empty:
        raise ValueError(f"No data returned for {ticker}")

    prices = df["Close"]

    rr = rolling_returns_from_prices(prices)
    rr["Ticker"] = ticker

    return rr


In [60]:
def rolling_return_distribution(
    rolling_returns: pd.DataFrame,
    quantiles=(0.05, 0.25, 0.5, 0.75, 0.95)
) -> pd.DataFrame:
    """
    rolling_returns : DataFrame issu de rolling_returns_from_prices
    """

    stats = {}

    for col in rolling_returns.columns:
        if not col.startswith("RollingReturn_"):
            continue

        s = rolling_returns[col].dropna()

        q = s.quantile(quantiles)

        stats[col] = {
            "Median": q.loc[0.5],
            "Q25": q.loc[0.25],
            "Q75": q.loc[0.75],
            "IQR": q.loc[0.75] - q.loc[0.25],
            "P05": q.loc[0.05],
            "P95": q.loc[0.95],
            "Min": s.min(),
            "Max": s.max(),
            "Obs": len(s),
        }

    return pd.DataFrame(stats).T


In [63]:
rr = yahoo_rolling_returns(
    ticker="AAPL",
    start_date="2018-01-01",
    end_date="2024-01-01"
)

dist = rolling_return_distribution(rr)

rr, dist


(            RollingReturn_1M  RollingReturn_3M  RollingReturn_12M Ticker
 Date                                                                    
 2018-01-02               NaN               NaN                NaN   AAPL
 2018-01-03               NaN               NaN                NaN   AAPL
 2018-01-04               NaN               NaN                NaN   AAPL
 2018-01-05               NaN               NaN                NaN   AAPL
 2018-01-08               NaN               NaN                NaN   AAPL
 ...                      ...               ...                ...    ...
 2023-12-22          0.011970          0.100949           0.437299   AAPL
 2023-12-26          0.016213          0.124124           0.468116   AAPL
 2023-12-27          0.017704          0.134803           0.472999   AAPL
 2023-12-28          0.016702          0.135597           0.497054   AAPL
 2023-12-29          0.016687          0.126007           0.536069   AAPL
 
 [1509 rows x 4 columns],
          

## Drawdown Analysis

In [68]:
# =========================
# Drawdown analysis (robuste)
# =========================

import pandas as pd


def _as_price_series(prices) -> pd.Series:
    """
    Accepte pd.Series ou pd.DataFrame (yfinance, MultiIndex inclus)
    et retourne UNE Series de prix.
    """
    if isinstance(prices, pd.Series):
        s = prices
    elif isinstance(prices, pd.DataFrame):
        if prices.shape[1] == 1:
            s = prices.iloc[:, 0]
        else:
            # fallback: premi√®re colonne
            s = prices.iloc[:, 0]
    else:
        raise TypeError("prices must be a pandas Series or DataFrame")

    s = s.dropna().copy()
    s.index = pd.to_datetime(s.index).tz_localize(None)
    return s


def drawdown_series(prices) -> pd.DataFrame:
    """
    Retourne un DataFrame avec :
      - Price
      - RunningMax
      - Drawdown
    """
    p = _as_price_series(prices)

    running_max = p.cummax()
    drawdown = p / running_max - 1.0

    return pd.DataFrame({
        "Price": p,
        "RunningMax": running_max,
        "Drawdown": drawdown
    })


def drawdown_metrics(prices) -> dict:
    """
    Calcule :
      - Max drawdown
      - Current drawdown
      - Nombre d'√©pisodes
      - Dur√©e moyenne / max des drawdowns (jours de trading)
    """
    dd_df = drawdown_series(prices)
    dd = dd_df["Drawdown"]

    max_dd = float(dd.min())
    current_dd = float(dd.iloc[-1].item())

    # Identification des √©pisodes de drawdown
    in_dd = dd < 0
    episode_id = (in_dd != in_dd.shift(1, fill_value=False)).cumsum()

    durations = []
    troughs = []

    for _, block in dd_df[in_dd].groupby(episode_id[in_dd]):
        durations.append(len(block))                    # jours de trading
        troughs.append(float(block["Drawdown"].min()))  # trough de l'√©pisode

    return {
        "MaxDrawdown": max_dd,
        "CurrentDrawdown": current_dd,
        "NumDrawdownEpisodes": int(len(durations)),
        "AvgDrawdownLength_trading_days": float(pd.Series(durations).mean()) if durations else 0.0,
        "MaxDrawdownLength_trading_days": int(max(durations)) if durations else 0,
        "WorstEpisodeTrough": float(min(troughs)) if troughs else 0.0,
    }


def max_drawdown_path(prices) -> dict:
    """
    Dates du pire drawdown :
      - Peak
      - Trough
      - Recovery (si atteint)
    """
    dd_df = drawdown_series(prices)

    p = dd_df["Price"]
    dd = dd_df["Drawdown"]

    trough_date = dd.idxmin()
    trough_dd = float(dd.loc[trough_date])

    peak_date = p.loc[:trough_date].idxmax()
    peak_price = float(p.loc[peak_date].item())

    # Recovery = retour au niveau du peak
    after = p.loc[trough_date:]
    rec = after[after >= peak_price]
    recovery_date = rec.index[0] if not rec.empty else None

    return {
        "PeakDate": peak_date.date(),
        "TroughDate": trough_date.date(),
        "RecoveryDate": None if recovery_date is None else recovery_date.date(),
        "MaxDrawdown": trough_dd,
    }


In [142]:
import yfinance as yf

df = yf.download("AAPL", start="2025-01-01", end="2025-12-25", auto_adjust=True, progress=False)
prices = df["Close"]

dd_df = drawdown_series(prices)
metrics = drawdown_metrics(prices)
path = max_drawdown_path(prices)

metrics, path, dd_df


({'MaxDrawdown': -0.30222577832657394,
  'CurrentDrawdown': -0.043257992163255765,
  'NumDrawdownEpisodes': 12,
  'AvgDrawdownLength_trading_days': 18.5,
  'MaxDrawdownLength_trading_days': 144,
  'WorstEpisodeTrough': -0.30222577832657394},
 {'PeakDate': datetime.date(2025, 2, 24),
  'TroughDate': datetime.date(2025, 4, 8),
  'RecoveryDate': datetime.date(2025, 9, 22),
  'MaxDrawdown': -0.30222577832657394},
                  Price  RunningMax  Drawdown
 Date                                        
 2025-01-02  242.752090  242.752090  0.000000
 2025-01-03  242.264297  242.752090 -0.002009
 2025-01-06  243.896927  243.896927  0.000000
 2025-01-07  241.119476  243.896927 -0.011388
 2025-01-08  241.607269  243.896927 -0.009388
 ...                ...         ...       ...
 2025-12-18  272.190002  286.190002 -0.048919
 2025-12-19  273.670013  286.190002 -0.043747
 2025-12-22  270.970001  286.190002 -0.053181
 2025-12-23  272.359985  286.190002 -0.048325
 2025-12-24  273.809998  286.190002

## Realized Volatility

In [96]:
import pandas as pd
import numpy as np

def realized_volatility(
    returns: pd.Series,
    freq: str = "daily",     # daily | weekly
    annualize: bool = True
) -> dict:
    """
    Calcule la volatilit√© r√©alis√©e (daily / weekly) et annualis√©e.

    returns : pd.Series de returns simples
    """

    r = returns.dropna().copy()
    if len(r) < 10:
        raise ValueError("Not enough observations to compute volatility")

    vol = r.std(ddof=1).item()

    if freq == "daily":
        ann_factor = np.sqrt(252)
    elif freq == "weekly":
        ann_factor = np.sqrt(52)
    else:
        raise ValueError("freq must be 'daily' or 'weekly'")

    vol_ann = vol * ann_factor if annualize else None

    return {
        "Observations": int(len(r)),
        "Volatility": float(vol),
        "AnnualizedVolatility": float(vol_ann) if annualize else None,
        "Frequency": freq,
    }


In [97]:
import yfinance as yf

def yahoo_realized_volatility(
    ticker: str,
    start_date: str,
    end_date: str,
    freq: str = "daily"
) -> dict:
    """
    Realized volatility depuis Yahoo Finance (Adjusted Close).
    """

    df = yf.download(
        ticker,
        start=start_date,
        end=pd.to_datetime(end_date) + pd.Timedelta(days=1),
        auto_adjust=True,
        progress=False
    )

    if df.empty:
        raise ValueError(f"No data returned for {ticker}")

    prices = df["Close"].dropna()

    if freq == "daily":
        returns = prices.pct_change()

    elif freq == "weekly":
        weekly_prices = prices.resample("W-FRI").last()
        returns = weekly_prices.pct_change()

    else:
        raise ValueError("freq must be 'daily' or 'weekly'")

    out = realized_volatility(
        returns=returns,
        freq=freq,
        annualize=True
    )

    out["Ticker"] = ticker
    out["PriceType"] = "Adjusted Close"

    return out


In [103]:
yahoo_realized_volatility(
    ticker="CAC.PA",
    start_date="2025-01-01",
    end_date="2025-12-25",
    freq="daily"
)


{'Observations': 251,
 'Volatility': 0.009858624844537691,
 'AnnualizedVolatility': 0.15650081764637727,
 'Frequency': 'daily',
 'Ticker': 'CAC.PA',
 'PriceType': 'Adjusted Close'}

## Expected shortfall (more complete integrate already VaR)

In [145]:
import numpy as np
import pandas as pd
import yfinance as yf
from scipy.stats import norm

def _as_price_series(x) -> pd.Series:
    """
    Force une sortie yfinance (Series/DataFrame, colonnes simples ou MultiIndex)
    en Series 1D de prix.
    """
    if isinstance(x, pd.Series):
        s = x
    elif isinstance(x, pd.DataFrame):
        # 1 colonne -> OK, sinon on prend la 1√®re
        s = x.iloc[:, 0]
    else:
        raise TypeError("Expected pandas Series or DataFrame")

    s = s.dropna().copy()
    s.index = pd.to_datetime(s.index).tz_localize(None)
    return s


def _as_return_series(returns) -> pd.Series:
    """
    Assure que returns est une Series 1D.
    """
    if isinstance(returns, pd.Series):
        s = returns
    elif isinstance(returns, pd.DataFrame):
        s = returns.iloc[:, 0]
    else:
        raise TypeError("Expected pandas Series or DataFrame")
    return s.dropna().copy()


# ---------- VaR ----------

def var_historical(returns: pd.Series, alpha: float) -> float:
    r = _as_return_series(returns).to_numpy()
    q = np.quantile(r, 1 - alpha)
    return float(-q)  # perte positive


def var_gaussian(returns: pd.Series, alpha: float) -> float:
    r = _as_return_series(returns)
    mu = r.mean().item()
    sigma = r.std(ddof=1).item()
    z = norm.ppf(1 - alpha)
    return float(-(mu + z * sigma))


def var_cornish_fisher(returns: pd.Series, alpha: float) -> float:
    r = _as_return_series(returns)
    mu = r.mean().item()
    sigma = r.std(ddof=1).item()
    S = r.skew().item()
    K = r.kurt().item()  # excess kurtosis

    z = norm.ppf(1 - alpha)
    z_cf = (
        z
        + (1/6)  * (z**2 - 1) * S
        + (1/24) * (z**3 - 3*z) * K
        - (1/36) * (2*z**3 - 5*z) * (S**2)
    )
    return float(-(mu + z_cf * sigma))


# ---------- ES / CVaR ----------

def es_historical(returns: pd.Series, alpha: float) -> float:
    r = _as_return_series(returns).to_numpy()
    q = np.quantile(r, 1 - alpha)      # seuil en return (queue gauche)
    tail = r[r <= q]
    return float(-np.mean(tail)) if tail.size else 0.0  # perte positive


def es_gaussian(returns: pd.Series, alpha: float) -> float:
    r = _as_return_series(returns)
    mu = r.mean().item()
    sigma = r.std(ddof=1).item()

    z_left = norm.ppf(1 - alpha)       # n√©gatif
    phi = norm.pdf(z_left)

    # ES (return) c√¥t√© gauche sous normalit√©
    es_return = mu - sigma * (phi / (1 - alpha))
    return float(-es_return)           # perte positive


def es_cf_empirical_tail(returns: pd.Series, alpha: float) -> float:
    """
    Pratique : seuil VaR Cornish-Fisher, puis moyenne empirique des returns <= seuil.
    """
    r = _as_return_series(returns).to_numpy()
    var_cf_loss = var_cornish_fisher(pd.Series(r), alpha)
    threshold_return = -var_cf_loss

    tail = r[r <= threshold_return]
    return float(-np.mean(tail)) if tail.size else 0.0


def es_summary(returns: pd.Series, confidence_levels=(0.95, 0.99)) -> pd.DataFrame:
    r = _as_return_series(returns)

    rows = []
    for alpha in confidence_levels:
        rows.append({
            "ConfidenceLevel": alpha,
            "VaR_Historical": var_historical(r, alpha),
            "ES_Historical": es_historical(r, alpha),
            "VaR_Gaussian": var_gaussian(r, alpha),
            "ES_Gaussian": es_gaussian(r, alpha),
            "VaR_CornishFisher": var_cornish_fisher(r, alpha),
            "ES_CF_EmpiricalTail": es_cf_empirical_tail(r, alpha),
        })
    return pd.DataFrame(rows)


def yahoo_es(ticker: str, start_date: str, end_date: str, confidence_levels=(0.95, 0.99)) -> pd.DataFrame:
    df = yf.download(
        ticker,
        start=start_date,
        end=pd.to_datetime(end_date) + pd.Timedelta(days=1),
        auto_adjust=True,
        progress=False
    )
    if df.empty:
        raise ValueError(f"No data for {ticker}")

    prices = _as_price_series(df["Close"])
    returns = prices.pct_change()

    out = es_summary(returns, confidence_levels)
    out["Ticker"] = ticker
    out["ReturnType"] = "SimpleReturns"
    out["Horizon"] = "1D"
    return out


In [146]:
yahoo_es("AAPL", "2025-01-01", "2025-12-25")


Unnamed: 0,ConfidenceLevel,VaR_Historical,ES_Historical,VaR_Gaussian,ES_Gaussian,VaR_CornishFisher,ES_CF_EmpiricalTail,Ticker,ReturnType,Horizon
0,0.95,0.032191,0.045673,0.033205,0.041819,0.020356,0.037666,AAPL,SimpleReturns,1D
1,0.99,0.049229,0.071721,0.047253,0.054238,0.087784,0.092456,AAPL,SimpleReturns,1D


## Tail Risk Metrics

In [128]:
import numpy as np
import pandas as pd
import yfinance as yf

# ---------- Helpers ----------

def _as_price_series(x) -> pd.Series:
    if isinstance(x, pd.Series):
        s = x
    elif isinstance(x, pd.DataFrame):
        s = x.iloc[:, 0]
    else:
        raise TypeError("Expected pandas Series or DataFrame")

    s = s.dropna().copy()
    s.index = pd.to_datetime(s.index).tz_localize(None)
    return s

def _as_return_series(x) -> pd.Series:
    if isinstance(x, pd.Series):
        s = x
    elif isinstance(x, pd.DataFrame):
        s = x.iloc[:, 0]
    else:
        raise TypeError("Expected pandas Series or DataFrame")
    s = s.dropna().copy()
    s.index = pd.to_datetime(s.index).tz_localize(None)
    return s

# ---------- 1) Worst N daily returns ----------

def worst_n_returns(returns: pd.Series, n: int = 10) -> pd.DataFrame:
    r = _as_return_series(returns)
    worst = r.nsmallest(n)
    return pd.DataFrame({
        "Date": worst.index.date,
        "Return": worst.values
    }).reset_index(drop=True)

# ---------- 2) Tail quantiles ----------

def tail_quantiles(returns: pd.Series, qs=(0.01, 0.05)) -> dict:
    r = _as_return_series(returns).to_numpy()
    out = {}
    for q in qs:
        out[f"Q{int(q*100):02d}"] = float(np.quantile(r, q))  # return quantile (left tail)
    return out

# ---------- Drawdown series & episodes ----------

def drawdown_series(prices) -> pd.DataFrame:
    p = _as_price_series(prices)
    running_max = p.cummax()
    dd = p / running_max - 1.0
    return pd.DataFrame({"Price": p, "RunningMax": running_max, "Drawdown": dd})

def drawdown_episodes(prices) -> pd.DataFrame:
    """
    Retourne un DF par √©pisode de drawdown (peak->trough->recovery),
    avec la profondeur max et la dur√©e (jours de trading).
    """
    dd_df = drawdown_series(prices)
    dd = dd_df["Drawdown"]
    p = dd_df["Price"]

    in_dd = dd < 0
    episode_id = (in_dd != in_dd.shift(1, fill_value=False)).cumsum()

    episodes = []
    for eid, block in dd_df[in_dd].groupby(episode_id[in_dd]):
        start_date = block.index[0]
        trough_date = block["Drawdown"].idxmin()
        depth = float(block["Drawdown"].min())  # n√©gatif
        length = int(len(block))                # trading days in drawdown

        # peak date = dernier max avant l'√©pisode
        peak_date = p.loc[:start_date].idxmax()

        # recovery = premi√®re date apr√®s la fin du block o√π Price >= Price@peak
        peak_price = float(p.loc[peak_date].item())
        after = p.loc[block.index[-1]:]
        rec = after[after >= peak_price]
        recovery_date = rec.index[0] if not rec.empty else None
        recovered = recovery_date is not None

        episodes.append({
            "EpisodeId": int(eid),
            "PeakDate": peak_date.date(),
            "StartDate": start_date.date(),
            "TroughDate": trough_date.date(),
            "RecoveryDate": None if recovery_date is None else recovery_date.date(),
            "Recovered": bool(recovered),
            "MaxDrawdown": depth,     # n√©gatif
            "Length_trading_days": length
        })

    return pd.DataFrame(episodes)

# ---------- 3) Drawdown tail distribution ----------

def drawdown_tail_distribution(prices, qs=(0.01, 0.05, 0.10)) -> dict:
    ep = drawdown_episodes(prices)
    if ep.empty:
        return {"NumEpisodes": 0}

    depths = ep["MaxDrawdown"].to_numpy()  # n√©gatif
    out = {"NumEpisodes": int(len(depths))}
    for q in qs:
        out[f"DD_Q{int(q*100):02d}"] = float(np.quantile(depths, q))  # plus n√©gatif => pire
    out["DD_Min"] = float(np.min(depths))
    out["DD_Median"] = float(np.median(depths))
    return out

# ---------- 4) Tail concentration risk ----------

def tail_concentration_risk(returns: pd.Series, n: int = 10) -> dict:
    """
    Mesure combien les pires jours expliquent la perte totale.
    - SumLoss: somme des returns n√©gatifs en absolu (approx, sans compounding)
    - TopN_LossShare: part des pires N jours dans la perte totale
    - HHI_TopN: concentration des pires N jours (plus haut = plus concentr√©)
    """
    r = _as_return_series(returns)

    neg = r[r < 0]
    if neg.empty:
        return {"SumLoss": 0.0, "TopN_LossShare": 0.0, "HHI_TopN": 0.0, "NumNegativeDays": 0}

    losses = (-neg).to_numpy()  # positive losses
    sum_loss = float(losses.sum())

    worst = (-r.nsmallest(n)).to_numpy()  # pires N jours, pertes positives
    worst = worst[worst > 0]

    topn_share = float(worst.sum() / sum_loss) if sum_loss > 0 and worst.size else 0.0

    # HHI (Herfindahl-Hirschman Index) sur les pires N pertes
    # HHI proche de 1 => un jour explique quasi tout le tail
    if worst.size:
        w = worst / worst.sum()
        hhi = float(np.sum(w**2))
    else:
        hhi = 0.0

    return {
        "NumNegativeDays": int(len(neg)),
        "SumLoss": sum_loss,
        "TopN_LossShare": topn_share,
        "HHI_TopN": hhi,
        "N": int(n),
    }

# ---------- Orchestrator ----------

def tail_risk_report_from_prices(prices: pd.Series, worst_n: int = 10) -> dict:
    p = _as_price_series(prices)
    rets = p.pct_change()

    return {
        "TailQuantiles": tail_quantiles(rets, qs=(0.01, 0.05)),
        "WorstNReturns": worst_n_returns(rets, n=worst_n),
        "DrawdownTail": drawdown_tail_distribution(p, qs=(0.01, 0.05, 0.10)),
        "TailConcentration": tail_concentration_risk(rets, n=worst_n),
        "Observations": int(rets.dropna().shape[0]),
    }

def yahoo_tail_risk_report(ticker: str, start_date: str, end_date: str, worst_n: int = 10) -> dict:
    df = yf.download(
        ticker,
        start=start_date,
        end=pd.to_datetime(end_date) + pd.Timedelta(days=1),
        auto_adjust=True,
        progress=False
    )
    if df.empty:
        raise ValueError(f"No data for {ticker}")

    prices = _as_price_series(df["Close"])
    out = tail_risk_report_from_prices(prices, worst_n=worst_n)
    out["Ticker"] = ticker
    out["PriceType"] = "Adjusted Close"
    out["ReturnType"] = "SimpleReturns"
    out["Horizon"] = "1D"
    return out


In [130]:
report = yahoo_tail_risk_report("AAPL", "2018-01-01", "2024-01-01", worst_n=10)

report["TailQuantiles"]          # 1% / 5% quantiles
report["WorstNReturns"]          # tableau des pires jours
report["DrawdownTail"]           # quantiles des drawdowns par √©pisode
report["TailConcentration"]      # concentration des pertes


{'NumNegativeDays': 701,
 'SumLoss': 9.701534544721548,
 'TopN_LossShare': 0.08321573907662265,
 'HHI_TopN': 0.10673227190536694,
 'N': 10}

In [147]:
report["TailQuantiles"]

{'Q01': -0.05231255924898922, 'Q05': -0.030711761178462582}

In [148]:
report["WorstNReturns"]

Unnamed: 0,Date,Return
0,2020-03-16,-0.128647
1,2019-01-03,-0.099607
2,2020-03-12,-0.098755
3,2020-09-03,-0.080061
4,2020-03-09,-0.079092
5,2020-09-08,-0.067295
6,2018-11-02,-0.066331
7,2020-02-27,-0.065368
8,2020-03-20,-0.063485
9,2022-09-13,-0.058679


In [149]:
report["DrawdownTail"]

{'NumEpisodes': 65,
 'DD_Q01': -0.33979171601705876,
 'DD_Q05': -0.2002024890930363,
 'DD_Q10': -0.125500314359433,
 'DD_Min': -0.3851591466877471,
 'DD_Median': -0.016196975332883334}

In [150]:
report["TailConcentration"]

{'NumNegativeDays': 701,
 'SumLoss': 9.701534544721548,
 'TopN_LossShare': 0.08321573907662265,
 'HHI_TopN': 0.10673227190536694,
 'N': 10}