In [1]:
import torch
import torch.nn as nn
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
import torch.optim as optim

In [2]:
res = np.load("data\mariel_beyond.npy")
motion_data=np.array(res)
# Scale the data using MinMaxScaler
scaler = MinMaxScaler()
scaled_data = scaler.fit_transform(motion_data.reshape(-1, 3)).reshape(motion_data.shape)

In [3]:
scaled_data.shape

(55, 6803, 3)

In [4]:
scaled_data=scaled_data.transpose(1, 0, 2).reshape(6803, -1)

In [5]:
scaled_data.shape

(6803, 165)

In [6]:
time_steps = 125
num_features = scaled_data.shape[1]

# Reshape data into smaller sequences
num_sequences = scaled_data.shape[0] // time_steps
scaled_data = scaled_data[:num_sequences*time_steps]  # truncate data to a size divisible by time_steps
data = scaled_data.reshape((num_sequences, time_steps, num_features))

# Split data into train and test sets
train_data, test_data = train_test_split(data, test_size=0.2, random_state=42)

In [8]:
data.shape

(54, 125, 165)

In [7]:
ninth_seq_data=data[11]
ninth_seq=ninth_seq_data.reshape(125, 55, 3).transpose(1, 0, 2)

In [8]:
ninth_seq.shape

(55, 125, 3)

In [9]:
np.save("ninth", ninth_seq)

In [35]:
class LSTMVAE(nn.Module):
    def __init__(self, input_size, hidden_size, latent_size):
        super(LSTMVAE, self).__init__()
        self.encoder_lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.encoder_mean = nn.Linear(hidden_size, latent_size)
        self.encoder_logvar = nn.Linear(hidden_size, latent_size)
        self.decoder_lstm = nn.LSTM(latent_size, hidden_size, batch_first=True)
        self.decoder_output = nn.Linear(hidden_size, input_size)

    def encode(self, x):
        _, (h_n, _) = self.encoder_lstm(x)
        mean = self.encoder_mean(h_n.squeeze(0))
        logvar = self.encoder_logvar(h_n.squeeze(0))
        return mean, logvar

    def reparameterize(self, mean, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mean + eps * std

    def decode(self, z):
        _, (h_n, _) = self.decoder_lstm(z.unsqueeze(1))
        output = self.decoder_output(h_n.squeeze(0))
        return output

    def forward(self, x):
        mean, logvar = self.encode(x)
        z = self.reparameterize(mean, logvar)
        reconstructed = self.decode(z)
        return reconstructed, mean, logvar

In [41]:
def loss_function(recon_x, x, mu, logvar):
    # Calculate the mean squared error loss only for non-padded elements
    mask = (x != 0).float()  # Assuming 0 is used for padding
    recon_x_trimmed = recon_x[:, :x.size(1), :]  # Trim recon_x to match the length of x
    recon_loss = nn.functional.mse_loss(recon_x_trimmed, x, reduction='none')  # Calculate loss element-wise
    recon_loss = torch.sum(recon_loss * mask) / torch.sum(mask)  # Take the average over non-padded elements
    kl_divergence = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return recon_loss + kl_divergence

In [37]:
def train(model, train_loader, criterion, optimizer, num_epochs):
    model.train()
    for epoch in range(num_epochs):
        total_loss = 0
        for batch in train_loader:
            optimizer.zero_grad()
            input_seq = batch[0].float()
            recon_batch, mu, logvar = model(input_seq)
            loss = criterion(recon_batch, input_seq, mu, logvar)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f"Epoch {epoch+1}, Loss: {total_loss}")

In [43]:
train_data.shape[-1]

165

In [42]:
batch_size = 8
train_loader = DataLoader(TensorDataset(torch.tensor(train_data).float()), batch_size=batch_size, shuffle=True)

# Initialize model, optimizer, and train
input_size = train_data.shape[-1]
hidden_size = 64
latent_size = 16
model = LSTMVAE(input_size, hidden_size, latent_size)
optimizer = optim.Adam(model.parameters(), lr=0.001)
num_epochs = 10
train(model, train_loader, loss_function, optimizer, num_epochs)

IndexError: too many indices for tensor of dimension 2

In [None]:
test_loader = DataLoader(TensorDataset(torch.tensor(test_data).float()), batch_size=1, shuffle=False)
def generate_new_data(model, test_loader):
    model.eval()
    generated_data = []
    with torch.no_grad():
        for batch in test_loader:
            input_seq = batch.float()
            recon_batch, _, _ = model(input_seq)
            generated_data.append(recon_batch.squeeze().numpy())
    return np.array(generated_data)

generated_data = generate_new_data(model, test_loader)