Importando libraries

In [None]:
import pandas as pd
import numpy as np
import talib
from funcoes_preco import organize_data, get_efficiency_ratio, get_stats
import dwx_query
from datetime import datetime, timedelta
from bisect import bisect_right
from scipy import stats

Dados de entrada

In [None]:
# files_dir = 'C:/Users/guicr/AppData/Roaming/MetaQuotes/Terminal/'\
#             'D0E8209F77C8CF37AD8BF550E51FF075/MQL5/Files/'
symbol = 'PETR4'
tf_1 = 'M5'
tf_2 = 'M5'
timeframes = [tf_1, tf_2]
strategy_name = f'cci_er_skew_strategy_{symbol}_{tf_1}_{tf_2}'
# start = datetime(2017, 1, 1).timestamp()
# end = datetime(2020, 12, 31).timestamp()

Gravando dados recebidos em um dicionário

In [None]:
data = {timeframe: organize_data(f'data/{symbol}_{timeframe}.csv') 
        for timeframe in timeframes}

Enviando os dados para dataframes

In [None]:
symbol_data = data.copy()

Copiando os dataframes para uma variável de manipulação

In [None]:
start = datetime(2015, 1, 1)
end = datetime(2021, 12, 31)
eqty_table = {timeframe: 
    symbol_data[timeframe].loc[(symbol_data[timeframe]['Date'] >= start) 
                               & (symbol_data[timeframe]['Date'] <= end)].copy()
    for timeframe in timeframes}
for timeframe in timeframes:
    eqty_table[timeframe].reset_index(inplace=True, drop=True)

Calculando o retorno barra a barra

In [None]:
pct_change = {timeframe: eqty_table[timeframe]['Close'].pct_change() 
              for timeframe in timeframes}
for timeframe in timeframes:
    eqty_table[timeframe]['Percent Change'] = pct_change[timeframe]

Triggers

In [None]:
cci_period = 20
cci = talib.CCI(eqty_table[tf_1]['High'], eqty_table[tf_1]['Low'], 
                eqty_table[tf_1]['Close'], timeperiod=cci_period)
eqty_table[tf_1]['CCI'] = cci

In [None]:
std_dev_period = 15
std_dev = get_stats(eqty_table[tf_1]['Close'], std_dev_period)['std']
eqty_table[tf_1]['Standard Deviation'] = std_dev

Filters

In [None]:
er_period = 8
efficiency_ratio = get_efficiency_ratio(eqty_table[tf_2]['Close'], 
                                        timeperiod=er_period)
eqty_table[tf_2]['Efficiency Ratio'] = efficiency_ratio['efficiency_ratio']


In [None]:
skew_period = 15
skewness = get_stats(eqty_table[tf_2]['Close'], skew_period)['skewness']
eqty_table[tf_2]['Skewness'] = skewness

Preparação do Dataframe para receber as informações de trade.
Integração entre o Timeframe maior com o menor.

Mudar o tipo da coluna 'Date' para datetime.

In [None]:
for timeframe in timeframes:
    eqty_table[timeframe]['Date'] = pd.to_datetime(eqty_table[timeframe]['Date'])

Colocando um marcador para identificar a correspondência entre o Timeframe 
maior e o menor.

In [None]:
lwr_timeframe_table = eqty_table[tf_1].copy()
hgr_timeframe_table = eqty_table[tf_2].copy()

In [None]:
index_higher_timeframe = []
for row_lwr_timeframe in lwr_timeframe_table.index:
    index = bisect_right(hgr_timeframe_table['Date'], 
                         lwr_timeframe_table.loc[row_lwr_timeframe, 'Date'])
    index_higher_timeframe.append(index-1)

In [None]:
eqty_table[tf_1]['Index Higher Timeframe'] = index_higher_timeframe

Configuração dos sinais de compra e venda.

In [None]:
open_position = False
position = []
pos_type = []

shift = 0
er_1 = 0.1
er_2 = 1

d_top = 0
d_bottom = 0

for row in eqty_table[tf_1].index:
    
    if row < cci_period:
        continue
    
    # # Verificação de Topo Duplo ou Fundo Duplo
    # if d_bottom == 0:
    #     if eqty_table[tf_1].loc[row-1, 'CCI'] < -100 and \
    #         eqty_table[tf_1].loc[row, 'CCI'] > -100:
    #             d_bottom += 1
                
    # if d_top == 0:
    #     if eqty_table[tf_1].loc[row-1, 'CCI'] > 100 and \
    #         eqty_table[tf_1].loc[row, 'CCI'] < 100:
    #             d_top += 1
                
    # if d_bottom > 0 and eqty_table[tf_1].loc[row, 'CCI'] > 0:
    #     d_bottom = 0
    
    # if d_top > 0 and eqty_table[tf_1].loc[row, 'CCI'] < 0:
    #     d_top = 0                

    # Sinal entrada de Compra
    elif not open_position and \
        d_bottom == 0 and \
        eqty_table[tf_1].loc[row-1, 'CCI'] < 0 and \
        eqty_table[tf_1].loc[row, 'CCI'] > 0 and \
        eqty_table[tf_2].loc[eqty_table[tf_1]['Index Higher Timeframe'][row]-shift, 
                                'Skewness'] < -0.5 and \
        eqty_table[tf_2].loc[eqty_table[tf_1]['Index Higher Timeframe'][row]-shift, 
                                'Skewness'] > -1 and \
        eqty_table[tf_2].loc[eqty_table[tf_1]['Index Higher Timeframe'][row]-shift,
                              'Efficiency Ratio'] < er_1 and \
        eqty_table[tf_2].loc[eqty_table[tf_1]['Index Higher Timeframe'][row]-1-shift,
                              'Efficiency Ratio'] < er_2:
            open_position = True
            pos_type = 'Buy'
            # Revisar como fazer arredondamento de 5 em 5
            stop_loss = eqty_table[tf_1].loc[row, 'Close'] - \
                (5 * np.ceil((2 * eqty_table[tf_1].loc[row, 'Standard Deviation']) / 5))
            std = eqty_table[tf_1].loc[row, 'Standard Deviation']
            position.append([row, pos_type, eqty_table[tf_1].loc[row+1, 'Open'],
                             std, stop_loss])
    
    # Sinal de Saída da Compra
    elif open_position and \
        pos_type == 'Buy' and \
        eqty_table[tf_1].loc[row-1, 'CCI'] < 100 and \
        eqty_table[tf_1].loc[row, 'CCI'] > 100:
            open_position = False
            pos_type = 'Exit Buy'
            position.append([row, pos_type, eqty_table[tf_1].loc[row, 'Close'],
                             np.nan, np.nan])
    
    elif open_position and \
        pos_type == 'Buy' and \
        eqty_table[tf_1].loc[row-1, 'CCI'] > -100 and \
        eqty_table[tf_1].loc[row, 'CCI'] < -100:
            open_position = False
            pos_type = 'Exit Buy'
            position.append([row, pos_type, eqty_table[tf_1].loc[row, 'Close'],
                             np.nan, np.nan])
    
    elif open_position and \
        eqty_table[tf_1].loc[row, 'Low'] <= stop_loss:
            open_position = False
            pos_type = 'Exit Buy'
            position.append([row, pos_type, stop_loss, np.nan, np.nan])
            
    # Sinal entrada de Venda
    elif not open_position and \
        d_top == 0 and \
        eqty_table[tf_1].loc[row-1, 'CCI'] > 0 and \
        eqty_table[tf_1].loc[row, 'CCI'] < 0 and \
        eqty_table[tf_2].loc[eqty_table[tf_1]['Index Higher Timeframe'][row]-shift, 
                                'Skewness'] > 0.5 and \
        eqty_table[tf_2].loc[eqty_table[tf_1]['Index Higher Timeframe'][row]-shift, 
                                'Skewness'] < 1 and \
        eqty_table[tf_2].loc[eqty_table[tf_1]['Index Higher Timeframe'][row]-shift,
                              'Efficiency Ratio'] < er_1 and \
        eqty_table[tf_2].loc[eqty_table[tf_1]['Index Higher Timeframe'][row]-1-shift,
                              'Efficiency Ratio'] < er_2:
            open_position = True
            pos_type = 'Sell'
            stop_loss = eqty_table[tf_1].loc[row, 'Close'] + \
                (5 * np.ceil((2 * eqty_table[tf_1].loc[row, 'Standard Deviation']) / 5))
            std = eqty_table[tf_1].loc[row, 'Standard Deviation']
            position.append([row, pos_type, eqty_table[tf_1].loc[row+1, 'Open'],
                             std, stop_loss])
            
    # Sinal de Saída da Venda
    elif open_position and \
        pos_type == 'Sell' and \
        eqty_table[tf_1].loc[row-1, 'CCI'] > -100 and \
        eqty_table[tf_1].loc[row, 'CCI'] < -100:
            open_position = False
            pos_type = 'Exit Sell'
            position.append([row, pos_type, eqty_table[tf_1].loc[row, 'Close'],
                             np.nan, np.nan])
    
    elif open_position and \
        pos_type == 'Sell' and \
        eqty_table[tf_1].loc[row-1, 'CCI'] < 100 and \
        eqty_table[tf_1].loc[row, 'CCI'] > 100:
            open_position = False
            pos_type = 'Exit Sell'
            position.append([row, pos_type, eqty_table[tf_1].loc[row, 'Close'],
                             np.nan, np.nan])
            
    elif open_position and \
        eqty_table[tf_1].loc[row, 'High'] >= stop_loss:
            open_position = False
            pos_type = 'Exit Sell'
            position.append([row, pos_type, stop_loss, np.nan, np.nan])
    
    # Cálculo da Máxima Exposição Favorável e Máxima Exposição Desfavorável
    

In [None]:
eqty_table[tf_1]

Cálculo do retorno.

In [None]:
df = pd.DataFrame(position, columns=['Index', 'Position Type', 'Position', 
                                     'Standard Deviation', 'Stop Loss'])

In [None]:
df

In [None]:
df2 = df.iloc[::2] 
df2.reset_index(inplace=True, drop=True)

In [None]:
df3 = df.iloc[1::2]
df3.reset_index(inplace=True, drop=True)
df3.rename(columns={'Position': 'Position Exit', 
                    'Position Type': 'Position Type Exit',
                    'Standard Deviation': 'Standard Deviation Exit',
                    'Stop Loss': 'Stop Loss Exit'}, inplace=True)

In [None]:
df = pd.concat([df2, df3], axis=1)
df

In [None]:
points = []
for row in df.index:
    if df.loc[row, 'Position Type'] == 'Buy':
        points = np.append(points, df.loc[row, 'Position Exit'] - df.loc[row, 'Position'])
    else:
        points = np.append(points, [df.loc[row, 'Position'] - df.loc[row, 'Position Exit']])

In [None]:
stds = df['Standard Deviation']

In [None]:
r = points / stds

In [None]:
r.dropna(inplace=True)

In [None]:
r

Estatísticas do retorno

In [None]:
last_value = 0
win_stk = 0
loss_stk = 0
pos_stk = 0
neg_stk = 0
max_sum_r = 0
sum_ant = 0
dd = []
for value in r:
    if value >= 0 and last_value >= 0:
        win_stk += 1
        if win_stk > pos_stk:
            pos_stk = win_stk
    elif value < 0 and last_value <= 0:
        loss_stk += 1
        if loss_stk > neg_stk:
            neg_stk = loss_stk
    else:
        win_stk = 1
        loss_stk = 1
    
    last_value = value
    
    sum_r = sum_ant + value
    sum_ant = sum_r
    if max_sum_r < sum_r:
        max_sum_r = sum_r
    dd.append(sum_r - max_sum_r)
    
print(pos_stk)
print(neg_stk)
print(abs(min(dd)))   

In [None]:
sum(r)

In [None]:
stats.describe(r)

In [None]:
z_scores = stats.zscore(r)
abs_z_scores = np.abs(z_scores)
filtered_r = r[abs_z_scores < 3]
stats.describe(filtered_r)

In [None]:
sum(filtered_r)

In [None]:
strategy_data = {
    'tf_1': tf_1,
    'tf_2': tf_2,
    'cci_period': cci_period,
    'std_dev_period': std_dev_period,
    'er_period': er_period,
    'skew_period': skew_period,
    'shift': shift,
    'er_1': er_1,
    'er_2': er_2,
}
filename = f'{strategy_name}_specs.txt'
with open(filename, 'w') as f:
    f.write(str(strategy_data))