# Instalação de dependências

In [None]:
!pip install python-binance 

In [None]:
import notebook
notebook.__version__

# **utils.py**

In [None]:
import numpy as np
import pandas as pd
from binance.client import Client
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import precision_score, recall_score, f1_score
from keras.optimizers import RMSprop, Adam, SGD
import csv
from datetime import datetime
import time
import pickle



# Recupera os dados históricos da binance e retorna um dataframe
def retrieve_historical_data_binance(client, symbol, interval, limit):
    klines = client.get_klines(symbol=symbol, interval=interval, limit=limit)
    df = pd.DataFrame(klines, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume', 'close_time',
                                       'quote_asset_volume', 'number_of_trades', 'taker_buy_base_asset_volume',
                                       'taker_buy_quote_asset_volume', 'ignore'])
    df['close'] = df['close'].astype(float)
    return df


def get_klines_df(client, symbol, interval, total_periods=2000):
    klines_df = pd.DataFrame(columns=['timestamp', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_asset_volume', 'number_of_trades', 'taker_buy_base_asset_volume', 'taker_buy_quote_asset_volume', 'ignore'])

    periods_received = 0
    last_timestamp = None

    while periods_received < total_periods:
        klines = client.get_klines(
            symbol=symbol,
            interval=interval,
            limit=total_periods - periods_received,
            endTime=last_timestamp
        )

        if not klines:
            break

        for kline in klines:
            klines_df.loc[len(klines_df)] = kline

        periods_received += len(klines)

        last_timestamp = klines[0][0]

    # Convert timestamp column to datetime
    klines_df['timestamp_to_date'] = pd.to_datetime(klines_df['timestamp'], unit='ms')

    return klines_df.sort_values(by='timestamp')


# Calcula manualmente a métrica de mvrv_ratio
def retrieve_mvrv_ratio(df_binance):
    df_binance['mvrv_ratio'] = df_binance['close'].astype(float) / ((df_binance['close'].astype(float) + df_binance['high'].astype(float)+ df_binance['low'].astype(float) + df_binance['open'].astype(float)) / 4.0)
    return df_binance

# Retorna o datafrime da Binance com o mvrv calculado
def get_concatenate_klines_mvrv(client, symbol, interval, limit):
    df_binance_klines = get_klines_df(client, symbol, interval, limit)

    #from_date,to_date = timestamp_to_datestring(df_binance_klines['timestamp'].iloc[-0]), timestamp_to_datestring(df_binance_klines['timestamp'].iloc[-1])
    df_binance_klines['timestamp'] = df_binance_klines['timestamp'] // 1000
    df_binance_klines = retrieve_mvrv_ratio(df_binance_klines)
    return df_binance_klines

# Função para calcular os quartis em uma coluna de DataFrame
def calculate_quartiles(df, column_name):
    """
    Calcula os quartis (25%, 50%, 75%) em uma coluna de um DataFrame.

    :param df: DataFrame
    :param column_name: Nome da coluna
    :return: Um dicionário com os valores dos quartis
    """
    if column_name not in df.columns:
        raise ValueError(f"A coluna '{column_name}' não existe no DataFrame.")
    df[column_name] = df[column_name].astype('float64')
    values = df[column_name].values
    first_quartile = np.percentile(values, 25)
    second_quartile = np.percentile(values, 50)
    third_quartile = np.percentile(values, 75)

    return {
        'first_quartile': first_quartile,
        'second_quartile': second_quartile,
        'third_quartile': third_quartile
    }


# Função para adicionar colunas indicando quartis a um DataFrame
def add_quartile_columns(df, column_name):
    """
    Adiciona colunas ao DataFrame indicando se cada valor está no primeiro, segundo ou terceiro quartil.

    :param df: DataFrame
    :param column_name: Nome da coluna
    :return: O DataFrame com as novas colunas
    """
    quartiles = calculate_quartiles(df, column_name)
    first_quartile = quartiles['first_quartile']
    second_quartile = quartiles['second_quartile']
    third_quartile = quartiles['third_quartile']
    df[f'{column_name}_in_first_quartile'] = (df[column_name] <= first_quartile).astype(int)
    df[f'{column_name}_in_second_quartile'] = ((first_quartile < df[column_name]) & (df[column_name] <= second_quartile)).astype(int)
    df[f'{column_name}_in_third_quartile'] = ((second_quartile < df[column_name]) & (df[column_name] <= third_quartile)).astype(int)
    df.drop(column_name, axis=1)
    return df

# Função que transforma timestamp em data
def timestamp_to_datestring(timestamp):
    ts = timestamp/ 1000
    data = datetime.fromtimestamp(ts)
    return data.strftime('%Y-%m-%d')

# Função utilizada para normalizar uma coluna
def normalize_data(df, column_name):
  # Inicialize o MinMaxScaler
  scaler = MinMaxScaler()
  # Ajuste o scaler aos dados
  scaler.fit(df[[column_name]])

  # Aplique a transformação aos dados
  return scaler.transform(df[[column_name]])

def preprocess_manual(historical_data):
    # Adiciona a coluna que no dataset será o alvo
    historical_data = calcular_target_high(historical_data.copy())

    # Lista das colunas a serem normalizadas
    list_columns_normalize = [
        'volume', 'quote_asset_volume', 'number_of_trades',
        'taker_buy_base_asset_volume', 'taker_buy_quote_asset_volume',
        'open', 'high', 'low', 'close', 'mvrv_ratio'
    ]

    # Crie um dicionário para armazenar os scalers
    scalers = {}
    # Loop para normalizar cada coluna e armazenar o scaler correspondente
    for col in list_columns_normalize:
        scaler = MinMaxScaler()
        scaler.fit(historical_data[[col]])  # Ajusta o scaler apenas a uma coluna
        scalers[col] = scaler  # Armazena o scaler no dicionário

        # Aplica a transformação à coluna no DataFrame
        historical_data[[col]] = scaler.transform(historical_data[[col]])

    # Cria um DataFrame temporário com as colunas normalizadas e nomes modificados
    colunas_normalizadas = pd.DataFrame(
        {f'normalized_{col}': historical_data[col] for col in list_columns_normalize}
    )

    # Adiciona a coluna alvo ao DataFrame temporário
    colunas_normalizadas['target-high'] = historical_data['target-high']

    # Substitui as colunas originais em 'historical_data' pelas colunas normalizadas com nomes modificados
    historical_data = colunas_normalizadas

    # Retorna o DataFrame preprocessado e o dicionário de scalers
    return historical_data, scalers

def calcular_target_high(historical_data, num_periodos=4):
    max_highs = []

    for i in range(len(historical_data)):
        # calcula o valor máximo da coluna 'high' nos próximos períodos.
        max_high = historical_data['high'].iloc[i:i + num_periodos].max()
        max_highs.append(max_high)

    historical_data['target-high'] = max_highs
    historical_data_alvo = historical_data.copy()
    historical_data_alvo['target-high'] = historical_data_alvo['target-high'].astype('float64')
    return historical_data_alvo

# Lendo dados históricos e pré processamento

In [None]:
from google.colab import drive
from sklearn.model_selection import train_test_split

drive.mount('/content/drive')
df = pd.read_csv('/content/drive/MyDrive/1_experimento/historical_data_ETCUSDT_10k.csv')

###### PREPROCESSA OS DADOS
dataset,scaler = preprocess_manual(df)
COLUNAS_DATASET_COM_ALV0 = dataset.columns
COLUNAS_DATASET_SEM_ALVO = dataset.copy().drop('target-high', axis=1).columns

dataset = dataset.copy()

# Separando as variáveis independentes (X) e a variável alvo (y)
X = dataset.drop(['target-high'], axis=1)
y = dataset['target-high']

# Dividindo os dados em conjuntos de treinamento e teste
random_state = 15
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, random_state=random_state)

dataset

# Regressão Linear Multipla

# RNA Multilayer Perceptron Regressor


In [None]:
#RNA MLP REDE NEURAL MUTILAYER PERCEPTRON REGRESSOR (ALVO TARGET-HIGH)
from sklearn.neural_network import MLPRegressor
from sklearn import preprocessing
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.20, random_state=random_state)

RNA = MLPRegressor(
    hidden_layer_sizes=(3),
    tol=0.0000001,
    learning_rate_init=0.1,
    verbose=0,
    random_state=random_state,
    validation_fraction = 0.2,
    max_iter = 5000

)

RNA.fit(X_train, y_train)

# Fazendo previsões com os dados de teste
y_pred_RNA = RNA.predict(X_test)
r2_RNA = r2_score(y_test, y_pred_RNA)

# Regressão de Árvore de Decisão

In [None]:
#REGRESSÃO DE ARVORE DE DECISAO (ALVO TARGET-HIGH)
from sklearn.tree import DecisionTreeRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error

# Dividindo os dados em conjuntos de treinamento e teste
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=random_state)

# Criando e treinando o modelo de árvore de decisão
tree_reg = DecisionTreeRegressor(random_state=random_state)
tree_reg.fit(X_train, y_train)

# Fazendo previsões no conjunto de teste
y_pred_tree = tree_reg.predict(X_test)

# Calculando o erro médio quadrático (RMSE)
rmse_tree = np.sqrt(mean_squared_error(y_test, y_pred_tree))

# Regressão de Floresta Aleatória

In [None]:
#REGRESSÃO DE FLORESTA ALEATÓRIA (ALVO TARGET-HIGH)
from sklearn.ensemble import RandomForestRegressor

# Criando e treinando o modelo de floresta aleatória
rf_reg = RandomForestRegressor(random_state=random_state)
rf_reg.fit(X_train, y_train)

# Fazendo previsões no conjunto de teste
y_pred_rf = rf_reg.predict(X_test)

# Calculando o erro médio quadrático (RMSE)
rmse_rf = np.sqrt(mean_squared_error(y_test, y_pred_rf))


# Regressão de Support Vector Machine

In [None]:
#REGRESSÃO DE SUPPORT VECTOR MACHINE (ALVO TARGET-HIGH)
from sklearn.svm import SVR

# Criando e treinando o modelo SVM
svm_reg = SVR(kernel='linear')
svm_reg.fit(X_train, y_train)

# Fazendo previsões no conjunto de teste
y_pred_svm = svm_reg.predict(X_test)

# Calculando o erro médio quadrático (RMSE)
rmse_svm = np.sqrt(mean_squared_error(y_test, y_pred_svm))

print("Máquinas de Vetores de Suporte (SVM) - RMSE:", rmse_svm)


# KNN

In [None]:
# KNN
from sklearn.neighbors import KNeighborsRegressor

# Criando e treinando o modelo SVM
knn = KNeighborsRegressor(n_neighbors=4)  # Você pode ajustar o número de vizinhos (k) conforme necessário
knn.fit(X_train, y_train)

# Fazendo previsões no conjunto de teste
y_pred_knn = knn.predict(X_test)

# Calculando o erro médio quadrático (RMSE)
rmse_knn= np.sqrt(mean_squared_error(y_test, y_pred_svm))

print("K-Nearest Neighbors (KNN) - RMSE:", rmse_knn)

# Avaliação

In [None]:
#### AVALIAÇÃO
import numpy as np
from sklearn.model_selection import cross_val_score
from sklearn.metrics import r2_score
from sklearn.metrics import mean_absolute_error
from sklearn.metrics import make_scorer



DICIONARIO_MODELOS_REGRESSÃO = {'Regressão Linear':lr,'Árvore de decisão':tree_reg,'Floresta Aleatória':rf_reg,'K-Nearest Neighbors (KNN)':knn, 'Máquinas de Vetores de Suporte (SVM)':svm_reg, "RNA MLP": RNA}



# Função para calcular o ERRO MÉDIO ABSOLUTO
def mae(y_true, predictions):
    y_true, predictions = np.array(y_true).astype(float), np.array(predictions).astype(float)
    return np.mean(np.abs(y_true - predictions))

# Função para calcular as métricas e imprimir
def calculate_metrics(model, X, y):
    y_pred = model.predict(X)
    r2 = r2_score(y, y_pred)
    scores = cross_val_score(model, X, y, scoring='neg_mean_squared_error', cv=5)
    rmse_scores = np.sqrt(-scores)
    mae_score = mae(y, y_pred)
    return r2, mae_score, rmse_scores.mean()

# DICIONARIO_MODELOS_REGRESSÃO contém os modelos

best_model = None
best_mean_r2 = -float('inf')
best_mean_mae = float('inf')
best_mean_rmse = float('inf')

for model_name, model in DICIONARIO_MODELOS_REGRESSÃO.items():
    r2, mae_score, mean_rmse = calculate_metrics(model, X_test, y_test)
    print(f"[MODELO: {model_name}]")
    print(f'R² Score: {r2:.4f}')
    print(f'Erro Médio Absoluto (MAE): {mae_score:.4f}')
    print(f'Validação Cruzada (RMSE):')
    print(f'Média RMSE: {mean_rmse:.4f}')
    print("-" * 30)

    # Verifique se este modelo é o melhor até agora com base nas métricas
    if r2 > best_mean_r2 and mae_score < best_mean_mae and mean_rmse < best_mean_rmse:
        best_model = model_name
        best_mean_r2 = r2
        best_mean_mae = mae_score
        best_mean_rmse = mean_rmse

print(f"Melhor modelo: {best_model}")




In [None]:
import pandas as pd
from sklearn.model_selection import cross_val_score
from sklearn.metrics import r2_score
from sklearn.metrics import mean_absolute_error
import numpy as np

# Defina seus modelos de regressão
DICIONARIO_MODELOS_REGRESSÃO = {
    'Regressão Linear': lr,
    'Árvore de decisão': tree_reg,
    'Floresta Aleatória': rf_reg,
    'K-Nearest Neighbors (KNN)': knn,
    'Máquinas de Vetores de Suporte (SVM)': svm_reg,
    'RNA MLP REGRESSOR': RNA
}

# Crie listas vazias para armazenar os resultados
modelos = []
r2_scores = []
media_rmse_scores = []
mae_scores = []

# Função para formatar um número com vírgula como separador decimal
def formatar_com_virgula(numero):
    return f'{numero:.4f}'.replace('.', ',')

# Função para calcular a média RMSE a partir de scores
def media_rmse(scores):
    return formatar_com_virgula(np.sqrt(-scores).mean())

# Loop através dos modelos e colete os resultados
for model_name, model in DICIONARIO_MODELOS_REGRESSÃO.items():
    y_pred = model.predict(X_test)
    r2 = r2_score(y_test, y_pred)
    scores = cross_val_score(model, X_test, y_test, scoring='neg_mean_squared_error', cv=5)
    rmse_mean = media_rmse(scores)
    mae_score = formatar_com_virgula(mean_absolute_error(y_test, y_pred))

    modelos.append(model_name)
    r2_scores.append(formatar_com_virgula(r2 * 100))  # Convertendo para percentual
    media_rmse_scores.append(rmse_mean)
    mae_scores.append(mae_score)

# Crie um DataFrame com os resultados
resultados_df = pd.DataFrame({
    'Modelo': modelos,
    'R² Score (%)': r2_scores,
    'Média RMSE (Validação Cruzada)': media_rmse_scores,
    'Erro Médio Absoluto (MAE)': mae_scores

})

# Exiba a tabela de resultados
resultados_df
