In [3]:
import numpy as np
from scipy.optimize import minimize
from importlib import reload
import inputsControlFunctions
reload(inputsControlFunctions)
from inputsControlFunctions import (
    validate_returns_single,
    validate_returns_portfolio,
    check_position_single,
    validate_alpha,
    validate_method,
    validate_n_bootstrap_samples,
    validate_position
)

In [4]:
class FixedIncomeNprmSingle:
    """
    Models non-parametric risk measures for a single fixed-income position.
    
    Parameters:
    -----------
    returns : np.ndarray
        Historical returns of the fixed-income security.
    position : float
        Quantity held (positive = long, negative = short).
    alpha : float, default=0.05
        Significance level for VaR/ES.
    method : str, default="quantile"
        Calculation method ("quantile" or "bootstrap").
    n_bootstrap_samples : int, default=10000
        Number of bootstrap samples (used if method="bootstrap").
    """
    
    def __init__(self, returns: np.ndarray, position: float, alpha: float = 0.05,
                 method: str = "quantile", n_bootstrap_samples: int = 10000):
        # Validate inputs using external functions.
        validate_returns_single(returns)
        check_position_single(position)
        validate_alpha(alpha)
        validate_method(method)
        validate_n_bootstrap_samples(n_bootstrap_samples)
        # Initialize parameters
        self.returns = returns
        self.position = position
        self.alpha = alpha
        self.method = method
        self.n_bootstrap_samples = n_bootstrap_samples
        
        # Initialize risk measure attributes
        self.var = None
        self.es = None
        
        # Automatically compute VaR and ES.
        self.fit()
        
    def fit(self):
        """
        Compute VaR and ES using the specified method.
        
        For long positions (position > 0):
            VaR = -position * Q_alpha(returns)
            ES  = -position * E[returns | returns < Q_alpha(returns)]
        For short positions (position < 0):
            VaR = -position * Q_{1-alpha}(returns)
            ES  = -position * E[returns | returns > Q_{1-alpha}(returns)]
        
        Bootstrap Method:
            Generate n_bootstrap_samples of returns.
            For each sample, compute the corresponding quantile and ES
            based on the position type, and then average across samples.
        
        Finally, round the results to 4 decimal places and ensure non-negative values.
        """
        if self.method == "quantile":
            self._compute_quantile()
        elif self.method == "bootstrap":
            self._compute_bootstrap()
        else:
            raise ValueError("Unsupported method: {}. Choose 'quantile' or 'bootstrap'.".format(self.method))
        
        # Round results to 4 decimal places and ensure non-negative values.
        self.var = round(max(self.var, 0), 4)
        self.es = round(max(self.es, 0), 4)
    
    def _compute_quantile(self):
        """
        Compute VaR and ES using the quantile method.
        """
        # Long Position: use alpha quantile.
        if self.position > 0:
            quantile_value = np.quantile(self.returns, self.alpha)
            self.var = -self.position * quantile_value
            # Expected Shortfall is the average of returns below the alpha quantile.
            tail_losses = self.returns[self.returns < quantile_value]
            self.es = -self.position * np.mean(tail_losses) if tail_losses.size > 0 else self.var
        # Short Position: use (1 - alpha) quantile.
        else:
            quantile_value = np.quantile(self.returns, 1 - self.alpha)
            self.var = -self.position * quantile_value
            # Expected Shortfall is the average of returns above the (1-alpha) quantile.
            tail_losses = self.returns[self.returns > quantile_value]
            self.es = -self.position * np.mean(tail_losses) if tail_losses.size > 0 else self.var
    
    def _compute_bootstrap(self):
        """
        Compute VaR and ES using the bootstrap method.
        """
        boot_var_list = []
        boot_es_list = []
        n = len(self.returns)
        
        for _ in range(self.n_bootstrap_samples):
            sample = np.random.choice(self.returns, size=n, replace=True)
            # Long Position: use alpha quantile.
            if self.position > 0:
                quantile_value = np.quantile(sample, self.alpha)
                boot_var = -self.position * quantile_value
                tail_losses = sample[sample < quantile_value]
            # Short Position: use (1 - alpha) quantile.
            else:
                quantile_value = np.quantile(sample, 1 - self.alpha)
                boot_var = -self.position * quantile_value
                tail_losses = sample[sample > quantile_value]
            
            boot_var_list.append(boot_var)
            # For each bootstrap sample, compute ES as the average loss beyond VaR.
            if tail_losses.size > 0:
                boot_es_list.append(-self.position * np.mean(tail_losses))
            else:
                boot_es_list.append(boot_var)
        
        self.var = np.mean(boot_var_list)
        self.es = np.mean(boot_es_list)
    
    def summary(self):
        """
        Returns a dictionary containing key risk metrics:
        
            - var: Computed Value at Risk.
            - es: Computed Expected Shortfall.
            - maxLoss: Worst-case loss, computed as:
                For long positions: -position * min(returns)
                For short positions: -position * max(returns)
            - maxExcessLoss: Difference between maxLoss and var.
            - maxExcessLossOverVar: Ratio of maxExcessLoss to var (if var != 0).
            - esOverVar: Ratio of es to var (if var != 0).
        """
        # Compute maxLoss based on position type.
        if self.position > 0:
            maxLoss = -self.position * np.min(self.returns)
        else:
            maxLoss = -self.position * np.max(self.returns)
        
        # Compute the excess loss beyond VaR.
        maxExcessLoss = maxLoss - self.var
        
        # Compute ratios, ensuring that division by zero is avoided.
        esOverVar = self.es / self.var if self.var != 0 else None
        maxExcessLossOverVar = maxExcessLoss / self.var if self.var != 0 else None
        
        # Construct and return the dictionary.
        return {
            "var": self.var,
            "es": self.es,
            "maxLoss": round(maxLoss, 4),
            "maxExcessLoss": round(maxExcessLoss, 4),
            "maxExcessLossOverVar": round(maxExcessLossOverVar, 4) if maxExcessLossOverVar is not None else None,
            "esOverVar": round(esOverVar, 4) if esOverVar is not None else None
        }
    def evt(self, quantile_threshold: float = 0.95):
        """
        Estimate VaR and ES using Extreme Value Theory (GPD).
        
        For long positions (position > 0):
            - Define losses = -returns.
            - Use the (quantile_threshold)-th quantile of losses as threshold u.
        For short positions (position < 0):
            - Losses are defined as returns.
            - Use the (quantile_threshold)-th quantile of losses as threshold u.
        
        Steps:
          1. Extract tail losses beyond the threshold u.
          2. Compute exceedances: x = losses - u.
          3. Estimate GPD parameters (ξ and β) by minimizing the negative log-likelihood.
          4. Compute EVT VaR and ES:
             VaR = u + (β/ξ)*[((n/n_u)*(1-α))^(-ξ) - 1]
             ES  = (VaR + β - ξ*u) / (1 - ξ)
          5. Scale the results by the absolute position, round to 4 decimals, and ensure non-negativity.
        
        Returns:
        --------
        dict:
            A dictionary containing EVT estimates:
                - evt_var: EVT-based Value at Risk.
                - evt_es: EVT-based Expected Shortfall.
                - xi: Estimated shape parameter.
                - beta: Estimated scale parameter.
                - u: Threshold used.
                - n: Total number of observations.
                - n_u: Number of exceedances beyond the threshold.
        """
        # Transform returns into losses based on position type.
        if self.position > 0:
            # For long positions, losses = -returns (losses are positive when returns are negative)
            losses = -self.returns
        else:
            # For short positions, losses are simply the returns.
            losses = self.returns
        
        # Determine threshold u using the quantile_threshold.
        u = np.quantile(losses, quantile_threshold)
        
        # Extract tail losses (exceedances) beyond the threshold.
        exceedances = losses[losses > u] - u
        
        n = len(losses)
        n_u = len(exceedances)
        if n_u == 0:
            raise ValueError("No exceedances found beyond the threshold; consider lowering the quantile_threshold.")
        
        # Define the negative log-likelihood for the GPD.
        def neg_log_likelihood(params):
            xi, beta = params
            # Ensure scale parameter is positive.
            if beta <= 0:
                return np.inf
            # Check that 1 + (xi * x / beta) > 0 for all exceedances.
            if np.any(1 + xi * exceedances / beta <= 0):
                return np.inf
            # Compute negative log-likelihood.
            nll = n_u * np.log(beta) + (1 / xi + 1) * np.sum(np.log(1 + xi * exceedances / beta))
            return nll
        
        # Initial guess for [xi, beta].
        initial_guess = [0.1, np.mean(exceedances)]
        result = minimize(neg_log_likelihood, initial_guess, method='L-BFGS-B',
                          bounds=[(-np.inf, np.inf), (1e-6, np.inf)])
        
        if not result.success:
            raise RuntimeError("GPD parameter optimization did not converge: " + result.message)
        
        xi_hat, beta_hat = result.x
        
        # Compute EVT-based VaR.
        # Note: The formula below assumes xi_hat != 0.
        var_evt = u + (beta_hat / xi_hat) * (((n / n_u) * (1 - self.alpha)) ** (-xi_hat) - 1)
        
        # Compute EVT-based ES.
        es_evt = (var_evt + beta_hat - xi_hat * u) / (1 - xi_hat)
        
        # Scale the results by the absolute value of the position.
        var_evt = abs(self.position) * var_evt
        es_evt = abs(self.position) * es_evt
        
        # Round and ensure non-negative.
        var_evt = round(max(var_evt, 0), 4)
        es_evt = round(max(es_evt, 0), 4)
        
        return {
            "evt_var": var_evt,
            "evt_es": es_evt,
            "xi": xi_hat,
            "beta": beta_hat,
            "u": u,
            "n": n,
            "n_u": n_u
        }

In [5]:
class FixedIncomeNprmPort:
    """
    Models risk measures for a portfolio of fixed-income positions.
    
    Parameters:
    -----------
    returns : np.ndarray
        Matrix of returns (rows=periods, columns=securities).
    positions : list
        List of positions for each security.
    alpha : float, default=0.05
        Significance level for VaR/ES.
    method : str, default="quantile"
        Calculation method ("quantile" or "bootstrap").
    n_bootstrap_samples : int, default=10000
        Number of bootstrap samples (used if method="bootstrap").
    """
    
    def __init__(self, returns: np.ndarray, positions: list, alpha: float = 0.05,
                 method: str = "quantile", n_bootstrap_samples: int = 10000):
        # Validate inputs.
        validate_returns_portfolio(returns)
        validate_position(positions, returns)
        validate_alpha(alpha)
        validate_method(method)
        validate_n_bootstrap_samples(n_bootstrap_samples)

        self.returns = returns
        self.positions = positions
        self.alpha = alpha
        self.method = method
        self.n_bootstrap_samples = n_bootstrap_samples
        
        # Placeholders for portfolio risk metrics.
        self.portfolio_returns = None  # Aggregated returns for each period.
        self.cum_returns = None        # Cumulative portfolio returns.
        self.var = None
        self.es = None
        
        # Compute aggregated returns and portfolio risk measures.
        self.fit()
    
    def fit(self):
        """
        Compute portfolio returns and risk measures.
        
        Steps:
         1. Aggregate portfolio returns for each period using:
                Portfolio Return_t = sum_{i=1}^{n} (Position_i * Return_{t,i})
            (Implemented via element-wise multiplication and np.sum(axis=1)).
         2. Compute cumulative portfolio returns:
                cum_returns = cumprod(1 + portfolio_returns) - 1
         3. Compute VaR and ES using either the quantile or bootstrap method:
            - For a net long portfolio (net position > 0):
                  VaR = - quantile(alpha) of portfolio returns,
                  ES = - mean(portfolio returns below the α-quantile)
            - For a net short portfolio (net position < 0):
                  VaR = - quantile(1-α) of portfolio returns,
                  ES = - mean(portfolio returns above the (1-α)-quantile)
         4. Round results to 4 decimal places and ensure non-negative values.
        """
        # Convert positions list to numpy array for vectorized operations.
        positions = np.array(self.positions)
        
        # 1. Aggregate portfolio returns.
        self.portfolio_returns = np.sum(self.returns * positions, axis=1)
        
        # 2. Calculate cumulative portfolio returns via compounding.
        self.cum_returns = np.cumprod(1 + self.portfolio_returns) - 1
        
        # 3. Compute VaR and ES based on the chosen method.
        if self.method == "quantile":
            self._compute_quantile()
        elif self.method == "bootstrap":
            self._compute_bootstrap()
        else:
            raise ValueError("Unsupported method: {}. Choose 'quantile' or 'bootstrap'.".format(self.method))
    
    def _compute_quantile(self):
        """
        Compute VaR and ES using the quantile-based approach on aggregated portfolio returns.
        
        For a net long portfolio (sum(positions) > 0):
            VaR = - quantile(alpha) of portfolio returns,
            ES = - mean(portfolio returns below the α-quantile)
        For a net short portfolio (sum(positions) < 0):
            VaR = - quantile(1-α) of portfolio returns,
            ES = - mean(portfolio returns above the (1-α)-quantile)
        """
        weights = np.array(self.positions)
        net_position = np.sum(weights)
        
        if net_position > 0:
            quantile_value = np.quantile(self.portfolio_returns, self.alpha)
            var = -quantile_value
            tail_losses = self.portfolio_returns[self.portfolio_returns < quantile_value]
            es = -np.mean(tail_losses) if tail_losses.size > 0 else var
        else:
            quantile_value = np.quantile(self.portfolio_returns, 1 - self.alpha)
            var = -quantile_value
            tail_losses = self.portfolio_returns[self.portfolio_returns > quantile_value]
            es = -np.mean(tail_losses) if tail_losses.size > 0 else var
        
        self.var = round(max(var, 0), 4)
        self.es = round(max(es, 0), 4)
    
    def _compute_bootstrap(self):
        """
        Compute VaR and ES using the bootstrap approach on aggregated portfolio returns.
        
        Steps:
         - For each bootstrap sample (sampled with replacement from portfolio returns),
           compute the corresponding quantile (α or 1-α depending on net position)
           and the resulting VaR and ES.
         - The final VaR and ES are the averages across all bootstrap samples.
        """
        weights = np.array(self.positions)
        net_position = np.sum(weights)
        boot_var_list = []
        boot_es_list = []
        n = len(self.portfolio_returns)
        
        for _ in range(self.n_bootstrap_samples):
            # Sample with replacement.
            sample_indices = np.random.choice(n, size=n, replace=True)
            sample_returns = self.portfolio_returns[sample_indices]
            
            if net_position > 0:
                q = np.quantile(sample_returns, self.alpha)
                boot_var = -q
                tail_losses = sample_returns[sample_returns < q]
            else:
                q = np.quantile(sample_returns, 1 - self.alpha)
                boot_var = -q
                tail_losses = sample_returns[sample_returns > q]
            
            boot_var_list.append(boot_var)
            boot_es_list.append(-np.mean(tail_losses) if tail_losses.size > 0 else boot_var)
        
        var = np.mean(boot_var_list)
        es = np.mean(boot_es_list)
        
        self.var = round(max(var, 0), 4)
        self.es = round(max(es, 0), 4)
            
    def marg_vars(self, scale_factor: float = 0.1):
        """
        Compute Marginal VaR for each position in the portfolio.
    
        Formula:
            MVaR_i = (VaR_new - VaR_original) / ΔPosition_i
    
        Steps:
            1. Store the original portfolio VaR.
            2. For each position:
                a. Perturb the position by adding scale_factor.
                b. Recompute portfolio VaR using self.fit().
                c. Compute the difference divided by scale_factor.
                d. Restore the original position.
            3. Finally, re-run self.fit() to restore the original state.
      
        Returns:
        List of marginal VaR values, one for each position.
        """
        # Store a copy of the original positions and VaR.
        original_positions = self.positions.copy()
        original_var = self.var
    
        marginal_vars = []
        n_positions = len(self.positions)
    
        for i in range(n_positions):
            # Perturb the i-th position.
            self.positions[i] += scale_factor
        
            # Recompute portfolio risk measures (VaR and ES) with the perturbed position.
            self.fit()
            perturbed_var = self.var
        
            # Approximate the derivative of VaR with respect to the i-th position.
            mvar = (perturbed_var - original_var) / scale_factor
            marginal_vars.append(round(mvar, 4))
        
            # Restore the original position.
            self.positions[i] = original_positions[i]
    
        # Restore the original risk measures.
        self.fit()
    
        return marginal_vars