In [1]:
import torch 
import torch.nn as nn
import numpy as np
import pandas as pd

In [None]:
torch.manual_seed(1)

lstm = nn.LSTM(3,3)
inputs = [torch.randn(1, 3) for _ in range(5)]
# this particular inputs batch is one sequence of length 5, with 3 features
inputs

[tensor([[-0.5525,  0.6355, -0.3968]]),
 tensor([[-0.6571, -1.6428,  0.9803]]),
 tensor([[-0.0421, -0.8206,  0.3133]]),
 tensor([[-1.1352,  0.3773, -0.2824]]),
 tensor([[-2.5667, -1.4303,  0.5009]])]

In [9]:
len(inputs)

5

In [12]:
batch_size = len(inputs)
print([len(x) for x in inputs])
seq_len = max(len(x) for x in inputs)
num_features = len(inputs[0][0])
print(batch_size,seq_len,num_features)
inputs = torch.cat(inputs).view(batch_size, seq_len, num_features)
# one input to train on at a time of training

[1, 1, 1, 1, 1]
5 1 3


TypeError: cat(): argument 'tensors' (position 1) must be tuple of Tensors, not Tensor

In [18]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.utils.tensorboard import SummaryWriter
import argparse
from pathlib import Path

class SequenceDataset(Dataset):
    def __init__(self, input_path, seq_length=40):
        self.inputs,self.targets = torch.load(input_path),
        self.seq_length = seq_length
        
    def __len__(self):
        return len(self.inputs) - self.seq_length + 1
    
    def __getitem__(self, idx):
        return (self.inputs[idx:idx + self.seq_length], 
                self.targets[idx + self.seq_length - 1])

class LSTMModel(nn.Module):
    def __init__(self, num_features, hidden_size, num_layers=1):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(
            input_size=num_features,
            hidden_size=hidden_size,
            num_layers=num_layers,
            
            batch_first=True
        )
        self.linear = nn.Linear(hidden_size, 1)
    
    def forward(self, x):
        lstm_out, _ = self.lstm(x)
        predictions = self.linear(lstm_out[:, -1, :])
        return predictions

def validate_model(model, val_loader, criterion, device):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for inputs, targets in val_loader:
            inputs = inputs.to(device).float()
            targets = targets.to(device).float()
            
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            total_loss += loss.item()
    
    return total_loss / len(val_loader)

def train_model(num_hid, optimizer_type, learning_rate, epochs, data_dir):
    # Device configuration
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    # Data loading
    data_dir = Path(data_dir)
    train_dataset = SequenceDataset(
        data_dir / "train_input.pt",
        data_dir / "train_output.pt"
    )
    val_dataset = SequenceDataset(
        data_dir / "val_input.pt",
        data_dir / "val_output.pt"
    )
    
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
    
    # Model initialization
    model = LSTMModel(num_features=4, hidden_size=num_hid).to(device)
    
    # Loss and optimizer
    criterion = nn.MSELoss()
    if optimizer_type.lower() == 'adam':
        optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    elif optimizer_type.lower() == 'sgd':
        optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)
    else:
        raise ValueError(f"Unsupported optimizer type: {optimizer_type}")
    
    # TensorBoard setup
    writer = SummaryWriter('runs/lstm_training')
    
    # Training loop
    for epoch in range(epochs):
        model.train()
        total_loss = 0
        
        for batch_idx, (inputs, targets) in enumerate(train_loader):
            inputs = inputs.to(device).float()
            targets = targets.to(device).float()
            
            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            
            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
        
        # Calculate average training loss
        avg_train_loss = total_loss / len(train_loader)
        
        # Validation every 100 epochs
        if (epoch + 1) % 100 == 0:
            val_loss = validate_model(model, val_loader, criterion, device)
            print(f'Epoch [{epoch+1}/{epochs}], '
                  f'Training Loss: {avg_train_loss:.4f}, '
                  f'Validation Loss: {val_loss:.4f}')
            
            # Log to TensorBoard
            writer.add_scalar('Training Loss', avg_train_loss, epoch)
            writer.add_scalar('Validation Loss', val_loss, epoch)
    
    # Save the model
    torch.save(model.state_dict(), 'lstm_model.pth')
    writer.close()

def main_test():
    parser = argparse.ArgumentParser(description='Train LSTM model')
    parser.add_argument('--num_hid', type=int, default=64,
                      help='number of hidden units')
    parser.add_argument('--optimizer', type=str, default='adam',
                      help='optimizer type (adam or sgd)')
    parser.add_argument('--lr', type=float, default=0.001,
                      help='learning rate')
    parser.add_argument('--epochs', type=int, default=1000,
                      help='number of epochs')
    parser.add_argument('--data_dir', type=str, default='./data',
                      help='directory containing the data files')
    
    args = parser.parse_args()
    
    train_model(
        num_hid=args.num_hid,
        optimizer_type=args.optimizer,
        learning_rate=args.lr,
        epochs=args.epochs,
        data_dir=args.data_dir
    )



In [19]:
train_model(
    num_hid=10,
    optimizer_type="adam",
    learning_rate=0.01,
    epochs=100,
    data_dir='../data'
)



  self.inputs,self.targets = torch.load(input_path),


ValueError: not enough values to unpack (expected 2, got 1)

In [2]:
import torch
import torch.nn as nn

In [7]:
class LSTMModel(nn.Module):
    def __init__(self, num_features, hidden_size, num_layers=1):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(
            input_size=num_features,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True
        )
        self.linear = nn.Linear(hidden_size, 2)
    
    def forward(self, x):
        lstm_out, _ = self.lstm(x)
        predictions = self.linear(lstm_out[:, -1, :])
        return predictions
    
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = LSTMModel(num_features=32, hidden_size=64)
model.load_state_dict(torch.load('../1257_1802.pth'))
model.eval()

# Create dummy input tensor
dummy_input = torch.randn([64, 40, 32])

# Export to ONNX
onnx_path = "../lstm_1257_1802.onnx"
torch.onnx.export(
    model,               # model being run
    dummy_input,        # model input (or a tuple for multiple inputs)
    onnx_path,          # where to save the model
    export_params=True, # store the trained parameter weights inside the model file
    opset_version=11,   # the ONNX version to export the model to
    do_constant_folding=True,  # whether to execute constant folding for optimization
    input_names=['input'],     # the model's input names
    output_names=['output'],   # the model's output names
    dynamic_axes={
        'input': {0: 'batch_size'},  # variable length axes
        'output': {0: 'batch_size'}
    }
)

  model.load_state_dict(torch.load('../1257_1802.pth'))


In [None]:
]