In [1]:
import csv
import itertools
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split
from src.data.exercise_data import ExerciseDataset

# Load the data
X = np.load(r'C:\Users\barrt\PycharmProjects\Gymalyze\src\data\landmarks_data.npy', allow_pickle=True)
y = np.load(r'C:\Users\barrt\PycharmProjects\Gymalyze\src\data\labels_data.npy', allow_pickle=True)

# Process data
if y.ndim == 2 and y.shape[1] == 1:
    y = y.reshape(-1)

labels_to_keep = [0, 1, 4, 7, 9]
mask = np.isin(y, labels_to_keep)
X_filtered = X[mask]
y_filtered = y[mask]

label_mapping = {original_label: new_label for new_label, original_label in enumerate(labels_to_keep)}
y_mapped = np.array([label_mapping[label] for label in y_filtered], dtype=np.int64)

X_train, X_test, y_train, y_test = train_test_split(
    X_filtered, y_mapped, test_size=0.2, random_state=42, stratify=y_mapped)

# Prepare DataLoaders
batch_size = 32
train_dataset = ExerciseDataset(X_train, y_train)
test_dataset = ExerciseDataset(X_test, y_test)

# Define LSTM Pose Autoencoder
class LSTMPoseAutoencoder(nn.Module):
    def __init__(self, input_size, hidden_dim, latent_dim, num_layers):
        super(LSTMPoseAutoencoder, self).__init__()
        
        # Encoder LSTM
        self.encoder_lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_dim, num_layers=num_layers, batch_first=True)
        self.latent_layer = nn.Linear(hidden_dim, latent_dim)  # Compress hidden state to latent vector

        # Decoder LSTM
        self.latent_to_hidden = nn.Linear(latent_dim, hidden_dim)  # Expand latent vector to hidden state
        self.decoder_lstm = nn.LSTM(input_size=hidden_dim, hidden_size=hidden_dim, num_layers=num_layers, batch_first=True)
        self.output_layer = nn.Linear(hidden_dim, input_size)  # Map hidden state to input size

    def forward(self, x):
        batch_size, seq_length, _ = x.size()
        
        # Encode
        _, (hidden, _) = self.encoder_lstm(x)  # Encoder produces final hidden state
        latent_vector = self.latent_layer(hidden[-1])  # Compress to latent vector

        # Decode
        hidden_init = self.latent_to_hidden(latent_vector).unsqueeze(0).repeat(self.decoder_lstm.num_layers, 1, 1)
        cell_init = torch.zeros_like(hidden_init)  # Initialize cell state
        
        # Expand latent vector for decoding
        latent_repeated = self.latent_to_hidden(latent_vector).unsqueeze(1).repeat(1, seq_length, 1)  # Repeat latent for sequence

        # Decode the sequence
        decoded_seq, _ = self.decoder_lstm(latent_repeated, (hidden_init, cell_init))
        reconstructed_seq = self.output_layer(decoded_seq)  # Map to original input size
        
        return reconstructed_seq




# Hyperparameter tuning setup
csv_file = "lstm_autoencoder_tuning_results.csv"
header = ["hidden_size", "latent_size", "learning_rate", "batch_size", "test_loss", "epoch_losses"]

try:
    with open(csv_file, mode='x', newline='', encoding='utf-8') as file:
        writer = csv.DictWriter(file, fieldnames=header)
        writer.writeheader()
except FileExistsError:
    pass

def append_result_to_csv(result):
    with open(csv_file, mode='a', newline='', encoding='utf-8') as file:
        writer = csv.DictWriter(file, fieldnames=header)
        writer.writerow(result)

def config_exists_in_csv(hidden_size, latent_size, lr, batch_size):
    with open(csv_file, mode='r', newline='', encoding='utf-8') as file:
        reader = csv.DictReader(file)
        for row in reader:
            if (
                int(row["hidden_size"]) == hidden_size
                and int(row["latent_size"]) == latent_size
                and float(row["learning_rate"]) == lr
                and int(row["batch_size"]) == batch_size
            ):
                return True
    return False

In [2]:


hidden_sizes = [128, 256, 512, 1024]
latent_sizes = [32, 64, 128]
learning_rates = [0.001, 0.0005, 0.0001]
batch_sizes = [32, 64]
param_combinations = list(itertools.product(hidden_sizes, latent_sizes, learning_rates, batch_sizes))

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def train_autoencoder_with_losses(hidden_size, latent_size, lr, batch_size):
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    
    autoencoder = LSTMPoseAutoencoder(
        input_size=132,  # Match your input feature size
        hidden_dim=hidden_size,
        latent_dim=latent_size,
        num_layers=2
    ).to(device)
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(autoencoder.parameters(), lr=lr)
    epochs = 100
    epoch_losses = []

    autoencoder.train()
    for epoch in range(epochs):
        epoch_loss = 0
        for batch in train_loader:
            sequences, _ = batch
            sequences = sequences.to(device)
            optimizer.zero_grad()
            reconstruction = autoencoder(sequences)
            loss = criterion(reconstruction, sequences)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
        epoch_losses.append(epoch_loss / len(train_loader))
    
    autoencoder.eval()
    test_loss = 0
    with torch.no_grad():
        for batch in test_loader:
            sequences, _ = batch
            sequences = sequences.to(device)
            reconstruction = autoencoder(sequences)
            loss = criterion(reconstruction, sequences)
            test_loss += loss.item()
    test_loss /= len(test_loader)

    return test_loss, epoch_losses

for hidden_size, latent_size, lr, batch_size in param_combinations:
    if config_exists_in_csv(hidden_size, latent_size, lr, batch_size):
        print(f"Skipping already completed config: Hidden Size={hidden_size}, Latent Size={latent_size}, LR={lr}, Batch Size={batch_size}")
        continue

    print(f"Training with Hidden Size: {hidden_size}, Latent Size: {latent_size}, Learning Rate: {lr}, Batch Size: {batch_size}")
    test_loss, epoch_losses = train_autoencoder_with_losses(hidden_size, latent_size, lr, batch_size)
    print(f"Test Loss: {test_loss}")

    result = {
        "hidden_size": hidden_size,
        "latent_size": latent_size,
        "learning_rate": lr,
        "batch_size": batch_size,
        "test_loss": test_loss,
        "epoch_losses": epoch_losses,
    }
    append_result_to_csv(result)

print(f"Results saved incrementally to {csv_file}")


Skipping already completed config: Hidden Size=128, Latent Size=32, LR=0.001, Batch Size=32
Skipping already completed config: Hidden Size=128, Latent Size=32, LR=0.001, Batch Size=64
Skipping already completed config: Hidden Size=128, Latent Size=32, LR=0.0005, Batch Size=32
Skipping already completed config: Hidden Size=128, Latent Size=32, LR=0.0005, Batch Size=64
Skipping already completed config: Hidden Size=128, Latent Size=32, LR=0.0001, Batch Size=32
Skipping already completed config: Hidden Size=128, Latent Size=32, LR=0.0001, Batch Size=64
Skipping already completed config: Hidden Size=128, Latent Size=64, LR=0.001, Batch Size=32
Skipping already completed config: Hidden Size=128, Latent Size=64, LR=0.001, Batch Size=64
Skipping already completed config: Hidden Size=128, Latent Size=64, LR=0.0005, Batch Size=32
Skipping already completed config: Hidden Size=128, Latent Size=64, LR=0.0005, Batch Size=64
Skipping already completed config: Hidden Size=128, Latent Size=64, LR=0.0

KeyboardInterrupt: 