In [1]:
import inspect
from math import sqrt
import warnings
import numpy as np
from scipy import linalg

class Strategy:
    def __init__(self):
        self.stock_prices = []
        self.market_prices = []
        self.risk_free_rates = []
        pairs = [(4,5),(0,1)]
        self.strategies = []
        for pair in pairs:
            self.strategies.append((pair,PairTrader(pair)))

    def allocate_portfolio(self, stock_price, market_price, risk_free_rate):
        self.stock_prices.append(list(stock_price))
        self.market_prices.append(market_price)
        self.risk_free_rates.append(risk_free_rate)
        
        total_weights = np.array([0 for _ in range(10)])
        
        for strategy in self.strategies:
            pair = strategy[0]
            px,py = stock_price[pair[0]], stock_price[pair[1]]
            trader = strategy[1]
            total_weights = total_weights + np.array(trader.handleData(px,py))
        
        return total_weights

class PairTrader:
    def __init__(self,pair):
        self.pair = pair
        
        self.x_obs = [0]            
        self.y_obs = [0]
        
        self.kf_x = kf_x = KalmanFilter(transition_matrices=[1],observation_matrices=[1],initial_state_mean=0,initial_state_covariance=1,observation_covariance=1,transition_covariance=.01)
        self.state_mean_x = 0
        self.state_covariance_x = 1   
        
        self.kf_y = KalmanFilter(transition_matrices=[1],observation_matrices=[1],initial_state_mean=0,initial_state_covariance=1,observation_covariance=1,transition_covariance=.01)
        self.state_mean_y = 0
        self.state_covariance_y = 1
        
        self.spreadHist = []
        
        self.long = False
        self.short = False
        
        self.halflife = 8
        
        self.entry_Z = 2
        self.exit_Z = -1
        
        self.weights = [0 for _ in range(10)]
        
    def handleData(self,price_x,price_y):
        self.x_obs.append(price_x)
        self.y_obs.append(price_y)
        
        if(len(self.x_obs)>25):
            self.x_obs = self.x_obs[1:]
            self.y_obs = self.y_obs[1:]
        
        # filtered close price
        self.state_mean_x, self.state_covariance_x = (self.kf_x.filter_update(self.state_mean_x,self.state_covariance_x,price_x)) 
        self.state_mean_y, self.state_covariance_y = (self.kf_y.filter_update(self.state_mean_y,self.state_covariance_y,price_y))    
    
        # Calculate the hedge ratio
        state_means = KalmanFilterRegression(self.x_obs,self.y_obs)
        hr = - state_means[:,0][-1]  
        # Calculate the spread
        spread = price_y + hr*price_x
    
        # halflife spread
        self.spreadHist.append(spread)
        if len(self.spreadHist) <= self.halflife:
            return self.weights.copy()
        else:
            self.spreadHist = self.spreadHist[1:]
    
        # Calculate Z score
        mean_Z = np.mean(self.spreadHist)
        std_Z = np.std(self.spreadHist)    
        Z = (spread - mean_Z)/std_Z
    
        if Z>=self.exit_Z and self.long:
            self.long = False
            self.short = False
            new_weights = [0 for _ in range(10)]
            self.weights = new_weights
            
        if Z<=-self.exit_Z and self.short:
            self.long = False
            self.short = False
            new_weights = [0 for _ in range(10)]
            self.weights = new_weights
            
        if Z<=-self.entry_Z and not self.long:
            self.long = True
            self.short = False
            new_weights = [0 for _ in range(10)]
            new_weights[self.pair[1]] = 1
            new_weights[self.pair[0]] = hr
            self.weights = new_weights
        
        # short position: short y and long hr*x
        if Z>=self.entry_Z and not self.short: 
            self.short = True
            self.long = False
            new_weights = [0 for _ in range(10)]
            new_weights[self.pair[1]] = -1
            new_weights[self.pair[0]] = -hr
            self.weights = new_weights
        
        return self.weights.copy()

    def KalmanFilterAverage(x):
        # Construct a Kalman filter
        kf = KalmanFilter(transition_matrices = [1],
        observation_matrices = [1],
        initial_state_mean = 0,
        initial_state_covariance = 1,
        observation_covariance=1,
        transition_covariance=.01)
        # Use the observed values of the price to get a rolling mean
        state_means, _ = kf.filter(x.values)
        state_means = pd.Series(state_means.flatten(), index=x.index)
        return state_means

    # Kalman filter regression
    def KalmanFilterRegression(x,y):
        delta = 1e-3
        trans_cov = delta / (1 - delta) * np.eye(2) # How much random walk wiggles
        obs_mat = np.expand_dims(np.vstack([[x], [np.ones(len(x))]]).T, axis=1)
        kf = KalmanFilter(n_dim_obs=1, n_dim_state=2, # y is 1-dimensional, (alpha, beta) is 2-dimensional
        initial_state_mean=[0,0],
        initial_state_covariance=np.ones((2, 2)),
        transition_matrices=np.eye(2),
        observation_matrices=obs_mat,
        observation_covariance=2,
        transition_covariance=trans_cov)
        # Use the observations y to get running estimates and errors for the state parameters
        state_means, state_covs = kf.filter(y.values)
        return state_means




########################## Kalman Filter Codes Below ###############################

def array1d(X, dtype=None, order=None):
    return np.asarray(np.atleast_1d(X), dtype=dtype, order=order)

def array2d(X, dtype=None, order=None):
    return np.asarray(np.atleast_2d(X), dtype=dtype, order=order)

def check_random_state(seed):
    if seed is None or seed is np.random:
        return np.random.mtrand._rand
    if isinstance(seed, (int, np.integer)):
        return np.random.RandomState(seed)
    if isinstance(seed, np.random.RandomState):
        return seed
    raise ValueError('{0} cannot be used to seed a numpy.random.RandomState'
                     + ' instance').format(seed)

def get_params(obj):
    try:
        # get names of every variable in the argument
        args = inspect.getargspec(obj.__init__)[0]
        args.pop(0)   # remove "self"

        # get values for each of the above in the object
        argdict = dict([(arg, obj.__getattribute__(arg)) for arg in args])
        return argdict
    except:
        raise ValueError("object has no __init__ method")


def preprocess_arguments(argsets, converters):
    result = {}
    for argset in argsets:
        for (argname, argval) in argset.items():
            # check that this argument is necessary
            if not argname in converters:
                raise ValueError("Unrecognized argument: {0}".format(argname))

            # potentially use this argument
            if argname not in result and argval is not None:
                # convert to right type
                argval = converters[argname](argval)

                # save
                result[argname] = argval

    # check that all arguments are covered
    if not len(converters.keys()) == len(result.keys()):
        missing = set(converters.keys()) - set(result.keys())
        s = "The following arguments are missing: {0}".format(list(missing))
        raise ValueError(s)

    return result

# Dimensionality of each Kalman Filter parameter for a single time step
DIM = {
    'transition_matrices': 2,
    'transition_offsets': 1,
    'observation_matrices': 2,
    'observation_offsets': 1,
    'transition_covariance': 2,
    'observation_covariance': 2,
    'initial_state_mean': 1,
    'initial_state_covariance': 2,
}


def _determine_dimensionality(variables, default):
    # gather possible values based on the variables
    candidates = []
    for (v, converter, idx) in variables:
        if v is not None:
            v = converter(v)
            candidates.append(v.shape[idx])

    # also use the manually specified default
    if default is not None:
        candidates.append(default)

    # ensure consistency of all derived values
    if len(candidates) == 0:
        return 1
    else:
        if not np.all(np.array(candidates) == candidates[0]):
            raise ValueError(
                "The shape of all " +
                "parameters is not consistent.  " +
                "Please re-check their values."
            )
        return candidates[0]
    
def _last_dims(X, t, ndims=2):
    X = np.asarray(X)
    if len(X.shape) == ndims + 1:
        return X[t]
    elif len(X.shape) == ndims:
        return X
    else:
        raise ValueError(("X only has %d dimensions when %d" +
                " or more are required") % (len(X.shape), ndims))

def _filter(transition_matrices, observation_matrices, transition_covariance,
            observation_covariance, transition_offsets, observation_offsets,
            initial_state_mean, initial_state_covariance, observations):
    n_timesteps = observations.shape[0]
    n_dim_state = len(initial_state_mean)
    n_dim_obs = observations.shape[1]

    predicted_state_means = np.zeros((n_timesteps, n_dim_state))
    predicted_state_covariances = np.zeros(
        (n_timesteps, n_dim_state, n_dim_state)
    )
    kalman_gains = np.zeros((n_timesteps, n_dim_state, n_dim_obs))
    filtered_state_means = np.zeros((n_timesteps, n_dim_state))
    filtered_state_covariances = np.zeros(
        (n_timesteps, n_dim_state, n_dim_state)
    )

    for t in range(n_timesteps):
        if t == 0:
            predicted_state_means[t] = initial_state_mean
            predicted_state_covariances[t] = initial_state_covariance
        else:
            transition_matrix = _last_dims(transition_matrices, t - 1)
            transition_covariance = _last_dims(transition_covariance, t - 1)
            transition_offset = _last_dims(transition_offsets, t - 1, ndims=1)
            predicted_state_means[t], predicted_state_covariances[t] = (
                _filter_predict(
                    transition_matrix,
                    transition_covariance,
                    transition_offset,
                    filtered_state_means[t - 1],
                    filtered_state_covariances[t - 1]
                )
            )

        observation_matrix = _last_dims(observation_matrices, t)
        observation_covariance = _last_dims(observation_covariance, t)
        observation_offset = _last_dims(observation_offsets, t, ndims=1)
        (kalman_gains[t], filtered_state_means[t],
         filtered_state_covariances[t]) = (
            _filter_correct(observation_matrix,
                observation_covariance,
                observation_offset,
                predicted_state_means[t],
                predicted_state_covariances[t],
                observations[t]
            )
        )

    return (predicted_state_means, predicted_state_covariances,
            kalman_gains, filtered_state_means,
            filtered_state_covariances)

def _filter_correct(observation_matrix, observation_covariance,
                    observation_offset, predicted_state_mean,
                    predicted_state_covariance, observation):
    if not np.any(np.ma.getmask(observation)):
        predicted_observation_mean = (
            np.dot(observation_matrix,
                   predicted_state_mean)
            + observation_offset
        )
        predicted_observation_covariance = (
            np.dot(observation_matrix,
                   np.dot(predicted_state_covariance,
                          observation_matrix.T))
            + observation_covariance
        )

        kalman_gain = (
            np.dot(predicted_state_covariance,
                   np.dot(observation_matrix.T,
                          linalg.pinv(predicted_observation_covariance)))
        )

        corrected_state_mean = (
            predicted_state_mean
            + np.dot(kalman_gain, observation - predicted_observation_mean)
        )
        corrected_state_covariance = (
            predicted_state_covariance
            - np.dot(kalman_gain,
                     np.dot(observation_matrix,
                            predicted_state_covariance))
        )
    else:
        n_dim_state = predicted_state_covariance.shape[0]
        n_dim_obs = observation_matrix.shape[0]
        kalman_gain = np.zeros((n_dim_state, n_dim_obs))

        corrected_state_mean = predicted_state_mean
        corrected_state_covariance = predicted_state_covariance

    return (kalman_gain, corrected_state_mean,
            corrected_state_covariance)

def _filter_predict(transition_matrix, transition_covariance,
                    transition_offset, current_state_mean,
                    current_state_covariance):
    predicted_state_mean = (
        np.dot(transition_matrix, current_state_mean)
        + transition_offset
    )
    predicted_state_covariance = (
        np.dot(transition_matrix,
               np.dot(current_state_covariance,
                      transition_matrix.T))
        + transition_covariance
    )

    return (predicted_state_mean, predicted_state_covariance)


class KalmanFilter(object):
    def __init__(self, transition_matrices=None, observation_matrices=None,
            transition_covariance=None, observation_covariance=None,
            transition_offsets=None, observation_offsets=None,
            initial_state_mean=None, initial_state_covariance=None,
            random_state=None,
            em_vars=['transition_covariance', 'observation_covariance',
                     'initial_state_mean', 'initial_state_covariance'],
            n_dim_state=None, n_dim_obs=None):
        """Initialize Kalman Filter"""

        # determine size of state space
        n_dim_state = _determine_dimensionality(
            [(transition_matrices, array2d, -2),
             (transition_offsets, array1d, -1),
             (transition_covariance, array2d, -2),
             (initial_state_mean, array1d, -1),
             (initial_state_covariance, array2d, -2),
             (observation_matrices, array2d, -1)],
            n_dim_state
        )
        n_dim_obs = _determine_dimensionality(
            [(observation_matrices, array2d, -2),
             (observation_offsets, array1d, -1),
             (observation_covariance, array2d, -2)],
            n_dim_obs
        )

        self.transition_matrices = transition_matrices
        self.observation_matrices = observation_matrices
        self.transition_covariance = transition_covariance
        self.observation_covariance = observation_covariance
        self.transition_offsets = transition_offsets
        self.observation_offsets = observation_offsets
        self.initial_state_mean = initial_state_mean
        self.initial_state_covariance = initial_state_covariance
        self.random_state = random_state
        self.em_vars = em_vars
        self.n_dim_state = n_dim_state
        self.n_dim_obs = n_dim_obs

    def filter(self, X):
        Z = self._parse_observations(X)

        (transition_matrices, transition_offsets, transition_covariance,
         observation_matrices, observation_offsets, observation_covariance,
         initial_state_mean, initial_state_covariance) = (
            self._initialize_parameters()
        )

        (_, _, _, filtered_state_means,
         filtered_state_covariances) = (
            _filter(
                transition_matrices, observation_matrices,
                transition_covariance, observation_covariance,
                transition_offsets, observation_offsets,
                initial_state_mean, initial_state_covariance,
                Z
            )
        )
        return (filtered_state_means, filtered_state_covariances)

    def filter_update(self, filtered_state_mean, filtered_state_covariance,
                      observation=None, transition_matrix=None,
                      transition_offset=None, transition_covariance=None,
                      observation_matrix=None, observation_offset=None,
                      observation_covariance=None):
        # initialize matrices
        (transition_matrices, transition_offsets, transition_cov,
         observation_matrices, observation_offsets, observation_cov,
         initial_state_mean, initial_state_covariance) = (
            self._initialize_parameters()
        )
        transition_offset = _arg_or_default(
            transition_offset, transition_offsets,
            1, "transition_offset"
        )
        observation_offset = _arg_or_default(
            observation_offset, observation_offsets,
            1, "observation_offset"
        )
        transition_matrix = _arg_or_default(
            transition_matrix, transition_matrices,
            2, "transition_matrix"
        )
        observation_matrix = _arg_or_default(
            observation_matrix, observation_matrices,
            2, "observation_matrix"
        )
        transition_covariance = _arg_or_default(
            transition_covariance, transition_cov,
            2, "transition_covariance"
        )
        observation_covariance = _arg_or_default(
            observation_covariance, observation_cov,
            2, "observation_covariance"
        )

        # Make a masked observation if necessary
        if observation is None:
            n_dim_obs = observation_covariance.shape[0]
            observation = np.ma.array(np.zeros(n_dim_obs))
            observation.mask = True
        else:
            observation = np.ma.asarray(observation)

        predicted_state_mean, predicted_state_covariance = (
            _filter_predict(
                transition_matrix, transition_covariance,
                transition_offset, filtered_state_mean,
                filtered_state_covariance
            )
        )
        (_, next_filtered_state_mean,
         next_filtered_state_covariance) = (
            _filter_correct(
                observation_matrix, observation_covariance,
                observation_offset, predicted_state_mean,
                predicted_state_covariance, observation
            )
        )

        return (next_filtered_state_mean, next_filtered_state_covariance)

    def _initialize_parameters(self):
        """Retrieve parameters if they exist, else replace with defaults"""
        n_dim_state, n_dim_obs = self.n_dim_state, self.n_dim_obs

        arguments = get_params(self)
        defaults = {
            'transition_matrices': np.eye(n_dim_state),
            'transition_offsets': np.zeros(n_dim_state),
            'transition_covariance': np.eye(n_dim_state),
            'observation_matrices': np.eye(n_dim_obs, n_dim_state),
            'observation_offsets': np.zeros(n_dim_obs),
            'observation_covariance': np.eye(n_dim_obs),
            'initial_state_mean': np.zeros(n_dim_state),
            'initial_state_covariance': np.eye(n_dim_state),
            'random_state': 0,
            'em_vars': [
                'transition_covariance',
                'observation_covariance',
                'initial_state_mean',
                'initial_state_covariance'
            ],
        }
        converters = {
            'transition_matrices': array2d,
            'transition_offsets': array1d,
            'transition_covariance': array2d,
            'observation_matrices': array2d,
            'observation_offsets': array1d,
            'observation_covariance': array2d,
            'initial_state_mean': array1d,
            'initial_state_covariance': array2d,
            'random_state': check_random_state,
            'n_dim_state': int,
            'n_dim_obs': int,
            'em_vars': lambda x: x,
        }

        parameters = preprocess_arguments([arguments, defaults], converters)

        return (
            parameters['transition_matrices'],
            parameters['transition_offsets'],
            parameters['transition_covariance'],
            parameters['observation_matrices'],
            parameters['observation_offsets'],
            parameters['observation_covariance'],
            parameters['initial_state_mean'],
            parameters['initial_state_covariance']
        )

    def _parse_observations(self, obs):
        """Safely convert observations to their expected format"""
        obs = np.ma.atleast_2d(obs)
        if obs.shape[0] == 1 and obs.shape[1] > 1:
            obs = obs.T
        return obs

In [2]:
import pandas as pd
risk_free_rates = pd.read_csv('risk_free_train.csv', index_col=0)
stock_prices = pd.read_csv('stock_prices_train.csv', index_col=0)

In [None]:
strategy = Strategy()
BT = BackTester(stock_prices, risk_free_rates, strategy)
BT.run()
BT.evaluate()

In [None]:
class BackTester:
    def __init__(self, stock_market_prices, risk_free_rates, strategy):
        def transform_rates(rates):
            rates = np.array(rates)
            ret = []
            for i in rates:
                ret.extend([i[0] for _ in range(21)])	
            return ret
        self.stock_prices = stock_market_prices.iloc[:, :-1]
        self.market_prices = stock_market_prices.iloc[:, -1]
        self.risk_free_rates = transform_rates(risk_free_rates)	
        self.strategy = strategy

    def run(self, verbose=True):
        def calc_excR(s_p, r, pre_pos, pre_data):
            pre_stock_price, pre_r = pre_data[0], pre_data[1]
            stock_returns = (np.array(s_p) - np.array(pre_stock_price)) / np.array(s_p)
            return np.sum(stock_returns * np.array(pre_pos))
        cur_position = np.array([None] * np.shape(self.stock_prices)[1])
        pre_data = (None, None, None) 	# For Return Calculation
        self.dailiy_excR = [] 	# Daily Excessive Return	
        count = 0
        for ((idx, stock_price), market_price, risk_free_rate) in zip(self.stock_prices.iterrows(), self.market_prices, self.risk_free_rates):
            if cur_position.any() != None:
                self.dailiy_excR.append(calc_excR(stock_price, risk_free_rate, cur_position, pre_data))
                if verbose:
                    print(f"Daily Excessive Return {idx}:" ,self.dailiy_excR[-1])
            cur_position = self.strategy.allocate_portfolio(stock_price, market_price, risk_free_rate)
            pre_data = (stock_price, risk_free_rate)
            count += 1

    def evaluate(self):
        def calc_sharp(daily_excR, r):
            return (np.mean(daily_excR - r) / np.sqrt(np.var(daily_excR))) * np.sqrt(252)
        print("-------Evaluation-------")
        print("  Year  |   Annulized Sharp Ratio")
        a_sharps = []
        for i in range(10):
            a_sharp = calc_sharp(self.dailiy_excR[i:i+120], self.risk_free_rates[i + 120])
            a_sharps.append(a_sharp)
            print(f"    {i}   |   {a_sharp}  ")
        print(f"Mean: {np.mean(a_sharps)}")