In [None]:
!export CUDA_LIB=/usr/local/cuda/lib64
!pip install omegaconf dacite xlstm yfinance ninja
import pandas as pd
import numpy as np
import torch
from google.colab import files
from torch import nn
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import MinMaxScaler
from omegaconf import OmegaConf
from dacite import from_dict, Config as DaciteConfig
from xlstm import xLSTMLMModel, xLSTMLMModelConfig
import yfinance as yf

In [None]:
if torch.cuda.is_available():
    print("CUDA is available!")
else:
    print("CUDA is not available. Check your installation.")

In [None]:
import shutil
import xlstm.blocks.slstm.src.cuda_init as cuda_init
import os

# Clear the cache
cache_dir = "/root/.cache/torch_extensions"
if os.path.exists(cache_dir):
    shutil.rmtree(cache_dir)



In [None]:
ticker = "AAPL"  # Remplacez par le ticker de l'action souhaitée
data = yf.download(ticker, period="max")

In [None]:
# Préparation des données pour l'entraînement
data = data[['Close']]  # Ne conserver que la colonne 'Close'
data = data.reset_index(drop=True)


In [None]:
scaler = MinMaxScaler(feature_range=(0, 1))
data = scaler.fit_transform(data)


In [None]:
class StockDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        x = torch.from_numpy(self.X[idx]).float()
        y = torch.from_numpy(self.y[idx]).float()
        return x, y

In [None]:
look_back = 60  # Nombre de valeurs passées à l'LSTM
X = []
y = []
for i in range(len(data) - look_back - 1):
    X.append(data[i:i + look_back])
    y.append(data[i + look_back])

X = np.array(X)
y = np.array(y)

In [None]:
class StockDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        x = torch.from_numpy(self.X[idx]).float()
        y = torch.from_numpy(self.y[idx]).float()
        return x, y

In [None]:

train_dataset = StockDataset(X, y)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)


In [None]:
xlstm_cfg = """
vocab_size: 128  # Adjust based on number of features
mlstm_block:
  mlstm:
    conv1d_kernel_size: 4
    qkv_proj_blocksize: 4
    num_heads: 4
slstm_block:
  slstm:
    backend: cuda  # Enable if using CUDA
    num_heads: 4
    conv1d_kernel_size: 4
    bias_init: powerlaw_blockdependent
  feedforward:
    proj_factor: 1.3
    act_fn: gelu
context_length: 360  # Update context length to match input size
num_blocks: 7
embedding_dim: 128
slstm_at: [1]
"""
cfg = OmegaConf.create(xlstm_cfg)
cfg = from_dict(data_class=xLSTMLMModelConfig, data=OmegaConf.to_container(cfg), config=DaciteConfig(strict=True))


In [None]:
model = xLSTMLMModel(cfg)

# Définition du critère de perte et de l'optimiseur
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)


In [None]:
for epoch in range(100):  # Adjust number of epochs
    running_loss = 0.0
    for i, (inputs, targets) in enumerate(train_loader):  # Use the DataLoader you created earlier
        # Move inputs and targets to the same device as the model
        inputs, targets = inputs.to(device), targets.to(device)

        # Make predictions
        # The model expects a 2D input (batch_size, sequence_length)
        inputs = inputs.view(inputs.size(0), -1).long()

        # Move the model to the same device as the data
        model = model.to(device) # Add this line

        outputs = model(inputs)

        # Select the last output for each sequence in the batch and the correct number of output features
        outputs = outputs[:, -1, :6]  # Select all 6 features of the last output

        loss = criterion(outputs, targets)

        # Update gradients
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Track loss
        running_loss += loss.item()
        if i % 100 == 0:
            print(f"[{epoch + 1}, {i + 1:4d}] loss: {running_loss / 100:.4f}")
            running_loss = 0.0

# Save trained model
torch.save(model.state_dict(), "model.pt")


In [None]:
import torch
from torch import nn
from xlstm import xLSTMLMModel, xLSTMLMModelConfig
import yfinance as yf
import numpy as np

# Load the trained model
model = xLSTMLMModel.from_pretrained("model.pt")  # Replace with your model path

# Define test data function (modify as needed)
def get_test_data(ticker, look_back):
    data = yf.download(ticker, period="max")  # Download latest data
    data = data[['Close']]
    data = data.dropna()
    data = scaler.transform(data)  # Assuming you have a scaler object from training
    X = data[-look_back:]
    return np.array([X])

# Define test function
def test_model(model, test_data, target):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    model.eval()  # Set model to evaluation mode

    with torch.no_grad():
        inputs = torch.from_numpy(test_data).float().to(device)
        outputs = model(inputs)

    predicted_price = scaler.inverse_transform(outputs.cpu().detach().numpy())[0][0]
    print(f"Predicted price for {target} closing: {predicted_price:.2f}")

# Example usage (replace with your desired ticker and look_back)
ticker = "AAPL"
look_back = 60
test_data = get_test_data(ticker, look_back)
target = "tomorrow"  # Adjust target date description

test_model(model, test_data, target)