In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import numpy as np
from mamba_ssm import Mamba
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, TensorDataset

In [None]:
# Constants

DATA_LOAD_PATH = './data/processed_ETHUSDT_5m.csv'
MODEL_SAVE_PATH = 'models/mamba.pt'
EPOCHS = 100
BATCH_SIZE = 32
PRINT_INERTVAL = 10
WINDOW_SIZE = 10
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
# Load data

data = pd.read_csv(DATA_LOAD_PATH)
data.set_index('time', inplace=True)
data.sort_index(inplace=True)
data = data.astype(float)

In [None]:
# Split data

def split_data(x_data:pd.DataFrame, y_data:pd.DataFrame, window_size:int):
    x_data.drop('time', axis=1, inplace=True) if 'time' in data.columns else None
    X = []
    y = []
    for i in range(len(x_data) - window_size):
        X.append(x_data.iloc[i:i+window_size].values)
        y.append([1,0] if y_data.iloc[i+window_size-1, 0] == 0 else [0,1])
    return np.array(X), np.array(y)

X, y = split_data(x_data=data[data.columns[~data.columns.isin(['label'])]], y_data=data['label'], window_size=WINDOW_SIZE)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)


In [None]:
# Convert data to tensors

X_train = torch.from_numpy(X_train).float()
y_train = torch.from_numpy(y_train).float()
X_test = torch.from_numpy(X_test).float()
y_test = torch.from_numpy(y_test).float()

In [None]:
# Create dataloaders

train_dataset = TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_dataset = TensorDataset(X_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

In [None]:
# Create a model
class Mamba_cls(nn.Module):
    def __init__(self, d_model, d_state, d_conv, expand, length):
        super().__init__()
        self.mamba = Mamba(d_model, d_state, d_conv, expand)
        self.flatten = nn.Flatten()
        self.linear1 = nn.Linear(length * d_model, 32)
        self.linear2 = nn.Linear(32, 2)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.mamba(x)
        x = self.flatten(x)
        x = self.linear1(x)
        x = self.linear2(x)
        x = self.sigmoid(x)
        return x


In [None]:
# Create an instance of your model
batch, length, dim = BATCH_SIZE, WINDOW_SIZE, X_train.shape[2]
model = Mamba_cls(
    d_model=dim,
    d_state=dim,
    d_conv=4,
    expand=2,
    length=length
).to(DEVICE)

In [None]:
# Define your loss function
criterion = nn.BCELoss()

In [None]:
# Define your optimizer
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
# TEST output shape
for batch_idx, (x, y) in enumerate(train_loader):
    x, y = x.to(DEVICE), y.to(DEVICE)
    output = model(x)
    print(output.shape)
    break

In [None]:
# Training loop
history = []
for epoch in range(EPOCHS):
    # Set the model to training mode
    model.train()

    for batch_idx, (data, targets) in enumerate(train_loader):
        # Load data to GPU
        data = data.to(DEVICE)
        targets = targets.to(DEVICE)

        # Forward pass
        outputs = model(data) # [batch_size, 2]
        loss = criterion(outputs, targets)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    # Track the accuracy
    history.append(loss.item())
    print(f'Epoch [{epoch+1}/{EPOCHS}], Loss: {loss.item():.4f}')


In [None]:
# Plot history
import matplotlib.pyplot as plt
plt.plot(history)

In [None]:
# Predictions
model.eval()
with torch.no_grad():
    predictions = []
    loss_list = []
    for batch_idx, (data, targets) in enumerate(test_loader):
        data = data.to(DEVICE)
        targets = targets.to(DEVICE)
        outputs = model(data)
        predictions.append(outputs)
        loss = criterion(outputs, targets)
        loss_list.append(loss.item())
    loss = np.mean(loss_list)
    print(f'Loss: {loss:.4f}')

In [None]:
# Plot confusion matrix
from sklearn.metrics import confusion_matrix
y_pred = np.argmax(predictions, axis=1)
y_true = np.argmax(y_test, axis=1)
confusion_matrix(y_true, y_pred)

In [None]:
# Save the trained model
torch.save(model.state_dict(), 'model.pth')
