In [1]:
import torch 
from torch import nn 
import torch; torch.manual_seed(0)
import torch.nn as nn
import torch.nn.functional as F
import torch.utils
import torch.distributions
import torchvision
import numpy as np
import matplotlib.pyplot as plt; plt.rcParams['figure.dpi'] = 200
import pandas as pd
from os import listdir 
import tgm
import glob
from sklearn.model_selection import train_test_split
import torch.optim as optim
import torch.distributions as dist
from torch.utils.data import DataLoader, Dataset

In [2]:


# Define the path to the directory containing the mocap data files
data_dir = 'output'

# List all the mocap data files
data_files = glob.glob(data_dir + '/*.csv')

# Initialize empty lists to store the data
mocap_data = []
emotion_labels = []

tmps = listdir("output") # current directory

ems = []
for i in range(len(tmps)):
    tmp = tmps[i].split("_")
    if tmp[-3]== '4' or tmp[-3]== '5' or tmp[-3]== '80' or tmp[-3]== '9':
        tmp[-2]=0
    elif tmp[-3]== '10' or tmp[-3]== '13' or tmp[-3]== '18' or tmp[-3]== '58':
        tmp[-2]=1
    elif tmp[-3]== '30' or tmp[-3]== '31' or tmp[-3]== '34' or tmp[-3]== '36':
        tmp[-2]= 2
    else:
        tmp[-2] = 3
    ems.append(tmp[-2])

# Load the data from each file
for file in data_files:
    # Load the file and extract the coordinates
    file_data = np.genfromtxt(file, delimiter=' ')
    min_val = np.min(file_data)
    normalized_data = file_data - min_val
    range_val = np.max(file_data) - min_val
    normalized_data /= range_val
    file_data = normalized_data
    
    # Append the coordinates to the mocap_data list
    mocap_data.append(file_data)

# Pad the sequences with the last row
max_length = max([sequence.shape[0] for sequence in mocap_data])
padded_mocap_data = []
for sequence in mocap_data:
    num_frames_to_pad = max_length - sequence.shape[0]
    last_row = sequence[-1]  # Get the last row
    padded_sequence = np.concatenate((sequence, np.tile(last_row, (num_frames_to_pad, 1))), axis=0)
    padded_mocap_data.append(padded_sequence)

# Convert the emotion labels to a NumPy array
emotion_label = np.array(ems)


# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(padded_mocap_data, emotion_label, test_size=0.2, random_state=42)


In [56]:


X_train.shape   # [500,1538,99] [nb_files, nb_rows per each file, nb_col per each file] every element is a coordinate  x, y, z, of a marker 
y_train.shape    # [500]  500 numbers which represent anottations (labels)
X_test.shape      #[126,1538,99]   [nb_files, nb_rows per each file, nb_col per each file]
y_test.shape     # [126] 126 numbers which represent anottations (labels)


torch.Size([126])

In [3]:
def kl_divergence_loss(mu, logvar):
    kl_loss = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return kl_loss

def reconstruction_loss(input,target):
    recon_loss = nn.CrossEntropyLoss()(input,target)
    return recon_loss

def total_loss( mu, logvar,input,target):
    kl_loss = kl_divergence_loss(mu, logvar)
    recon_loss = reconstruction_loss(input,target)

    total_loss = 0.005 * kl_loss + 0.95* recon_loss
    return total_loss




In [4]:
class BatchFlatten(nn.Module):
    def __init__(self):
        super(BatchFlatten,self).__init__()
        self._name = 'batch_flatten'
    def forward(self, x):
        return x.view(x.shape[0],-1)

In [5]:
class ContinousRotReprDecoder(nn.Module):
    def __init__(self):
        super(ContinousRotReprDecoder, self).__init__()

    def forward(self, module_input):
        reshaped_input = module_input.view(-1, 3, 2)

        b1 = F.normalize(reshaped_input[:, :, 0], dim=1)

        dot_prod = torch.sum(b1 * reshaped_input[:, :, 1], dim=1, keepdim=True)
        b2 = F.normalize(reshaped_input[:, :, 1] - dot_prod * b1, dim=-1)
        b3 = torch.cross(b1, b2, dim=1)

        return torch.stack([b1, b2, b3], dim=-1)


In [6]:
class NormalDistDecoder(nn.Module):
    def __init__(self, num_feat_in, latentD):
        super(NormalDistDecoder, self).__init__()

        self.mu = nn.Linear(num_feat_in, latentD)
        self.logvar = nn.Linear(num_feat_in, latentD)

    def forward(self, Xout):
        mu_logvar =torch.distributions.normal.Normal(self.mu(Xout), F.softplus(self.logvar(Xout)))
        return mu_logvar
        


In [7]:
class VPoser(nn.Module):
    def __init__(self):
        super(VPoser, self).__init__()

        num_neurons, self.latentD = 512,512
        nb_frames = 1538

        self.num_joints = 33
        n_features = self.num_joints * 3 * nb_frames

        self.encoder_net = nn.Sequential(
            BatchFlatten(),
            nn.BatchNorm1d(n_features),
            nn.Linear(n_features, num_neurons),
            nn.LeakyReLU(),
            nn.BatchNorm1d(num_neurons),
            nn.Dropout(0.1),
            nn.Linear(num_neurons, num_neurons),
            nn.LeakyReLU(),
            nn.Linear(num_neurons, self.latentD),
            # NormalDistDecoder(num_neurons, self.latentD)
            
            
            
        )

        self.decoder_net = nn.Sequential(
            nn.Linear(self.latentD, num_neurons),
            nn.LeakyReLU(),
            nn.Dropout(0.1),
            nn.Linear(num_neurons, num_neurons),
            nn.LeakyReLU(),
            nn.Linear(num_neurons, self.num_joints * 6),
            ContinousRotReprDecoder(),
        )
        self.mu_net = nn.Linear(512, 512)
        self.logvar_net = nn.Linear(512,512)
        # self.decoder_net = NormalDistDecoder(self.latentD, 512)




    def encode(self, pose_body):
       
        # self.mu_net = nn.Linear(32, 512)
        # self.logvar_net = nn.Linear(32,512)
        q_z = self.encoder_net(pose_body)
        mu = self.mu_net(q_z)
        logvar = self.logvar_net(q_z)
        return q_z, mu, logvar

    def decode(self, Zin):
       
        bs = Zin.shape[0]
        # print(bs)

        prec = self.decoder_net(Zin)
        # print(prec.shape)

        return {
            # 'pose_body': matrot2aa(prec.view(-1, 3, 3)).view(bs, -1, 3),
            'pose_body_matrot': prec.view(bs,-1,3)
        }


    def forward(self, pose_body):    

        q_z, mu, logvar = self.encode(pose_body)
        # print(q_z.shape)
        # q_z_sample = q_z.rsample()
        q_z_sample = self.reparameterize(mu, logvar)

        decode_results = self.decode(q_z_sample)
        pose_body_decoded = decode_results['pose_body_matrot']  # Extract the decoded pose tensor

        # decode_results.update({'poZ_body_mean': q_z.mean, 'poZ_body_std': q_z.scale, 'q_z': q_z})
        return pose_body_decoded, mu , logvar

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std
    
    
    
    def sample_poses(self, num_poses, seed=None):
        np.random.seed(seed)

        some_weight = [a for a in self.parameters()][0]
        dtype = some_weight.dtype
        device = some_weight.device
        self.eval()
        with torch.no_grad():
            Zgen = torch.tensor(np.random.normal(0., 1., size=(num_poses, self.latentD)), dtype=dtype, device=device)

        return self.decode(Zgen)
    

In [8]:
# Convert the mocap data and emotion labels to NumPy arrays
X_train_array = np.array(X_train, dtype=np.float32)
y_train_array = np.array(y_train, dtype=np.compat.long)
X_test_array = np.array(X_test, dtype=np.float32)
y_test_array = np.array(y_test, dtype=np.compat.long)

# Convert NumPy arrays to PyTorch tensors
X_train_tensor = torch.tensor(X_train_array)
y_train_tensor = torch.tensor(y_train_array)
X_test_tensor = torch.tensor(X_test_array)
y_test_tensor = torch.tensor(y_test_array)

# Create a custom dataset class to load the data
class MocapDataset(Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

# Create DataLoader for training and testing sets
train_dataset = MocapDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=False)

test_dataset = MocapDataset(X_test_tensor, y_test_tensor)
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)


In [9]:


# Define the VPoser model
model = VPoser()
optimizer = optim.Adam(model.parameters(), lr=0.000001)
criterion = nn.CrossEntropyLoss()
# Training loop
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    for batch_data, batch_labels in train_loader:
        # print(batch_labels)
        optimizer.zero_grad()
        outputs, mu, logvar = model(batch_data)
        # print(outputs.shape)
       
        
        labels = batch_labels.repeat_interleave(3)
        # print(labels.shape)


        labels = labels.reshape(-1,3)

        # print(labels.shape)
        labels = labels.long()    
        # loss = criterion(outputs, labels)
        loss = total_loss(mu,logvar,outputs,labels)
        optimizer.step()

    # Print the loss at the end of each epoch
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {loss}")


Epoch 1/10, Loss: 9.54720687866211
Epoch 2/10, Loss: 9.412491798400879
Epoch 3/10, Loss: 9.466836929321289
Epoch 4/10, Loss: 9.492734909057617
Epoch 5/10, Loss: 9.519180297851562
Epoch 6/10, Loss: 9.517202377319336
Epoch 7/10, Loss: 9.519922256469727
Epoch 8/10, Loss: 9.526994705200195
Epoch 9/10, Loss: 9.54348373413086
Epoch 10/10, Loss: 9.454803466796875


In [12]:
model.eval()
test_loss = 0.0
num_samples = 0
for batch_data, batch_labels in test_loader:
    outputs, mu, logvar = model(batch_data)
    labels = batch_labels.repeat_interleave(3)
    labels = labels.reshape(-1,3)
    labels = labels.long()    
    # loss = criterion(outputs,labels)
    loss = total_loss(mu,logvar,outputs,labels)

print(f" Loss: {loss} ")




    


 Loss: 5.972689151763916 
