In [6]:
import math

import cvxpy as cp
import numpy as np

import copy

import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns


In [3]:
import math

import cvxpy as cp
import numpy as np


# %%
def PriceReverse(df, cycle, time):
    """
    Compute 1M Price Reversal
    Order: Ascending
    :param df: dataframe object (n*1 vector)
    :param cycle: how many days to look back to see its reversal
    :param time: current index for df to look at
    :return: PM_{i,t} = (Close_{i,t} - Close_{i, t-1}) / Close_{i, t-1}
    """
    try:
        previous_price = df.iloc[time - cycle]
        return (df.iloc[time] - previous_price) / previous_price
    except KeyError:
        pass


def PriceMomentum(df, cycle, time):
    """
    Compute 1M Price Reversal
    Order: Descending
    :param df: dataframe object (n*1 vector)
    :param cycle: how many days to look back to see its reversal
    :param time: current index for df to look at
    :return: PM_{i,t} = (Close_{i,t} - Close_{i, t-1}) / Close_{i, t-1}
    """
    try:
        previous_price = df.iloc[time - 2*cycle]
        return -(df.iloc[time] - previous_price) / previous_price
    except KeyError:
        pass


def Price_High_Low(df, cycle, time):
    """
    Compute High-minus-low:
    Order: Descending
    :param df: dataframe object (n*1 vector)
    :param cycle: how many days to look back to see its reversal
    :param time: current index for df to look at
    :return: HL_{i,t} = (High_{i,t} - Close_{i,t}) / (Close_{i,t} - Low_{i,t})
    """
    try:
        arr = df.iloc[time-cycle:time].values
        High = max(arr)
        Low = min(arr)
        return -(High - df.iloc[time]) / (df.iloc[time] - Low)
    except KeyError:
        pass


def Vol_Coefficient(df, cycle, time):
    """
    Compute Coefficient of Variation:
    Order: Descending
    :param df: dataframe object (n*1 vector)
    :param cycle: how many days to look back to see its reversal
    :param time: current index for df to look at
    :return: CV_{i,t} = Std(Close_i, cycle) / Ave(Close_i, cycle)
    """
    try:
        arr = df.iloc[time - cycle:time].values
        arr_pct = np.diff(arr) / arr[:len(arr)-1]
        std = np.std(arr_pct)
        avg = np.mean(arr_pct)
        return -std / avg
    except KeyError:
        pass


def AnnVol(df, cycle, time):
    """
    Compute Annual Volatility:
    Order: Descending
    :param df: dataframe object (n*1 vector)
    :param cycle: how many days to look back to see its reversal
    :param time: current index for df to look at
    :return: AnnVol = sqrt(252) * sqrt(1/21 * sum(r_{i,t-j}^2))
    where r_{i,s} = log(Close_{i,t} / Close_{i,t-1})
    """
    try:
        r_2 = int(0)
        for i in range(1, cycle):
            log = np.log(df.iloc[time - i] / df.iloc[time - i - 1])
            r_2 += log ** 2
        result = np.sqrt(252 / cycle * r_2)
        return -result
    except KeyError:
        pass


def MovingAverage(df, cycle, time):
    """
    Compute Moving Average:
    Order: Descending
    :param df: dataframe object (n*1 vector)
    :param cycle: how many days to look back to see its reversal
    :param time: current index for df to look at
    :return: (MA_10 - Price) + (MA_20 - Price) * 2 + (MA_50 - Price) * 5
    """
    if time - 50 <= 0 and not math.isnan(df.iloc[time - cycle]):
        return 0
    try:
        arr = df.iloc[time - 50:time].values
        cumsum = np.cumsum(np.insert(arr, 0, 0))
        ma10 = (cumsum[10:] - cumsum[:-10]) / 10
        ma20 = (cumsum[20:] - cumsum[:-20]) / 20
        ma50 = (cumsum[50:] - cumsum[:-50]) / 50
        res = ma10[-1] + ma20[-1] + ma50 - df.iloc[time]*3
        return res
    except KeyError:
        pass


def MACD(df, cycle, time):
    """
    Compute Moving Average Convergence Divergence:
    Order: Descending
    :param df: dataframe object (n*1 vector)
    :param cycle: how many days to look back to see its reversal
    :param time: current index for df to look at
    :return: cycle-Period EMA - cycle*2-Period EMA
    where EMA = Price_t * k + EMA_t-1 * (1-k)
    k = 2 / (N+1)
    """
    try:
        data = df.iloc[time - cycle:time]
        EMA_SR = data.ewm(span=cycle).mean()
        EMA_LR = data.ewm(span=cycle*2).mean()
        res = list(EMA_SR)[-1] - list(EMA_LR)[-1]
        return res
    except KeyError:
        pass


def BoolingerBands(df, cycle, time):
    """
    Compute Boolinger Bands:
    Order: Descending
    :param df: dataframe object (n*1 vector)
    :param cycle: how many days to look back to see its reversal
    :param time: current index for df to look at
    :return: Ave(cycle) +- 2 * Std(cycle)
    """
    if time - 2 * cycle <= 0 and not math.isnan(df.iloc[time - cycle]):
        return 0
    try:
        arr_lr = df.iloc[time - 2*cycle+1:time].values
        arr_sr = df.iloc[time - cycle:time].values
        # moving average for long-run
        cumsum = np.cumsum(np.insert(arr_lr, 0, 0))
        ma_cycle = (cumsum[cycle:] - cumsum[:-cycle]) / cycle
        delta = np.std(arr_sr)
        up_bound = ma_cycle + delta
        lw_bound = ma_cycle - delta
        midpoint = len(arr_sr) // 2
        res = sum(arr_sr[:midpoint] > up_bound[:midpoint]) - sum(arr_sr[midpoint:] < lw_bound[midpoint:])
        # calculate pct_change
        arr_pct = np.diff(arr_sr) / arr_sr[:len(arr_sr)-1]
        return res * np.std(arr_pct)
    except ValueError:
        pass



trading_strategies = [MovingAverage, PriceReverse, PriceMomentum, Price_High_Low, Vol_Coefficient, AnnVol, MACD, BoolingerBands]


In [4]:
def MinVariance(data, ranking, time, cycle):
    """
    MinVariance minimizes variance (needs short positions)
    Argument ranking: list of stocks from PitchStock
            return weighting for each stock (in percentage)
    """
    covar = np.zeros(shape=(len(ranking), cycle))
    for i in range(len(ranking)):
        covar[i] = data[ranking[i]].iloc[time-cycle:time].fillna(method='Backfill')
    inv_cov_matrix = np.linalg.pinv(np.cov(covar))
    ita = np.ones(inv_cov_matrix.shape[0])
    weight = (inv_cov_matrix @ ita) / (ita @ inv_cov_matrix @ ita)
    return weight


def EqualWeight(data, ranking, time, cycle):
    """
    EqualWeight assign weight by 1/N
    return weighting for each stock (in percentage)
    """
    N = len(ranking)
    weight = np.ones(shape=N) / N
    return weight


def MeanVariance_Constraint(data, ranking, time, cycle):
    """
    Mean Variance solved by convex optimization
    return weighting for each stock (in percentageg)
    """
    covar = np.zeros(shape=(len(ranking), cycle))
    for i in range(len(ranking)):
        covar[i] = data[ranking[i]].iloc[time-cycle:time].fillna(method='Backfill')
    cov_matrix = np.cov(covar)
    weight = cp.Variable(shape=len(ranking))
    objective = cp.Minimize(cp.quad_form(weight, cov_matrix))
    constraints = [cp.sum(weight) == 1, weight >= 1 / (2 * len(ranking))]
    problem = cp.Problem(objective, constraints)
    result = problem.solve()
    return weight.value


def RiskParity(data, ranking, time, cycle):
    """
    RiskParity inversely invest for stock according to their volatility
    disregards covariance is the major drawback
    return weighting for each stock (in percentage)
    """
    covar = np.zeros(shape=(len(ranking), cycle))
    for i in range(len(ranking)):
        covar[i] = data[ranking[i]].iloc[time - cycle:time]
    vol = np.array(covar.std(axis=1))
    vol = np.reciprocal(vol)
    weight = vol / vol.sum()
    return weight


rebalancing_strategies = [MinVariance, EqualWeight, MeanVariance_Constraint, RiskParity]


In [5]:
class Agent:

    def __init__(self, portfolio, data, trading_strategies, rebalancing_strategies, cycle, max_holding):
        """
        portfolio: dictionary (accounting book)
        Max_holding is the maximum number of stocks ths agent can hold
        Cycle is the rebalancing period
        Data is the dataset
        Strategies is which factor investing stratgy this Agent has in disposal
        """
        self.portfolio = portfolio
        self.data = data
        self.trading_strategies = trading_strategies
        self.rebalancing_strategies = rebalancing_strategies
        self.cycle = cycle
        self.equity = INITIAL_BALANCE
        self.re = float()
        self.tran_cost = float()
        self.rf = np.power(RISKFREE, self.cycle / 252)
        self.max_holding = max_holding

    def PitchStock(self, trading_strategy, time):
        """
        Argument trading_strategy: a function that takes (df, cycle, time) as argument
        return ranking: list of stocks that should invest
        """
        cycle = self.cycle
        data = self.data
        max_holding = self.max_holding
        ranking = {}
        for i in ticker:
            metric = trading_strategy(data[i], cycle, time)
            if metric is not None and not math.isnan(metric):
                ranking[i] = trading_strategy(data[i], cycle, time)
        result = sorted(ranking, key=ranking.get)[:max_holding]
        return result

    def Rebalancing(self, ranking, rebalancing_strategy, time):
        """
        Argument ranking: result from Agent.PitchStock
                rebalancing_strategy: a function that takes (df, ranking, time, cycle) as argument
                return target_portfolio: dictionary {Stock: # of shares}
        """
        cycle = self.cycle
        data = self.data
        cash = self.portfolio['cash']
        # assume that cash earns risk-free rate interest
        equity = self.get_Equity(time) + cash * (self.rf - 1)
        target_portfolio = {}
        weight = np.array(rebalancing_strategy(data, ranking, time, cycle))
        weight = (weight * equity).astype(int)
        for w, stock in zip(weight, ranking):
            price = data[stock].iloc[time]
            shares = w // price
            target_portfolio[stock] = shares
            equity -= shares * price
        target_portfolio['cash'] = equity
        return target_portfolio

    def get_Equity(self, time):
        """
        return the equity value for a given time
            sum weight * price
        """
        data = self.data
        portfolio = copy.deepcopy(self.portfolio)
        cash = portfolio['cash']
        total_equity = cash
        # compute the stock value
        del portfolio['cash']
        ticker = list(portfolio)
        shares = np.array(list(portfolio.values()))
        price = np.matrix(data[ticker].iloc[time])
        total_equity += price @ shares
        return total_equity.item()

    def Trading(self, target_portfolio, time):
        """
        Argument target_portfolio: a dictionary get from rebalance 
                    (what Agent.portfolio should be after trading)
        returns nothing but update:
                equity, portfolio, re, tran_cost
        """
        # take all necessary attributes from the class
        cost = 0
        portfolio = self.portfolio
        # selling and adjust share
        for i in list(portfolio):
            if i not in target_portfolio and i != 'cash':
                cost += portfolio[i] * TRANS_COST
            elif i in target_portfolio and i != 'cash':
                diff = abs(portfolio[i] - target_portfolio[i])
                cost += diff * TRANS_COST
        # buying
        for i in target_portfolio:
            if i not in portfolio:
                cost += target_portfolio[i] * TRANS_COST
        # update all the attribute of the agent
        self.tran_cost += cost
        self.portfolio = target_portfolio
        self.equity = self.get_Equity(time) - cost
        self.re = self.equity / INITIAL_BALANCE

    def get_Vol(self, time):
        """
        use portfolio weights to calcualte equity paths in this cycle
        and then compute its variance
        """
        cycle = self.cycle
        data = self.data
        portfolio = copy.deepcopy(self.portfolio)
        del portfolio['cash']
        # this is a vector of max_holding number of elements
        shares = np.array(list(portfolio.values()))
        # ticker in the portfolio except cash
        ticker = list(portfolio)
        price_matrix = np.matrix(data[ticker].iloc[time + 1 - cycle:time + 1])
        equity_path = price_matrix @ shares
        return equity_path

    def BackTesting_Single(self, trading_strategy, rebalancing_strategy):
        """
        This is backtsting for one single combination of trading and rebalancing strategy
        Return the total return, volatility and Sharpe ratio
        """
        cycle = self.cycle
        data = self.data
        print("Trading strategy: %s \n" % trading_strategy.__name__)
        print("Rebalancing strategy: %s \n" % rebalancing_strategy.__name__)
        T = len(data) // cycle
        print("We are rebalancing for %s number of times." % T)
        portfolio_path = []
        for i in range(1, T):
            time = i * cycle
            ranking = self.PitchStock(trading_strategy, time)
            target_portfolio = self.Rebalancing(ranking, rebalancing_strategy, time)
            # get volatility before portfolio updates
            portfolio_path.append(self.get_Vol(time))
            self.Trading(target_portfolio, time)
            print("Rebalancing for %s time!" % i)
        vol = np.std(portfolio_path) / np.sqrt(T * cycle) / 100
        # annualized return
        annual_return = (np.power(self.re, 252 // cycle / T) - 1) * 100
        # annualized risk free
        total_rf = np.power(RISKFREE, cycle * T / 252)
        sharpe = (self.re - total_rf) / vol
        return annual_return, vol, sharpe

    def BackTesting(self):
        """
        This is backtsting for all strategies
        Return two dictionary
            1. return for each strategy
            2. overall cost for each strategy
        """
        trading_strategies = self.trading_strategies
        rebalancing_strategies = self.rebalancing_strategies
        print("There are %s trading strategies and %s rebalancing strategies we are testing." % (
            len(trading_strategies), len(rebalancing_strategies)))
        print("They are: ")
        for i in trading_strategies:
            print("     %s \n" % i.__name__)
        for i in rebalancing_strategies:
            print("     %s \n" % i.__name__)
        portfolio_re = pd.DataFrame(index=[x.__name__ for x in rebalancing_strategies],
                                    columns=[x.__name__ for x in trading_strategies])
        portfolio_vol = pd.DataFrame(index=[x.__name__ for x in rebalancing_strategies],
                                     columns=[x.__name__ for x in trading_strategies])
        portfolio_sharpe = pd.DataFrame(index=[x.__name__ for x in rebalancing_strategies],
                                        columns=[x.__name__ for x in trading_strategies])
        for col, trading_strategy in enumerate(trading_strategies):
            for row, rebalancing_strategy in enumerate(rebalancing_strategies):
                # use BackTesting_Single to get the three value of metrics needed
                total_return, vol, sharpe = self.BackTesting_Single(trading_strategy, rebalancing_strategy)
                portfolio_re.iloc[row][col] = total_return
                portfolio_vol.iloc[row][col] = vol
                portfolio_sharpe.iloc[row][col] = sharpe
                # reset balance, equity, re, and transaction cost for the agent
                self.reset()
                print("\n")
        # turn this dictionary into a nicely presentable dataframe
        return portfolio_re, portfolio_vol, portfolio_sharpe

    def reset(self):
        """
        This reset the Agent to its initial holding. 
        Apply this method between testing different strategies.
        """
        self.portfolio = {'cash': INITIAL_BALANCE}
        self.equity = INITIAL_BALANCE
        self.re = float()
        self.tran_cost = float()

In [7]:
df = pd.read_csv("SP500.csv")
df.drop(['Unnamed: 0'], axis=1, inplace=True)
print(df.shape)
ticker = list(df.columns)[1:]

INITIAL_BALANCE = 51500
TRANS_COST = 0.00
# define the risk-free rate
RISKFREE = 1.00

wsw = Agent({'cash': INITIAL_BALANCE}, df[4000:], trading_strategies, rebalancing_strategies[1:], cycle=10, max_holding=20)

(4783, 456)


In [40]:
test = df.MMM
temp = test[0:50].values
temp

array([35.31, 34.9 , 35.25, 33.94, 34.14, 33.61, 33.27, 33.22, 32.5 ,
       33.62, 32.59, 32.29, 31.62, 32.25, 32.88, 31.74, 32.44, 31.79,
       31.78, 33.  , 32.78, 33.05, 32.21, 32.52, 33.08, 33.14, 33.2 ,
       32.78, 33.32, 33.81, 32.88, 33.99, 33.54, 33.73, 33.31, 33.01,
       33.05, 33.34, 33.83, 33.76, 32.82, 33.33, 33.59, 33.61, 33.81,
       35.09, 34.85, 33.47, 32.25, 32.19])

In [47]:
def BoolingerBands(df, cycle, time):
    """
    Compute Boolinger Bands:
    Order: Descending
    :param df: dataframe object (n*1 vector)
    :param cycle: how many days to look back to see its reversal
    :param time: current index for df to look at
    :return: Ave(cycle) +- 2 * Std(cycle)
    """
    if time - 2 * cycle <= 0 and not math.isnan(df.iloc[time - cycle]):
        return 0
    try:
        arr_lr = df.iloc[time - 2*cycle+1:time].values
        arr_sr = df.iloc[time - cycle:time].values
        # moving average for long-run
        cumsum = np.cumsum(np.insert(arr_lr, 0, 0))
        ma_cycle = (cumsum[cycle:] - cumsum[:-cycle]) / cycle
        delta = np.std(arr_sr)
        up_bound = ma_cycle + delta
        lw_bound = ma_cycle - delta
        midpoint = len(arr_sr) // 2
        res = sum(arr_sr[:midpoint] > up_bound[:midpoint]) - sum(arr_sr[midpoint:] < lw_bound[midpoint:])
        # calculate pct_change
        arr_pct = np.diff(arr_sr) / arr_sr[:len(arr_sr)-1]
        return res * np.std(arr_pct)
    except ValueError:
        pass

In [58]:
BoolingerBands(test, 20, 1860)

-0.030718310375564807

In [18]:
(34.9 - 35.31)/35.31

-0.011611441517983679