In [1]:
from __future__ import annotations

import re
import warnings
from typing import Dict, List, Union

import apimoex
import investpy
import numpy
import pandas
import requests
import yfinance
from scipy import stats

<section style="font-family:system-ui,Segoe UI,Roboto,Arial,sans-serif; line-height:1.5; color:#111;">
  <h1 style="margin:0 0 8px;">Market Risk • VaR Backtesting Mini-Project</h1>
  <p style="margin:0 0 16px; color:#444;">
    Портфельная валидация VaR для акций, FX и долларовой облигации (UST) с рублёвой отчётностью. 
    Реализованы исторический, нормальный и модифицированный (Cornish–Fisher) VaR, а также тесты Купьице/Кристоферсена.
  </p>

  <h3 style="margin:16px 0 6px;">Цель</h3>
  <ul style="margin:0 0 12px 18px;">
    <li>Построить и сравнить <em>скользящие</em> оценки VaR на уровне 95–99% для разных классов активов.</li>
    <li>Валидировать модели VaR тестами <strong>Kupiec UC</strong>, <strong>Christoffersen IND</strong> и <strong>Conditional Coverage</strong>.</li>
    <li>Для UST в RUB учесть <strong>двойной риск-фактор</strong>: сдвиг доходности (duration) и движение USD/RUB.</li>
  </ul>

  <h3 style="margin:16px 0 6px;">Данные и метод</h3>
  <ul style="margin:0 0 12px 18px;">
    <li>Загрузка цен/курсов из Yahoo/Мосбиржи, нормализация колонок, базовая очистка.</li>
    <li>Скользящее окно <code>W=250</code> для оценок квантилей и параметров распределения.</li>
    <li>UST→RUB: линейная аппроксимация PnL: 
      <code>ΔV<sub>RUB</sub> ≈ −S·P·D<sub>mod</sub>·Δy + P·ΔS</code>.
    </li>
  </ul>

  <h3 style="margin:16px 0 6px;">Ключевые итоги</h3>
  <ul style="margin:0 0 12px 18px;">
    <li>Исторический VaR на акциях обычно проходит UC, но проваливает IND (кластеризация хвостов).</li>
    <li>Нормальный VaR чувствителен к «толстым хвостам» и часто проваливает UC/CC.</li>
    <li>Модифицированный VaR устойчивее, но тоже страдает при режимных сдвигах волатильности.</li>
    <li>В UST (в RUB) существенна FX-составляющая риска; duration-шок и FX-шок необходимо учитывать совместно.</li>
  </ul>

  <h3 style="margin:16px 0 6px;">Ограничения</h3>
  <ul style="margin:0 0 12px 18px;">
    <li>Backtest без <code>shift(1)</code> даёт look-ahead bias; для честной валидации VaR нужно смещение на один день.</li>
    <li>В текущей версии масштабы duration/DV01 требуют правки (см. заметки в код-ревью).</li>
    <li>Кросс-терм <code>ΔP·ΔS</code> опущен (при больших шоках может влиять).</li>
  </ul>

  <p style="margin:14px 0 0; color:#444;">
    Репозиторий демонстрирует «сквозной» пайплайн: загрузка данных → оценка VaR → рублёвая трансляция риска → backtesting.
  </p>
</section>


## Теоретический фреймворк и формулы

### VaR (однодневный)
- **Historical VaR (левый хвост)**:  
  $$VaR_\alpha = -\,q_\alpha(r_t),$$  
  где $r_t=\ln(P_t/P_{t-1})$. Квантиль оценивается на скользящем окне $W$.

- **Parametric Normal VaR**:  
  $$VaR_\alpha = -(\mu\,\Delta t + z_\alpha\,\sigma \sqrt{\Delta t}), \quad z_\alpha=\Phi^{-1}(\alpha).$$

- **Modified VaR (Cornish–Fisher)**:  
  $$
  z_\alpha^{CF}=z_\alpha+\frac{1}{6}(z_\alpha^2-1)\gamma
  +\frac{1}{24}(z_\alpha^3-3z_\alpha)\kappa
  -\frac{1}{36}(2z_\alpha^3-5z_\alpha)\gamma^2,
  $$
  $$
  VaR_\alpha=-\big(\mu\,\Delta t + z_\alpha^{CF}\,\sigma\sqrt{\Delta t}\big),
  $$
  где $\gamma$ — асимметрия, $\kappa$ — **excess kurtosis** на окне.

---

### Облигация: цена, DV01 и дюрация
- Цена купонной облигации (nominal $N$, ставка доходности $y$, купон $c$, $m$ куп. в год):  
  $$
  P = \sum_{k=1}^{mT}\frac{N\cdot c/m}{(1+y/m)^k}+\frac{N}{(1+y/m)^{mT}}.
  $$

- Определения:  
  $$
  DV01 \approx -\frac{\partial P}{\partial y}\cdot 0.0001,\qquad
  D_\text{mod} = -\frac{1}{P}\frac{\partial P}{\partial y}=\frac{DV01}{P\cdot 0.0001}.
  $$

- Линейный ценовой эффект:  
  $$
  \Delta P \approx -P\cdot D_\text{mod}\cdot \Delta y = -DV01\cdot \frac{\Delta y}{0.0001}.
  $$

---

### UST в рублях (двухфакторная линейная аппроксимация)
Пусть $S_t$ — USD/RUB, $P_t$ — цена облигации в USD. Тогда рублёвый PnL:
$$
\boxed{\Delta V_{RUB} \approx -\,S_t P_t D_\text{mod}\,\Delta y_t \;+\; P_t\,\Delta S_t}
$$


---

### Backtesting (UC, IND, CC)
- **Kupiec UC**: проверяет частоту пробоев $x$ vs $p=1-\alpha$: $LR_{uc}\sim\chi^2_1$.  
- **Christoffersen IND**: проверяет независимость пробоев через матрицу переходов (00,01,10,11): $LR_{ind}\sim\chi^2_1$.  
- **Conditional Coverage**: $LR_{cc}=LR_{uc}+LR_{ind}\sim\chi^2_2$.



In [2]:
class DataFactory:
    def downloadDataFromYahooFinance(
            self,
            ticker: str,
            startDate: str,
            endDate: str,
            autoAdjust: bool = True) -> pandas.DataFrame:
        dataFrame = yfinance.download(ticker,
                                      start=startDate,
                                      end=endDate,
                                      auto_adjust=autoAdjust,
                                      progress=False)
        if dataFrame.empty:
            return dataFrame
        dataFrame = self._flattenYahooColumns(dataFrame, [ticker])
        return dataFrame

    def downloadBondFromYahooFinance(
            self,
            ticker: str,
            startDate: str,
            endDate: str,
            autoAdjust: bool = False) -> pandas.DataFrame:
        dataFrame = yfinance.download(ticker,
                                      start=startDate,
                                      end=endDate,
                                      auto_adjust=autoAdjust,
                                      progress=False)
        if dataFrame.empty:
            return dataFrame
        dataFrame = self._flattenYahooColumns(dataFrame, [ticker])
        return dataFrame

    def downloadFxFromYahooFinance(self, pair: str, startDate: str,
                                   endDate: str) -> pandas.DataFrame:
        yahooPair = self._normalizeFxTicker(pair)
        dataFrame = yfinance.download(yahooPair,
                                      start=startDate,
                                      end=endDate,
                                      auto_adjust=True,
                                      progress=False)
        if dataFrame.empty:
            return dataFrame
        dataFrame = self._flattenYahooColumns(dataFrame, [yahooPair])
        normalizedPair = yahooPair.replace("=X", "")
        dataFrame.columns = [
            column.replace(yahooPair, normalizedPair)
            for column in dataFrame.columns
        ]
        return dataFrame

    def downloadDataFromMoex(self, ticker: str, startDate: str,
                             endDate: str) -> pandas.DataFrame:
        with requests.Session() as session:
            rawData = apimoex.get_board_history(session,
                                                ticker,
                                                start=startDate,
                                                end=endDate)
        dataFrame = pandas.DataFrame(rawData)
        if dataFrame.empty:
            return dataFrame
        if 'TRADEDATE' not in dataFrame.columns:
            return pandas.DataFrame()
        dataFrame = dataFrame.set_index('TRADEDATE')
        dataFrame.index = pandas.to_datetime(dataFrame.index, errors='coerce')
        dataFrame.index.name = 'Date'
        keepColumns = [
            column for column in ['CLOSE', 'VOLUME', 'VALUE']
            if column in dataFrame.columns
        ]
        dataFrame = dataFrame[keepColumns].copy()
        tickerUpper = ticker.upper()
        renameMap = {}
        if 'CLOSE' in keepColumns:
            renameMap['CLOSE'] = f'{tickerUpper} Close'
        if 'VOLUME' in keepColumns:
            renameMap['VOLUME'] = f'{tickerUpper} Volume'
        if 'VALUE' in keepColumns:
            renameMap['VALUE'] = f'{tickerUpper} Value'
        dataFrame = dataFrame.rename(columns=renameMap)
        dataFrame = dataFrame.sort_index()
        return dataFrame

    def downloadBondFromInvestingCom(self, bond: str, startDate: str,
                                     endDate: str) -> pandas.DataFrame:
        startDateDt = pandas.to_datetime(startDate)
        endDateDt = pandas.to_datetime(endDate)
        startStr = startDateDt.strftime('%d/%m/%Y')
        endStr = endDateDt.strftime('%d/%m/%Y')
        dataFrame = investpy.bonds.get_bond_historical_data(bond=bond,
                                                            from_date=startStr,
                                                            to_date=endStr)
        if dataFrame is None or dataFrame.empty:
            return pandas.DataFrame()
        dataFrame.index.name = 'Date'
        renamed = dataFrame.rename(
            columns={
                'Open': f'{bond} Open',
                'High': f'{bond} High',
                'Low': f'{bond} Low',
                'Close': f'{bond} Close',
                'Volume': f'{bond} Volume'
            })
        keep = [
            c for c in renamed.columns
            if c.endswith(' Close') or c.endswith(' Volume') or
            c.endswith(' Open') or c.endswith(' High') or c.endswith(' Low')
        ]
        return renamed[keep].sort_index()

    def downloadBondYieldFromYahooFinance(self,
                                          code: str,
                                          startDate: str,
                                          endDate: str,
                                          autoAdjust: bool = True
                                         ) -> pandas.DataFrame:
        mapping = {'US10Y': '^TNX', 'US30Y': '^TYX', 'US5Y': '^FVX'}
        ticker = mapping.get(code.upper(), code)
        dataFrame = yfinance.download(ticker,
                                      start=startDate,
                                      end=endDate,
                                      auto_adjust=autoAdjust,
                                      progress=False)
        if dataFrame is None or dataFrame.empty:
            return pandas.DataFrame()
        dataFrame.index.name = 'Date'
        dataFrame = dataFrame.rename(
            columns={c: f'{code.upper()} {c}' for c in dataFrame.columns})
        dataFrame = self._flattenYahooColumns(dataFrame, [ticker])
        return dataFrame.sort_index()

    def getQuotesForPortfolio(self,
                              items: List[Union[str, Dict[str, str]]],
                              startDate: str,
                              endDate: str,
                              priceField: str = 'Close',
                              joinHow: str = 'inner') -> pandas.DataFrame:
        collectedFrames: List[pandas.DataFrame] = []
        for item in items:
            meta = self._coerceItem(item)
            ticker = meta['ticker']
            assetClass = meta.get('assetClass') or self._inferAssetClass(ticker)
            try:
                if assetClass == 'fx':
                    frame = self.downloadFxFromYahooFinance(
                        ticker, startDate, endDate)
                elif assetClass == 'moex':
                    frame = self.downloadDataFromMoex(ticker, startDate,
                                                      endDate)
                elif assetClass == 'bond':
                    frame = self.downloadBondFromYahooFinance(
                        ticker, startDate, endDate)
                elif assetClass == 'bondYield':
                    frame = self.downloadBondYieldFromYahooFinance(
                        ticker, startDate, endDate)
                else:
                    frame = self.downloadDataFromYahooFinance(
                        ticker, startDate, endDate)

                if frame is None or frame.empty:
                    warnings.warn(
                        f"No data returned for {ticker} ({assetClass}). Skipping."
                    )
                    continue

                targetColumns = [
                    column for column in frame.columns
                    if column.endswith(f' {priceField}')
                ]
                if not targetColumns:
                    if priceField in frame.columns:
                        frame = frame[[priceField]].copy()
                        frame = frame.rename(
                            columns={priceField: f'{ticker} {priceField}'})
                    else:
                        fuzzyColumns = [
                            column for column in frame.columns
                            if priceField.lower() in column.lower()
                        ]
                        if fuzzyColumns:
                            frame = frame[[fuzzyColumns[0]]].copy()
                        else:
                            numericColumns = frame.select_dtypes(
                                include=[numpy.number]).columns.tolist()
                            if not numericColumns:
                                warnings.warn(
                                    f"No numeric columns for {ticker}. Skipping."
                                )
                                continue
                            frame = frame[[numericColumns[0]]].copy()
                        frame.columns = [
                            f'{self._extractTickerFromColumn(frame.columns[0])} {priceField}'
                        ]
                else:
                    frame = frame[targetColumns].copy()

                collectedFrames.append(frame)

            except Exception as exception:
                warnings.warn(
                    f"Error fetching {ticker} ({assetClass}): {exception}")

        if not collectedFrames:
            return pandas.DataFrame()

        result = pandas.concat(collectedFrames, axis=1,
                               join=joinHow).sort_index()
        return result

    def _flattenYahooColumns(self, dataFrame: pandas.DataFrame,
                             tickers: List[str]) -> pandas.DataFrame:
        result = dataFrame.copy()
        if isinstance(result.columns, pandas.MultiIndex):
            level0Values = list(map(str, result.columns.get_level_values(0)))
            level1Values = list(map(str, result.columns.get_level_values(1)))
            level0Set = set(level0Values)
            level1Set = set(level1Values)
            tickersSet = set(map(str, tickers))
            if tickersSet & level0Set and not (tickersSet & level1Set):
                result.columns = [
                    f"{level0} {level1}" for (level0, level1) in result.columns
                ]
            elif tickersSet & level1Set and not (tickersSet & level0Set):
                result.columns = [
                    f"{level1} {level0}" for (level0, level1) in result.columns
                ]
            else:
                result.columns = [
                    ' '.join(map(str, columnTuple)).strip()
                    for columnTuple in result.columns
                ]
        else:
            ticker = str(tickers[0])
            result.columns = [f"{ticker} {column}" for column in result.columns]
        result.index.name = 'Date'
        return result

    def _normalizeFxTicker(self, pair: str) -> str:
        upperPair = pair.strip().upper()
        if upperPair.endswith('=X'):
            return upperPair
        cleaned = re.sub(r'[^A-Z]', '', upperPair)
        if len(cleaned) < 6:
            raise ValueError(
                f"FX pair '{pair}' is ambiguous. Use forms like 'EURUSD' or 'EUR/USD'."
            )
        base = cleaned[:3]
        quote = cleaned[3:6]
        return f"{base}{quote}=X"

    def _inferAssetClass(self, ticker: str) -> str:
        upperTicker = ticker.strip().upper()
        if ('=X' in upperTicker) or ('/' in upperTicker) or (re.fullmatch(
                r'[A-Z]{6}', upperTicker) is not None):
            return 'fx'
        if upperTicker.endswith('.ME') or upperTicker.startswith(
                'MOEX:') or upperTicker.endswith(':MOEX'):
            return 'moex'
        commonBondTickers = {
            'TLT', 'IEF', 'BND', 'LQD', 'HYG', '^TNX', '^TYX', '^FVX', '^IRX'
        }
        if upperTicker in commonBondTickers:
            return 'bond'
        return 'stock'

    def _coerceItem(self, item: Union[str, Dict[str, str]]) -> Dict[str, str]:
        if isinstance(item, str):
            return {'ticker': item}
        if isinstance(item, dict) and 'ticker' in item:
            return item
        raise ValueError(
            "Each item must be a ticker string or dict with key 'ticker'.")

    def _extractTickerFromColumn(self, columnName: str) -> str:
        parts = columnName.split()
        if len(parts) >= 2:
            return ' '.join(parts[:-1])
        return parts[0]

    @staticmethod
    def makeDataframeRub(dataFrame_: pandas.DataFrame,usdRubColumnIndex:int, changeColumnsIndexes: List[int]):
        dataFrame = dataFrame_.copy()
        for column in changeColumnsIndexes:
            dataFrame.iloc[:, column] = dataFrame.iloc[:, column] * dataFrame.iloc[:, usdRubColumnIndex]
        return dataFrame

In [3]:
class VarCalculator:
    def varHist(self, data, alpha=0.05):
        return (numpy.percentile(data, alpha * 100))

    def varNormal(self, data, alpha=0.05, t=1):
        return numpy.mean(data) * t + stats.norm.ppf(alpha) * numpy.std(data) * t**.5

    def varModified(self, data, alpha=0.05, t=1):
        return numpy.mean(data) * t + self.z_corrected(data, alpha,
                                               t) * numpy.std(data) * t**.5

    @staticmethod
    def z_corrected(data, alpha=0.05, t=1):
        return (stats.norm.ppf(alpha) + 1 / 6 * (stats.norm.ppf(alpha)**2 - 1) *
                (1 / t**.5) * stats.skew(data) + 1 / 24 *
                (stats.norm.ppf(alpha)**3 - 3 * stats.norm.ppf(alpha)) *
                (1 / t) * stats.kurtosis(data) - 1 / 36 *
                (2 * stats.norm.ppf(alpha)**3 - 5 * stats.norm.ppf(alpha)) *
                (1 / t) * stats.skew(data)**2)

    def bondPriceFromYield(self, yieldRate, tenorYears=10.0, couponRate=None, nominal=100.0, paymentsPerYear=2):
        if couponRate is None:
            couponRate = yieldRate
        numberOfPayments = int(round(paymentsPerYear * tenorYears))
        growthPerPeriod = 1.0 + yieldRate / paymentsPerYear
        cashCoupon = nominal * couponRate / paymentsPerYear
        discountFactors = growthPerPeriod ** (-numpy.arange(1, numberOfPayments + 1))
        price = cashCoupon * discountFactors.sum() + nominal * growthPerPeriod ** (-numberOfPayments)
        return price

    def dollarValueOfOneBasisPointAndModifiedDuration(self, yieldRate, tenorYears=10.0, couponRate=None, nominal=100.0, paymentsPerYear=2, bumpInBasisPoints=1.0):
        bumpAsDecimal = bumpInBasisPoints / 10000.0
        price = self.bondPriceFromYield(yieldRate, tenorYears, couponRate, nominal, paymentsPerYear)
        priceUp = self.bondPriceFromYield(yieldRate + bumpAsDecimal, tenorYears, couponRate, nominal, paymentsPerYear)
        priceDown = self.bondPriceFromYield(yieldRate - bumpAsDecimal, tenorYears, couponRate, nominal, paymentsPerYear)
        dollarValueOfOneBasisPoint = (priceDown - priceUp) / 2.0
        modifiedDuration = dollarValueOfOneBasisPoint / (price * 0.0001)
        convexity = (priceUp + priceDown - 2.0 * price) / (price * bumpAsDecimal ** 2)
        return price, dollarValueOfOneBasisPoint, modifiedDuration, convexity

    def usualDurationFromModified(self, yieldRate, modifiedDuration, paymentsPerYear=2):
        return modifiedDuration * (1.0 + yieldRate / paymentsPerYear)   

    def kupiecUnconditionalCoverageLr(self, violations, confidenceLevel):
        violations = numpy.asarray(violations, dtype=int)
        total = violations.size
        hits = violations.sum()
        probability = 1.0 - confidenceLevel
        epsilon = 1e-12
        hitRatio = numpy.clip(hits / max(total, 1), epsilon, 1.0 - epsilon)
        probability = numpy.clip(probability, epsilon, 1.0 - epsilon)
        logLikelihoodUnrestricted = (total - hits) * numpy.log(1.0 - hitRatio) + hits * numpy.log(hitRatio)
        logLikelihoodRestricted = (total - hits) * numpy.log(1.0 - probability) + hits * numpy.log(probability)
        statistic = -2.0 * (logLikelihoodRestricted - logLikelihoodUnrestricted)
        degreesOfFreedom = 1
        pValue = None
        try:
            from scipy.stats import chi2
            pValue = 1.0 - chi2.cdf(statistic, degreesOfFreedom)
        except Exception:
            pValue = None
        return statistic, degreesOfFreedom, pValue, hits, total

    def christoffersenIndependenceLr(self, violations):
        violations = numpy.asarray(violations, dtype=int)
        if violations.size < 2:
            return 0.0, 1, None, 0, 0, 0, 0
        v0 = violations[:-1]
        v1 = violations[1:]
        n00 = int(((v0 == 0) & (v1 == 0)).sum())
        n01 = int(((v0 == 0) & (v1 == 1)).sum())
        n10 = int(((v0 == 1) & (v1 == 0)).sum())
        n11 = int(((v0 == 1) & (v1 == 1)).sum())
        epsilon = 1e-12
        totalZeroStarts = n00 + n01
        totalOneStarts = n10 + n11
        pi0 = numpy.clip(n01 / max(totalZeroStarts, 1), epsilon, 1.0 - epsilon)
        pi1 = numpy.clip(n11 / max(totalOneStarts, 1), epsilon, 1.0 - epsilon)
        totalTransitions = totalZeroStarts + totalOneStarts
        pi = numpy.clip((n01 + n11) / max(totalTransitions, 1), epsilon, 1.0 - epsilon)
        logLikelihoodRestricted = n00 * numpy.log(1.0 - pi) + n01 * numpy.log(pi) + n10 * numpy.log(1.0 - pi) + n11 * numpy.log(pi)
        logLikelihoodUnrestricted = n00 * numpy.log(1.0 - pi0) + n01 * numpy.log(pi0) + n10 * numpy.log(1.0 - pi1) + n11 * numpy.log(pi1)
        statistic = -2.0 * (logLikelihoodRestricted - logLikelihoodUnrestricted)
        degreesOfFreedom = 1
        pValue = None
        try:
            from scipy.stats import chi2
            pValue = 1.0 - chi2.cdf(statistic, degreesOfFreedom)
        except Exception:
            pValue = None
        return statistic, degreesOfFreedom, pValue, n00, n01, n10, n11

    def christoffersenConditionalCoverageLr(self, violations, confidenceLevel):
        statisticUnconditional, dfUnconditional, pUnconditional, hits, total = self.kupiecUnconditionalCoverageLr(
            violations=violations, confidenceLevel=confidenceLevel
        )
        statisticIndependence, dfIndependence, pIndependence, n00, n01, n10, n11 = self.christoffersenIndependenceLr(
            violations=violations
        )
        statistic = statisticUnconditional + statisticIndependence
        degreesOfFreedom = dfUnconditional + dfIndependence
        pValue = None
        try:
            from scipy.stats import chi2
            pValue = 1.0 - chi2.cdf(statistic, degreesOfFreedom)
        except Exception:
            pValue = None
        details = {
            "hits": hits,
            "total": total,
            "n00": n00,
            "n01": n01,
            "n10": n10,
            "n11": n11,
            "lrUnconditional": statisticUnconditional,
            "lrIndependence": statisticIndependence,
        }
        return statistic, degreesOfFreedom, pValue, details

    def printBacktestsSummary(self, violations, confidenceLevel=0.99, significanceLevel=0.05):
        if isinstance(violations, pandas.Series):
            v = violations.astype(int).to_numpy()
        else:
            v = numpy.asarray(violations, dtype=int)

        total = int(v.size)
        hits = int(v.sum())
        probability = 1.0 - confidenceLevel
        expectedHits = probability * total

        kupiecStat, kupiecDf, kupiecPvalue, _, _ = self.kupiecUnconditionalCoverageLr(
            violations=v, confidenceLevel=confidenceLevel
        )
        indStat, indDf, indPvalue, n00, n01, n10, n11 = self.christoffersenIndependenceLr(
            violations=v
        )
        combStat, combDf, combPvalue, details = self.christoffersenConditionalCoverageLr(
            violations=v, confidenceLevel=confidenceLevel
        )

        def passFail(p):
            return "PASS ✅" if (p is not None and p >= significanceLevel) else ("FAIL ❌" if p is not None else "N/A")

        print("=== VaR Backtesting Summary ===")
        print(f"Observations: {total}")
        print(f"Violations: {hits}  |  Expected: {expectedHits:.2f}  (p = {probability:.4f}, confidence = {confidenceLevel:.2%})")
        print()

        print("[Kupiec Unconditional Coverage]")
        print(f"LR: {kupiecStat:.4f} | dof: {kupiecDf} | p-value: {('%.4f' % kupiecPvalue) if kupiecPvalue is not None else 'None'} | {passFail(kupiecPvalue)}")

        print("[Christoffersen Independence]")
        print(f"Transitions n00={n00}, n01={n01}, n10={n10}, n11={n11}")
        print(f"LR: {indStat:.4f} | dof: {indDf} | p-value: {('%.4f' % indPvalue) if indPvalue is not None else 'None'} | {passFail(indPvalue)}")

        print("[Conditional Coverage (Kupiec + Christoffersen)]")
        print(f"LR: {combStat:.4f} | dof: {combDf} | p-value: {('%.4f' % combPvalue) if combPvalue is not None else 'None'} | {passFail(combPvalue)}")

In [4]:
factory = DataFactory()

items = [
    "AAPL",
    "USDRUB", 
    {"ticker": "SBER", "assetClass": "moex"}, 
    {
    'ticker': 'US10Y',
    'assetClass': 'bondYield'
}                
]
quotes = factory.getQuotesForPortfolio(items, "2020-01-01", "2025-01-01", priceField="Close", joinHow="inner")
quotes = quotes.dropna()

quotes

Unnamed: 0_level_0,AAPL Close,USDRUB Close,SBER Close,^TNX Close
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2020-01-03,71.833313,61.694199,255.00,1.788
2020-01-06,72.405655,62.029999,253.90,1.811
2020-01-08,73.224388,61.930901,259.15,1.874
2020-01-09,74.779755,61.261902,257.99,1.858
2020-01-10,74.948807,61.266102,258.19,1.825
...,...,...,...,...
2024-12-23,254.367035,101.842270,264.90,4.599
2024-12-24,257.286682,101.223656,264.34,4.591
2024-12-26,258.103729,99.985092,269.56,4.579
2024-12-27,254.685883,99.761086,271.20,4.619


In [5]:
quotesRub = factory.makeDataframeRub(quotes, 1, [0])

In [6]:
varCalculator = VarCalculator()

In [8]:
def printAssetHeader(assetName: str):
    line = "═" * max(20, len(assetName) + 24)
    print(f"\n╔{line}╗")
    print(f"║   BACKTEST: {assetName} ".ljust(len(line) + 1) + "║")
    print(f"╚{line}╝")

def printVarTestHeader(testName: str):
    line = "─" * max(12, len(testName) + 14)
    print(f"\n┌{line}┐")
    print(f"│   VaR method: {testName} ".ljust(len(line) + 1) + "│")
    print(f"└{line}┘")

# Карта имён серий с VaR
varTests = [
    "Historical Var",
    "Normal Var",
    "Modified Var"
]
mapVarTest = {
    "Historical Var": "VarHist",
    "Normal Var": "VarNormal",
    "Modified Var": "VarModified"
}

# Основной цикл по активам/колонкам
for columnId in range(len(quotes.columns)):
    assetName = str(quotes.columns[columnId])
    printAssetHeader(assetName)

    # Подготовка данных
    observedSeries = quotes.iloc[:, columnId].to_frame(name="price")
    observedSeries["return"] = numpy.log(observedSeries["price"] / observedSeries["price"].shift(1))
    observedSeries = observedSeries.dropna()

    # Расчёт VaR
    if columnId in (0, 2):
        # Акции (VaR в относительных величинах доходности)
        observedSeries["VarHist"] = observedSeries["return"].rolling(window=30).apply(varCalculator.varHist, raw=True)
        observedSeries["VarNormal"] = observedSeries["return"].rolling(window=30).apply(varCalculator.varNormal, raw=True)
        observedSeries["VarModified"] = observedSeries["return"].rolling(window=30).apply(varCalculator.varModified, raw=True)
    elif columnId == 1:
        # FX (преобразуем относительный VaR в денежный VaR на текущую цену)
        observedSeries["VarHist"] = observedSeries["return"].rolling(window=30).apply(varCalculator.varHist, raw=True) * observedSeries["price"]
        observedSeries["VarNormal"] = observedSeries["return"].rolling(window=30).apply(varCalculator.varNormal, raw=True) * observedSeries["price"]
        observedSeries["VarModified"] = observedSeries["return"].rolling(window=30).apply(varCalculator.varModified, raw=True) * observedSeries["price"]
    elif columnId == 3:
        observedSeries["yieldDecimal"] = observedSeries["price"] / 100.0
        observedSeries["deltaYield"]   = observedSeries["yieldDecimal"].diff()
    
        fxSeries = quotes.iloc[:, 1].to_frame(name="fxPrice") 
        observedSeries = observedSeries.join(fxSeries, how="inner")
        observedSeries["deltaFx"] = observedSeries["fxPrice"].diff()
    
        priceList = []
        dmodList  = []
        for y in observedSeries["yieldDecimal"]:
            if numpy.isnan(y):
                priceList.append(numpy.nan)
                dmodList.append(numpy.nan)
            else:
                price_t, dv01_t, dmod_t, _ = varCalculator.dollarValueOfOneBasisPointAndModifiedDuration(
                    yieldRate=y,
                    tenorYears=10.0,
                    couponRate=None,
                    nominal=100.0,
                    paymentsPerYear=2,
                    bumpInBasisPoints=1.0
                )
                priceList.append(price_t)
                dmodList.append(dmod_t)
    
        observedSeries["bondPriceParLike"] = numpy.array(priceList)      # P_t in USD per 100 nominal
        observedSeries["modifiedDuration"] = numpy.array(dmodList)       # D_mod in years
    
        observedSeries["rubLossRates"] = - observedSeries["fxPrice"] * observedSeries["bondPriceParLike"] * observedSeries["modifiedDuration"] * observedSeries["deltaYield"]
        observedSeries["rubLossFx"]    =   observedSeries["bondPriceParLike"] * observedSeries["deltaFx"]
    
        observedSeries["rubLossTotal"] = observedSeries["rubLossRates"] + observedSeries["rubLossFx"]
    
        observedSeries["VarHist"]     = observedSeries["rubLossTotal"].rolling(window=30).apply(varCalculator.varHist,     raw=True)
        observedSeries["VarNormal"]   = observedSeries["rubLossTotal"].rolling(window=30).apply(varCalculator.varNormal,   raw=True)
        observedSeries["VarModified"] = observedSeries["rubLossTotal"].rolling(window=30).apply(varCalculator.varModified, raw=True)
    
        observedSeries = observedSeries.dropna(subset=["VarHist","VarNormal","VarModified","rubLossTotal"])
            

    for testId in range(3):
        testVarName = varTests[testId]
        varColumn = mapVarTest[testVarName]
        printVarTestHeader(testVarName)

        if columnId in (0, 2):
            violations = observedSeries["return"] < observedSeries[varColumn]
        elif columnId == 1:
            moneyLoss = observedSeries["return"] * observedSeries["price"]
            violations = moneyLoss < observedSeries[varColumn]
        elif columnId == 3:
            violations = observedSeries["rubLossTotal"] < observedSeries[varColumn]

        print(f"• Window: 250 obs | Confidence: 95% | Significance: 5%")
        varCalculator.printBacktestsSummary(
            violations=violations,
            confidenceLevel=0.95,
            significanceLevel=0.05
        )



╔══════════════════════════════════╗
║   BACKTEST: AAPL Close           ║
╚══════════════════════════════════╝

┌────────────────────────────┐
│   VaR method: Historical Var │
└────────────────────────────┘
• Window: 250 obs | Confidence: 95% | Significance: 5%
=== VaR Backtesting Summary ===
Observations: 1201
Violations: 78  |  Expected: 60.05  (p = 0.0500, confidence = 95.00%)

[Kupiec Unconditional Coverage]
LR: 5.1828 | dof: 1 | p-value: 0.0228 | FAIL ❌
[Christoffersen Independence]
Transitions n00=1051, n01=71, n10=71, n11=7
LR: 0.7626 | dof: 1 | p-value: 0.3825 | PASS ✅
[Conditional Coverage (Kupiec + Christoffersen)]
LR: 5.9454 | dof: 2 | p-value: 0.0512 | PASS ✅

┌────────────────────────┐
│   VaR method: Normal Var │
└────────────────────────┘
• Window: 250 obs | Confidence: 95% | Significance: 5%
=== VaR Backtesting Summary ===
Observations: 1201
Violations: 57  |  Expected: 60.05  (p = 0.0500, confidence = 95.00%)

[Kupiec Unconditional Coverage]
LR: 0.1658 | dof: 1 | p-va