In [None]:
from typing import List, Tuple, Optional, Mapping
import pandas as pd
import numpy as np
from collections import defaultdict
import statsmodels.api as sm
import pickle

In [None]:
class IndicesSelector():
    def __init__(self):
        pass

    def select(self, df: Optional[pd.DataFrame] = None) -> List[str]:
        price_df = pd.read_csv('price_df.csv')
        return price_df['ETF_Ticker'].unique()

In [None]:
class PairsSelector():
    def __init__(self):
        pass

    def select(self, df: Optional[pd.DataFrame] = None) -> List[List[str]]:
        training_df = pd.read_csv('TrainingSet.csv')
        pairs = [[p.split("_")[0], p.split("_")[1]] for p in training_df["Ticker_Pair"].unique()]
        return pairs

In [None]:
class Predictor():
    def __init__(self):
        pass
    
    def train(self, data: Optional[pd.DataFrame] = None, params: Optional[Mapping] = None):
        pass

    def predict(self, data: Optional[pd.DataFrame] = None, params: Optional[Mapping] = None) -> pd.DataFrame:
        pass

    def periodic_train_predict(self, data: Optional[pd.DataFrame] = None, params: Optional[Mapping] = None) -> pd.DataFrame:
        pass

In [None]:
class MockPredictor(Predictor):
    def __init__(self):
        pass

    def train(self, data: Optional[pd.DataFrame] = None, params: Optional[Mapping] = None):
        pass

    def predict(self, data: Optional[pd.DataFrame] = None, params: Optional[Mapping] = None) -> pd.DataFrame:
        return pd.read_csv("mock_evaluation_df.csv")

    def periodic_train_predict(self, data: Optional[pd.DataFrame] = None, params: Optional[Mapping] = None) -> pd.DataFrame:
        return self.predict(data, params)
    

In [None]:
class LinearPredictor(Predictor):
    def __init__(self):
        pass

    def train(self, data: Optional[pd.DataFrame] = None, params: Optional[Mapping] = None):
        pass

    def predict(self, data: Optional[pd.DataFrame] = None, params: Optional[Mapping] = None) -> pd.DataFrame:
        # return pd.read_csv("mock_evaluation_df.csv")
        if params is None:
            params = {
                'pred_period': 'D'
            }
        pred_period = params['pred_period'] 
        if pred_period == 'D':
            with open('../prediction/linear_model/predict/ReturnSpreadPredictions_d.pkl', 'rb') as f:
                return pickle.load(f)
        if pred_period == 'M':
            with open('../prediction/linear_model/predict/ReturnSpreadPredictions_M.pkl', 'rb') as f:
                return pickle.load(f)
        if pred_period == 'W':
            with open('../prediction/linear_model/predict/ReturnSpreadPredictions_w.pkl', 'rb') as f:
                return pickle.load(f)
        with open('../prediction/linear_model/predict/ReturnSpreadPredictions_d.pkl', 'rb') as f:
                return pickle.load(f)

    def periodic_train_predict(self, data: Optional[pd.DataFrame] = None, params: Optional[Mapping] = None) -> pd.DataFrame:
        return self.predict(data, params)
    

In [None]:
class XGBPredictor(Predictor):
    def __init__(self):
        pass

    def train(self, data: Optional[pd.DataFrame] = None, params: Optional[Mapping] = None):
        pass

    def predict(self, data: Optional[pd.DataFrame] = None, params: Optional[Mapping] = None) -> pd.DataFrame:
        # return pd.read_csv("mock_evaluation_df.csv")
        if params is None:
            params = {
                'pred_period': 'D'
            }
        pred_period = params['pred_period'] 
        if pred_period == 'D':
            with open('../prediction/xgboost_model/predict/ReturnSpreadPredictions_d.pkl', 'rb') as f:
                return pickle.load(f)
        if pred_period == 'M':
            return self.predict(data, params)
        if pred_period == 'W':
            with open('../prediction/xgboost_model/predict/ReturnSpreadPredictions_w.pkl', 'rb') as f:
                return pickle.load(f)
        with open('../prediction/xgboost_model/predict/ReturnSpreadPredictions_d.pkl', 'rb') as f:
                return pickle.load(f)

    def periodic_train_predict(self, data: Optional[pd.DataFrame] = None, params: Optional[Mapping] = None) -> pd.DataFrame:
        return self.predict(data, params)
    

In [None]:
class SignalGenerator():
    def __init__(self):
        pass

    def pair_sig_to_asset_sig(self, price_df: pd.DataFrame, signal_df: pd.DataFrame) -> pd.DataFrame:
        assets = price_df.columns
        strategy_asset = [[None] * len(assets)]

        def process_signal(x):
            if not x['pair']:
                strategy_asset.append([None] * len(assets))
                return
            buy_symbol = x['pair'].split('_')[0]
            sell_symbol = x['pair'].split('_')[1]
            # print(price_df.loc[x.name])
            buy_px = price_df.loc[x.name][buy_symbol]
            buy_qty = 1.0 / buy_px
            sell_px = price_df.loc[x.name][sell_symbol]
            # sell_px = price_df[(price_df['Date'] == x.name) & (price_df['ETF_Ticker'] == sell_symbol)]['ETF Price'].iloc[0]
            sell_qty = 1.0 / sell_px
            if x['side'] == True:
                strategy_asset[-1][assets.get_loc(buy_symbol)] = buy_qty
                strategy_asset[-1][assets.get_loc(sell_symbol)] = -sell_qty 
                strategy_asset.append([None] * len(assets))
                strategy_asset[-1][assets.get_loc(buy_symbol)] = -buy_qty 
                strategy_asset[-1][assets.get_loc(sell_symbol)] = sell_qty 
            elif x['side'] == False:
                strategy_asset[-1][assets.get_loc(buy_symbol)] = -buy_qty 
                strategy_asset[-1][assets.get_loc(sell_symbol)] = sell_qty 
                strategy_asset.append([None] * len(assets))
                strategy_asset[-1][assets.get_loc(buy_symbol)] = buy_qty 
                strategy_asset[-1][assets.get_loc(sell_symbol)] = -sell_qty 
            else:
                strategy_asset.append([None] * len(assets))

        signal_df.iloc[:-1].apply(process_signal, axis=1) 
        return pd.DataFrame(strategy_asset, columns=assets, index=signal_df.index)


    def generate(self, pairs: List[str], price_df: pd.DataFrame, \
                 predict_df: pd.DataFrame, params: Optional[Mapping] = None)\
                 -> Tuple[pd.DataFrame, pd.DataFrame]:
        pass

In [None]:
class PercentileCurrent(SignalGenerator):
    def __init__(self):
        pass

    def generate(self, pairs: List[str], price_df: pd.DataFrame, \
                 predict_df: pd.DataFrame, params: Optional[Mapping] = None)\
                 -> Tuple[pd.DataFrame, pd.DataFrame]:
        price_pivot_df = price_df.pivot_table("ETF Price", ["Date"], columns="ETF_Ticker")
        price_pivot_df.index =pd.to_datetime(price_pivot_df.index)
        if params is None:
            params = {
                'holding_period': 20,
                'distribution_period': 60
            }
        holding_period = params["holding_period"]
        distribution_period = params["distribution_period"]
        signal_df = pd.DataFrame()
        for pair in pairs:
            col = '_'.join(pair)
            signal_df[col] = price_pivot_df[pair[0]] - price_pivot_df[pair[1]]
        signal_df = signal_df.rolling(distribution_period).apply(lambda x: pd.Series(x).rank(pct=True).iloc[-1]) - 0.5
        signal_df = signal_df.iloc[::holding_period, :]
        signal_df['pair'] = np.abs(signal_df).idxmax(axis=1)
        signal_df['pair'].fillna(value='', inplace=True)
        signal_df['side'] = signal_df.apply(lambda x: x[x["pair"]] < 0 if x["pair"] else None, axis=1)
        pair_sig = self.pair_sig_to_asset_sig(price_pivot_df, signal_df)
        asset_sig = signal_df[['pair', 'side']]
        return (pair_sig, asset_sig)
        

In [None]:
class MostSpreadReturnPredict(SignalGenerator):
    def __init__(self):
        pass

    def generate(self, pairs: List[str], price_df: pd.DataFrame, \
                 predict_df: pd.DataFrame, params: Optional[Mapping] = None)\
                 -> Tuple[pd.DataFrame, pd.DataFrame]:

        price_pivot_df = price_df.pivot_table("ETF Price", ["Date"], columns="ETF_Ticker")
        if params is None:
            params = {
                'holding_period': 20
            }
        holding_period = params["holding_period"]
        signal_df = pd.DataFrame()
        predict_df = predict_df.reset_index()
        signal_df = predict_df.iloc[predict_df.groupby('Date')['pred_spread'].agg(lambda x: np.abs(x).idxmax())]
        signal_df.set_index("Date", inplace=True)
        signal_df.index = pd.to_datetime(signal_df.index)
        signal_df = signal_df.iloc[::holding_period, :]
        signal_df['side'] = signal_df['pred_spread'] > 0
        return self.pair_sig_to_asset_sig(price_pivot_df, signal_df), signal_df[['pair', 'side']]

In [None]:
class PairTradingPipeline():
    """
    @param price_df DataFrame with symbol columns and price values
    @param training_ratio ratio of the subset of the price_df to be used to select paris
    """
    def __init__(self, price_df: pd.DataFrame, training_ratio: float=0.5):
        self.price_df = price_df.copy()
        training_idx = int(len(price_df) * training_ratio)
        self.index_selection_df = self.price_df[:training_idx]
        self.pair_selection_df = self.price_df[training_idx:]
        self.selected_indices = None
        self.indicies_selector = IndicesSelector()
        self.pairs_selector = PairsSelector()
        self.selected_pairs = None
        self.predictors = {
            'linear': LinearPredictor()
        }
        self.predict_result = {
            'linear': None
        }
        self.signal_generator = {
            'percentile_current': PercentileCurrent(),
            'most_spread_rtn': MostSpreadReturnPredict()
        }

    def select_indicies(self) -> List[str]:
        self.selected_indices = self.indicies_selector.select(self.index_selection_df)
        return self.selected_indices

    def select_pairs(self) -> List[List[str]]:
        if self.selected_indices is None:
            self.select_indicies()
        # self.selected_pairs = self.pairs_selector.select(self.index_selection_df[[self.selected_indices]])
        self.selected_pairs = self.pairs_selector.select()
        return self.selected_pairs 

    def predict(self, predictor: str, pair: List[str], params: Optional[Mapping] = None, periodic: Optional[bool] = False):
        if periodic:
            # self.predict_result[predictor] = self.predictors[predictor].periodic_train_predict(self.pair_selection_df[pair])
            self.predict_result[predictor] = self.predictors[predictor].periodic_train_predict()
        else:
            # self.predictors[predictor].train(self.pair_selection_df)
            # self.predict_result[predictor] = self.predictors[predictor].predict(self.pair_selection_df[pair])
            self.predict_result[predictor] = self.predictors[predictor].predict()
        return self.predict_result[predictor]
    
    def create_signal(self, signalGenerator: str, predict_df: pd.DataFrame, params: Optional[Mapping] = None):
        if self.selected_pairs is None:
            self.select_pairs()
        return self.signal_generator[signalGenerator].generate(self.selected_pairs, self.price_df, predict_df, params)
    
    def generate_portfolio(self, trade_df: pd.DataFrame):
        price_pivot_df = self.price_df.pivot_table("ETF Price", ["Date"], columns="ETF_Ticker")
        price_pivot_df.index =pd.to_datetime(price_pivot_df.index)
        return (trade_df.reindex(price_pivot_df.index).fillna(method='ffill').fillna(0) * price_pivot_df.fillna(0)).sum(axis = 1)


In [None]:
price_df = pd.read_csv('price_df.csv')
price_df['Date'] = pd.to_datetime(price_df['Date'])

training_df = pd.read_csv('TrainingSet.csv')
training_df

pipeline = PairTradingPipeline(price_df)
pairs = pipeline.selected_pairs

In [None]:
linear_pred_df = pipeline.predict('linear', pairs)
trade_df, pair_trade_df = pipeline.create_signal('most_spread_rtn', linear_pred_df)
df_port = pipeline.generate_portfolio(trade_df)
trade_df.plot()

In [None]:
df_port.plot()

In [None]:
total_ret = (df_port + 1)
daily_ret = total_ret/total_ret.shift() - 1
daily_ret.plot()

In [None]:
trade_df, pair_trade_df = pipeline.create_signal('percentile_current', linear_pred_df)
df_port = pipeline.generate_portfolio(trade_df)
trade_df.plot()

In [None]:
df_port.plot()

In [None]:
total_ret = (df_port + 1)
daily_ret = total_ret/total_ret.shift() - 1
daily_ret.plot()