In [None]:
import yfinance as yf
import datetime as dt
import numpy as np
import matplotlib.pyplot as plt
import math
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, LSTM, Input

In [None]:
def collectCryptoData(ticker):
    startDate = dt.datetime(2020, 10, 20)
    endDate = dt.datetime(2024, 10, 20)

    data = yf.download(ticker, startDate, endDate)

    return data

In [None]:
def calculateMacdInfo(data):
    emaShort = data['Close'].ewm(span=12, adjust=False).mean()
    emaLong = data['Close'].ewm(span=26, adjust=False).mean()
    
    data['MacdLine'] = emaShort - emaLong
    
    data['SignalLine'] = data['MacdLine'].ewm(span=9, adjust=False).mean()

    data['Histogram'] = data['MacdLine'] - data['SignalLine']
    
    return data

def preprocessData(data):
    data = calculateMacdInfo(data)

    data = data[['Close', 'MacdLine', 'SignalLine', 'Histogram']].drop_duplicates().dropna()

    scaler = MinMaxScaler(feature_range=(0, 1))
    scaledData = scaler.fit_transform(data)

    windowSize = 120
    x, y = [], []
    
    for i in range(windowSize, len(data)):
        x.append(scaledData[i-windowSize:i])
        y.append(scaledData[i, 0])
    
    x, y = np.array(x), np.array(y)

    trainSize = int(len(x) * 0.7)
    valSize = int(len(x) * 0.15)
    testSize = len(x) - trainSize - valSize 
    
    xTrain, xVal, xTest = x[:trainSize], x[trainSize:trainSize + valSize], x[trainSize + valSize:]
    yTrain, yVal, yTest = y[:trainSize], y[trainSize:trainSize + valSize], y[trainSize + valSize:]

    return xTrain, xVal, xTest, yTrain, yVal, yTest, scaler


In [None]:
def buildLstmModel(xTrain):
    model = Sequential()
    model.add(Input(shape=(xTrain.shape[1], 4)))
    model.add(LSTM(units=150, return_sequences=True))
    model.add(Dropout(0.2))
    model.add(LSTM(units=100, return_sequences=True))
    model.add(Dropout(0.2))
    model.add(LSTM(units=50))
    model.add(Dropout(0.2))
    model.add(Dense(units=1))
    
    model.compile(optimizer='adam', loss='mean_squared_error')
    return model

def trainAndTest(ticker):
    data = collectCryptoData(ticker)
    
    xTrain, xVal, xTest, yTrain, yVal, yTest, scaler = preprocessData(data)

    model = buildLstmModel(xTrain)
    
    history = model.fit(xTrain, yTrain, epochs=100, batch_size=16, validation_data=(xVal, yVal))
    plotLoss(history)

    predictions = model.predict(xTest)
    plotResult(predictions, yTest, scaler, ticker)

def plotLoss(history):
    plt.figure(figsize=(12, 6))

    lossTraining = history.history['loss']
    lossValidation = history.history['val_loss']

    plt.subplot(1, 2, 1)
    plt.plot(lossTraining, label='Perda de Treinamento')
    plt.plot(lossValidation, label='Perda de Validação')
    plt.title('Perda durante o treinamento x validação')
    plt.xlabel('Épocas')
    plt.ylabel('Perda')
    plt.legend()

    plt.tight_layout()
    plt.show()

def calculateEvaluationMetrics(predictions, yTest):
    mae = mean_absolute_error(yTest, predictions)
    print(f"MAE: {mae}")
    
    mse = mean_squared_error(yTest, predictions)
    print(f"MSE: {mse}")

    rmse = math.sqrt(mse)
    print(f"RMSE: {rmse}")
    
def plotResult(predictions, yTest, scaler, ticker):
    predictions = scaler.inverse_transform(np.column_stack((predictions, np.zeros((predictions.shape[0], 3)))))
    yTestActual = scaler.inverse_transform(np.column_stack((yTest, np.zeros((yTest.shape[0], 3)))))

    calculateEvaluationMetrics(predictions, yTestActual)
    
    plt.figure(figsize=(14, 7))
    plt.plot(yTestActual[:, 0], color='blue', label='Preço Real')
    plt.plot(predictions[:, 0], color='red', label='Previsão')
    plt.title(f'Previsão de Preço de {ticker}')
    plt.xlabel('Tempo')
    plt.ylabel('Preço (USD)')
    plt.legend()
    plt.show()
    
trainAndTest('BTC-USD')
trainAndTest('ETH-USD')
trainAndTest('SOL-USD')
