In [37]:
from dataclasses import dataclass
import torch

@dataclass
class ModelArgs:
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    no_of_neurons = 128
    block_size = 32
    batch_size = 32
    dropout = 0.1
    epoch = 10
    max_lr = 1e-4
    embedding_dims = 1  # since we're using scalar sequences like sin(t)
    total_samples = 50000

In [38]:
import torch
from torch.utils.data import Dataset

class TimeSeriesDataset(Dataset):
    def __init__(self, data, seq_len):
        self.X = torch.stack([data[i:i+seq_len] for i in range(len(data) - seq_len)])
        self.y = data[seq_len:]
    
    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx].unsqueeze(-1), self.y[idx].unsqueeze(-1)  # shape: (seq_len, 1)


In [39]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class ForgetGate(nn.Module):
    """custom forget gate for LSTM"""
    def __init__(self):
        super().__init__()
        self.linear = nn.Linear(ModelArgs.no_of_neurons + ModelArgs.embedding_dims, ModelArgs.no_of_neurons)

    def forward(self, x, h_prev):
        xh = torch.cat([x,h_prev], dim=1)
        return torch.sigmoid(self.linear(xh))
    
class InputGate(nn.Module):
    """custom input gate for LSTM"""
    def __init__(self):
        super().__init__()
        self.i_linear = nn.Linear(ModelArgs.no_of_neurons + ModelArgs.embedding_dims, ModelArgs.no_of_neurons)
        self.c_linear = nn.Linear(ModelArgs.no_of_neurons + ModelArgs.embedding_dims, ModelArgs.no_of_neurons)

    def forward(self, x, h_prev):
        xh = torch.cat([x, h_prev], dim=1)
        i_t = torch.sigmoid(self.i_linear(xh))  # input gate
        c_tilde = torch.tanh(self.c_linear(xh))     # candidate cell state
        return i_t, c_tilde

class OutputGate(nn.Module):
    """custom output gate for LSTM"""
    def __init__(self):
        super().__init__()
        self.o_linear = nn.Linear(ModelArgs.no_of_neurons + ModelArgs.embedding_dims, ModelArgs.no_of_neurons)

    def forward(self, x, h_prev):
        xh = torch.cat([x, h_prev], dim=1)
        o_t = torch.sigmoid(self.o_linear(xh))  # output gate
        return o_t

In [40]:
class LSTMBlock(nn.Module):
    """custom LSTM Block"""
    def __init__(self):
        super().__init__()
        self.input_gate = InputGate()
        self.forget_gate = ForgetGate()
        self.output_gate = OutputGate()

    def forward(self, x):
        B,T,D = x.shape
        h_t = torch.zeros(B, ModelArgs.no_of_neurons, device=x.device)
        c_t = torch.zeros(B, ModelArgs.no_of_neurons, device=x.device)
        outputs = []

        for t in range(T):
            x_t = x[:, t, :]
            f_t = self.forget_gate(x_t, h_t)
            i_t, c_tilde = self.input_gate(x_t, h_t)
            o_t = self.output_gate(x_t, h_t)
            
            c_t = f_t * c_t + i_t * c_tilde  # update cell state
            h_t = o_t * torch.tanh(c_t)       # update hidden state
            outputs.append(h_t.unsqueeze(1))  # store hidden state for each time step
        return torch.cat(outputs, dim=1)  # concatenate hidden states
    


In [41]:
class LSTM(nn.Module):
    """custom LSTM model"""
    def __init__(self):
        """Initialize the LSTM model with a custom LSTM block and dropout."""
        super().__init__()
        self.lstm = LSTMBlock()
        self.dropout = nn.Dropout(ModelArgs.dropout)
        self.output = nn.Linear(ModelArgs.no_of_neurons, 1) #predict next value in sequence
    
    def forward(self, x):
        """Forward pass through the LSTM model."""
        x = x.to(ModelArgs.device)
        out = self.lstm(x)  # shape: (B, T, no_of_neurons)
        out = self.dropout(out[:,-1,:])  # apply dropout, take the last time step
        out = self.output(out)  # shape: (B, T, 1)
        return out

In [42]:
import torch
from torch.utils.data import DataLoader, Dataset
import torch.nn as nn
import torch.optim as optim
import mlflow
import mlflow.pytorch

# Synthetic Dataset (can be replaced)
class TimeSeriesDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# Generate simple data (sinusoidal or linear)
def generate_data():
    t = torch.linspace(0, 100, ModelArgs.total_samples + ModelArgs.block_size, device=ModelArgs.device)
    data = torch.sin(t)  # or t for linear
    X = torch.stack([data[i:i+ModelArgs.block_size] for i in range(ModelArgs.total_samples)])
    y = data[ModelArgs.block_size:]
    return X.unsqueeze(-1), y.unsqueeze(-1)

# Setup
X, y = generate_data()
train_size = int(0.8 * len(X))
X_train, y_train = X[:train_size], y[:train_size]
X_val, y_val = X[train_size:], y[train_size:]

train_loader = DataLoader(TimeSeriesDataset(X_train, y_train), batch_size=ModelArgs.batch_size, shuffle=True, drop_last=True)
val_loader = DataLoader(TimeSeriesDataset(X_val, y_val), batch_size=ModelArgs.batch_size, shuffle=False, drop_last=True)

# Model
model = LSTM().to(ModelArgs.device)
optimizer = torch.optim.AdamW(model.parameters(), lr=ModelArgs.max_lr)
criterion = nn.MSELoss()

mlflow.set_tracking_uri("file:./mlruns")
mlflow.set_experiment("Custom_LSTM_Experiment")
with mlflow.start_run(run_name="custom_lstm_run"):
    # Log hyperparameters
    mlflow.log_params({
        "no_of_neurons": ModelArgs.no_of_neurons,
        "block_size": ModelArgs.block_size,
        "batch_size": ModelArgs.batch_size,
        "dropout": ModelArgs.dropout,
        "epochs": ModelArgs.epoch,
        "learning_rate": ModelArgs.max_lr,
    })

    for epoch in range(ModelArgs.epoch):
        model.train()
        train_loss = 0.0
        for xb, yb in train_loader:
            y_pred = model(xb)
            loss = criterion(y_pred, yb)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            train_loss += loss.item()

        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for xb, yb in val_loader:
                y_pred = model(xb)
                loss = criterion(y_pred, yb)
                val_loss += loss.item()

        train_loss /= len(train_loader)
        val_loss /= len(val_loader)

        # Log metrics to MLflow
        mlflow.log_metrics({
            "Train Loss": train_loss,
            "Val Loss": val_loss
        }, step=epoch)

        print(f"Epoch {epoch:03d} | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f}")

    # Save the model
    mlflow.pytorch.log_model(model, "model")


Epoch 000 | Train Loss: 0.0367 | Val Loss: 0.0002
Epoch 001 | Train Loss: 0.0010 | Val Loss: 0.0003
Epoch 002 | Train Loss: 0.0009 | Val Loss: 0.0002
Epoch 003 | Train Loss: 0.0008 | Val Loss: 0.0002
Epoch 004 | Train Loss: 0.0007 | Val Loss: 0.0001
Epoch 005 | Train Loss: 0.0007 | Val Loss: 0.0001
Epoch 006 | Train Loss: 0.0006 | Val Loss: 0.0001
Epoch 007 | Train Loss: 0.0006 | Val Loss: 0.0001
Epoch 008 | Train Loss: 0.0006 | Val Loss: 0.0001




Epoch 009 | Train Loss: 0.0006 | Val Loss: 0.0000


