### LSTM

In [1]:
import pandas as pd
import numpy as np
data = pd.read_csv('BTC-I.csv', parse_dates=['Date'], index_col=['Date'])
data

Unnamed: 0_level_0,Open,High,Low,Close,Adj Close,Volume,ema20,ema50,rsi,macd,signal,hist,Price-Up
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1
2014-12-24,334.385010,334.740997,321.356995,322.533997,322.533997,15092300,339.512870,360.579539,40.513754,-11.648774,-11.161816,-0.486958,0
2014-12-25,322.286011,322.670013,316.958008,319.007996,319.007996,9883640,337.560025,358.949282,39.167231,-11.833191,-11.296091,-0.537100,1
2014-12-26,319.152008,331.424011,316.627014,327.924011,327.924011,16410500,336.642310,357.732605,44.216090,-11.131576,-11.263188,0.131612,0
2014-12-27,327.583008,328.911011,312.630005,315.863007,315.863007,15185200,334.663328,356.090660,39.446692,-11.417154,-11.293981,-0.123173,1
2014-12-28,316.160004,320.028015,311.078003,317.239014,317.239014,11676600,333.003870,354.567066,40.238692,-11.401021,-11.315389,-0.085632,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...
2024-12-21,97756.195312,99507.101562,96426.523438,97224.726562,97224.726562,51765334294,99069.601344,92289.965450,48.952697,1965.864891,3012.484494,-1046.619603,0
2024-12-22,97218.320312,97360.265625,94202.187500,95104.937500,95104.937500,43147981314,98692.014312,92400.356511,44.934433,1437.643624,2697.516320,-1259.872696,0
2024-12-23,95099.390625,96416.210938,92403.132812,94686.242188,94686.242188,65239002919,98310.512205,92489.999087,44.163328,974.011455,2352.815347,-1378.803892,1
2024-12-24,94684.343750,99404.062500,93448.015625,98676.093750,98676.093750,47114953674,98345.329495,92732.591034,52.524176,917.945789,2065.841435,-1147.895647,1


In [4]:
features = ['Close', 'ema20', 'ema50', 'rsi', 'macd', 'signal', 'hist']
target = ['Price-Up']
# Separar X e y
X = data[features]
y = data[target].to_numpy()
y.shape, y

((3655, 1),
 array([[0],
        [1],
        [0],
        ...,
        [1],
        [1],
        [0]]))

In [7]:
from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler(feature_range=(-1, 1))
X_scaled = scaler.fit_transform(X) # Ya lo convierte en un array de numpy 
X_scaled.shape, X_scaled

((3655, 7),
 array([[-0.99727392, -0.9977697 , -0.99726331, ..., -0.16684507,
         -0.18506937, -0.05291906],
        [-0.99734047, -0.99780899, -0.99729847, ..., -0.16687555,
         -0.18509408, -0.05294709],
        [-0.99717219, -0.99782746, -0.9973247 , ..., -0.1667596 ,
         -0.18508803, -0.05257324],
        ...,
        [ 0.78380352,  0.9735338 ,  0.98926416, ..., -0.00395957,
          0.25001379, -0.82349381],
        [ 0.85911039,  0.97423437,  0.99449502, ..., -0.01322471,
          0.19719707, -0.69440008],
        [ 0.86980816,  0.97595435,  1.        , ..., -0.01474071,
          0.15460602, -0.57015242]]))

In [9]:
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(
    X_scaled, y, test_size=0.2, shuffle=False)

### Implementacion con Pytorch

In [12]:
# Convertimos los datos de Numpy a Tensores de Pytorch 
import torch
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

X_train_tensor.shape,  y_train_tensor.shape, X_test_tensor.shape, y_test_tensor.shape 

(torch.Size([2924, 7]),
 torch.Size([2924, 1]),
 torch.Size([731, 7]),
 torch.Size([731, 1]))

In [15]:
# Creamos Datasets y Dataloaders de Pytorch
from torch.utils.data import TensorDataset, DataLoader
train_dataset = TensorDataset(X_train_tensor, y_train_tensor) # Instanciamos la clase TendorDataSet 
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

batch_size = 32

train_loader = DataLoader(train_dataset, batch_size, shuffle=False) # Instanciamos la clase DataLoader
test_loader = DataLoader(test_dataset, batch_size, shuffle=False)

print(f'Cantidad: {len(train_loader)} ')
for batch_X, batch_y in train_loader:
    print("Batch X shape:", batch_X.shape)
    print("Batch y shape:", batch_y.shape)
      # Solo el primer batch
    break


Cantidad: 92 
Batch X shape: torch.Size([32, 7])
Batch y shape: torch.Size([32, 1])


In [18]:
import torch
import torch.nn as nn 

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print (device)

class MiLSTM (nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size ):
        super().__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.output_size = output_size
        # Definimos 1 capa LSTM y un Full Connected 
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True) #Ej [8, 64, 2, (batch_size, seq_length, hidden_size)]
        self.fc = nn.Linear(hidden_size, output_size)
    
    def forward(self, x):
        # Estados ocultos y celdas en 0 (num_layers, batch_size, hidden_size)
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device) # Ej [2, 32, 64]
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device) # Ej [2, 32, 64]
        x = x.unsqueeze(1)
        # Pasa la entrada por la capa LSTM 
        out, _ = self.lstm(x, (h0,c0)) #LLamo a la instancia de lstm (llamo a su metodo __call__ (no necesita nombre)) (batch_size, seq_length, hidden_size) [32, 30, 64]

        #Seleccionamos la ultima salida de la secuencia
        out = out[:,-1,:] # (batch_size , hidden_size) [32,64]

        #Pasa por la fc
        out = self.fc(out) # (batch_size, output_size) Ej [32,1]

        return out

# Parametros del modelo 
input_size = len(features) 
hidden_size = 32
num_layers  = 2
output_size = 1 # Clasificacion 

model = MiLSTM(input_size, hidden_size, num_layers, output_size).to(device)
print(model) 

cpu
MiLSTM(
  (lstm): LSTM(7, 32, num_layers=2, batch_first=True)
  (fc): Linear(in_features=32, out_features=1, bias=True)
)


In [19]:
# Definir la función de pérdida y el optimizador
criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Entrenamiento del modelo
num_epochs = 100 
model.train()
print(f'Entrenado en {device}')

for epoch in range(num_epochs):
    for batch_X, batch_y in train_loader:
        batch_X = batch_X.to(device) #[32, 30, 8]
        batch_y = batch_y.to(device) #[32, 1]

        outputs = model(batch_X).squeeze() # [batch_size, 1] => [batch_size] 

        loss = criterion(outputs, batch_y.squeeze())

        loss.backward()

        optimizer.step()

        optimizer.zero_grad()
    if (epoch + 1 ) % 10 == 0:
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')





Entrenado en cpu
Epoch [10/100], Loss: 0.7054
Epoch [20/100], Loss: 0.7028
Epoch [30/100], Loss: 0.7007
Epoch [40/100], Loss: 0.6976
Epoch [50/100], Loss: 0.6934
Epoch [60/100], Loss: 0.6924
Epoch [70/100], Loss: 0.6919
Epoch [80/100], Loss: 0.6915
Epoch [90/100], Loss: 0.6910
Epoch [100/100], Loss: 0.6902


In [20]:
from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score

model.eval()  # Modo de evaluación
y_true = []  # Etiquetas reales
y_pred = []  # Predicciones binarias
y_probs = []  # Probabilidades predichas

with torch.no_grad():
    for batch_X, batch_y in test_loader:
        batch_X = batch_X.to(device)
        batch_y = batch_y.to(device)

        outputs = model(batch_X).squeeze()   # Logits
        probs = torch.sigmoid(outputs)       # Probabilidades
        predicted = torch.round(probs)       # Predicciones binarias (0 o 1)

        # Guardar los valores reales y predichos
        y_true.extend(batch_y.squeeze().cpu().numpy())
        y_pred.extend(predicted.cpu().numpy())
        y_probs.extend(probs.cpu().numpy())

# Convertir listas a numpy arrays
y_true = np.array(y_true)
y_pred = np.array(y_pred)
y_probs = np.array(y_probs)

# Calcular métricas
accuracy = np.mean(y_pred == y_true) * 100
precision = precision_score(y_true, y_pred)
recall = recall_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred)
roc_auc = roc_auc_score(y_true, y_probs)

# Mostrar resultados
print(f'Accuracy: {accuracy:.2f}%')
print(f'Precision: {precision:.2f}')
print(f'Recall: {recall:.2f}')
print(f'F1-Score: {f1:.2f}')
print(f'ROC-AUC: {roc_auc:.2f}')


Accuracy: 51.30%
Precision: 0.53
Recall: 0.34
F1-Score: 0.42
ROC-AUC: 0.52


In [13]:
import joblib

# Guardar el modelo y el scaler
torch.save(model, 'model.pth')
joblib.dump(scaler, 'scaler.pkl')

['scaler.pkl']

In [36]:
def predict(input_data):
    # Normalizar los datos de entrada
    input_array = np.array(input_data).reshape(1, -1)
    input_data = scaler.transform(input_array)
    input_data = torch.tensor(input_data, dtype=torch.float32).to(device)
    # Realizar la predicción
    model.eval()
    with torch.no_grad():
        output = model(input_data)
        probability = torch.sigmoid(output).item() # Convertir logits a probabilidad
        prediction = 1 if probability >= 0.5 else 0  # Umbral de 0.5 para clasificación

    return prediction

In [47]:
# Ejemplo de datos
close = 15000
ema20 = 34800
ema50 = 34500
rsi = 70
macd = 10
signal = 11
hist = 2

# Predicción
result = predict([close, ema20, ema50, rsi, macd, signal, hist])
print(f"Predicción: {result}")  # Salida: 1 o 0


Predicción: 1


