In [2]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
plt.style.use('ggplot')

In [3]:
np.random.seed(1600134)

# Time Series Generation

In [4]:
T = 2000
y_t = np.zeros(2000)
y_t[0] = 100 
y_t[1] = 100
e_t = np.random.standard_normal(size = T)

In [5]:
%matplotlib inline
phi = 0.6
d = 0.025
theta = -0.4 
for i in range(2,2000):
    dy_ = y_t[i-1] - y_t[i-2]
    dy = 0.6*(dy_ - d) + e_t[i] + theta*e_t[i-1]
    y_t[i] = y_t[i-1] + dy

In [6]:
%matplotlib qt
plt.figure(figsize=(12.6,4.6))
plt.plot(y_t)
plt.xlabel("Days")
plt.ylabel("Closing Price ($)")
plt.title("Price movement over 2000 days")

Text(0.5, 1.0, 'Price movement over 2000 days')

In [7]:
X_train, X_test = y_t[:1400], y_t[len(y_t) - 600:]

# Trading Strategies

I am going to look at the following strategies: buy and hold, trend-following and mean-reversion. 

In [8]:
def log_returns(TV):
    returns = [0]
    for i in range(1,len(TV)):
        r = np.log(TV[i]/TV[i-1])
        returns.append(r)
    return returns

In [9]:
def sharpe_ratio(returns):
    returns=np.array(returns)
    avg_return = returns.mean()
    std = returns.std()
    return avg_return*np.sqrt(252)/std

In [10]:
cash = np.zeros(np.shape(y_t))
cash[0] = 10000

### Buy and hold

I will be using this as a benchmark against our 3 trading strategies.

In [11]:
TV_bh = cash[0]*y_t/y_t[0]

# Finding the optimal window length

Here we are creating functions for our strategies.  

In [12]:
def mean_rev(window, data):
    t = np.linspace(0, len(data), len(data))
    time_window = window
    cumsum = [0]

    ma = np.zeros(np.shape(data))

    w = np.zeros(np.shape(data))
    cash = np.zeros(np.shape(data))

    cash[0] = 10000

    for i, x in enumerate(data[:-1], 0):
        cumsum.append(cumsum[i] + x)
        ma[i] = x
        if i>=time_window:
            moving_ave = (cumsum[i] - cumsum[i-time_window])/(time_window)
            ma[i] = moving_ave

        if ma[i] == x:
            w[i+1] = w[i]
            cash[i+1] = cash[i]

        if ma[i] > x: 
            w[i+1] = cash[i]/x  + w[i]
            cash[i+1] = 0

        if ma[i] < x:
            cash[i+1] = w[i]*x + cash[i]
            w[i+1] = 0

    ma[i+1] = data[len(data)-1]

    mr_strategy = [a*b for a,b in zip(w,data)]+ cash
    returns = log_returns(mr_strategy)
    sharpe = sharpe_ratio(returns)
    return mr_strategy, sharpe


In [13]:
def trend_f(window, data):
    t = np.linspace(0, len(data), len(data))
    time_window = window
    cumsum = [0]

    ma = np.zeros(np.shape(data))

    w = np.zeros(np.shape(data))
    cash = np.zeros(np.shape(data))

    cash[0] = 10000

    for i, x in enumerate(data[:-1], 0):
        cumsum.append(cumsum[i] + x)
        ma[i] = x
        if i>=time_window:
            moving_ave = (cumsum[i] - cumsum[i-time_window])/(time_window)
            ma[i] = moving_ave

        if ma[i] == x:
            w[i+1] = w[i]
            cash[i+1] = cash[i]

        if ma[i] < x: 
            w[i+1] = cash[i]/x  + w[i]
            cash[i+1] = 0

        if ma[i] > x:
            cash[i+1] = w[i]*x + cash[i]
            w[i+1] = 0

    ma[i+1] = data[len(data)-1]

    tf_strategy = [a*b for a,b in zip(w,data)]+ cash
    returns = log_returns(tf_strategy)
    sharpe = sharpe_ratio(returns)
    return tf_strategy, sharpe
    



In [159]:
def dual_cv(long_w, short_w, data):
    X_train=data
    cumsum = [0]
    flag=-1

    long_ma = np.zeros(np.shape(X_train))
    short_ma = np.zeros(np.shape(X_train)
                       )
    w = np.zeros(np.shape(X_train))
    cash = np.zeros(np.shape(X_train))

    cash[0] = 10000

    for i, x in enumerate(X_train[:-1], 0):
        cumsum.append(cumsum[i] + x)
        long_ma[i] = x
        short_ma[i] = x
        if i>=long_w:
            long_moving_ave = (cumsum[i] - cumsum[i-long_w])/(long_w)
            short_moving_ave = (cumsum[i] - cumsum[i-short_w])/(short_w)
            long_ma[i] = long_moving_ave
            short_ma[i] = short_moving_ave

        if flag == -1:
            if short_ma[i] > long_ma[i]:
                flag = 1
                w[i+1] = cash[i]/x + w[i]
                cash[i+1] = 0
            elif short_ma[i] < long_ma[i]:
                flag = 0
                cash[i+1] = w[i]*x + cash[i]
                w[i+1] = 0
            else:
                w[i+1] = w[i]
                cash[i+1] = cash[i]
        elif flag == 0:
            if short_ma[i] > long_ma[i]:
                flag = 1
                w[i+1] = cash[i]/x + w[i]
                cash[i+1] = 0
            else:
                w[i+1] = w[i]
                cash[i+1] = cash[i]
        else:
            if short_ma[i] < long_ma[i]:
                flag = 0
                cash[i+1] = w[i]*x + cash[i]
                w[i+1] = 0
            else:
                w[i+1] = w[i]
                cash[i+1] = cash[i]

    long_ma[i+1] = X_train[len(X_train)-1]
    short_ma[i+1] = X_train[len(X_train)-1]

    dma_cv_strategy = [a*b for a,b in zip(w,X_train)]+ cash
    returns = log_returns(dma_cv_strategy)
    sharpe = sharpe_ratio(returns)
    return sharpe

### Grid Search

We then complete a grid search over various window lenghths for each strategy, and use the annualized sharp ratio for each one to compute the best window size.  

In [160]:
mr_sharpe = []
mr_max = 0
mr_optim = 0
for i in np.arange(15,100,20):
    strat,mr = mean_rev(i, X_train)
    mr_sharpe.append(mr)
    if mr > mr_max:
        mr_max = mr
        mr_optim = i
        
plt.plot(mr_sharpe)
plt.show()

tf_max = 0
tf_optim = 0
tf_sharpe = []
for i in np.arange(15,100,20):
    strat, tf = trend_f(i, X_train)
    tf_sharpe.append(tf)
    if tf > tf_max:
        tf_max = tf
        tf_optim = i
    
plt.plot(tf_sharpe)
plt.show()

dma_sharpe_max = 0
dma_long = 0
dma_short = 0
for i in np.arange(50,100, 20):
    for k in np.arange(5,25,5):
        dma = dual_cv(i, k, X_train)
        if dma > dma_sharpe_max:
            dma_long = i
            dma_short = k

In [161]:
print(dma_long)
print(dma_short)
print(tf_optim)
print(mr_optim)

90
20
15
95


## Strategy 3 : Dual moving average crossover

In this model we look to compare a longer-term moving average to a shorter-term moving average.  When there is a cross over between the moving averages, we either buy or sell.  Specifically, when the shorter term moving average goes above the longer term moving average, we buy.  When the shorter term moving average goes below the longer term moving average, we sell. 

In [162]:
def dual_mcv_sig(long, short, data):
    long_w = long
    short_w = short
    X_test=data
    flag = - 1
    sigPriceBuy = []
    sigPriceSell = []
    cumsum = [0]

    long_ma = np.zeros(np.shape(X_test))
    short_ma = np.zeros(np.shape(X_test))
    w = np.zeros(np.shape(X_test))
    cash = np.zeros(np.shape(X_test))

    cash[0] = 10000

    for i, x in enumerate(X_test[:-1], 0):
        cumsum.append(cumsum[i] + x)
        long_ma[i] = x
        short_ma[i] = x
        if i>=long_w:
            long_moving_ave = (cumsum[i] - cumsum[i-long_w])/(long_w)
            short_moving_ave = (cumsum[i] - cumsum[i-short_w])/(short_w)
            long_ma[i] = long_moving_ave
            short_ma[i] = short_moving_ave

        if flag == -1:
            if short_ma[i] > long_ma[i]:
                flag = 1
                w[i+1] = cash[i]/x + w[i]
                cash[i+1] = 0
                sigPriceSell.append(np.nan)
                sigPriceBuy.append(np.nan)
            elif short_ma[i] < long_ma[i]:
                flag = 0
                cash[i+1] = w[i]*x + cash[i]
                w[i+1] = 0
                sigPriceBuy.append(np.nan)
                sigPriceSell.append(np.nan)
            else:
                w[i+1] = w[i]
                cash[i+1] = cash[i]
                sigPriceBuy.append(np.nan)
                sigPriceSell.append(np.nan)
        elif flag == 0:
            if short_ma[i] > long_ma[i]:
                flag = 1
                w[i+1] = cash[i]/x + w[i]
                cash[i+1] = 0
                sigPriceSell.append(np.nan)
                sigPriceBuy.append(x)
            else:
                sigPriceBuy.append(np.nan)
                sigPriceSell.append(np.nan)
                w[i+1] = w[i]
                cash[i+1] = cash[i]
        else:
            if short_ma[i] < long_ma[i]:
                flag = 0
                cash[i+1] = w[i]*x + cash[i]
                w[i+1] = 0
                sigPriceBuy.append(np.nan)
                sigPriceSell.append(x)
            else:
                w[i+1] = w[i]
                cash[i+1] = cash[i]
                sigPriceBuy.append(np.nan)
                sigPriceSell.append(np.nan)


    long_ma[i+1] = X_test[len(X_test)-1]
    short_ma[i+1] = X_test[len(X_test)-1]
    sigPriceBuy.append(np.nan)
    sigPriceSell.append(np.nan)

    dma_cv_strategy = [a*b for a,b in zip(w,X_test)]+ cash

    return long_ma, short_ma, sigPriceBuy, sigPriceSell, dma_cv_strategy

## Returns for train and test

### Train

In [163]:
t = np.linspace(0, len(X_train), len(X_train))
mr_train, mr_train_sharpe = mean_rev(mr_optim, X_train)
tf_train, tf_train_sharpe = trend_f(tf_optim, X_train)
long_ma_train, short_ma_train, sigPriceBuy_train, sigPriceSell_train, dma_train = dual_mcv_sig(dma_long,dma_short,X_train)

In [164]:
returns_bh_train = log_returns(TV_bh[:1400])
returns_tf_train = log_returns(tf_train)
returns_mr_train = log_returns(mr_train)
returns_dma_train = log_returns(dma_train)

### Test

In [165]:
t = np.linspace(0, len(X_test), len(X_test))
mr_test, mr_test_sharpe = mean_rev(mr_optim, X_test)
tf_test, tf_test_sharpe = trend_f(tf_optim, X_test)
long_ma_test, short_ma_test, sigPriceBuy_test, sigPriceSell_test, dma_test = dual_mcv_sig(dma_long,dma_short,X_test)

In [166]:
returns_bh_test = log_returns(TV_bh[1400:])
returns_tf_test = log_returns(tf_test)
returns_mr_test = log_returns(mr_test)
returns_dma_test = log_returns(dma_test)

## Some plots

In [167]:

fig, axes = plt.subplots(4,2, figsize=(25.2,9.2), constrained_layout=True)
plt.rcParams['lines.linewidth'] = 1.5
axes[0][0].plot(returns_bh_train)
axes[0][0].set_ylabel("Buy and Hold")
axes[1][0].set_ylabel("Trend Following")
axes[2][0].set_ylabel("Mean Reversion")
axes[3][0].set_ylabel("Dual moving average")
axes[0][0].title.set_text("Training phase")
fig.suptitle("Log returns", fontsize=15)
axes[1][0].plot(returns_tf_train)
axes[2][0].plot(returns_mr_train)
axes[3][0].plot(returns_dma_train)
axes[3][0].set_xlabel("Days")
axes[3][1].set_xlabel("Days")
axes[0][1].title.set_text("Testing phase")
axes[0][1].plot(returns_bh_test)
axes[1][1].plot(returns_tf_test)
axes[2][1].plot(returns_mr_test)
axes[3][1].plot(returns_dma_test)

[<matplotlib.lines.Line2D at 0x1a31149a670>]

In [168]:
%matplotlib qt
# Train
plt.figure(figsize=(12.6,4.6))
t=np.linspace(0,len(X_train),len(X_train))
plt.plot(t, TV_bh[:1400], label="Buy and hold")
plt.plot(t, dma_train, label="Dual moving average crossover")
plt.plot(t, tf_train, label="Trend Following")
plt.plot(t, mr_train, label="Mean Reversion")
plt.xlabel("Days")
plt.ylabel("Total portfolio value ($)")
plt.title("Total portolio value movements over 1400 days (training phase)")
plt.legend()
plt.show()

# Test
plt.figure(figsize=(12.6,4.6))
t=np.linspace(0,len(X_test),len(X_test))
plt.plot(t, TV_bh[1400:], label="Buy and hold")
plt.plot(t, dma_test, label="Dual moving average crossover")
plt.plot(t, tf_test, label="Trend Following")
plt.plot(t, mr_test, label="Mean Reversion")
plt.xlabel("Days")
plt.ylabel("Total portfolio value ($)")
plt.title("Total portolio value movements over 600 days (testing phase)")
plt.legend()
plt.show()


In [196]:
%matplotlib qt
plt.figure(figsize=(12.6,4.6))
t=np.linspace(0,len(X_train),len(X_train))
plt.plot(t, X_train, alpha=0.5, label="Price", linewidth=1)
plt.plot(t, long_ma_train, alpha=0.5, label="MA_90", linewidth=1)
plt.plot(t, short_ma_train, alpha=0.5, label="MA_20", linewidth=1, color="blue")
plt.scatter(t,sigPriceBuy_train, marker = '^', label = 'Buy', color='green')
plt.scatter(t, sigPriceSell_train, marker = 'v', label = 'Sell',  color='red')
plt.xlabel("Days")
plt.ylabel("Price ($)")
plt.title("Dual moving average buy and sell points (training phase)")
plt.legend()
plt.show()

In [197]:
%matplotlib qt
t=np.linspace(0,len(X_test),len(X_test))
plt.figure(figsize=(12.6,4.6))
plt.plot(t, X_test, alpha=0.5, label="Price", linewidth=1)
plt.plot(t, long_ma_test, alpha=0.5, label="MA_90", linewidth=1)
plt.plot(t, short_ma_test, alpha=0.5, label="MA_20", linewidth=1, color="blue")
plt.scatter(t,sigPriceBuy_test, marker = '^', label = 'Buy', color='green')
plt.scatter(t, sigPriceSell_test, marker = 'v', label = 'Sell',  color='red')
plt.xlabel("Days")
plt.ylabel("Price ($)")
plt.title("Dual moving average buy and sell points (testing phase)")
plt.legend()
plt.show()

# Performance Indicators

I will compute the Sharpe Ratio, Value at Risk (VaR) and the conditional sharpe ratio for each of our strategies.  

### Sharpe Ratio

In [171]:
#Train

sharpe_bh_train = sharpe_ratio(returns_bh_train)
sharpe_tf_train = sharpe_ratio(returns_tf_train)
sharpe_mr_train = sharpe_ratio(returns_mr_train)
sharpe_dma_train = sharpe_ratio(returns_dma_train)

#Test

sharpe_bh_test = sharpe_ratio(returns_bh_test)
sharpe_tf_test = sharpe_ratio(returns_tf_test)
sharpe_mr_test = sharpe_ratio(returns_mr_test)
sharpe_dma_test = sharpe_ratio(returns_dma_test)


In [172]:
sharpe_train = [sharpe_bh_train, sharpe_tf_train, sharpe_mr_train, sharpe_dma_train]
sharpe_test = [sharpe_bh_test, sharpe_tf_test, sharpe_mr_test, sharpe_dma_test]
df = pd.DataFrame()
df['sharpe_train'] = sharpe_train
df['sharpe_test'] = sharpe_test
df['strategy'] = ["buy and hold", "trend following", "mean reversion", "dual moving avg"]

### VaR

In [173]:
def vaR(data):
    df = pd.DataFrame()
    df['returns'] = data
    df.sort_values('returns',inplace = True, ascending = True)
    VaR_95 = df['returns'].quantile(0.05)

    #plt.hist(df.returns, bins = 40)
    #plt.axvline(x=VaR_95,color='b')

    #plt.xlabel('Returns')
    #plt.ylabel('Frequency')
    #plt.s(how)


    return VaR_95

In [174]:
# Train
vaR_bh_train = vaR(returns_bh_train)
vaR_mr_train = vaR(returns_mr_train)
vaR_tf_train = vaR(returns_tf_train)
vaR_dma_train = vaR(returns_dma_train)

# Test 
vaR_bh_test = vaR(returns_bh_test)
vaR_mr_test = vaR(returns_mr_test)
vaR_tf_test = vaR(returns_tf_test)
vaR_dma_test = vaR(returns_dma_test)

vaR_train = [vaR_bh_train, vaR_tf_train, vaR_mr_train, vaR_dma_train]
vaR_test = [vaR_bh_test, vaR_tf_test, vaR_mr_test, vaR_dma_test]
df['vaR_train'] = vaR_train
df['vaR_test'] = vaR_test

### Excess Shortfall

In [175]:
def csr(data):
    data = np.array(data)
    avg_returns = data.mean()
    df = pd.DataFrame()
    df['returns'] = data
    df.sort_values('returns',inplace = True, ascending = True)
    VaR_95 = df['returns'].quantile(0.05)
    excess = df[df['returns'] <= VaR_95]
    excess_shortfall = excess.mean()
    conditional_sharp = avg_returns*np.sqrt(252)/excess_shortfall
    return conditional_sharp[0]

In [195]:
# Train

csr_bh_train = csr(returns_bh_train)
csr_tf_train = csr(returns_tf_train)
csr_mr_train = csr(returns_mr_train)
csr_dma_train = csr(returns_dma_train)

# Test

csr_bh_test = csr(returns_bh_test)
csr_tf_test = csr(returns_tf_test)
csr_mr_test = csr(returns_mr_test)
csr_dma_test = csr(returns_dma_test)

csr_train = [csr_bh_train, csr_tf_train, csr_mr_train, csr_dma_train]
csr_test = [csr_bh_test, csr_tf_test, csr_mr_test, csr_dma_test]
df['csr_train'] = csr_train
df['csr_test'] = csr_test
df['p_train'] = p_values_train
df['p_test'] = p_values_test
#df.set_index('strategy', inplace=True)
df.round(3)

Unnamed: 0_level_0,sharpe_train,sharpe_test,vaR_train,vaR_test,csr_train,csr_test,p_train,p_test
strategy,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
buy and hold,0.776,-2.392,-0.012,-0.016,-0.384,1.006,0.219,0.991
trend following,1.519,-0.585,-0.009,-0.01,-0.656,0.206,0.065,0.72
mean reversion,0.416,-1.778,-0.008,-0.015,-0.163,0.682,0.339,0.962
dual moving avg,0.287,-0.49,-0.01,-0.006,-0.12,0.163,0.387,0.688


# Statistical Tests

In [177]:
from scipy.stats import t
degrees_f = 251
p_values_train = []
p_values_test = []
for sh in sharpe_train:
    p = 1-t.cdf(sh,degrees_f)
    p_values_train.append(p)

for sh in sharpe_test:
    p = 1-t.cdf(sh,degrees_f)
    p_values_test.append(p)

In [178]:
print(p_values_train)
print(p_values_test)

[0.21917363078525898, 0.06499017992243872, 0.3388644944337851, 0.387062808106277]
[0.9912436382441551, 0.7204035198171459, 0.961676781837709, 0.6877605111177137]


In [191]:
df_p = pd.DataFrame()
#df_p['sharpe_train'] = sharpe_train
#df_p['sharpe_test'] = sharpe_test
df_p['strategy'] = ["buy and hold", "trend following", "mean reversion", "dual moving avg"]
df_p.set_index('strategy', inplace=True)
df_p['p_train'] = p_values_train
df_p['p_test'] = p_values_test
df_p.round(3)

Unnamed: 0_level_0,p_train,p_test
strategy,Unnamed: 1_level_1,Unnamed: 2_level_1
buy and hold,0.219,0.991
trend following,0.065,0.72
mean reversion,0.339,0.962
dual moving avg,0.387,0.688


### Family wise error rate

In [180]:
fw_p_train = [(1-(1-p)**3) for p in p_values_train]
fw_p_test = [(1-(1-p)**3) for p in p_values_test]

In [181]:
df_w = pd.DataFrame()
df_w['fwer_train'] = fw_p_train
df_w['fwer test'] = fw_p_test
df_w

Unnamed: 0,fwer_train,fwer test
0,0.523938,0.999999
1,0.182574,0.978143
2,0.711018,0.999944
3,0.769724,0.969559


To control for the family wise error rate, we use both the Bonferonni and Holm methods.  Give a brief explanation of this in the report

In [182]:
bonferonni_train = multipletests(p_values_train[1:], alpha=0.05, method='bonferroni', is_sorted=False, returnsorted=False)
bonferonni_test = multipletests(p_values_test[1:], alpha=0.05, method='bonferroni', is_sorted=False, returnsorted=False)

In [183]:
from statsmodels.stats.multitest import multipletests
holm_train = multipletests(p_values_train[1:], alpha=0.05, method='holm', is_sorted=False, returnsorted=False)
holm_test = multipletests(p_values_test[1:], alpha=0.05, method='holm', is_sorted=False, returnsorted=False)

In [194]:
adjusted_p = pd.DataFrame()
adjusted_p['holm_train'] = holm_train[1]
adjusted_p['holm_test'] = holm_test[1]
adjusted_p['bonferonni_train'] = bonferonni_train[1]
adjusted_p['bonferonni_test'] = bonferonni_test[1]
adjusted_p['strategy'] = ["trend following", "mean reversion", "dual moving avg"]
adjusted_p.set_index('strategy', inplace=True)

adjusted_p.round(2)

Unnamed: 0_level_0,holm_train,holm_test,bonferonni_train,bonferonni_test
strategy,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
trend following,0.19,1.0,0.19,1.0
mean reversion,0.68,1.0,1.0,1.0
dual moving avg,0.68,1.0,1.0,1.0
