In [29]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.dates as dates
import pandas_datareader.data as web
from tqdm import tqdm
import yfinance as yf
import os
from typing import List, Tuple
from collections import namedtuple

In [30]:
stocks_tickers = []
futures_tickers = []

start = '2018-9-28'
end = '2025-4-10'

In [None]:
def load_cleaned_price_csv(filepath):
    df = pd.read_csv(filepath, header=[0, 1], index_col=0)
    for col in df.columns:
        df[col] = pd.to_numeric(df[col], errors='coerce')
    ticker = df.columns.levels[1][0]
    df = df.xs(ticker, axis=1, level=1)
    df.columns.name = None
    df.index = pd.to_datetime(df.index)
    return df

def load_price_data(folder_path):
    price_data = {}
    for filename in os.listdir(folder_path):
        if filename.endswith('.csv'):
            ticker = os.path.splitext(filename)[0]
            filepath = os.path.join(folder_path, filename)
            df = load_cleaned_price_csv(filepath=filepath)
            price_data[ticker] = df.dropna()
    return price_data

stock_data = load_price_data("/home/arnabdey/../My Codes/Risk Aware Portfolio Stocks")
futures_data = load_price_data("/home/arnabdey/../My Codes/Risk Aware Portfolio Futures")

In [32]:
print(len(stock_data))
print(len(futures_data))

80
14


In [33]:
print(stock_data['AAPL'].head())

                Close       High        Low       Open     Volume
Date                                                             
2018-09-28  53.656433  53.680200  53.247604  53.430623   91717600
2018-10-01  54.017723  54.531137  53.801427  54.181731   94403200
2018-10-02  54.497860  54.668998  53.867979  54.015347   99152800
2018-10-03  55.161030  55.493796  54.616715  54.680892  114619200
2018-10-04  54.191242  55.227576  53.891749  54.854399  128168000


In [34]:
class Particle:
    def __init__(self, uei_indices: np.ndarray, notionals: np.ndarray, velocity: np.ndarray):
        self.position = np.concatenate([uei_indices, notionals])
        self.velocity = velocity
        self.best_position = self.position.copy()
        self.best_fitness = np.inf
class RATSOptimiser:
    def __init__(self, portfolio, uei_pool, fitness_function, constraints, m=10, n_particles = 30):
        self.portfolio = portfolio
        self.uei_pool = uei_pool
        self.fitness_function = fitness_function
        self.constraints = constraints
        self.m = m
        self.n_particles = n_particles
        self.particles = []
        self.global_best_position = None
        self.global_best_fitness = np.inf
        self.initialize_particles()
    
    def initialize_particles(self):
        for _ in range(self.n_particles):
            uei_indices = np.random.choice(len(self.uei_pool), self.m, replace=False)
            notionals = np.random.randint(-10, 11, size=self.m)
            velocity = np.random.uniform(-1, 1, size=2 * self.m)
            self.particles.append(Particle(uei_indices, notionals, velocity))

    def optimize(self, n_iterations=100, w=0.5, c1=1.5, c2=1.5):
        for iteration in tqdm(range(n_iterations)):
            for particle in self.particles:
                position = particle.position
                uei_indices = position[:self.m].astype(int)
                notionals = position[self.m:].astype(int)
                fitness =self.fitness_function(uei_indices, notionals, self.portfolio, self.uei_pool)

                fitness += sum([penalty(uei_indices, notionals, self.portfolio, self.uei_pool) for penalty in self.constraints])

                if fitness < particle.best_fitness:
                    particle.best_position = particle.position.copy()
                    particle.best_fitness = fitness
                if fitness < self.global_best_fitness:
                    self.global_best_position = particle.position.copy()
                    self.global_best_fitness = fitness
            for particle in self.particles:
                r1, r2 = np.random.rand(2)
                cognitive = c1 * r1 * (particle.best_position - particle.position)
                social = c2 * r2 * (self.global_best_position - particle.position)
                particle.velocity = w * particle.velocity + cognitive + social
                particle.position = np.round(particle.position)

        return self.global_best_position, self.global_best_fitness
        

In [35]:
class UEI:
    def __init__(self, ticker, instrument_type, data):
        self.ticker = ticker
        self.instrument_type = instrument_type
        self.data = data
uei_pool = []

for ticker, df in stock_data.items():
    uei_pool.append(UEI(ticker=ticker, instrument_type='stock', data=df))
for ticker, df in futures_data.items():
    uei_pool.append(UEI(ticker=ticker, instrument_type='future', data=df))

In [36]:
Holding = namedtuple('Holding', ['uei', 'notional'])

initial_portfolio = [
    Holding(uei=uei_pool[0], notional=100),
    Holding(uei=uei_pool[3], notional=-50),
    Holding(uei=uei_pool[7], notional=75)
]

In [None]:
def compute_pnl(ueis: List, notionals: List[int], window: int = 250):
    pnl = np.zeros(window - 1)
    for uei, notional in zip(ueis, notionals):
        prices = uei.data.iloc[-window:]['Close'].values
        if len(prices) < 2:
            continue
        returns = np.diff(prices)

        if len(returns) < len(pnl):
            padded_returns = np.zeros_like(pnl)
            padded_returns[-len(returns):] = returns
        else:
            padded_returns = returns[-len(pnl):]
        
        pnl += notional * padded_returns
    return pnl

def compute_var(ueis: List, notionals: List[int], window: int = 250, confidence: float = 0.99):
    pnl = compute_pnl(ueis, notionals, window)
    sorted_pnl = np.sort(pnl)
    index = int((1 - confidence) * len(sorted_pnl))
    return sorted_pnl[index]

def compute_cost(ueis: List, notionals: List[int]):
    cost = 0.0
    for uei, notional in zip(ueis, notionals):
        try:
            price = uei.data['Close'].iloc[-1]
        except:
            price = 1.0

        spread = 0.001 if uei.instrument_type == 'stock' else 0.0005
        cost += abs(notional) * price * spread
    return cost


def compute_fitness(uei_indices, notionals, portfolio, uei_pool):

    H = [uei_pool[i] for i in uei_indices]
    h = notionals

    T_uei = [h.uei for h in portfolio] + H
    T_notionals = [h.notional for h in portfolio] + list(h)

    pnl_T = compute_pnl(T_uei, T_notionals)
    var_T = compute_var(T_uei, T_notionals)
    pnl_P = compute_pnl([h.uei for h in portfolio], [h.notional for h in portfolio])
    cost_H = compute_cost(H, h)

    denom = var_T - cost_H
    denom = denom if abs(denom) > 1e-6 else 1e-6

    return (np.mean(pnl_T) - np.mean(pnl_P) - cost_H) / denom
def get_delta(uei):
    return 1.0 if uei.instrument_type == 'stock' else 0.0

def get_vega(uei):
    return 0.2 if uei.instrument_type == 'option' else 0.0

def get_gamma(uei):
    return 0.1 if uei.instrument_type == 'option' else 0.0

def delta_constraint(uei_indices, notionals, portfolio, uei_pool, max_ratio=0.2):

    delta_H = sum(get_delta(uei_pool[i]) * h for i, h in zip(uei_indices, notionals))
    delta_P = sum(get_delta(h.uei) * h.notional for h in portfolio)
    
    return max(0, abs(delta_H) - max_ratio * abs(delta_P))
def vega_constraint(uei_indices, notionals, portfolio, uei_pool, max_ratio=0.2):
    
    vega_H = sum(get_vega(uei_pool[i]) * h for i, h in zip(uei_indices, notionals))
    vega_P = sum(get_vega(h.uei) * h.notional for h in portfolio)
    
    return max(0, abs(vega_H) - max_ratio * abs(vega_P))

def gamma_constraint(uei_indices, notionals, portfolio, uei_pool, max_ratio=0.2):
    
    gamma_H = sum(get_gamma(uei_pool[i]) * h for i, h in zip(uei_indices, notionals))
    gamma_P = sum(get_gamma(h.uei) * h.notional for h in portfolio)
    
    return max(0, abs(gamma_H) - max_ratio * abs(gamma_P))
def visualize_global_best(global_best_position, uei_pool, m):
    uei_indices = global_best_position[:m].astype(int)
    notionals = global_best_position[m:].astype(int)

    print(f"{'Index':<6} {'Ticker':<10} {'Type':<8} {'Notional':<10}")
    print("-" * 40)

    for i, (uei_idx, amount) in enumerate(zip(uei_indices, notionals)):
        if uei_idx < 0 or uei_idx >= len(uei_pool):
            print(f"{i:<6} INVALID INDEX {uei_idx}")
            continue

        uei = uei_pool[uei_idx]
        print(f"{i:<6} {uei.ticker:<10} {uei.instrument_type:<8} {amount:<10}")


Use the Optimiser from Paper

In [38]:
rats = RATSOptimiser(
    portfolio=initial_portfolio,
    uei_pool=uei_pool,
    fitness_function=compute_fitness,
    constraints=[delta_constraint, vega_constraint, gamma_constraint],
    m = 6,
    n_particles=50
)

z, best_score = rats.optimize(n_iterations=100)
visualize_global_best(z, uei_pool, m=6)

100%|██████████| 100/100 [00:10<00:00,  9.49it/s]

Index  Ticker     Type     Notional  
----------------------------------------
0      CL=F       future   8         
1      TSLA       stock    1         
2      ZN=F       future   -9        
3      NFLX       stock    1         
4      MSFT       stock    6         
5      GC=F       future   9         



