In [None]:
import numpy as np
import scipy.stats as si
from statsmodels.tsa.seasonal import STL
import yfinance as yf
from datetime import datetime
import statsmodels.api as sm
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import RobustScaler
from typing import Dict, Callable, List


In [None]:
cnf = {
    "start": "2020-01-01",
    "end": "2025-01-01",
    "tickers": ["NVDA", "TSLA", "LMT", "AAPL", "MSFT", "SPY"],
    "trend": None,
    "stl_period": 21,
    "robust": True,
    "market_ticker": "^GSPC",
    "rfr": 0.04,
    "w_delta": 21,
    "w_vol": 21,
    "delta_smooth_method": "mean",
    "vix_ticker": "^VIX",
    "gamma_pow_transform_method": "Identity",
    "vanna_log_transform_lambda": 5,
    "charm_tanh_sign_alpha": 0.5,
    "trend_power_transform_method": "Power",
    "weight_cons": None
}

In [None]:
def get_data(stock_tickers:List[str], market_ticker:str,  vix_ticker: str, start:str, end:str):
  tickers=stock_tickers + [market_ticker, vix_ticker]
  data = yf.download(tickers=tickers, start=start, end=end, group_by="ticker")
  return data, tickers

In [None]:
data, tickers = get_data(stock_tickers=cnf["tickers"], market_ticker=cnf["market_ticker"], vix_ticker=cnf["vix_ticker"], start=cnf["start"], end=cnf["end"])

In [None]:

def apply_decomposition_metrics(log_returns: pd.DataFrame|np.ndarray,
                            period: int|None=None, trend: int|None=None,
                            robust: bool=True) -> pd.DataFrame:
  if not isinstance(log_returns, pd.DataFrame) and period is None:
    raise ValueError("If log_returns is not DataFrame, period should be specified")

  stl = STL(log_returns[1:,], period=period, robust=robust, trend=trend)
  res = stl.fit()

  return res.seasonal, res.trend

def compute_realized_vol(df: pd.DataFrame,tickers:
                         List[str], lambda_: int|None=None,
                         window: int|None=21) -> pd.Series:
  df = df.copy()

  for ticker in tickers:
    print(f"-----{ticker}-----")
    df[(ticker,"LogRet")] = \
        np.log(df[(ticker,'Close')]).diff()

    if lambda_:
      df[(ticker,"RealizedVol")] = \
          df[(ticker,"LogRet")].emw((1-lambda_)).std() * np.sqrt(252)

    elif window:
      df[(ticker,"RealizedVol")] = \
            df[(ticker,"LogRet")].rolling(window).std() * np.sqrt(252)

  return df

def get_estimation(x:pd.DataFrame, y:pd.Series, window:int) -> pd.Series:
    x = x.fillna(0)
    deltas, gammas, vannas, vegas, index = [], [], [], [], []

    for i in range(window, len(y)+1):
        x_window = x.iloc[i-window:i]
        y_window = y.iloc[i-window:i]

        model = sm.OLS(y_window, sm.add_constant(x_window)).fit()

        deltas.append(model.params['mkt'])
        gammas.append(2 * model.params['mkt_sq'])
        vannas.append(model.params['mkt_interaction'])
        vegas.append(model.params['vix'])
        index.append(y.index[i-1])

    return (
        pd.Series(deltas, index=index),
        pd.Series(gammas, index=index),
        pd.Series(vannas, index=index),
        pd.Series(vegas, index=index),
    )

def get_sigma_hat(r_i: pd.Series, window: int=21):
  return r_i.ewm(span=window).std()

def get_charm(deltas: List[float], L:int=5):
  charm = deltas.rolling(L).mean()
  return charm

def apply_greeks(market_returns: pd.DataFrame, stock_returns: pd.DataFrame,
                 vix: pd.DataFrame, sigma: pd.DataFrame, window: int= 21, L:int=5):
    vix = vix.reindex(stock_returns.index)
    sigma_diff = sigma.diff().fillna(0)

    X = pd.DataFrame({
        'mkt': market_returns,
        'mkt_sq': market_returns**2,
        'mkt_interaction': market_returns * sigma_diff,
        "vix": vix
    }, index=market_returns.index)

    y = stock_returns
    delta, gamma, vanna, vega = get_estimation(window=window, x=X, y=y)
    charm = get_charm(delta)

    return delta, gamma, vanna, vega, charm

def apply_scaler(df:pd.DataFrame):
  scaler = RobustScaler()
  columns = df.columns
  df_scaled = scaler.fit_transform(df)

  for k, v in zip(columns, df_scaled.T):
    df[k] = v

  return df

def apply_inverse_logistic_penalty(
    feature:pd.Series, a:int, window:int, method:str):
  if method == "mean":
    t = feature.rolling(window=window, min_periods=1).mean()
  elif method == "median":
    t = feature.rolling(window=window, min_periods=1).median()
  else:
    t = 0

  z = a*(feature-t)
  return 1/(1+np.exp(z))

def apply_power_transform(feature:pd.Series, method:str="Identity", power:float|int=2):
  x = np.array(feature, dtype=float)

  if method == "Identity":
    return x
  elif method == "Sqrt":
    return np.sqrt(np.maximum(x, 0))
  elif method == "Power":
    return np.power(x, power)
  elif method == "Reciprocal":
    return 1 / (x + 1e-8)
  elif method == "Log":
    return np.log(x + 1e-8)
  else:
    raise ValueError(f"Unknown transform method {method}")

def apply_tanh_transform(feature: pd.Series, alpha:float=0.5) -> pd.Series:
  return np.sign(feature) * np.tanh(alpha * np.abs(feature))

def apply_log_transform(feature: pd.Series, l:int=5) -> pd.Series:
  return np.log1p(l*np.abs(feature))

def apply_softplus(feature: pd.Series) -> pd.Series:
  return np.log1p(np.exp(np.clip(feature, -50, 50)))

def apply_signed_fractional_power_transformation(
    feature: pd.Series, power:int=0.7) -> pd.Series:
  return np.sign(feature) * np.abs(feature)**power

def normalize(feature_df:pd.Series) -> pd.Series:
  wins = feature_df.clip(
      lower = feature_df.quantile(0.01),
      upper = feature_df.quantile(0.99)
  )

  med = wins.median()
  iqr = wins.quantile(0.75) - wins.quantile(0.25)

  zscore = (wins - med) / iqr

  return zscore

def scale(df: pd.DataFrame, a:int=1, window:int=5, method: str|None=None,
          l_gamma:str="Identity", l_vanna:int=5, tanh_sign_alpha:float=0.5,
          trend_method:str="Power") -> pd.DataFrame:
  df_scaled = apply_scaler(df=df)

  df_scaled['Delta'] = apply_inverse_logistic_penalty(feature=df_scaled['Delta'], a=a, window=window, method=method)
  df_scaled['Gamma'] = apply_power_transform(feature=df_scaled['Gamma'], method=l_gamma)
  df_scaled['Vanna'] = apply_log_transform(feature=df_scaled['Vanna'], l=l_vanna)
  df_scaled['Vega'] = apply_log_transform(feature=df_scaled['Vega'], l=l_vanna)
  df_scaled['Charm'] = apply_tanh_transform(feature=df_scaled['Charm'], alpha=tanh_sign_alpha)
  df_scaled['Season'] = apply_softplus(feature=df_scaled['Season'])
  df_scaled['Trend'] = apply_signed_fractional_power_transformation(feature=df_scaled['Trend'])

  for col in df_scaled.columns:
    df_scaled[col] = normalize(df_scaled[col])

  return df_scaled



In [None]:
data = compute_realized_vol(data, tickers)
sigma_hat = get_sigma_hat(data[cnf["market_ticker"]]["LogRet"],
                          window=cnf['w_vol'])

In [None]:
def generate_featureset(data:pd.DataFrame, market_data: pd.DataFrame,
                 vix: pd.DataFrame, tickers:List[str], sigma: pd.Series):
  composite_df = []
  for ticker in tickers:
    print(f"-----{ticker}-----")
    tmp_data = data[ticker]
    season, trend  = apply_decomposition_metrics(tmp_data["LogRet"],
                                                 period=cnf["stl_period"])

    df = pd.DataFrame({
        "Trend": trend,
        "Season": season
    }, index=tmp_data.index)

    greeks = apply_greeks(
        market_returns=market["LogRet"], stock_returns=tmp_data["LogRet"],
        vix=vix, sigma=sigma_hat, window=cnf['w_delta']
    )

    for k, v in zip(["Delta", "Gamma", "Vanna", "Vega", "Charm"], greeks):
      df[k] = v
    df = df.dropna()
    df = scale(df=df, method=cnf["delta_smooth_method"],
              l_gamma=cnf["gamma_pow_transform_method"],
              l_vanna=cnf["vanna_log_transform_lambda"],
              tanh_sign_alpha=cnf["charm_tanh_sign_alpha"],
              trend_method = cnf["trend_power_transform_method"]
    )
    composite_df.append(df)
  
  output = pd.concat(
      composite_df,
      keys=tickers,
      names=["Tickers", "Dates"]
  )
    
  return output


In [None]:
market = data["^GSPC"]
vix = data[("^VIX", "Close")]
df = data.drop(["^GSPC", "^VIX"], axis=1)

composite_df = generate_featureset(df, market, vix, sigma=sigma_hat, tickers=cnf["tickers"])


In [None]:

class ScoringEngine:
  def __init__(self, features: List[str], composite_df: pd.DataFrame):
    self.features = features
    self.composite_df = composite_df

  def compute_scores(self, coefs: np.ndarray, return_scores: bool=False):
    self.S = {}
    self.tickers = []
    for ticker in self.composite_df.index.levels[0]:
      self.tickers.append(ticker)
      df_t = self.composite_df.loc[ticker]
      self.S[ticker] = np.log1p(np.exp(df_t.values.dot(coefs)))

    print(self.S)
    if not isinstance(self.S, pd.Series):
      self.S = pd.DataFrame(self.S, index=df_t.index)
    if return_scores:
      return self.S if not self.S.empty else None

  def compute_weights(self, cons: Dict[str, float]|None=None, scores: pd.DataFrame|None=None):
    if cons is not None:
      allowed_keys = ["bounds", "vol_target"]
      for k, v in cons.items():
        if k not in allowed_keys:
          raise ValueError(f"Unknown constraint {k}")

    if self.S.empty and scores is None:
      raise ValueError("Scores not computed")

    scores = self.S if scores is None else scores
    W_i = self.S.copy()
    W_i[:] = np.nan

    for i in range(len(self.S.iloc[:,0])):
      row = scores.iloc[i]
      W_i.iloc[i] = row / np.sum(row)
    return W_i


In [None]:
theta_init = np.array([
    0.8,   # Delta weight
    0.3,   # Gamma weight
    0.4,   # Vanna weight
    0.5,   # Vega weight
    0.2,   # Charm weight
    1.0,   # Trend weight
    0.6    # Seasonality weight
])

In [None]:
eng = ScoringEngine(features=df[tickers[0]].columns, composite_df=composite_df)
eng.compute_scores(coefs=theta_init)

In [None]:
eng = ScoringEngine(features=df[tickers[0]].columns, composite_df=composite_df)
S = eng.compute_scores(coefs=theta_init)
W_i = eng.compute_weights(cons=cnf["weight_cons"])