In [None]:
%load_ext autoreload
%autoreload 2

from enum import Enum
from typing import Tuple, Optional, Literal
from abc import ABC, abstractmethod

import pandas as pd
import numpy as np
import numba as nb
import torch
from plotly.subplots import make_subplots
import plotly.express as px
import plotly.graph_objects as go
import matplotlib.pyplot as plt
from tqdm import tqdm

from pyquant.heston_sim import *
from pyquant.heston import *
from pyquant.common import *
from pyquant.barrier import price_barrier_option
from pyquant.lsm import price_american_put_lsm

In [None]:
RANDOM_SEED = 90909090
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED);

CALIBR_DATA_DIR = 'calibration_data'
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.set_default_device(DEVICE)

In [None]:
class InstrumentType(Enum):
    FUTURE = 1
    OPTION = 2
    FUTURE_COMBO = 3
    OPTION_COMBO = 4
    CALL_OPTION = 5
    PUT_OPTION = 6
    ASSET = 7


class OptType(Enum):
    PUT = False
    CALL = True


class BaseOption(ABC):
    def __init__(self, strike: float, expiry_ts: int, opt_type: OptType):
        """
        Args:
            strike: Strike price of the option.
            expiry_ts: Expiration timestamp in nanoseconds.
        """
        self._strike: float = strike
        self._expiry_ts: int = expiry_ts
        self._type = opt_type
        self._ts: Optional[int] = None
        self._rate: Optional[float] = None
        self._time_to_expiry: Optional[float] = None
        self._are_params_set = False

    @property
    def get_type(self) -> OptType:
        return self._type

    @property
    def strike(self):
        return self._strike

    @property
    def expiry_ts(self):
        return self._expiry_ts

    @property
    def time_to_expiry(self):
        return self._time_to_expiry

    def set_params(self, ts: int, rate: float):
        """Sets current market parameters and Heston model parameters.

        Args:
            ts: Timestamp in nanoseconds
            rate: Forward yield.
        """
        self._ts = ts
        self._rate = rate
        self._time_to_expiry = (self._expiry_ts - self._ts) / YEAR_NANOS
        self._are_params_set = True

    @abstractmethod
    def get_premium(self, paths, dt) -> torch.Tensor:
        if not self._are_params_set:
            raise ValueError('Cannot calculate premium because `self.set_params()` was never called')


class VanillaOption(BaseOption):
    def __init__(self, strike: float, expiry_ts: int, opt_type: OptType):
        super().__init__(strike, expiry_ts, opt_type)

    def get_premium(self, paths, dt):
        super().get_premium(paths, dt)
        if self._type == OptType.PUT:
            sign = -1
        else:
            sign = 1

        payoffs = torch.maximum(sign * (paths[:, -1] - self._strike), torch.zeros_like(paths[:, -1]))
        n_steps = paths.shape[1] - 1
        T = dt * n_steps
        premium = torch.exp(torch.tensor(-self._rate * T)) * torch.mean(payoffs)
        return premium


class BarrierOption(BaseOption):
    def __init__(
        self,
        strike: float,
        expiry_ts: int,
        opt_type: OptType,
        barrier_price: float,
        barrier_type: Literal['up-in', 'up-out', 'down-in', 'down-out']
    ):
        super().__init__(strike, expiry_ts, opt_type)
        self._barrier_price = barrier_price
        self._barrier_type = barrier_type

    def get_premium(self, paths, dt):
        super().get_premium(paths, dt)
        n_steps = paths.shape[1] - 1
        T = dt * n_steps
        premium = price_barrier_option(
            paths, self._strike, T, self._rate, self._barrier_price,
            self._barrier_type, self.get_type.value
        )
        return premium


class AmericanOption(BaseOption):
    # TODO: implement
    
    def __init__(self, strike: float, expiry_ts: int, opt_type: OptType):
        super().__init__(strike, expiry_ts, opt_type)

    def get_premium(self, paths, dt):
        super().get_premium(paths, dt)


class HestonPortfolio:
    def __init__(
        self,
        options: Tuple[BaseOption],
        n_paths: int,
        max_dt: float,
        min_steps: int,
        minimum_var: float = 0.005
    ):
        self._options: Tuple[BaseOption] = options
        self.n_paths: int = n_paths
        self.max_dt: float = max_dt
        self.min_steps: int = min_steps
        self.minimum_var: float = minimum_var

        self.ts: Optional[int] = None
        self.spot: Optional[float] = None
        self.heston_params: HestonParams = None

        self.times_to_expiry: Optional[list[float]] = None
        self.unique_tte: Optional[np.ndarray[float]] = None
        self.unique_tte_idxs: Optional[np.ndarray[int]] = None
        self.tte_idxs_inverse: Optional[np.ndarray[int]] = None
        self.dt: Optional[list[float]] = None
        self.n_steps: Optional[list[int]] = None

        self._premiums: Optional[list[float]] = None
        self._deltas: Optional[list[float]] = None
        self._vegas: Optional[list[float]] = None

    def set_params(
        self,
        ts: int,
        spot: float,
        heston_params: HestonParams,
    ):
        """Sets current market parameters and Heston model parameters.

        Args:
            ts: Timestamp in nanoseconds.
            spot: Spot price.
            heston_params: Heston parameters.
        """
        self.ts = ts
        self.spot = spot
        self.heston_params = heston_params

        for option in self._options:
            option.set_params(self.ts, self.heston_params.r)
        
        self.times_to_expiry = [opt.time_to_expiry for opt in self._options]
        self.unique_tte, self.unique_tte_idxs, self.tte_idxs_inverse = \
            np.unique(self.times_to_expiry, return_index=True, return_inverse=True)
        
        self.dt = []
        self.n_steps = []
        
        for tte in self.unique_tte:
            if tte <= 0:
                self.dt.append(None)
                self.n_steps.append(None)
                continue
            
            dt = tte / self.min_steps
            if dt > self.max_dt:
                self.dt.append(self.max_dt)
                self.n_steps.append(round(tte / self.dt[-1]))
            else:
                self.dt.append(dt)
                self.n_steps.append(self.min_steps)

    def calculate_greeks(self):
        self._premiums = [0 for _ in range(len(self._options))]
        self._deltas = [0 for _ in range(len(self._options))]
        self._vegas = [0 for _ in range(len(self._options))]
        
        for tte_idx in range(len(self.unique_tte)):
            if self.unique_tte[tte_idx] <= 0:
                continue
            
            S0 = torch.tensor(self.spot, requires_grad=True)
            V0 = torch.tensor(self.heston_params.v0, requires_grad=True)
            init_price = S0 * torch.ones(self.n_paths)
            init_var = V0 * torch.ones(self.n_paths)
            paths, _ = generate_heston(
                self.n_paths,
                self.n_steps[tte_idx],
                self.dt[tte_idx],
                init_price,
                init_var,
                torch.tensor([self.heston_params.kappa]),
                torch.tensor([self.heston_params.theta]),
                torch.tensor([self.heston_params.eps]),
                torch.tensor([self.heston_params.rho]),
                torch.tensor([self.heston_params.r]),
                self.minimum_var
            )

            opt_idxs = np.where(self.tte_idxs_inverse == tte_idx)[0]
            for opt_idx in opt_idxs:
                premium = self._options[opt_idx].get_premium(paths, self.dt[opt_idx])
                delta, vega = torch.autograd.grad(premium, [S0, V0], retain_graph=True)
                self._premiums[opt_idx] = premium.item()
                self._deltas[opt_idx] = delta.item()
                self._vegas[opt_idx] = vega.item()

    @property
    def options(self):
        return self._options

    @property
    def premiums(self):
        return self._premiums

    @property
    def deltas(self):
        return self._deltas

    @property
    def vegas(self):
        return self._vegas


class Backtester:
    def __init__(
        self,
        portfolio: HestonPortfolio,
        market_data: pd.DataFrame,
        calibr_data: pd.DataFrame,
        fee: float,
        start_date: Optional[str] = None,
        end_date: Optional[str] = None,
    ):
        self.portfolio = portfolio
    
        if start_date is not None:
            ts_mask1 = calibr_data['timestamp'] >= pd.to_datetime(start_date)
        else:
            ts_mask1 = np.full(len(calibr_data), True)
        if end_date is not None:
            ts_mask2 = calibr_data['timestamp'] <= (pd.to_datetime(end_date) + pd.to_timedelta('1 day'))
        else:
            ts_mask2 = np.full(len(calibr_data), True)

        self.calibr_data = calibr_data[ts_mask1 & ts_mask2]
        self.market_data = market_data[market_data['sample_idx'].isin(self.calibr_data['sample_idx'])]
        self.fee = fee
        self.results = None

    def run(self):
        self.results = {'ts': [], 'spot': []}
        for i in range(len(self.portfolio.options)):
            self.results[f'{i}_premium'] = []
            self.results[f'{i}_delta'] = []
            self.results[f'{i}_vega'] = []

        for i in tqdm(range(len(self.calibr_data))):
            curr_cal = self.calibr_data.iloc[i]
            curr_md = self.market_data[self.market_data['sample_idx'] == curr_cal['sample_idx']]

            ts = curr_cal['timestamp'].value
            spot = curr_md[curr_md['instrument_type'] == InstrumentType.ASSET.value]['price'].item()
            heston_params = HestonParams(
                Variance(curr_cal['v0']),
                VarReversion(curr_cal['kappa']),
                AverageVar(curr_cal['theta']),
                VolOfVar(curr_cal['eps']),
                Correlation(curr_cal['rho']),
                FlatForwardYield(curr_cal['flat_yield'])
            )
            self.portfolio.set_params(ts, spot, heston_params)
            self.portfolio.calculate_greeks()

            portf_delta = sum(self.portfolio.deltas)
            portf_vega = sum(self.portfolio.vegas)

            self.results['ts'].append(ts)
            self.results['spot'].append(spot)
            for i in range(len(self.portfolio.options)):
                self.results[f'{i}_premium'].append(self.portfolio.premiums[i])
                self.results[f'{i}_delta'].append(self.portfolio.deltas[i])
                self.results[f'{i}_vega'].append(self.portfolio.vegas[i])

    def get_results(self) -> pd.DataFrame:
        res = pd.DataFrame(self.results)
        res['ts'] = pd.to_datetime(res['ts'])
        return res

In [None]:
OPTIONS = [
    VanillaOption(2600, pd.to_datetime('2024-01-12').value, OptType.CALL),
    # BarrierOption(2300, pd.to_datetime('2024-01-27').value, OptType.CALL, 2600, 'up-in')
]
N_PATHS = 200_000
MAX_DT = 1/365
MIN_STEPS = 20
MINIMUM_VAR = 0.005

portfolio = HestonPortfolio(OPTIONS, N_PATHS, MAX_DT, MIN_STEPS, MINIMUM_VAR)


FEE = 0.0003  # https://www.deribit.com/kb/fees 
START_DATE = '2024-01-02'
END_DATE = '2024-01-12'

MARKET_DATA = pd.read_parquet(f'{CALIBR_DATA_DIR}/market_data.parquet')
CALIBR_DATA = pd.read_parquet(f'{CALIBR_DATA_DIR}/calibr_data.parquet')

bt = Backtester(portfolio, MARKET_DATA, CALIBR_DATA, FEE, START_DATE, END_DATE)

In [None]:
bt.run()

In [None]:
bt.market_data.query('sample_idx == 2502 and instrument_type == 7')

# Backtest results

In [None]:
def plot_results(bt_results: pd.DataFrame):
    fig = make_subplots(rows=4, cols=1, shared_xaxes=True)
    column_names = bt_results.columns.to_list()[1:]
    for i, col in enumerate(column_names):
        fig.add_trace(
            go.Scatter(x=bt_results['ts'], y=bt_results[col], name=col.capitalize()),
            row=1+i, col=1
        )

    for i in range(len(column_names)):
        if i == 0:
            fig['layout'][f'yaxis']['title'] = column_names[i].capitalize()
        else:
            fig['layout'][f'yaxis{i + 1}']['title'] = column_names[i].capitalize()

    fig.update_layout(xaxis_showticklabels=True, xaxis2_showticklabels=True,
                      xaxis3_showticklabels=True, xaxis4_showticklabels=True)
    fig.update_traces(connectgaps=True)
    return fig

In [None]:
fig = plot_results(bt.get_results())
fig.update_layout(height=1000)