In [2]:
import pandas as pd
import numpy as np
import math
from funcoes import *
from tqdm import tqdm
pd.options.mode.chained_assignment = None
#dados foram baixados de https://www.cryptodatadownload.com/data/binance/
#indicadores tecnicos de https://www.akademiabaru.com/doc/ARBMSV14_N1_P35_41.pdf
#indicadores são:  MACD, RSI, SO, MA, EMA, ROC e VT.

O objetivo do projeto é criar um modelo que seja capaz de realizar previsões sobre a variação do preço de um cryptoativo (bitcoin) em incrementos de tempo definidos, no caso, de minuto em minuto. Após extensa pesquisa, foi encontrado um artigo (link presente nos comentarios acima), que alegava ter conseguido uma boa acurácia utilizando uma regressão logística binomial. Portanto, partindo desse documento partimos para tentar implementar o modelo estátistico proposto pelo mesmo.

No documento acadêmico a técnica utilizada foi a de regressão logística binomial, de forma que a saída é 0 ou 1. 0 Indica que o valor do ativo irá cair no próximo incremento de tempo e 1 indica a subida no próximo incremento de tempo.

Começamos baixando um arquivo csv contendo colunas com valores de "open,high,low,close,Volume BTC,Volume USDT,tradecount" para cada minuto da criptomoeda bitcoin desde 2019 e calculamos os features e o target do modelo com esse dataframe.

Os features são indicadores técnicos, calculados para cada minuto (cada row do dataframe) e são funções de valores presentes do dataframe, para exemplificar, alguns são calculados com médias de preço dos últimos 12 incrementos de tempo, outros são calculados com o volume de trades no intervalo citado. 

Já o target, que é a variação do preço do ativo (positiva, ou negativa), é calculado pela subtração do preço de fechamento (close price) do ativo em dois tempos subsequentes, ou seja, sendo P(t) o close price do ativo, o target é igual ao "sinal" de P(t+1) - P(t), caso essa variação seja positiva o target é 1, caso seja negativa o target é 0.

Vale notar que alguns features são calculados com base em tempos anteriores, como por exemplo, o 12_SMA (Simple moving average de 12 incrementos de tempo) é obtido em função do close price de 12 incrementos de tempo anteriores e, portanto, para as primeiras 12 rows do dataframe esse feature será Nan. Assim sendo, é utilizado o df.dropna para eliminar as linhas nas quais alguns features ainda não estão definidos. Fora isso, não existem rows com valores nulos no df, deste modo, o uso do df.dropna não diminuirá o tamanho do dataframe de forma significativa e não afetará a acurácia do modelo.

Partindo para a lógica de treinamento e teste do modelo: foi definido um intervalo de 1000 incrementos de tempo de "lookback" para o modelo, isto é, toda regressão realizada teve seus coeficientes de regressão determinados à partir de 1000 incrementos de tempo anteriores (que equivalem à 1000 rows do dataframe). A título de exemplo, em um dataframe com 100000 dados, o modelo começa a operar na linha 1001. Ele determinará os coeficientes de regressão à partir de um fitting que utiliza os 1000 rows de features e targets anteriores e então, munido desses coeficientes ele insere os valores dos features da row em que se encontra e realiza uma estimativa. Ou seja, na primeira iteração (onde i = 1001) o modelo determina os coeficientes de regressão à partir das 1000 primeiras rows, e insere os valores da row 1001 na fórmula com os coeficientes de regressão encontrados e retorna 1 ou 0 para variação positiva ou negativa até o minuto seguinte ( que seria a row 1002).
  

Organizando dataframe de BTC para intervalos de 1 minuto

In [24]:

#número de rows escolhido foi de 100000, de forma a salvar tempo de processamento. Representa por volta de 69 dias.
nrows = 2000
df = pd.read_csv("Binance_BTCUSDT_minute.csv",skiprows=1,nrows=nrows) 

calculate_indicators(df) #chamando função que foi definida no arquivo funcoes e que tem a finalidade de calcular os features em cada row e inseri-los no proprio dataframe original.
lista_coeficientes = [] #lista que guarda os coeficientes de regressao, não é utilizada no modelo mas serve para analisar a importância de cada um.
df = df.dropna(axis=0,how="any") #retirando as linhas com valores null ou nan do dataframe, lembrando que essas serão as linhas onde alguns features ainda não possuem dados antecedentes suficientes para serem calculados.
x,y = get_x_y(df) #chamando função que foi definida na file funcoes e que tem a finalidade de retornar duas listas, x - lista que guarda uma lista dos features para cada row e y - lista que guarda o change (target) 0 ou 1 para cada row.


In [26]:
interval = 1000 #definicao do intervalo (também chamado de "lookback"), isto é, o número de rows que o modelo usará para determinar os coeficientes de regressão em cada iteração.

lista_resultados = [] #iniciando variavel lista para guardar as previsões realizadas pelo modelo.

#loop que parte do intervalo, no caso row 1000 do df, e, com passo = 1, vai iterar por todo o dataframe realizando previsões do target seguindo a lógica já citada.
for i in tqdm(range(interval,len(x))): 

    x_train,y_train = x[i-interval:i],y[i-interval:i] #selecionando o x e o y que vão ser usados no fit do modelo, para determinar os coeficientes de regressão da iteração. lembrando que estes serão sempre x[i-interval:i] e y[i-interval:i], ou seja, os ultimos 1000 valores de x e y sem contar a row atual - a row atual será usada como input para prever o target da iteração.
    x_test = x[i] # definindo os inputs que serão usados para calcular a probabilidade do preco da btc subir ou descer proximo minuto.
    
    coeficientes = calc_coefs(x_train,y_train) #chamando função que foi definida no arquivo funcoes; essa função recebe uma lista de listas (x_treino) e uma lista de targets e retorna os coeficientes de regressão.
    lista_coeficientes.append(coeficientes) #apendando os coeficientes de regressão à lista de coeficientes para poder avaliar a importância de cada feature posteriormente.

    lista_resultados.append(calcula_p(x_test,coeficientes)) #chamando função que foi definida no arquivo funcoes; essa função efetua a previsão da probabilidade do preço subir ou descer no minuto seguinte (target), para tal ela coloca os features da row em que se encontra (row=i) como inputs na função logística e os coeficientes da função logistica são os que foram calculados por meio do calc_coefs()
    
y = y[interval:] #retirando os primeiros 1000 elementos da lista de targets para poder comparar a lista das previsões que foram realizadas pelo modelo com a lista dos valores de fato.


acertos_btc_min = 0 #número que guardará os acertos

linha_decisao = 0.5 #todas as probabilidades que o modelo definiu como acima de 0.5 são classificadas como 1 e abaixo de 0.5 como 0, lembrar da imagem da função logística.

for i in range(0,len(lista_resultados)): #loop que compara as previsões realizadas pelo modelo com os valores verdadeiros para determinar a acurácia do sistema.
    if lista_resultados[i] >= linha_decisao and y[i] == 1:
        acertos_btc_min += 1
    if lista_resultados[i] < linha_decisao and y[i] == 0:
        acertos_btc_min += 1

print(acertos_btc_min/len(y)*100) #dividindo número de acertos pelo tamanho da array de targets e multiplicando o resultado por 100 para determinar acurácia em %


100%|██████████| 975/975 [00:17<00:00, 55.92it/s]

70.25641025641025





Percebe-se, finalmente, que a acurácia de 70.26% do sistema construído é muito próxima com a acurácia de 71.4% citada pelos acadêmicos e, portanto, o modelo pode ser considerado validado.

Avaliando as importâncias médias de cada feature

In [None]:


#transpondo a matriz de coeficientes
trans = list(map(list, zip(*lista_coeficientes)))

coefs_import = [math.exp(np.mean(trans[0])*np.mean(df.MACD)),math.exp(np.mean(trans[1])*np.mean(df.RSI)),math.exp(np.mean(trans[2])*np.mean(df.SO)),math.exp(np.mean(trans[3])*np.mean(df["12_SMA"])),math.exp(np.mean(trans[4])*np.mean(df["12_EMA"])),math.exp(np.mean(trans[5])*np.mean(df.ROC)),math.exp(np.mean(trans[6])*np.mean(df["tradecount"]))]

Teste Para intervalos de 1 hora

In [14]:
acuracias = []

df = pd.read_csv("Binance_BTCUSDT_1h.csv",skiprows=1)
calculate_indicators(df)
df = df.dropna(axis=0,how="any")
x,y = get_x_y(df)

In [23]:
interval = 1000 

acertos_btc_h = 0
lista_resultados = []
for i in tqdm(range(interval,len(x))):


    x_train,y_train = x[i-interval:i],y[i-interval:i]
    x_test = x[i]
    

    coeficientes = calc_coefs(x_train,y_train)

    lista_resultados.append(calcula_p(x_test,coeficientes))
    
y = y[interval:]
    
for i in range(0,len(lista_resultados)):
    if lista_resultados[i] >= 0.5 and y[i] == 1:
        acertos_btc_h += 1
    if lista_resultados[i] < 0.5 and y[i] == 0:
        acertos_btc_h += 1

print(acertos_btc_h/len(y)*100)

100%|██████████| 10455/10455 [02:42<00:00, 64.40it/s]

67.02056432329029





Testando para ETH (criptomoeda da blockchain ethereum)

In [14]:
acuracias = []
nrows = 1400
df = pd.read_csv("Binance_ETHUSDT_minute.csv",skiprows=1,nrows=nrows)
calculate_indicators(df)
df = df.dropna(axis=0,how="any")


In [17]:
interval = 1000 

lista_coeficientes = []

acertos_eth_min = 0
lista_resultados = []
for i in tqdm(range(interval,len(x))):


    x_train,y_train = x[i-interval:i],y[i-interval:i]
    x_test = x[i]
    

    coeficientes = calc_coefs(x_train,y_train)
    lista_coeficientes.append(coeficientes)

    lista_resultados.append(calcula_p(x_test,coeficientes))
    
y = y[interval:]
    
for i in range(0,len(lista_resultados)):
    if lista_resultados[i] >= 0.5 and y[i] == 1:
        acertos_eth_min += 1
    if lista_resultados[i] < 0.5 and y[i] == 0:
        acertos_eth_min += 1

print(acertos_eth_min/len(y)*100)




#


print(coefs_import)

100%|██████████| 375/375 [00:05<00:00, 64.15it/s]

73.86666666666667
2388.1173768931026





Teste de ETH para intervalos de 1 hora.

In [18]:
acuracias = []
nrows = 10000
df = pd.read_csv("Binance_ETHUSDT_1h.csv",skiprows=1,nrows=nrows)
calculate_indicators(df)
df = df.dropna(axis=0,how="any")
x,y = get_x_y(df)
print(len(df))

9415


In [16]:
interval = 1000 

acertos_eth_h = 0
lista_resultados = []
for i in tqdm(range(interval,len(x))):


    x_train,y_train = x[i-interval:i],y[i-interval:i]
    x_test = x[i]
    

    coeficientes = calc_coefs(x_train,y_train)

    lista_resultados.append(calcula_p(x_test,coeficientes))
    
y = y[interval:]
    
for i in range(0,len(lista_resultados)):
    if lista_resultados[i] >= 0.5 and y[i] == 1:
        acertos_eth_h += 1
    if lista_resultados[i] < 0.5 and y[i] == 0:
        acertos_eth_h += 1

print(acertos_eth_h/len(y)*100)

100%|██████████| 8415/8415 [02:35<00:00, 54.16it/s]

65.56149732620321



