In [None]:
################### INITIALIZATION #####################
%reset -f
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from torch.utils.data import DataLoader, TensorDataset

# Set device to GPU if available, otherwise use CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

n=93000 #Number of datapoints
sf=2600 #scaling factor
torch.cuda.is_available()
print(torch.version.cuda)

################### DATA IMPORT #####################

df = pd.read_csv('PLL_speed.csv').iloc[:n]
print(df.head)

X_train_speed = df['predictors']  
X_train_for_plot = df['predictors']

plt.plot(X_train_speed)
df = pd.read_csv('ia.csv').iloc[:n]
print(df.head)

X_train_a = df['predictors']
X_train_for_plot = df['predictors']

plt.plot(X_train_a)
df = pd.read_csv('ib.csv').iloc[:n]
print(df.head)

X_train_b = df['predictors']  
X_train_for_plot = df['predictors']

plt.plot(X_train_b)
df = pd.read_csv('ic.csv').iloc[:n]
print(df.head)

X_train_c = df['predictors']  
X_train_for_plot = df['predictors']

plt.plot(X_train_c)

df = pd.read_csv('Vafa.csv').iloc[:n]
print(df.head)
X_train_Vafa = df['predictors']  
X_train_for_plot = df['predictors']

plt.plot(X_train_Vafa)
df = pd.read_csv('Vbet.csv').iloc[:n]
print(df.head)
X_train_Vbet = df['predictors']  
X_train_for_plot = df['predictors']

plt.plot(X_train_Vbet)
df = pd.read_csv('N_measure.csv').iloc[:n]
print(df.head)
y_train = df['ground_truth']/sf
plt.plot(y_train)

################### TENSOR CREATION #####################

# Convert input arrays to float tensors
X_train_speed = torch.tensor(X_train_speed, dtype=torch.float32)
X_train_a = torch.tensor(X_train_a, dtype=torch.float32)
X_train_b = torch.tensor(X_train_b, dtype=torch.float32)
X_train_c = torch.tensor(X_train_c, dtype=torch.float32)
X_train_Vafa = torch.tensor(X_train_Vafa, dtype=torch.float32)
X_train_Vbet = torch.tensor(X_train_Vbet, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)
X_train_for_plot = torch.tensor(X_train_for_plot, dtype=torch.float32)

print(X_train_a.shape)

# Stack all 5 features along a new last dimension to create (time_steps, 5)
X_combined = torch.stack([X_train_speed/sf, X_train_a/sf, X_train_b/sf, X_train_c/sf, X_train_Vafa/sf, X_train_Vbet/sf], dim=1)
# Now shape is: (total_time_steps, 5)

window_size = 200
# Create windows: (num_windows, window_size, 5)
X_windows = torch.stack([X_combined[i:i+window_size] for i in range(len(X_combined) - window_size)])

# Target values aligned with the end of each window
y_windows = y_train[window_size:]
X_train_for_plot = X_train_for_plot[window_size:]

# Print shapes to confirm
print(f"Shape of X_windows: {X_windows.shape}")  
print(f"Shape of y_windows: {y_windows.shape}")


Testlengt=5000 #Data left out, to use for validation

x_train = X_windows[:len(X_windows)-Testlengt]
y_train = y_windows[:len(y_windows)-Testlengt]

x_test = X_windows[-Testlengt:]

y_train = y_train.unsqueeze(1)
y_test = y_windows[-Testlengt:].unsqueeze(1)

print(f"Shape of y_train: {y_train.shape}")
print(f"Shape of y_test: {y_test.shape}")
print(f"Shape of x_train: {x_train.shape}")
print(f"Shape of x_test: {x_test.shape}")

################## NETWORK DEFINITION AND TRAINING #############################

HIDDEN_SIZE = 48  # Number of units in LSTM hidden layers
NUM_LAYERS = 1  # Number of LSTM layers
BATCH_SIZE = 49  # Number of samples per batch during training
EPOCHS = 93  # Number of training iterations over the dataset
LR = 0.001  # Learning rate for optimizer

# Create DataLoader for training (handles batching and shuffling)
train_loader = DataLoader(
    TensorDataset(x_train, y_train),
    batch_size=BATCH_SIZE,
    shuffle=True,
    pin_memory=True, # Faster transfer to CUDA
)



# Define the LSTM model class
class LSTMModel(nn.Module):
    def __init__(self, input_size=6, hidden_size=HIDDEN_SIZE, num_layers=NUM_LAYERS):
        super(LSTMModel, self).__init__()  # Initialize parent class
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)  # Define LSTM layer
        self.fc = nn.Linear(hidden_size, 1)  # Fully connected layer to produce final output

    def forward(self, x):
        out, _ = self.lstm(x)  # Forward pass through LSTM
        out = self.fc(out[:, -1, :])  # Take the last time step's output and pass it through FC layer
        return out  # Return predicted value

# Initialize the model and move it to the selected device
model = LSTMModel().to(device)
print(f"Model is on device: {next(model.parameters()).device}")
# Define the loss function (Mean Squared Error) and optimizer (Adam)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=LR)

# Training function
def train(model, train_loader, epochs):
    model.train()  # Set model to training mode
    for epoch in range(epochs):  # Loop through epochs
        total_loss = 0  # Initialize total loss for epoch
        for x_batch, y_batch in train_loader:  # Loop through batches
            x_batch, y_batch = x_batch.to(device), y_batch.to(device)  # Move data to device

            optimizer.zero_grad()  # Clear previous gradients
            y_pred = model(x_batch)  # Forward pass to get predictions
            loss = criterion(y_pred, y_batch)  # Compute loss
            loss.backward()  # Backpropagation to compute gradients
            optimizer.step()  # Update model weights using optimizer
            total_loss += loss.item()  # Accumulate loss

        # Print average loss per epoch
        print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss / len(train_loader):.9f}")

# Train the model
train(model, train_loader, EPOCHS)



########### VISUALLY EVALUATE THE MODEL ######################

model.eval()  # Set model to evaluation mode
with torch.no_grad():  # Disable gradient calculations
    x_test = x_test.to(device)#x_test.to(device)
    y_test = y_test.to(device)#y_test.to(device)  # Move test data to device
    y_pred = model(x_test).cpu().numpy()  # Get predictions and move them to CPU



plt.figure(figsize=(12, 6), dpi=500)  
plt.plot(y_test.cpu().numpy().squeeze(1), label="Actual", color="blue", linewidth=2)  
plt.plot(y_pred, label="Predicted", color="red", linestyle="dashed", linewidth=2)  
plt.legend(fontsize=12) 
plt.grid(True, linestyle="--", alpha=0.6)  
plt.title("LSTM Sine Wave Prediction", fontsize=14)  
plt.xlabel("Time Step", fontsize=12) 
plt.ylabel("Amplitude", fontsize=12) 
plt.show()  

################# SAVE PARAMETERS TO .mat FILE ######################

import scipy.io

lstm = model.lstm  

# Extract weights and biases

weight_ih = lstm.weight_ih_l0.cpu().detach().numpy()  # shape (4*hidden_size, input_size)
weight_hh = lstm.weight_hh_l0.cpu().detach().numpy()  # shape (4*hidden_size, hidden_size)
bias_ih = lstm.bias_ih_l0.cpu().detach().numpy()      # shape (4*hidden_size)
bias_hh = lstm.bias_hh_l0.cpu().detach().numpy()       # shape (4*hidden_size)

# Sum the input and hidden biases (PyTorch separates them)
bias = bias_ih + bias_hh

# Now split everything into the 4 gates
hidden_size = lstm.hidden_size

Wf = weight_hh[0:hidden_size, :]  # hidden -> forget gate
Wi = weight_hh[hidden_size:2*hidden_size, :]  # hidden -> input gate
Wc = weight_hh[2*hidden_size:3*hidden_size, :]  # hidden -> cell gate
Wo = weight_hh[3*hidden_size:4*hidden_size, :]  # hidden -> output gate

Uf = weight_ih[0:hidden_size, :]  # input -> forget gate
Ui = weight_ih[hidden_size:2*hidden_size, :]  # input -> input gate
Uc = weight_ih[2*hidden_size:3*hidden_size, :]  # input -> cell gate
Uo = weight_ih[3*hidden_size:4*hidden_size, :]  # input -> output gate

bf = bias[0:hidden_size]  # bias for forget gate
bi = bias[hidden_size:2*hidden_size]  # bias for input gate
bc = bias[2*hidden_size:3*hidden_size]  # bias for cell gate
bo = bias[3*hidden_size:4*hidden_size]  # bias for output gate

# (Optional) Linear layer afterward
Wout = model.fc.weight.cpu().detach().numpy()  # shape (output_size, hidden_size)
bout = model.fc.bias.cpu().detach().numpy()    # shape (output_size)

# Save everything into a .mat file
scipy.io.savemat('lstm_parameters4.mat', {
    'Wf': Wf,
    'Uf': Uf,
    'bf': bf,
    'Wi': Wi,
    'Ui': Ui,
    'bi': bi,
    'Wo': Wo,
    'Uo': Uo,
    'bo': bo,
    'Wc': Wc,
    'Uc': Uc,
    'bc': bc,
    'Wout': Wout,
    'bout': bout
})

print("Saved parameters to lstm_parameters4.mat ✅")


Using device: cuda


True