In [1]:
import pandas as pd
import numpy as np
from scipy import stats
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import scipy
from tqdm import tqdm
from tabulate import tabulate
pd.options.display.expand_frame_repr = False

In [2]:
import warnings
warnings.filterwarnings("ignore")

In [3]:
data_close = pd.read_csv('../data_correct/Close_correct.csv', index_col='date')
data_close.index = data_close.index.astype('datetime64').rename('date')

data_open = pd.read_csv('../data_correct/Open_correct.csv', index_col='date')
data_open.index = data_open.index.astype('datetime64').rename('date')

data_low = pd.read_csv('../data_correct/Low_correct.csv', index_col='date')
data_low.index = data_low.index.astype('datetime64').rename('date')

data_high = pd.read_csv('../data_correct/High_correct.csv', index_col='date')
data_high.index = data_high.index.astype('datetime64').rename('date')

data_volume = pd.read_csv('../data/Volume.csv').T[1:]
data_volume.index = data_volume.index.astype('datetime64').rename('date')
data_volume = data_volume.astype('float64')


In [4]:
data_returns = pd.read_csv('../data/data_returns.csv', index_col='date')
data_returns.index = data_returns.index.astype('datetime64').rename('date')
data_returns.head()

Unnamed: 0_level_0,0,1,2,3,4,5,6,7,8,9,...,2426,2427,2428,2429,2430,2431,2432,2433,2434,2435
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2010-01-04,,,,,,,,,,,...,,,,,,,,,,
2010-01-05,0.031298,0.010467,-0.016575,0.004902,-0.010927,0.00673,-0.003376,0.004707,0.002931,0.010305,...,0.02599,-0.045205,0.010171,0.031328,0.006757,0.005876,0.025986,0.004815,-0.008101,0.00605
2010-01-06,-0.011794,0.002125,-0.016854,-0.019512,-0.035804,0.019457,0.013173,-0.030115,0.000421,0.000835,...,-0.009226,-0.005261,0.005343,0.015717,-0.067114,0.000982,0.009383,0.005857,0.002689,0.007314
2010-01-07,0.003445,-0.016257,0.002189,0.034826,0.06005,-0.009983,-0.015602,0.008711,0.02701,-0.019369,...,0.072129,0.040865,0.002197,-0.023955,0.05036,0.002931,-0.01343,0.015087,-0.030761,-0.004034
2010-01-08,0.019436,-0.003458,-0.006884,0.091346,-0.03237,0.003361,-0.011698,-0.017956,0.003479,0.019752,...,-0.017372,-0.018476,-0.002192,-0.001067,0.0,0.000991,0.006185,0.015593,0.013084,0.00729


In [5]:
# numpy
def neutralize(alpha):
    return alpha - np.mean(alpha)

In [6]:
def normalize(alpha):
    return alpha / np.sum(np.abs(alpha))

In [7]:
def truncate(alpha, max_weight, coef):
    signs = (alpha / np.abs(alpha))
    alpha[np.abs(alpha) > max_weight * coef] = max_weight * coef
    alpha = alpha * signs
    return alpha

In [8]:
# pandas
def get_returns(data):
    return (data / data.shift(1)) - 1

In [9]:
def get_rank(alpha):
    return scipy.stats.rankdata(alpha, method='ordinal') / (len(alpha) - 1)

In [10]:
def cut_outliers(alpha, coef=0.01):
    alpha[alpha < np.quantile(alpha, coef)] = 0
    alpha[alpha > np.quantile(alpha, 1 - coef)] = 0
    return alpha

In [11]:
def cut_middle(alpha, coef=0.01):
    alpha[(alpha < np.quantile(alpha, 0.5 + coef)) & (alpha > np.quantile(alpha, 0.5 - coef))] = 0
    return alpha

In [12]:
def turnover(alphas, option=0):
    turnover_days = abs(alphas.diff(periods=1)).sum(axis=1)
    if option == 1:
        return turnover_days
    return turnover_days.groupby(alphas.index.year).mean()

In [13]:
def get_sharpe_coef(days_pnl):
    ans = pd.DataFrame()
    ans['coef_sharpe'] = days_pnl.groupby(days_pnl.index.year).apply(lambda x : np.sqrt(len(x) - 1) * np.mean(x) / np.std(x))
    return ans

In [14]:
def get_drawdown_t_2(pnl_cum):
    pnl_cum_index =  pnl_cum.index
    pnl_cum = np.array(pnl_cum)
    max_drawdown = -1
    for i in range(0, len(pnl_cum)):
        for j in range(i + 1, len(pnl_cum)):
            if ((pnl_cum[i] - pnl_cum[j]) / pnl_cum[i] > max_drawdown) and (pnl_cum[i] - pnl_cum[j]) > 0:
                if (pnl_cum[i] - pnl_cum[j]) / pnl_cum[i] != np.inf:
                    max_drawdown = (pnl_cum[i] - pnl_cum[j]) / pnl_cum[i]
                    days_i = i
                    days_j = j
    return max_drawdown, pnl_cum_index[days_i].strftime("%d.%m.%Y"), pnl_cum_index[days_j].strftime("%d.%m.%Y")

In [15]:
def get_drawdown_years(pnl_cum):
    return pnl_cum.groupby(pnl_cum.index.year).apply(lambda x: get_drawdown_t_2(x))

In [16]:
def get_pnl(data_returns, alpha, option=0):
    data_returns = data_returns.iloc[2:]
    pnl = alpha.reset_index(drop=True).mul(data_returns.reset_index(drop=True), axis=0).sum(axis=1)
    ans = pd.DataFrame()
    ans['pnl'] = pnl
    ans = ans.set_index(alpha.index).shift(2)
    if option == 1:
        return ans.set_index(alpha.index).groupby(alpha.index.year).sum()
    return ans

##### Generate alpha and statistics

In [17]:
def alpha_stats(data_returns, alpha):
    days_pnl = get_pnl(data_returns, alpha)['pnl']
    returns_table = pd.DataFrame()
    returns_table['years_pnl_cum'] = get_pnl(data_returns, alpha, 1)
    returns_table['turnover'] = turnover(alpha)
    returns_table['sharpe_coef'] = get_sharpe_coef(days_pnl)
    returns_table['drawdown'] = get_drawdown_years(days_pnl.cumsum()).apply(lambda x: x[0])
    returns_table['drawdown_day_start'] = get_drawdown_years(days_pnl.cumsum()).apply(lambda x: x[1])
    returns_table['drawdown_day_end'] = get_drawdown_years(days_pnl.cumsum()).apply(lambda x: x[2])
    return returns_table

In [18]:
def generate_alphas(data_returns, algh_alpha, max_n=1, data_close=0, data_open=0, data_high=0, data_low=0, volume=0, cut_middle_coef=0, cut_outliers_coef=0, ranking=False, filename=0):
    with open(filename, "w") as external_file:
        for n in range(1, max_n + 1):
            text = 'n =', n, 'cut_middle_coef = 0,   cut_outliers_coef = 0,   ranking = False'
            print(text, file=external_file)
            print(alpha_stats(data_returns, algh_alpha(n, data_close, data_open, data_high, data_low, volume, cut_middle_coef=0, cut_outliers_coef=0, ranking=False)), file=external_file)
            print('\n', file=external_file)

        
        for n in range(1, max_n + 1):
            text = 'n =', n, 'cut_middle_coef = 0.01,   cut_outliers_coef = 0.01,   ranking = False'
            print(text, file=external_file)
            print(alpha_stats(data_returns, algh_alpha(n, data_close, data_open, data_high, data_low, volume, cut_middle_coef=0.01, cut_outliers_coef=0.01, ranking=False)), file=external_file)
            print('\n', file=external_file)


        for n in range(1, max_n + 1):
            text = 'n =', n, 'cut_middle_coef = 0.01,   cut_outliers_coef = 0.01,   ranking = True'
            print(text, file=external_file)
            print(alpha_stats(data_returns, algh_alpha(n, data_close, data_open, data_high, data_low, volume, cut_middle_coef=0.01, cut_outliers_coef=0.01, ranking=True)), file=external_file)
            print('\n', file=external_file)

#### Task 1    
Alpha: reversion
$$ kernel(d) = -\left({close(d) \over close(d - n))} - 1\right) $$
$$ return(d) = {close(d) \over close(d - 1)} - 1 $$
$$ pnl(d) = \alpha(d - 2) * return(d)$$


In [19]:
def algh_alpha_reversion(n=1, data_close=0, data_open=0, data_high=0, data_low=0, volume=0, cut_middle_coef=0, cut_outliers_coef=0, ranking=False):
    kernel = -(data_close / data_close.shift(n) - 1)
    if ranking == True:
        kernel = kernel.T.apply(lambda x: get_rank(x)).T
    alpha = alpha.T.apply(lambda x: cut_outliers(x, cut_outliers_coef)).apply(lambda x: cut_middle(x, cut_middle_coef)).apply(neutralize).apply(normalize).T
    return alpha

In [20]:
generate_alphas(data_returns, algh_alpha_reversion, data_close=data_close, max_n=6, cut_middle_coef=0.01, cut_outliers_coef=0.01, filename='stats_alpha_reversion.txt')

UnboundLocalError: local variable 'alpha' referenced before assignment

#### Task 2
$$ kernel (d) = {high(d) - low(d)} $$
$$ \alpha (d) = {high(d) - low(d) \over open(d)} $$


In [None]:
def algh_alpha_high_minus_low(n=1, data_close=0, data_open=0, data_high=0, data_low=0, volume=0, cut_middle_coef=0, cut_outliers_coef=0, ranking=False):
    kernel = (data_high - data_low) / data_open 
    if ranking ==True:
        kernel = kernel.T.apply(lambda x: get_rank(x)).T
    alpha = alpha.T.apply(lambda x: cut_outliers(x, cut_outliers_coef)).apply(lambda x: cut_middle(x, cut_middle_coef)).apply(neutralize).apply(normalize).T
    return alpha

In [None]:
generate_alphas(data_returns, algh_alpha_high_minus_low, data_open=data_open, data_high=data_high, data_low=data_low, cut_middle_coef=0.01, cut_outliers_coef=0.01, filename='stats_alpha_high_minus_low.txt')

####
$$ kernel (d) = {high(d) \over low(d)} $$
$$ \alpha (d) = kernel - 1 $$


In [None]:
def algh_alpha_high_over_low(n=1, data_close=0, data_open=0, data_high=0, data_low=0, volume=0, cut_middle_coef=0, cut_outliers_coef=0, ranking=False):
    kernel = data_high / data_low - 1
    if ranking ==True:
        kernel = kernel.T.apply(lambda x: get_rank(x)).T
    alpha = alpha.T.apply(lambda x: cut_outliers(x, cut_outliers_coef)).apply(lambda x: cut_middle(x, cut_middle_coef)).apply(neutralize).apply(normalize).T
    return alpha

In [None]:
generate_alphas(data_returns, algh_alpha_high_over_low, data_high=data_high, data_low=data_low, cut_middle_coef=0.01, cut_outliers_coef=0.01, filename='stats_alpha_high_over_low.txt')

#### Task 3

$$ kernel(d) = high(d) + low(d) - 2*close(d)$$ 
$$ alpha(d) = kernel(d) 

In [None]:
def algh_alpha_high_plus_low_minus_two_close(n=1, data_close=0, data_open=0, data_high=0, data_low=0, volume=0, cut_middle_coef=0, cut_outliers_coef=0, ranking=False):
    kernel = data_high + data_low - 2 * data_close
    if ranking ==True:
        kernel = kernel.T.apply(lambda x: get_rank(x)).T
    alpha = alpha.T.apply(lambda x: cut_outliers(x, cut_outliers_coef)).apply(lambda x: cut_middle(x, cut_middle_coef)).apply(neutralize).apply(normalize).T
    return alpha

In [None]:
generate_alphas(data_returns, algh_alpha_high_plus_low_minus_two_close, data_close=data_close, data_high=data_high, data_low=data_low, cut_middle_coef=0.01, cut_outliers_coef=0.01, filename='stats_high_plus_low_minus_two_close.txt')

$$ kernel(d) = {high(d) * low(d) \over close^2(d)} $$
$$ \alpha(d) = \ln(kernel)

In [None]:
def algh_alpha_high_mul_low_over_close2(n=1, data_close=0, data_open=0, data_high=0, data_low=0, volume=0, cut_middle_coef=0, cut_outliers_coef=0, ranking=False):
    kernel = np.log((data_high * data_low) / data_close ** 2)
    if ranking == True:
        kernel = kernel.T.apply(lambda x: get_rank(x)).T
    alpha = alpha.T.apply(lambda x: cut_outliers(x, cut_outliers_coef)).apply(lambda x: cut_middle(x, cut_middle_coef)).apply(neutralize).apply(normalize).T
    return alpha

In [None]:
generate_alphas(data_returns, algh_alpha_high_mul_low_over_close2, data_close=data_close, data_high=data_high, data_low=data_low, cut_middle_coef=0.01, cut_outliers_coef=0.01, filename='stats_alpha_high_mul_low_over_close2.txt')

$$ kernel(d) = {high(d) + low(d) \over 2} < close(d)?    1:-1 $$
$$ \alpha(d) = kernel(d) * \left({close(d) \over close(d - n)}\right)

In [None]:
def algh_high_plus_low_less_two_close(n=1, data_close=0, data_open=0, data_high=0, data_low=0, volume=0, cut_middle_coef=0, cut_outliers_coef=0, ranking=False):
    kernel = (data_high + data_low < 2 * data_close).replace({True: 1, False: -1})
    alpha = kernel * (data_close / data_close.shift(n) - 1)
    if ranking ==True:
        kernel = kernel.T.apply(lambda x: get_rank(x)).T
    alpha = alpha.T.apply(lambda x: cut_outliers(x, cut_outliers_coef)).apply(lambda x: cut_middle(x, cut_middle_coef)).apply(neutralize).apply(normalize).T
    return alpha

In [None]:
generate_alphas(data_returns, algh_high_plus_low_less_two_close, data_close=data_close, data_high=data_high, data_low=data_low, cut_middle_coef=0.01, cut_outliers_coef=0.01, filename='stats_high_plus_low_less_two_close.txt')

#### Task 4
$$ kernel(d) = {open(d) \over close(d)} $$
$$ \alpha(d) = kernel(d) - 1

In [None]:
def algh_open_over_close(n=1, data_close=0, data_open=0, data_high=0, data_low=0, volume=0, cut_middle_coef=0, cut_outliers_coef=0, ranking=False):
    kernel = data_open / data_close
    alpha = kernel - 1
    if ranking ==True:
        kernel = kernel.T.apply(lambda x: get_rank(x)).T
    alpha = alpha.T.apply(lambda x: cut_outliers(x, cut_outliers_coef)).apply(lambda x: cut_middle(x, cut_middle_coef)).apply(neutralize).apply(normalize).T
    return alpha

In [None]:
generate_alphas(data_returns, algh_open_over_close, data_close=data_close, data_open=data_open, cut_middle_coef=0.01, cut_outliers_coef=0.01, filename='stats_open_over_close.txt')

$$ kernel(d) = {high(d) - low(d) \over close(d)} $$
$$ \alpha(d) = {close(d) - open(d) \over kernel(d) * close(d)} + 0.001$$

In [None]:
def algh_high_minus_law_over_close(n=1, data_close=0, data_open=0, data_high=0, data_low=0, volume=0, cut_middle_coef=0, cut_outliers_coef=0, ranking=False):
    kernel = (data_low - data_high)
    alpha = (data_close - data_open) / (kernel + 0.001)
    if ranking ==True:
        kernel = kernel.T.apply(lambda x: get_rank(x)).T
    alpha = alpha.T.apply(lambda x: cut_outliers(x, cut_outliers_coef)).apply(lambda x: cut_middle(x, cut_middle_coef)).apply(neutralize).apply(normalize).T
    return alpha

In [None]:
generate_alphas(data_returns, algh_high_minus_law_over_close, data_close=data_close, data_open=data_open, data_high=data_high, data_low=data_low, cut_middle_coef=0.01, cut_outliers_coef=0.01, filename='stats_high_minus_law_over_close.txt')

#### Task 5

$$ kernel(d) = {close(d) - low(d) \over high(d) - low(d)}

In [None]:
def algh_close_minus_low_over_high_minus_low(n=1, data_close=0, data_open=0, data_high=0, data_low=0, volume=0, cut_middle_coef=0, cut_outliers_coef=0, ranking=False):
    kernel = (data_close - data_low) / (data_high - data_low)
    alpha = - kernel
    if ranking ==True:
        kernel = kernel.T.apply(lambda x: get_rank(x)).T
    alpha = alpha.T.apply(lambda x: cut_outliers(x, cut_outliers_coef)).apply(lambda x: cut_middle(x, cut_middle_coef)).apply(neutralize).apply(normalize).T
    return alpha

In [None]:
generate_alphas(data_returns, algh_close_minus_low_over_high_minus_low, data_close=data_close, data_open=data_open, data_high=data_high, data_low=data_low, cut_middle_coef=0.01, cut_outliers_coef=0.01, filename='stats_close_minus_low_over_high_minus_low.txt')

$$ kernel(d) = {close(d) - low(d) \over high(d) - low(d)} < 0.5 ? 1 :-1$$

In [22]:
def algh_close_minus_low_over_high_minus_low_less_half(n=1, data_close=0, data_open=0, data_high=0, data_low=0, volume=0, cut_middle_coef=0, cut_outliers_coef=0, ranking=False):
    kernel = ((data_close - data_low) / (data_high - data_low) < 0.5).replace({True: 1, False: -1})
    alpha = kernel * (data_close / data_close.shift(n) - 1)
    if ranking ==True:
        kernel = kernel.T.apply(lambda x: get_rank(x)).T
    alpha = alpha.T.apply(lambda x: cut_outliers(x, cut_outliers_coef)).apply(lambda x: cut_middle(x, cut_middle_coef)).apply(neutralize).apply(normalize).T
    return alpha

In [23]:
generate_alphas(data_returns, algh_close_minus_low_over_high_minus_low_less_half, data_close=data_close, data_open=data_open, data_high=data_high, data_low=data_low, cut_middle_coef=0.01, cut_outliers_coef=0.01, filename='stats_close_minus_low_over_high_minus_low_less_half.txt')