<a href="https://colab.research.google.com/github/eonlabs-research/rsr-fsa/blob/main/sr_mdd_equity_returns.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [22]:
!pip install plotly

import numpy as np
import pandas as pd
import plotly.graph_objects as go
import plotly.offline as pyo
import plotly.subplots as sp
import time
from copy import deepcopy
from multiprocessing import Pool, Manager



In [23]:
NUM_DAYS = 365 * 3
# Risk-free rate (annual)
ANNUAL_RISK_FREE_RATE = 0.01

# Target values for optimization
TARGET_SHARPE = 4
TARGET_DRAWDOWN = 0.05

# Optimization runs
NUM_RUNS = 1

# PSO Parameters
NUM_PARTICLES = 150
MAX_ITERATIONS = 400
INERTIA = 0.5
PERSONAL_BEST_WEIGHT = 1.5
GLOBAL_BEST_WEIGHT = 1.5

# Dynamic Adjustment Parameters
INERTIA_START = 0.9
INERTIA_END = 0.4
PERSONAL_BEST_START = 2.5
PERSONAL_BEST_END = 0.5
GLOBAL_BEST_START = 0.5
GLOBAL_BEST_END = 2.5

# Plotting parameters
CHART_HEIGHT = 900

In [24]:
def calculate_max_drawdown(nav_values):
    if np.isnan(nav_values).any() or np.isinf(nav_values).any():
        return np.inf
    # Safeguard against overflow
    safe_nav_values = np.clip(nav_values, -1e10, 1e10)
    return np.max(1 - safe_nav_values / np.maximum.accumulate(safe_nav_values))

def safe_cumprod(returns):
    # Cap the cumulative returns to avoid overflow
    capped_returns = np.clip(returns, -0.99, 1e10)
    return (capped_returns + 1).cumprod()

def new_sharpe_ratio(returns, annual_rf):
    if np.isnan(returns).any() or np.isinf(returns).any():
        return -np.inf
    excess_returns = returns - annual_rf/365
    return np.mean(excess_returns) / np.std(excess_returns, ddof=1) * np.sqrt(365)

# Particle definition for PSO v5
class Particle_v5:
    def __init__(self, num_days, annual_rf):
        # Adjust the initialization
        self.position = np.clip(np.random.normal(annual_rf/365, 0.01, num_days), -0.05, 0.05)  # Random initialization based on NUM_DAYS
        # Adjust the velocity range
        self.velocity = np.clip(np.random.uniform(-0.01, 0.01, num_days), -0.01, 0.01)
        self.best_position = np.copy(self.position)
        self.best_score = -np.inf

    def update(self, global_best_position, inertia, personal_best_weight, global_best_weight):
        inertia_component = inertia * self.velocity
        personal_best_component = personal_best_weight * np.random.random() * (self.best_position - self.position)
        global_best_component = global_best_weight * np.random.random() * (global_best_position - self.position)
        self.velocity = np.clip(inertia_component + personal_best_component + global_best_component, -0.01, 0.01) # Clip velocities again
        self.position += self.velocity


def safe_cumprod(returns):
    safe_returns = np.clip(returns + 1, -1e10, 1e10)
    return safe_returns.cumprod()

def pso_fitness_function_v2(returns, annual_rf, target_sharpe, target_drawdown):
    sharpe = new_sharpe_ratio(returns, annual_rf)
    equity_curve = safe_cumprod(returns)
    max_drawdown = calculate_max_drawdown(equity_curve)
    sharpe_penalty = -abs(sharpe - target_sharpe) * 10
    drawdown_penalty = -abs(max_drawdown - target_drawdown) * 10
    return sharpe_penalty + drawdown_penalty

def particle_swarm_optimization_v5(annual_rf, target_sharpe=4, target_drawdown=0.033, num_days=365):
    particles = [Particle_v5(num_days, annual_rf) for _ in range(NUM_PARTICLES)]
    global_best_position = np.random.normal(annual_rf/365, 0.01, num_days)
    global_best_score = -np.inf

    # Early Stopping Variables
    no_improvement_count = 0
    EARLY_STOPPING_THRESHOLD = 50
    previous_best = -np.inf

    for iteration in range(MAX_ITERATIONS):

        # Dynamic adjustments
        inertia = INERTIA_START - (iteration/MAX_ITERATIONS) * (INERTIA_START - INERTIA_END)
        personal_best_weight = PERSONAL_BEST_START - (iteration/MAX_ITERATIONS) * (PERSONAL_BEST_START - PERSONAL_BEST_END)
        global_best_weight = GLOBAL_BEST_START + (iteration/MAX_ITERATIONS) * (GLOBAL_BEST_END - GLOBAL_BEST_START)

        for particle in particles:
            particle.update(global_best_position, inertia, personal_best_weight, global_best_weight)

            fitness = pso_fitness_function_v2(particle.position, annual_rf, target_sharpe, target_drawdown)
            if fitness > particle.best_score:
                particle.best_score = fitness
                particle.best_position = deepcopy(particle.position)
            if fitness > global_best_score:
                global_best_score = fitness
                global_best_position = deepcopy(particle.position)

        # Early stopping check
        if global_best_score > previous_best:
            previous_best = global_best_score
            no_improvement_count = 0
        else:
            no_improvement_count += 1

        if no_improvement_count >= EARLY_STOPPING_THRESHOLD:
            break

        for particle in particles:
            particle.update(global_best_position, inertia, personal_best_weight, global_best_weight)


    return global_best_position, (global_best_position + 1).cumprod()

In [25]:
# Step 1: Create a function to check for overlap
def has_overlap(equity_curve_1, equity_curve_2, tolerance=1e-5):
    """Check if two equity curves overlap."""
    return np.all(np.abs(equity_curve_1 - equity_curve_2) < tolerance)

# Step 2: Modify diagnostic_run_pso
def diagnostic_run_pso(_):
    MAX_ATTEMPTS = 5
    attempts = 0
    while attempts < MAX_ATTEMPTS:
        returns_strict, equity_curve_strict = particle_swarm_optimization_v5(ANNUAL_RISK_FREE_RATE, TARGET_SHARPE, TARGET_DRAWDOWN, NUM_DAYS)
        sharpe_strict = new_sharpe_ratio(returns_strict, ANNUAL_RISK_FREE_RATE)
        mdd_strict = calculate_max_drawdown(equity_curve_strict)
        meets_criteria = (TARGET_SHARPE - 0.1 <= sharpe_strict <= TARGET_SHARPE + 0.1) and (TARGET_DRAWDOWN - 0.002 <= mdd_strict <= TARGET_DRAWDOWN + 0.002)

        # Check for overlap with previous results
        overlap = any(has_overlap(equity_curve_strict, previous_curve[1]) for previous_curve in list(strict_results_final))

        if meets_criteria and not overlap:
            return (returns_strict, equity_curve_strict, sharpe_strict, mdd_strict)

        attempts += 1
    return None

In [26]:
# Begin main execution
start_time = time.time()

# Step 3: Initialize the Manager and the shared list
manager = Manager()
strict_results_final = manager.list()

with Pool(processes=4) as pool:
    results = pool.map(diagnostic_run_pso, range(NUM_RUNS))

# Initialize our chest of treasures (a list) to store our unique equity curves
strict_results_final = []

# Adjust the main loop
while len(strict_results_final) < NUM_RUNS:
    with Pool(processes=4) as pool:
        results = pool.map(diagnostic_run_pso, range(NUM_RUNS - len(strict_results_final)))

    # Filter out None results and add to our chest of treasures
    for res in results:
        if res is not None:
            overlap = any(has_overlap(res[1], previous_curve[1]) for previous_curve in list(strict_results_final))
            if not overlap:
                strict_results_final.append(res)

success_rate = len(strict_results_final) / NUM_RUNS
print(f"Success rate of meeting criteria: {success_rate * 100:.2f}%")

print("Time taken:", time.time() - start_time, "seconds")

from datetime import datetime, timedelta

# Calculate the start date which is NUM_DAYS prior from today
start_date = datetime.today() - timedelta(days=NUM_DAYS)

# Step 4: Save the aggregated data for each run in the CSV format
for i, (returns, equity_curve, _, _) in enumerate(strict_results_final):
    data = {
        'Date': pd.date_range(start=start_date, periods=NUM_DAYS, freq='D'),
        'PnL': returns,
        'NAV': equity_curve
    }
    df = pd.DataFrame(data)
    # Saving to Colab's storage
    df.to_csv(f'run_{i+1}_data.csv', index=False)

# Step 6: Function to read the saved data from a CSV
def read_data_from_csv(run_number):
    df = pd.read_csv(f'run_{run_number}_data.csv')
    return df['Date'], df['PnL'], df['NAV']

# Step 7: Unit Test to validate the read-in data
for i in range(NUM_RUNS):
    date, pnl, nav = read_data_from_csv(i+1)
    original_returns, original_equity_curve, _, _ = strict_results_final[i]
    assert np.allclose(pnl, original_returns), f"Data mismatch for PnL in run {i+1}"
    assert np.allclose(nav, original_equity_curve), f"Data mismatch for NAV in run {i+1}"

print("🚀 Yeehaw! All data is saved and validated successfully. You're good to go, partner!")

# Step 8: Function to calculate MDD and Sharpe from the CSV
def calculate_metrics_from_csv(run_number):
    _, pnl, nav = read_data_from_csv(run_number)

    mdd = calculate_max_drawdown(nav)
    sharpe = new_sharpe_ratio(pnl, ANNUAL_RISK_FREE_RATE)

    return mdd, sharpe

# Step 9: Loop through each CSV and calculate metrics
for i in range(NUM_RUNS):
    mdd, sharpe = calculate_metrics_from_csv(i+1)
    print(f"🌵 For Run {i+1} 🌵")
    print(f"Max Drawdown: {mdd:.4f}")
    print(f"Sharpe Ratio: {sharpe:.4f}")
    print("-" * 40)  # Just to make it look spiffy


Success rate of meeting criteria: 100.00%
Time taken: 25.995115756988525 seconds
🚀 Yeehaw! All data is saved and validated successfully. You're good to go, partner!
🌵 For Run 1 🌵
Max Drawdown: 0.0500
Sharpe Ratio: 4.0000
----------------------------------------


In [27]:
fig_strict_final = go.Figure()

# Define the buttons for updating the y-axis type
buttons = [
    dict(args=[{"yaxis2.type": "linear"}], label="Linear", method="relayout"),
    dict(args=[{"yaxis2.type": "log"}], label="Log", method="relayout")
]

# Add the equity curve traces
for i in range(NUM_RUNS):
    date, pnl, nav = read_data_from_csv(i+1)
    fig_strict_final.add_trace(
        go.Scatter(
            x=date,  # We're using the date as the x-values now
            y=nav,
            mode='lines',
            name=f"Run {i+1}",
            xaxis="x2",
            yaxis="y2"
        )
    )

# Preparing data for the table
header_values = ["PSO Run", "Avg Daily PnL", "Cumulative PnL", "Annualized Sharpe Ratio", "Fractional MDD"]
rows = []
for i in range(NUM_RUNS):
    _, pnl, nav = read_data_from_csv(i+1)
    sharpe = new_sharpe_ratio(pnl, ANNUAL_RISK_FREE_RATE)
    mdd = calculate_max_drawdown(nav)
    rows.append([f"Run {i+1}", f"{np.mean(pnl):.7f}", f"{nav.iloc[-1]-1:.7f}", f"{sharpe:.7f}", f"{mdd:.4f}"])

# Creating a table at the top
fig_strict_final.add_trace(
    go.Table(
        domain=dict(x=[0, 1], y=[0.8, 1]),
        header=dict(values=header_values),
        cells=dict(values=np.transpose(rows))
    )
)

# Styling the layout
fig_strict_final.update_layout(
    template="plotly_dark",
    title="Equity Curves with Parameterized Settings",
    xaxis_title="Date",
    yaxis_title="Equity Value",
    font=dict(size=15, family='Roboto Mono, monospace'),
    hovermode="x unified",
    height=CHART_HEIGHT,
    xaxis2=dict(anchor="y2", domain=[0, 1]),
    yaxis2=dict(anchor="x2", domain=[0, 0.8]),
    updatemenus=[
        dict(
            type="buttons",
            showactive=True,
            y=0.05,
            x=0.95,
            xanchor="right",
            yanchor="bottom",
            buttons=buttons
        )
    ]
)

fig_strict_final.show()
