In [17]:
import numpy as np
import pandas as pd
from scipy import stats
import matplotlib.pyplot as plt


class PortfolioRisk:
    """
    A class to calculate risk metrics (VaR, ES, Stress) for a portfolio using historical equity prices.

    Parameters
    ----------
    file_path : str
        Path to the Excel file containing stock prices with dates as index and symbols as columns.
    confidence_level : float, optional
        Confidence level for risk metrics (default is 0.95).
    weights : array-like, optional
        Portfolio asset weights summing to 1. If None, equal weights are assigned.
    time_horizon : int, optional
        Number of days to scale risk metrics (default is 1).
    """

    def __init__(self, file_path, confidence_level=0.95, weights=None, time_horizon=1):
        self.file_path = file_path
        self.confidence_level = confidence_level
        self.time_horizon = time_horizon
        self.data = None
        self.returns = None
        self.weights = None
        self.load_data()

        # Assign and validate weights
        n_assets = len(self.returns.columns)
        if weights is None:
            self.weights = np.ones(n_assets) / n_assets
        else:
            self.weights = np.array(weights)
            if len(self.weights) != n_assets:
                raise ValueError(f"Expected {n_assets} weights, got {len(weights)}")
            if not np.isclose(np.sum(self.weights), 1.0, atol=1e-6):
                raise ValueError("Weights must sum to 1")

        # Compute portfolio returns
        self.portfolio_returns = self.returns.dot(self.weights)

    def load_data(self):
        """
        Load price data from Excel and calculate log returns.

        Returns
        -------
        None
        """
        self.data = pd.read_csv(self.file_path, index_col=0, parse_dates=True)
        self.returns = np.log(self.data / self.data.shift(1)).dropna()

    def historical_var(self):
        """
        Compute Historical VaR based on empirical quantiles, scaled by time horizon.

        Returns
        -------
        pd.Series
            VaR for each asset and the portfolio.
        """
        var_assets = self.returns.quantile(1 - self.confidence_level)
        var_portfolio = self.portfolio_returns.quantile(1 - self.confidence_level)
        # Scale by square root of time (assumes i.i.d. returns)
        scaling_factor = np.sqrt(self.time_horizon)
        return pd.concat([var_assets * scaling_factor,
                         pd.Series(var_portfolio * scaling_factor, index=["Portfolio"])])

    def parametric_var(self):
        """
        Compute Parametric VaR assuming normal distribution, scaled by time horizon.

        Returns
        -------
        pd.Series
            VaR for each asset and the portfolio.
        """
        mean = self.returns.mean() * self.time_horizon
        std = self.returns.std() * np.sqrt(self.time_horizon)
        z_score = stats.norm.ppf(1 - self.confidence_level)
        var_assets = mean + z_score * std
        portfolio_var = (self.portfolio_returns.mean() * self.time_horizon +
                         z_score * self.portfolio_returns.std() * np.sqrt(self.time_horizon))
        return pd.concat([var_assets, pd.Series(portfolio_var, index=["Portfolio"])])

    def monte_carlo_var(self, simulations=10000):
        """
        Compute Monte Carlo VaR using vectorized multivariate normal simulation.

        Parameters
        ----------
        simulations : int, optional
            Number of simulation runs (default is 10000).

        Returns
        -------
        pd.Series
            VaR for each asset and the portfolio over the specified time horizon.
        """
        mean = self.returns.mean() * self.time_horizon
        cov = self.returns.cov() * self.time_horizon  # Scale covariance by time horizon
        # Vectorized simulation for all assets at once
        sim_returns = np.random.multivariate_normal(mean, cov, size=simulations)
        # Vectorized portfolio returns calculation
        sim_portfolio_returns = sim_returns @ self.weights  # Matrix multiplication
        var_assets = np.percentile(sim_returns, (1 - self.confidence_level) * 100, axis=0)
        var_portfolio = np.percentile(sim_portfolio_returns, (1 - self.confidence_level) * 100)
        return pd.concat([pd.Series(var_assets, index=self.returns.columns),
                         pd.Series(var_portfolio, index=["Portfolio"])])

    def expected_shortfall(self):
        """
        Compute Expected Shortfall (Conditional VaR) based on historical VaR, scaled by time horizon.

        Returns
        -------
        pd.Series
            ES for each asset and the portfolio.
        """
        var = self.historical_var()
        scaling_factor = np.sqrt(self.time_horizon)
        es_assets = pd.Series({
            col: self.returns[col][self.returns[col] <= var[col] / scaling_factor].mean()
            * self.time_horizon
            for col in self.returns.columns
        })
        es_portfolio = (self.portfolio_returns[
            self.portfolio_returns <= var["Portfolio"] / scaling_factor
        ].mean() * self.time_horizon)
        return pd.concat([es_assets, pd.Series(es_portfolio, index=["Portfolio"])])

    def stress_test(self, stress_factor=-0.10):
        """
        Simulate a stress test with a specified return shock over the time horizon.

        Parameters
        ----------
        stress_factor : float, optional
            Hypothetical daily return shock (default is -0.10 for a 10% drop).

        Returns
        -------
        pd.Series
            Loss for each asset and the portfolio under the stress scenario.
        """
        # Compound stress factor over time horizon
        compounded_stress = (1 + stress_factor) ** self.time_horizon - 1
        stress_assets = self.returns * 0 + compounded_stress  # Uniform shock
        stress_portfolio = stress_assets.iloc[0].dot(self.weights)
        return pd.concat([stress_assets.iloc[0], pd.Series(stress_portfolio, index=["Portfolio"])])

    def max_drawdown(self):
        """
        Compute maximum historical drawdown for stress analysis.

        Returns
        -------
        pd.Series
            Maximum drawdown for each asset and the portfolio.
        """
        cumulative = (1 + self.returns).cumprod()
        peak = cumulative.cummax()
        drawdown = (cumulative - peak) / peak
        portfolio_cumulative = (1 + self.portfolio_returns).cumprod()
        portfolio_drawdown = (portfolio_cumulative - portfolio_cumulative.cummax()) / portfolio_cumulative.cummax()
        return pd.concat([drawdown.min(), pd.Series(portfolio_drawdown.min(), index=["Portfolio"])])

    def summary(self):
        """
        Summarize all risk metrics in a DataFrame as percentages.

        Returns
        -------
        pd.DataFrame
            Table of risk metrics for assets and portfolio over the time horizon, formatted as percentages (e.g., "10.34%").
        """
        # Calculate raw metrics
        metrics = {
            "Historical VaR": self.historical_var(),
            "Parametric VaR": self.parametric_var(),
            "Monte Carlo VaR": self.monte_carlo_var(),
            "Expected Shortfall": self.expected_shortfall(),
            f"Stress Test ({self.time_horizon}-Day {self.stress_test.__defaults__[0]*100:.0f}% Drop)": self.stress_test(),
            "Max Drawdown": self.max_drawdown()
        }

        # Convert to percentage strings with 2 decimal places
        formatted_metrics = {
            key: pd.Series([f"{val * 100:.2f}%" for val in metrics[key]], index=metrics[key].index)
            for key in metrics
        }

        return pd.DataFrame(formatted_metrics)

    def plot_risk_metrics(self):
        """
        Plot the portfolio returns distribution with VaR, ES, and stress test overlays.

        Returns
        -------
        None
            Displays the plot.
        """
        var = self.historical_var()["Portfolio"]
        es = self.expected_shortfall()["Portfolio"]
        stress = self.stress_test()["Portfolio"]

        plt.figure(figsize=(10, 6))
        self.portfolio_returns.hist(bins=50, alpha=0.7, color='blue', label='Portfolio Returns')
        plt.axvline(var, color='red', linestyle='--',
                    label=f'Historical VaR ({self.confidence_level*100:.0f}%)')
        plt.axvline(es, color='purple', linestyle='--',
                    label=f'Expected Shortfall ({self.confidence_level*100:.0f}%)')
        plt.axvline(stress, color='orange', linestyle='--',
                    label=f'Stress Test ({self.time_horizon}-Day {self.stress_test.__defaults__[0]*100:.0f}% Drop)')
        plt.title(
            f'Portfolio Returns Distribution (Time Horizon: {self.time_horizon} Day{"s" if self.time_horizon > 1 else ""})')
        plt.xlabel('Returns')
        plt.ylabel('Frequency')
        plt.legend()
        plt.grid(True)
        plt.show()