In [1]:
import pandas as pd
import numpy as np
import random
from datetime import timedelta

In [2]:
# Expand tickers to 50 and separate rating into its own column

# Create 50 fake tickers
base_tickers = [f"TICK{i:03}" for i in range(1, 51)]
ratings = ['AAA', 'AA', 'A', 'BBB', 'BB', 'B']
date_range = pd.date_range(start="2024-01-01", end="2024-03-31", freq="D")

# Generate the full universe: all combinations of ticker and rating
universe = [(ticker, rating) for ticker in base_tickers for rating in ratings]

records = []
for current_date in date_range:
    # For each rating, randomly pick 10–20 tickers to simulate trading on that day
    for rating in ratings:
        eligible_tickers = [ticker for ticker in base_tickers]
        traded_today = random.sample(eligible_tickers, k=random.randint(10, 20))
        for ticker in traded_today:
            expiry_date = current_date + timedelta(days=random.randint(30, 3650))
            features = np.random.normal(loc=0, scale=1, size=4)
            yield_value = 5 + features[0]*1 + features[1]*2 + np.random.normal(0, 0.5)
            records.append({
                'date': current_date,
                'ticker': f"{ticker} {rating}",
                'rating': rating,
                'feature_A': features[0],
                'feature_B': features[1],
                'feature_C': features[2],
                'feature_D': features[3],
                'expiry': expiry_date,
                'yield': yield_value
            })

df_large = pd.DataFrame(records)


In [28]:
import pandas as pd
from typing import Type
from abc import ABC, abstractmethod
from quantbullet.utils.consolidator import Consolidator

class DataHandler:
    def __init__(self, df: pd.DataFrame, date_level_name: str = 'date'):
        if date_level_name not in df.index.names:
            raise ValueError(f"'{date_level_name}' must be in the index names of the DataFrame.")
        
        self._date_level = date_level_name
        self.df = df.sort_index()
        self.all_dates = self.df.index.get_level_values(self._date_level).unique().sort_values()
        self.return_flat = False

    def get_window(self, end_date, window_size: int = None, use_full_history: bool = False) -> pd.DataFrame:
        
        end_date = Consolidator.to_time_stamp(end_date)
        if end_date not in self.all_dates:
            raise ValueError(f"End date {end_date} not in available dates.")

        end_idx = self.all_dates.get_loc(end_date)

        if use_full_history:
            window_dates = self.all_dates[: end_idx + 1]
        else:
            if window_size is None:
                raise ValueError("You must specify a window_size if use_full_history is False.")
            if end_idx + 1 < window_size:
                raise ValueError("Not enough data for the requested window size.")
            window_dates = self.all_dates[end_idx + 1 - window_size : end_idx + 1]

        result = self.df.loc[self.df.index.get_level_values(self._date_level).isin(window_dates)]
        return self._format_return(result)


    def get_next_day_data(self, current_date) -> pd.DataFrame | None:
        current_date = Consolidator.to_time_stamp(current_date)
        idx = self.all_dates.get_loc(current_date)
        if idx + 1 >= len(self.all_dates):
            return None
        next_day = self.all_dates[idx + 1]
        result = self.df.loc[self.df.index.get_level_values(self._date_level) == next_day]
        return self._format_return(result)
    
    def _format_return(self, df: pd.DataFrame) -> pd.DataFrame:
        return df.reset_index() if self.return_flat else df
    
    @classmethod
    def from_flat( cls, df: pd.DataFrame, date_col: str = 'date'):
        if date_col not in df.columns:
            raise ValueError(f"'{date_col}' must be a column in the DataFrame.")
        
        df_sorted = df.sort_values(by=date_col)
        df_indexed = df_sorted.set_index(date_col)
        
        instance = cls(df_indexed, date_level_name=date_col)
        instance.return_flat = True
        return instance

class Model(ABC):
    @abstractmethod
    def fit(self, X, y):
        pass

    @abstractmethod
    def predict(self, X):
        pass

class SimpleMeanModel(Model):
    def fit(self, X, y):
        self.mean_return = y.groupby(X.index.get_level_values('security')).mean()

    def predict(self, X):
        return X.index.get_level_values('security').map(self.mean_return).values

class ResultLogger:
    def __init__(self):
        self.predictions = []

    def log(self, date, security, prediction, actual):
        self.predictions.append({
            "date": date,
            "security": security,
            "prediction": prediction,
            "actual": actual
        })

    def to_frame(self):
        return pd.DataFrame(self.predictions)

class Backtester:
    def __init__(self, data: pd.DataFrame, model_class: Type[Model], window_size: int):
        self.data_handler = DataHandler(data)
        self.model_class = model_class
        self.window_size = window_size
        self.logger = ResultLogger()

    def run(self):
        all_dates = sorted(self.data_handler.df.index.get_level_values('date').unique())
        for i in range(self.window_size, len(all_dates) - 1):
            current_date = all_dates[i]
            window_data = self.data_handler.get_window(current_date, self.window_size)
            next_day_data = self.data_handler.get_next_day_data(current_date)

            if next_day_data is None:
                break

            X_train = window_data[['feature']]  # change this as needed
            y_train = window_data['target']
            X_test = next_day_data[['feature']]
            y_test = next_day_data['target']

            model = self.model_class()
            model.fit(X_train, y_train)
            y_pred = model.predict(X_test)

            for sec, pred, actual in zip(X_test.index.get_level_values('security'), y_pred, y_test):
                self.logger.log(current_date, sec, pred, actual)

        return self.logger.to_frame()


In [31]:
dh = DataHandler(df_large.set_index(['date', 'ticker']))
dh = DataHandler(df_large.set_index(['date']))
dh = DataHandler.from_flat(df_large, date_col='date')

In [32]:
dh.get_window('2024-01-20', window_size=5)

Unnamed: 0,date,ticker,rating,feature_A,feature_B,feature_C,feature_D,expiry,yield
0,2024-01-16,TICK013 BBB,BBB,-0.822462,-0.808287,1.322948,-0.431289,2029-07-25,3.313102
1,2024-01-16,TICK042 BBB,BBB,0.393233,-1.110280,-0.200273,0.085599,2026-03-11,3.254585
2,2024-01-16,TICK040 BB,BB,0.191740,1.010309,-0.422885,0.686635,2031-11-15,6.344249
3,2024-01-16,TICK009 BB,BB,-1.679160,-1.899144,-0.674185,0.509269,2028-01-09,-0.949719
4,2024-01-16,TICK035 BB,BB,-1.752920,-0.046660,-1.116742,0.133428,2026-06-12,2.743380
...,...,...,...,...,...,...,...,...,...
419,2024-01-20,TICK034 AA,AA,-0.952065,0.085421,-0.690457,-2.687656,2030-02-01,3.775494
420,2024-01-20,TICK005 A,A,-0.488983,-0.831111,-0.607724,-1.509591,2024-03-31,2.584561
421,2024-01-20,TICK007 A,A,-0.753803,0.078284,-1.811138,-0.482287,2032-07-05,4.006690
422,2024-01-20,TICK032 AA,AA,0.267818,-1.603792,0.477184,1.779072,2024-05-24,0.837745


In [27]:
# all_dates = self.df.index.get_level_values("date").unique().sort_values()
# end_idx = all_dates.get_loc(end_date)
# if end_idx + 1 < window_size:
#     raise ValueError("Not enough data for the requested window size.")

# # Get the rolling window of dates
# window_dates = all_dates[end_idx + 1 - window_size : end_idx + 1]
# return self.df.loc[self.df.index.get_level_values("date").isin(window_dates)]

In [7]:
from datetime import date

In [11]:
df = df_large.set_index(['date', 'ticker'])
all_dates = df.index.get_level_values("date").unique().sort_values()
end_idx = all_dates.get_loc("2024-01-02")