<a href="https://colab.research.google.com/github/csciulla/stress-test-dashboard/blob/main/stresstest_ntbk.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
import pandas as pd
import numpy as np
import yfinance as yf
from scipy.optimize import minimize

In [5]:
class Portfolio:
  def __init__(self, portfolio:list,  lower_bound:float, upper_bound:float):
    try:
      if lower_bound >= upper_bound:
        raise ValueError("Lower bound must be less than upper bound.")

      self.portfolio = portfolio
      self.weights = None
      self.dfclose = None
      self.lower_bound = lower_bound
      self.upper_bound = upper_bound

    except Exception as e:
      print(f"Error in intializer function: {e}")
      return None

  def get_data(self, period:str=None, start_date:str=None, end_date:str=None):
    """
    Downloads the portfolios adjusted closes either by 'period' or 'start_date' and 'end_date'.
    Only one method of date input should be provided.
    Data downloaded should be big enough to handle calculations.
    """
    try:
      if period and (start_date or end_date): #checks if both methods of date input are used
        raise ValueError("Provide either 'period' OR both 'start_date' and 'end_date' -- not both.")

      if period:
        period = period.strip()
        self.dfclose = yf.download(self.portfolio, period=period, progress=False, auto_adjust=False)["Adj Close"]
      elif start_date and end_date:
        start_date = start_date.strip()
        end_date = end_date.strip()
        self.dfclose = yf.download(self.portfolio, start=start_date, end=end_date, progress=False, auto_adjust=False)["Adj Close"]
      else:
        raise ValueError("You must provide either a 'period' or both 'start_date' and 'end_date'.")

      if self.dfclose.empty or self.dfclose is None:
        raise ValueError("Downloaded price data is empty or unavailable.")
      elif len(self.dfclose) <= 2:
        raise ValueError("Downloaded price data is too short.")
      elif len(self.dfclose) < 21: #average trading days in a month
        print("Warning: Limited price history may lead to unreliable metrics.")

      return self.dfclose

    except Exception as e:
      print(f"Error in get_data: {e}")
      return None

  def get_weights(self, type_weight:str):
    """
    Returns a list of weights for the portfolio.

    type_weight: Input 'eq' for equal-weighted portfolio or 'opt' for optimized weights based on sharpe-ratio
    """
    try:
      dfclose = self.dfclose
      if dfclose is None or dfclose.empty:
        raise ValueError("The portfolio's price data is missing. Please properly run 'get_data' first.")
      elif len(dfclose) <= 2:
        raise ValueError("Downloaded price data is too short.")

      #Get log returns of each asset
      log_returns = np.log(dfclose/dfclose.shift()).dropna()

      #Calculate initial portfolio metrics
      weights = np.repeat(1/len(self.portfolio), len(self.portfolio))
      expected_returns = log_returns.mean()*252
      port_returns = weights.T @ expected_returns
      cov_matrix = log_returns.cov()*252
      port_vol = np.sqrt(weights.T @ cov_matrix @ weights)
      rf = 0.045

      #Set bounds and constraints for objective function
      bounds = [(self.lower_bound, self.upper_bound) for _ in range(len(self.portfolio))]
      constraints = {"type": "eq", "fun": lambda w: np.sum(w)-1}
      def neg_sharpe(w):
        port_ret = w.T @ expected_returns
        port_std = np.sqrt(w.T @ cov_matrix @ w)
        return -((port_ret - rf)/port_std)

      if type_weight.strip().lower() == "eq":
        self.weights = [float(i) for i in weights]
      elif type_weight.strip().lower() == "opt":
        optimized_weights = minimize(neg_sharpe, weights, method="SLSQP", bounds=bounds, constraints=constraints)
        self.weights = [round(float(i),4) for i in optimized_weights.x]
      else:
        raise ValueError("Select a valid input for 'type_weight' -- either 'eq' or 'opt'.")

      return self.weights

    except Exception as e:
      print(f"Error in get_weights: {e}")
      return None


In [20]:
#test weights
test = Portfolio(["AAPL", "MSFT", "GOOG", "JNJ", "XOM"], 0.6, 0.5)
test.get_data(start_date='2025-06-25  ', end_date=' 2025-06-28')
test.get_weights("opt  ")

Error in intializer function: Lower bound must be less than upper bound.
Error in get_data: 'Portfolio' object has no attribute 'portfolio'
Error in get_weights: 'Portfolio' object has no attribute 'dfclose'


In [26]:
def monte_carlo(T:int, sims:int, vol_mult:float, df:pd.DataFrame, method:str, rand:str=None ):
  """
  Returns simulated prices for each ticker in the portfolio either using Monte Carlo or Bootstrapping.

  T: number of days in a path
  sims: number of paths
  vol_mult: multiplier that affects volatilty to inject stress
  df: dataframe of the assets adjusted closes
  method: input the string 'mc' to simulate returns assuming a t-distribution; input the string 'boot' to simulate unparamterized returns.
  rand: input the string 'yes' to return a random path, otherwise ignore
  """
  try:
    if T <= 2:
      raise ValueError("The length of each simulated path is too short.")
    elif T < 21:
      print("Warning: Limited price data may lead to unreliable metrics.")

    #Intialize dictionary to store simulated paths of T days for each ticker
    sims_prices = {ticker: np.full(shape=(sims, T), fill_value=0.0) for ticker in df.columns}

    method = method.strip().lower()

    #Gather inital metrics from historical data for each ticker
    for ticker in df.columns:
      last_price = df[ticker].iloc[-1]
      log_returns = np.log(df[ticker]/df[ticker].shift()).dropna()
      expected_return = log_returns.mean()
      target_vol = log_returns.std()*vol_mult

      #Generate paths
      for m in range(sims):
        if method == "mc":
          dailyReturns = np.random.standard_t(df=5, size=T) * target_vol + expected_return
          dailyReturns = np.clip(dailyReturns, -0.95, 0.95) #Removes extremely disoriented returns caused by fat tails
        elif method == "boot":
          dailyReturns = np.random.choice(log_returns.values, size=T, replace=True)
          boot_vol = dailyReturns.std()
          dailyReturns = (dailyReturns - dailyReturns.mean()) * (target_vol/boot_vol) + dailyReturns.mean() #Scaling bootstrapped returns 
        else:
          raise ValueError("Invaild input for 'method'. Input 'mc' or 'boot'.")
        
        cumReturns = (1+dailyReturns).cumprod()
        prices = last_price*cumReturns
        sims_prices[ticker][m:,] = prices

    #Get a random path
    if isinstance(rand, str) and rand.strip().lower() == "yes":
      random_int = np.random.randint(0,sims)
      random_sims_prices = {ticker: sims_prices[ticker][random_int] for ticker in df.columns}
      random_sims_df = pd.DataFrame(random_sims_prices)
      return random_sims_df
    elif rand != None:
      raise ValueError("Invaild input for 'rand'. Input the string 'yes' to return a random path, otherwise ignore.")
    else:
        return sims_prices

  except Exception as e:
    print(f"Error in monte_carlo: {e}")
    return None

In [125]:
df = test.get_data(period='5y')
sims = monte_carlo(3,1,2,df, " Yes")



In [7]:
def calculate_metrics(weights:list, df: pd.DataFrame):
  """
  Calculates annual portfolio volatilty, Sharpe Ratio, 95% VaR, Max Drawdown, and Beta.
  weights: list of each assets weight in the portfolio
  df: dataframe of the assets adjusted closes
  """
  try:
    if df is None or df.empty:
      raise ValueError("Price data is empty or unavailable. Make sure historical/simulated data is properly downloaded.")

    #Core calculations
    weights = np.array(weights)
    log_returns = np.log(df/df.shift()).dropna()
    expected_returns = log_returns.mean()*252
    cov_matrix = log_returns.cov()*252
    rf = 0.045
    port_returns = weights.T @ expected_returns
    port_returns_series = log_returns @ weights

    #Metrics
    port_vol = np.sqrt(weights.T @ cov_matrix @ weights)
    sharpe = (port_returns - rf)/port_vol
    VaR_95 = np.percentile(port_returns_series, 5)

    #Max Drawdown
    cum_returns = (1+port_returns_series).cumprod()
    cum_max = np.maximum.accumulate(cum_returns)
    drawdown = cum_returns/cum_max - 1
    mdd = drawdown.min() #drawdown values are negative

    #Beta
    market = yf.download("SPY", period='10y', progress=False, auto_adjust=False)["Adj Close"]
    market_returns = (np.log(market/market.shift()).dropna()).squeeze() #convert to series so that it works properly with port_returns_series
    if pd.api.types.is_integer_dtype(port_returns_series.index):
      #Simulated case: align by length
      market_returns = market_returns.tail(len(port_returns_series)).reset_index(drop=True)
      port_returns_series = port_returns_series.reset_index(drop=True)
    else:
      #Simulated historical case: align by date
      start_date = pd.to_datetime(port_returns_series.index[0])
      end_date = pd.to_datetime(port_returns_series.index[-1])
      if start_date and end_date not in market_returns.index: #first make sure that market data contains crisis event
        market = yf.download("SPY", start=start_date, end=end_date, progress=False, auto_adjust=False)["Adj Close"]
        market_returns = (np.log(market/market.shift()).dropna()).squeeze()

      #align by date for either simulated historical or historical case
      aligned_index = port_returns_series.index.intersection(market_returns.index)
      market_returns = market_returns.loc[aligned_index]
      port_returns_series = port_returns_series.loc[aligned_index]
    beta = port_returns_series.cov(market_returns) / market_returns.var()

    metrics = pd.DataFrame(data=[[port_vol, sharpe, VaR_95, mdd, beta]] ,columns=["Annual Volatilty", "Sharpe","95% VaR", "Max DD", "Beta"], index=[["Portfolio"]])
    return metrics

  except Exception as e:
    print(f"Error in calculate_metrics: {e}")
    return None


In [28]:
#test the simulated max and min sharpe metrics
df = yf.download(["AAPL", "MSFT","TSLA","INTL","GOOG"], period='10y', auto_adjust=False)["Adj Close"]
sims = monte_carlo(252,50,4.0, df, 'boot')
tickers = list(df.columns)
pathlen = len(sims[tickers[0]])
all_metrics = []
for m in range(pathlen):
  mth_df = pd.DataFrame({ticker: sims[ticker][m] for ticker in tickers})
  metrics = calculate_metrics([0.0, 0.137, 0.363, 0.5, 0.0], mth_df)
  all_metrics.append(metrics)
sharpes = [df.loc["Portfolio", "Sharpe"] for df in all_metrics]
min_idx = np.argmin(sharpes)
max_idx = np.argmax(sharpes)
all_metrics[min_idx].index = ["Worst Portfolio"]
all_metrics[max_idx].index = ["Best Portfolio"]
print(all_metrics[min_idx])
print(all_metrics[max_idx])

[*********************100%***********************]  5 of 5 completed


                 Annual Volatilty    Sharpe   95% VaR    Max DD      Beta
Worst Portfolio          0.615493 -1.329264 -0.063821 -0.734568  0.306319
                Annual Volatilty    Sharpe   95% VaR    Max DD      Beta
Best Portfolio          0.633859  0.060388 -0.054374 -0.399719  0.078192


In [113]:
def historical(df:pd.DataFrame, crisis:str):
  """
  Simulates the prices of your portfolio if a historical event were to happen again.

  df: dataframe of the assets adjusted closes
  crisis: string of the event you want to simulate

  Crisis Options:
  "DOT-COM" -- The Dot-Com bubble
  "2008 GFC" -- 2008 Global Financial Crisis
  "2011 Euro" -- 2011 Eurozone Crisis
  "COVID" -- COVID-19 Pandemic
  "2022 Inf" -- 2022 Inflation Crash
  """
  try:
    crisis_periods = {"DOT-COM": ("2000-03-01", "2002-10-01"),
                      "2008 GFC": ("2007-10-01", "2009-03-01"),
                      "2011 Euro": ("2011-07-01", "2011-12-01"),
                      "COVID": ("2020-02-14", "2020-04-15"),
                      "2022 Inf": ("2022-01-01", "2022-10-01")
                      }
    crisis = crisis.strip()
    if crisis not in crisis_periods.keys():
      raise ValueError("Input a valid crisis event.")

    tickers = list(df.columns)
    start_date = pd.to_datetime(crisis_periods[crisis][0])
    end_date = pd.to_datetime(crisis_periods[crisis][1])

    if start_date not in df.index: #check if crisis event does not exist in existing df
      dfcrisis = yf.download(tickers, start=start_date, end=end_date, progress=False, auto_adjust=False)["Adj Close"]
    else:
      dfcrisis = df.loc[start_date:end_date]

    for ticker in tickers:
      if dfcrisis[ticker].isna().sum() >= len(dfcrisis[ticker])//3: #checks if any ticker reaches NA threshold
        raise ValueError(f"{ticker} price data does not exist for crisis period.")

    last_price = df.iloc[-1]
    crisisReturns = np.log(dfcrisis/dfcrisis.shift()).dropna()
    cumReturns = (1+crisisReturns).cumprod()
    crisisPrices = last_price.mul(cumReturns)
    return crisisPrices

  except Exception as e:
    print(f" \n Error in historical: {e}")
    return None

In [139]:
df = yf.download(["AAPL", "MSFT", "GOOG", "JNJ", "XOM"], period='10y', auto_adjust=False)["Adj Close"]
hist = historical(df, "COVID")
calculate_metrics([0.0, 0.056, 0.0, 0.444, 0.5], hist)

[*********************100%***********************]  5 of 5 completed


Unnamed: 0,Annual Volatilty,Sharpe,95% VaR,Max DD,Beta
Portfolio,0.829948,-2.356406,-0.106502,-0.439156,1.097892


In [1]:
[s.strip() for s in "'2020-01-01', '2025-01-01'".split(',')]

["'2020-01-01'", "'2025-01-01'"]