In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torch.utils.data import dataloader,dataset
import torch.optim as optim
import torchvision
from torchvision.utils import save_image
from torch.distributions.normal import Normal
from torchvision import datasets, models, transforms
from torchvision.transforms import ToTensor 
from torchsummary import summary

import pandas as pd
import math
import os
import os.path as osp
import numpy as np
import random
import matplotlib.pyplot as plt
from skimage.io import imread, imshow

import time
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')
import argparse
from IPython import display
import networkx as nx
import glob
import hashlib
import pickle
from tqdm import tqdm
import plyfile
import json
from plyfile import PlyData
import scipy.io as sio
import copy
import sys
import copy
import load_data as loader
from utils import Progbar
from loss import loss as Loss
import utils
from scipy.spatial import procrustes
from scipy.linalg import expm
from models.indiv_crossAttention import crossAttention

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

os.environ['CUDA_VISIBLE_DEVICES'] = '0'
torch.cuda.device_count()
torch.cuda.current_device()

In [None]:
T_obs                   = 10
T_pred                  = 25
T_total                 = T_obs + T_pred
batch_size              = 16
in_size                 = 90
out_size                = 45
stochastic_out_size     = out_size * 2
hidden_size             = 256
embed_size              = 64
global dropout_val
dropout_val             = 0.2
teacher_forcing_ratio   = 0.7
avg_n_path_eval         = 20
bst_n_path_eval         = 20
startpoint_mode         = "on"

class TrainConfig(object):
    """Training Configurations"""
    input_window_size = 50  # Input window size during training
    output_window_size = 10  # Output window size during training 
    
    hidden_size = 18  
    batch_size = 16 
    learning_rate = 0.001 
    max_epoch = 500
    training_size = 200 
    validation_size = 20 
    restore = False
    longterm = False
    context_window = 1 
    visualize = False
    model = 'cmatp'
    bone_dim = 3  # dimension of one bone representation, static in all datasets

    def __init__(self, dataset, datatype, action, gpu, training, visualize):
        self.device_ids = gpu  # index of GPU used to train the model
        self.train_model = training  # train or predict
        self.visualize = visualize 
        self.dataset = dataset
        self.datatype = datatype
        self.filename = action

#### CHOOSE DATASET

In [None]:
class DatasetChooser(object):
    
    def __init__(self, config):
        self.config = config
        self.dataset = config.dataset #Human

    def choose_dataset(self, train=True, prediction=False):
        if not prediction:
            if self.config.datatype == 'lie' or 'xyz' or 'xyzl' or 'xyzk':
                if self.dataset == 'Human':
                    bone_length_path = None
                    data = loader.HumanDataset(self.config)
                    self.config.input_size = data[0]['encoder_inputs'].shape[1]
        else:
            if self.config.datatype == 'lie' or 'xyz' or 'xyzl' or 'xyzk':
                if self.dataset == 'Human':
                    bone_length_path = None
                    data_loader = loader.HumanPredictionDataset(self.config)
                    data = data_loader.get_data()
                    self.config.input_size = data[0][list(data[0].keys())[0]].shape[2]
                    
        bone = np.array([[0., 0., 0.],
                         [132.95, 0., 0.],
                         [442.89, 0., 0.],
                         [454.21, 0., 0.],
                         [162.77, 0., 0.],
                         [75., 0., 0.],
                         [132.95, 0., 0.],
                         [442.89, 0., 0.],
                         [454.21, 0., 0.],
                         [162.77, 0., 0.],
                         [75., 0., 0.],
                         [0., 0., 0.],
                         [233.38, 0., 0.],
                         [257.08, 0., 0.],
                         [121.13, 0., 0.],
                         [115., 0., 0.],
                         [257.08, 0., 0.],
                         [151.03, 0., 0.],
                         [278.88, 0., 0.],
                         [251.73, 0., 0.],
                         [0., 0., 0.],
                         [100., 0., 0.],
                         [137.5, 0., 0.],
                         [0., 0., 0.],
                         [257.08, 0., 0.],
                         [151.03, 0., 0.],
                         [278.88, 0., 0.],
                         [251.73, 0., 0.],
                         [0., 0., 0.],
                         [100., 0., 0.],
                         [137.5, 0., 0.],
                         [0., 0., 0.]])

        return data, bone

    def __call__(self, train=True, prediction=False):
        return self.choose_dataset(train, prediction)

    def cal_bone_length(self, rawdata):
        njoints = rawdata.shape[1]
        bone = np.zeros([njoints, 3])
        
        if self.config.datatype == 'lie':
            for i in range(njoints):
                bone[i, 0] = round(rawdata[0, i, 3], 2)
            bone = bone[1:, :]
        elif self.config.datatype == 'xyz' or 'xyzl' or 'xyzk':
            for i in range(njoints):
                bone[i, 0] = round(np.linalg.norm(rawdata[0, i, :] - rawdata[0, i - 1, :]), 2)

        return bone

#### Data

In [None]:
class Args:
    gpu=[0]
    training=True
    action='all' #choose one action in the dataset:= ['directions', 'discussion', 'eating', 'greeting', 'phoning', 'posing', 'purchases', 'sitting',""'sittingdown', 'smoking', 'takingphoto', 'waiting', 'walking', 'walkingdog', 'walkingtogether']""'all means all of the above"
    dataset='Human'
    datatype='xyz'
    visualize=0
args=Args()

config = TrainConfig(args.dataset, args.datatype, args.action, args.gpu, args.training, args.visualize)

In [None]:
print('Loading Data')

choose = DatasetChooser(config)

print("TRAIN DATASET & TRAIN LOADER")
train_dataset, bone_length = choose(train=True)
train_loader = DataLoader(train_dataset, batch_size=config.batch_size, shuffle=True)

print("TEST DATASET & TEST LOADER")
test_dataset, _ = choose(train=False)
test_loader = DataLoader(test_dataset, batch_size=config.batch_size, shuffle=True)

print("PREDICTION DATASET")
prediction_dataset, bone_length = choose(prediction=True)
x_test, y_test, dec_in_test = prediction_dataset

print('Final Loading Data')

#### Kendall Process Data

In [None]:
class Args:
    gpu=[0] 
    training=True
    action='all' 
    dataset='Human'
    datatype='xyzk' #Kendall
    visualize=0
args=Args()

config = TrainConfig(args.dataset, args.datatype, args.action, args.gpu, args.training, args.visualize)

In [None]:
def inv_exp(X, Y):
        
    # Check if X and Y have more than one row
    if X.shape[0] <= 1 or Y.shape[0] <= 1:
        # Handle the case when matrices have one row
        pass
        return Y

    # Apply Procrustes to align Y sur X
    _, Y_aligned, _ = procrustes(X, Y)
    # Calculate the invExp matrix on aligned matrices after applying Cenetered Scaled function
    skeleton = X.dot(Y_aligned.T)
    tr = abs(skeleton.trace())
    if tr > 1:
        tr = 1
    teta_invexp = math.acos(tr)
    if math.sin(teta_invexp) < 0.0001:
        teta_invexp = 0.1

    invExp = (teta_invexp / math.sin(teta_invexp)) * (Y_aligned - (math.cos(teta_invexp)) * X)
    np_inv = np.array(invExp)

    return invExp

In [None]:
print('Loading Data')

choose = DatasetChooser(config)

print("TRAIN DATASET & TRAIN LOADER")
train_dataset, bone_length = choose(train=True)

ref_skel_train_encoder = copy.deepcopy(train_dataset[0]['encoder_inputs']) 
ref_skel_train_decoder_inputs = copy.deepcopy(train_dataset[0]['decoder_inputs'])
ref_skel_train_decoder_outputs = copy.deepcopy(train_dataset[0]['decoder_outputs'])

train_dataset_k = [
    {
        'encoder_inputs': inv_exp(ref_skel_train_encoder, sample['encoder_inputs']),
        'decoder_inputs': inv_exp(ref_skel_train_encoder, sample['decoder_inputs']),
        'decoder_outputs': inv_exp(ref_skel_train_decoder_outputs, sample['decoder_outputs'])
    }
    for sample in train_dataset
]

train_loader_k = DataLoader(train_dataset_k, batch_size=config.batch_size, shuffle=True)

print("TEST DATASET & TEST LOADER")
test_dataset, _ = choose(train=False)

ref_skel_test_encoder = copy.deepcopy(test_dataset[0]['encoder_inputs'])
ref_skel_test_decoder_inputs = copy.deepcopy(test_dataset[0]['decoder_inputs'])
ref_skel_test_decoder_outputs = copy.deepcopy(test_dataset[0]['decoder_outputs'])

test_dataset_k= [
    {
        'encoder_inputs': inv_exp(ref_skel_test_encoder, sample['encoder_inputs']),
        'decoder_inputs': inv_exp(ref_skel_test_encoder, sample['decoder_inputs']),
        'decoder_outputs': inv_exp(ref_skel_test_decoder_outputs, sample['decoder_outputs'])
    }
    for sample in test_dataset
]

test_loader_k = DataLoader(test_dataset_k, batch_size=config.batch_size, shuffle=True)

print('Final Loading Data')

#### LIE PROCESS DATA

In [None]:
class Args:
    gpu=[0]
    training=True
    action='all' 
    dataset='Human'
    datatype='xyzl' #lie process
    visualize=0
args=Args()

config = TrainConfig(args.dataset, args.datatype, args.action, args.gpu, args.training, args.visualize)

In [None]:
def calculate_global_transformation(skeleton, ref_skeleton):
    # Calculate global rotation
    rotation_matrix = np.dot(skeleton, np.transpose(ref_skeleton))
    u, s, v = np.linalg.svd(rotation_matrix, full_matrices=False)
    rotation_matrix = np.dot(v.T, u.T)
    # Calculate global translation
    translation_vector = np.mean(ref_skeleton, axis=1) - np.dot(rotation_matrix, np.mean(skeleton, axis=1))
    return rotation_matrix, translation_vector

def to_SE3(rotation_matrix, translation_vector):
    se3_matrix = np.eye(4)
    se3_matrix[:3, :3] = rotation_matrix[:3, :3]  # Take the top-left 3x3 block
    se3_matrix[:3, 3] = translation_vector[:3]  # Take the first 3 elements
    return se3_matrix

def extract_point_in_SE3(se3_matrix):
    return se3_matrix

def derive_tangent_space(rotation_matrix, translation_vector):
    # Ensure rotation_matrix is 3x3
    rotation_matrix = rotation_matrix[:3, :3]
    # Create a 3x3 identity matrix
    identity_matrix = np.eye(3)
    # Ensure translation_vector is a column vector
    translation_vector = translation_vector[:3].reshape(-1, 1)
    # Calculate the skew-symmetric matrix directly
    skew_symmetric_matrix = rotation_matrix - identity_matrix
    skew_symmetric_matrix_flat = skew_symmetric_matrix.flatten()
    tangent_space = np.zeros((4, 4))
    tangent_space[:3, :3] = rotation_matrix
    tangent_space[:3, 3] = translation_vector.flatten()
    tangent_space[3, :3] = skew_symmetric_matrix_flat[:3] 

    return tangent_space

In [None]:
def lie_group_and_algebra_transform_s(skeleton, ref_skeleton):
        
    skeleton = skeleton.cpu().numpy() if isinstance(skeleton, torch.Tensor) else skeleton
    ref_skeleton = ref_skeleton.cpu().numpy() if isinstance(ref_skeleton, torch.Tensor) else ref_skeleton
    skeleton = skeleton.reshape(-1, 90) 
    # Calculate global transformation (rotation and translation)
    rotation_matrix, translation_vector = calculate_global_transformation(skeleton, ref_skeleton)
    se3_matrix = to_SE3(rotation_matrix, translation_vector)
    # Extract a representative point in Lie group (SE(3))
    point_in_SE3 = extract_point_in_SE3(se3_matrix)
    # Derive tangent space (Lie algebra) associated with SE(3)
    tangent_space = derive_tangent_space(rotation_matrix, translation_vector)
    return tangent_space #point_in_SE3, tangent_space

def lie_group_and_algebra_transform(frames, ref_skeleton):
    result = np.array([lie_group_and_algebra_transform_s(skeleton, ref_skeleton) for skeleton in frames])
    return result

In [None]:
print('Loading Data')

choose = DatasetChooser(config)

print("TRAIN DATASET & TRAIN LOADER")
train_dataset, bone_length = choose(train=True)

ref_skel_train_encoder = copy.deepcopy(train_dataset[0]['encoder_inputs'])

# Apply Lie group and Lie algebra transformations to the data
train_dataset_l = [
    {
        'encoder_inputs': lie_group_and_algebra_transform(sample['encoder_inputs'],ref_skel_train_encoder),
        'decoder_inputs': lie_group_and_algebra_transform(sample['decoder_inputs'],ref_skel_train_decoder_inputs),
        'decoder_outputs': lie_group_and_algebra_transform(sample['decoder_outputs'],ref_skel_train_decoder_outputs)
    }
    for sample in train_dataset
]

train_loader_l = DataLoader(train_dataset_l, batch_size=config.batch_size, shuffle=True)

print("TEST DATASET & TEST LOADER")
test_dataset, _ = choose(train=False)

ref_skel_test_encoder = copy.deepcopy(test_dataset[0]['encoder_inputs'])
ref_skel_test_decoder_inputs = copy.deepcopy(test_dataset[0]['decoder_inputs'])
ref_skel_test_decoder_outputs = copy.deepcopy(test_dataset[0]['decoder_outputs'])

test_dataset_l= [
    {
        'encoder_inputs': lie_group_and_algebra_transform(sample['encoder_inputs'],ref_skel_test_encoder),
        'decoder_inputs': lie_group_and_algebra_transform(sample['decoder_inputs'],ref_skel_test_decoder_inputs),
        'decoder_outputs': lie_group_and_algebra_transform(sample['decoder_outputs'],ref_skel_test_decoder_outputs)
    }
    for sample in test_dataset
]

test_loader_l = DataLoader(test_dataset_l, batch_size=config.batch_size, shuffle=True)

print('Final Loading Data')

### Model

In [None]:
def position_embedding(input, d_model):
    input = input.view(-1, 1)
    dim = torch.arange(d_model // 2, dtype=torch.float32, device=input.device).view(1, -1)
    sin = torch.sin(input / 10000 ** (2 * dim / d_model))
    cos = torch.cos(input / 10000 ** (2 * dim / d_model))

    out = torch.zeros((input.shape[0], d_model), device=input.device)
    out[:, ::2] = sin
    out[:, 1::2] = cos
    return out

def sinusoid_encoding_table(max_len, d_model):
    pos = torch.arange(max_len, dtype=torch.float32)
    out = position_embedding(pos, d_model)
    return out

class ScaledDotProductAttention(nn.Module):
    def __init__(self, d_model, d_k, d_v, h):
        """
        param:
        d_model: Output dimensionality of the model
        d_k: Dimensionality of queries and keys
        d_v: Dimensionality of values
        h: Number of heads
        """
        super(ScaledDotProductAttention, self).__init__()
        self.fc_q = nn.Linear(d_model, h * d_k)
        self.fc_k = nn.Linear(d_model, h * d_k)
        self.fc_v = nn.Linear(d_model, h * d_v)
        self.fc_o = nn.Linear(h * d_v, d_model)

        self.d_model = d_model
        self.d_k = d_k
        self.d_v = d_v
        self.h = h

        self.init_weights(gain=1.0)

    def init_weights(self, gain=1.0):
        nn.init.xavier_normal_(self.fc_q.weight, gain=gain)
        nn.init.xavier_normal_(self.fc_k.weight, gain=gain)
        nn.init.xavier_normal_(self.fc_v.weight, gain=gain)
        nn.init.xavier_normal_(self.fc_o.weight, gain=gain)
        nn.init.constant_(self.fc_q.bias, 0)
        nn.init.constant_(self.fc_k.bias, 0)
        nn.init.constant_(self.fc_v.bias, 0)
        nn.init.constant_(self.fc_o.bias, 0)

    def forward(self, queries, keys, values):
        """
        Computes
        :param queries: Queries (b_s, nq, d_model)
        :param keys: Keys (b_s, nk, d_model)
        :param values: Values (b_s, nk, d_model)
        :return:
        """
        b_s, nq = queries.shape[:2]
        nk = keys.shape[1]
        q = self.fc_q(queries).view(b_s, nq, self.h, self.d_k).permute(0, 2, 1, 3)  # (b_s, h, nq, d_k)
        k = self.fc_k(keys).view(b_s, nk, self.h, self.d_k).permute(0, 2, 3, 1)  # (b_s, h, d_k, nk)
        v = self.fc_v(values).view(b_s, nk, self.h, self.d_v).permute(0, 2, 1, 3)  # (b_s, h, nk, d_v)

        att = torch.matmul(q, k) / np.sqrt(self.d_k)  # (b_s, h, nq, nk)

        att = torch.softmax(att, -1)

        out = torch.matmul(att, v).permute(0, 2, 1, 3).contiguous().view(b_s, nq, self.h * self.d_v)  # (b_s, nq, h*d_v)
        out = self.fc_o(out)  # (b_s, nq, d_model)
        return out
    
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, d_k, d_v, h, dff=2048, dropout=.1):
        super(MultiHeadAttention, self).__init__()

        self.attention = ScaledDotProductAttention(d_model=d_model, d_k=d_k, d_v=d_v, h=h)
        self.dropout = nn.Dropout(p=dropout)
        self.layer_norm = nn.LayerNorm(d_model)
        self.fc = nn.Sequential(*[nn.Linear(d_model, dff), nn.ReLU(inplace=True), nn.Dropout(p=dropout),nn.Linear(dff, d_model)])

    def forward(self, queries, keys, values):
        att = self.attention(queries, keys, values)
        att = self.dropout(att)
        att = self.fc(att)
        att = self.dropout(att)
        return self.layer_norm(queries + att)
    
class EncoderSelfAttention(nn.Module):
    def __init__(self, device, d_model, d_k, d_v, n_head, dff=2048, dropout_transformer=.1, n_module=6):
        super(EncoderSelfAttention, self).__init__()
        self.encoder = nn.ModuleList([MultiHeadAttention(d_model, d_k, d_v, n_head, dff, dropout_transformer) for _ in range(n_module)])
        self.device = device
    
    def forward(self, x): 
        in_encoder = x + sinusoid_encoding_table(x.shape[1], x.shape[2]).expand(x.shape).to(self.device)
        for l in self.encoder:
            in_encoder = l(in_encoder, in_encoder, in_encoder)
            
        return in_encoder

In [None]:
class SelfAttention(nn.Module):
    def __init__(self, input_dim, hidden_dim):
        super(SelfAttention, self).__init__()
        self.query = nn.Linear(input_dim, hidden_dim)
        self.key = nn.Linear(input_dim, hidden_dim)
        self.value = nn.Linear(input_dim, hidden_dim)

    def forward(self, x):
        q = self.query(x)
        k = self.key(x)
        v = self.value(x)

        attention_scores = F.softmax(torch.matmul(q, k.transpose(-2, -1)) / (x.size(-1) ** 0.5), dim=-1)
        output = torch.matmul(attention_scores, v)

        return output

class CoordinatesTransformer_k(nn.Module):
    def __init__(self, device, dropout1d,input_dim=90, hidden_size=256, output_dim=256):
        super(CoordinatesTransformer_k, self).__init__()
        self.device = device
        self.dropout1d = dropout1d

        # Self-Attention Layer
        self.self_attention = SelfAttention(input_dim, hidden_size)

        # Feedforward Layers
        self.linear1 = nn.Linear(hidden_size, hidden_size)  # Adjusted input_dim to hidden_size
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout1d)
        self.linear2 = nn.Linear(hidden_size, output_dim)

    def forward(self, input_tensor_k):
        # Assuming input_tensor_k shape: [batch_size, sequence_length, input_dim]
        
        # Self-Attention
        self_attended = self.self_attention(input_tensor_k)
        # Feedforward Layers
        output = self.linear1(self_attended)
        output = self.relu(output)
        output = self.dropout(output)
        output = self.linear2(output)

        return output

In [None]:
class SelfAttention(nn.Module):
    def __init__(self, input_dim, hidden_dim):
        super(SelfAttention, self).__init__()
        self.query = nn.Linear(input_dim, hidden_dim)
        self.key = nn.Linear(input_dim, hidden_dim)
        self.value = nn.Linear(input_dim, hidden_dim)

    def forward(self, x):
        q = self.query(x)
        k = self.key(x)
        v = self.value(x)
        attention_scores = F.softmax(torch.matmul(q, k.transpose(-2, -1)) / (x.size(-1) ** 0.5), dim=-1)
        output = torch.matmul(attention_scores, v)

        return output

class CoordinatesTransformer_l(nn.Module):
    def __init__(self, device, dropout1d, input_dim=16, hidden_size=256, output_dim=256):
        super(CoordinatesTransformer_l, self).__init__()
        self.device = device
        self.dropout1d = dropout1d
        # Self-Attention Layer
        self.self_attention = SelfAttention(input_dim, hidden_size)
        # Feedforward Layers
        self.linear1 = nn.Linear(hidden_size, hidden_size)  # Adjusted input_dim to hidden_size
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout1d)
        self.linear2 = nn.Linear(hidden_size, output_dim)

    def forward(self, input_tensor_l):
        # Assuming input_tensor_k shape: [batch_size, sequence_length, input_dim]
       
        # Self-Attention
        self_attended = self.self_attention(input_tensor_l)
        # Feedforward Layers
        output = self.linear1(self_attended)
        output = self.relu(output)
        output = self.dropout(output)
        output = self.linear2(output)
        return output

In [None]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, nhead):
        super(DecoderLayer, self).__init__()
        self.self_attn = nn.MultiheadAttention(d_model, nhead)
        self.norm1 = nn.LayerNorm(d_model) 
        self.norm2 = nn.LayerNorm(d_model)
        self.feed_forward = nn.Linear(d_model, d_model)

    def forward(self, x, e_output):
        # Multihead self-attention
        attn_output, _ = self.self_attn(x, x, x)
        x = x + self.norm1(attn_output)
        # Feed forward
        ff_output = self.feed_forward(x)
        x = x + self.norm2(ff_output)
        return x

    
class DecoderTransformer(nn.Module):
    def __init__(self, in_size, embed_size, hidden_size, d_model=512, dropout_val=dropout_val, batch_size=1, nhead=8, num_layers=6):
        super(DecoderTransformer, self).__init__()
                
        self.in_size                = in_size 
        self.stochastic_out_size    = stochastic_out_size
        self.hidden_size            = hidden_size 
        self.batch_size             = batch_size
        self.embed_size             = embed_size
        self.seq_length             = T_pred
        self.dropout_val            = dropout_val
        self.d_model                = d_model
        self.nhead                  = nhead
        self.num_layers             = num_layers
        
        self.embedder_rho = nn.Linear(90, 200)
        self.fC_mu = nn.Sequential(nn.Linear(self.hidden_size + self.hidden_size + 2, int(self.hidden_size/2), bias=True),nn.ReLU(),nn.Dropout(p=dropout_val),nn.Linear(int(self.hidden_size/2), self.stochastic_out_size, bias=True))
        self.dropout = nn.Dropout(dropout_val)
        self.reducted_size = int((self.hidden_size-1)/3)+1
        self.reducted_size2 = int((self.hidden_size+in_size-1)/3)+1
        self.FC_dim_red = nn.Sequential(nn.MaxPool2d(kernel_size=3, stride=3, padding=1),nn.Flatten(start_dim=1, end_dim=-1),nn.Linear(self.reducted_size*self.reducted_size2, 2*self.hidden_size+in_size, bias=True),nn.ReLU())
        
        self.embedding = nn.Linear(200, d_model)
        self.layers = nn.ModuleList([DecoderLayer(d_model, nhead) for _ in range(num_layers)])
        self.output_layer = nn.Linear(d_model, 514)
                
    def forward(self, x, encoder_outputs):   
        
        # Coordination Embedding
        embedding = self.embedder_rho(x.view(x.shape[0], 1, -1))
        embedding = F.relu(self.dropout(embedding))
        # Embed the decoder input
        x = self.embedding(embedding)
        for layer in self.layers: #6 layers
            # Move the layer to the same device as `x`
            # Perform the forward pass
            x = layer(x, encoder_outputs)
            if torch.isnan(x).any():
                print("NaN values found in x layer")
            if torch.isinf(x).any():
                print("infinite values found in x layer")
    
        output = self.output_layer(x)
        prediction = self.fC_mu(output.squeeze(0)) 
        
        return prediction
    
    def dim_red(self, input):
        output = self.FC_dim_red(input)
        return output

In [None]:
class Model(nn.Module):
    def __init__(self, in_size, embed_size, hidden_size, batch_size, d_model=512, d_ff=2048, h=8, dropout_val=dropout_val, N=6, input_dim=512):
        super(Model, self).__init__()
        torch.cuda.empty_cache()
        
        self.encoder_k = CoordinatesTransformer_k(device,dropout1d=dropout_val)
        self.encoder_k.apply(init_weights)
        self.encoder_l = CoordinatesTransformer_l(device,dropout1d=dropout_val)
        self.encoder_l.apply(init_weights)
        self.decoder = DecoderTransformer(in_size, embed_size, hidden_size, num_layers=6, nhead=8)
        self.decoder.apply(init_weights)
        self.crossAttention = crossAttention(N=6,d_model=256, d_ff=2048, h=8, dropout=0.1)
                    
        if device.type=='cuda':
            self.encoder_k.cuda()
            self.encoder_l.cuda()
            self.decoder.cuda()

    def forward(self, input_tensor_k, input_tensor_l, input_tensor, output_tensor, batch_size, train_mode):       
        
        batch_size      = int(input_tensor_k.size(0))
        encoder_outputs = torch.zeros(batch_size, config.input_window_size, hidden_size).cuda()
        start_point = (input_tensor[:,0,:]).to(device).clone().detach()
        if startpoint_mode=="on":
            input_tensor[:,0,:]    = 0
            
        input_tensor_l = input_tensor_l.reshape(input_tensor_l.size(0), input_tensor_l.size(1), input_tensor_l.size(-1) * input_tensor_l.size(-2))
        encoder_outputs_k = self.encoder_k(input_tensor_k)
        encoder_outputs_l = self.encoder_l(input_tensor_l)
        
        src_mask = None
        obd_enc_mask = None
        cross_ouput = self.crossAttention( encoder_outputs_k, encoder_outputs_l, src_mask, obd_enc_mask)
        e_outputs=cross_ouput
        
        decoder_input = input_tensor[:,-1,:] 
        outputs                         = torch.zeros(batch_size, T_pred , in_size).cuda() 
        stochastic_outputs              = torch.zeros(batch_size, T_pred , stochastic_out_size).cuda()
        teacher_force                   = 1
        epsilonX                        = Normal(torch.zeros(batch_size,1),torch.ones(batch_size,1))
        teacher_force                   = int(random.random() < teacher_forcing_ratio) if train_mode else 0
    
        for t in range(0, config.output_window_size-1):
            output = self.decoder(decoder_input, e_outputs)
            # Reparameterization Trick :)
            decoder_output              = torch.zeros(batch_size,1,90).cuda()
            for i in range(0,out_size):
                epsilonX               = Normal(torch.zeros(batch_size,1),torch.ones(batch_size,1))
                decoder_output[:,:,i]  = output[:,:,2*i] + epsilonX.sample((avg_n_path_eval,1)).view(-1,avg_n_path_eval,1).mean(-2).cuda() * output[:,:,2*i+1]

            outputs[:,t,:]                        = decoder_output.squeeze(1)
            stochastic_outputs[:,t,:]             = output.squeeze(1)
            predictionns = outputs
                                            
        return predictionns

In [None]:
#Kendall transformation and mapping to tangent space:

def CenteredScaledd(X):
    # Convert to PyTorch tensor
    X_reshaped = torch.tensor(X)
    # Reshape the input tensor to (n_frames, n_joints, k_dimensions)
    n_frames, total_dimensions = X_reshaped.shape
    n_joints = total_dimensions // 3
    k_dimensions = 3
    X_reshaped = X_reshaped.view((n_frames, n_joints, k_dimensions))
    # Centering: Subtract the mean of each joint across all frames
    X_reshaped = X_reshaped - torch.mean(X_reshaped, dim=0)
    # Calculate the "centered" Frobenius norm for each joint
    normX = torch.norm(X_reshaped, dim=(1, 2), p='fro')
    # Scale to equal (unit) norm for each joint
    X_reshaped = X_reshaped / normX[:, None, None]
    # Reshape back to the original shape
    X_scaled = X_reshaped.view((n_frames, total_dimensions))
    return X_scaled

def inv_exp(X, Y):
    if isinstance(X, torch.Tensor):
        X = X.cpu().numpy()
    if isinstance(Y, torch.Tensor):
        Y = Y.cpu().numpy()
                
    # Check if X and Y have more than one row
    if X.shape[0] <= 1 or Y.shape[0] <= 1:
        # Handle the case when matrices have one row
        pass
        return Y

    _, Y_aligned, _ = procrustes(X, Y)
    skeleton = X.dot(Y_aligned.T)
    tr = abs(skeleton.trace())
    if tr > 1:
        tr = 1
    teta_invexp = math.acos(tr)
    if math.sin(teta_invexp) < 0.0001:
        teta_invexp = 0.1
    invExp = (teta_invexp / math.sin(teta_invexp)) * (Y_aligned - (math.cos(teta_invexp)) * X)
    np_inv = np.array(invExp)
    return invExp

# Train

In [None]:
def choose_net(config):
    if config.model == 'ST_HRN':
        net = ST_HRN(config)
    elif config.model == 'HMR':
        net = HMR(config)
    elif config.model == 'cmatp': #yes
        net = Model(in_size, embed_size, hidden_size, dropout_val=dropout_val, batch_size=batch_size)
    return net

In [None]:
def train(config, checkpoint_dir):
    
    print('Start Training the Model!')

    # generate data loader
    if config.longterm is True:
        config.output_window_size = 100
        
    choose = DatasetChooser(config)
    train_dataset, bone_length = choose(train=True)
    train_loader = DataLoader(train_dataset, batch_size=config.batch_size, shuffle=True)
    test_dataset, _ = choose(train=False)
    test_loader = DataLoader(test_dataset, batch_size=config.batch_size, shuffle=True)
    prediction_dataset, bone_length = choose(prediction=True)
    x_test, y_test, dec_in_test = prediction_dataset
    
    device = torch.device("cuda:"+str(config.device_ids[0]) if torch.cuda.is_available() else "cpu")
    print('Device {} will be used to save parameters'.format(device)) #cuda
    
    net = choose_net(config)
    net.to(device)

    if config.restore is True:
        dir = utils.get_file_list(checkpoint_dir)
        print('Load model from:' + checkpoint_dir + dir[-1])
        net.load_state_dict(torch.load(checkpoint_dir + dir[-1], map_location='cuda:0'))

    optimizer = optim.Adam(net.parameters(), lr=config.learning_rate)
    
    if not (os.path.exists(checkpoint_dir)):
        os.makedirs(checkpoint_dir)
    print('Checkpoint dir:', checkpoint_dir)

    best_error = float('inf')
    best_error_list = None
    for epoch in range(config.max_epoch):
        print("At epoch:{}".format(str(epoch + 1)))
        prog = Progbar(target=config.training_size)
        prog_valid = Progbar(target=config.validation_size)

        # Train
        for it in range(config.training_size):
            for i, (data, data_k, data_l) in enumerate(zip(train_loader, train_loader_k, train_loader_l), 0):
                
                encoder_inputs = data['encoder_inputs'].float().to(device)
                decoder_inputs = data['decoder_inputs'].float().to(device)
                decoder_outputs = data['decoder_outputs'].float().to(device)
                          
                encoder_inputs_k = data_k['encoder_inputs'].float().to(device)
                decoder_inputs_k = data_k['decoder_inputs'].float().to(device)
                decoder_outputs_k = data_k['decoder_outputs'].float().to(device)
                
                encoder_inputs_l = data_l['encoder_inputs'].float().to(device)
                decoder_inputs_l = data_l['decoder_inputs'].float().to(device)
                decoder_outputs_l = data_l['decoder_outputs'].float().to(device)

                prediction= net(encoder_inputs_k, encoder_inputs_l, encoder_inputs, decoder_outputs, batch_size, train_mode=True)
                loss = Loss(prediction, decoder_outputs, bone_length, config)
                
                net.zero_grad()
                loss.backward()
                _ = torch.nn.utils.clip_grad_norm_(net.parameters(), 1)
                optimizer.step()
    
            prog.update(it + 1, [("Training Loss", loss.item())])

        with torch.no_grad():
            for it in range(config.validation_size):
                for j in range(3):
                    for i, (data, data_k, data_l) in enumerate(zip(train_loader, train_loader_k, train_loader_l), 0):
                        if j == 0 and i == 0:
                            encoder_inputs = data['encoder_inputs'].float().to(device)
                            decoder_inputs = data['decoder_inputs'].float().to(device)
                            decoder_outputs = data['decoder_outputs'].float().to(device)
                            
                            encoder_inputs_k = data_k['encoder_inputs'].float().to(device)
                            decoder_inputs_k = data_k['decoder_inputs'].float().to(device)
                            decoder_outputs_k = data_k['decoder_outputs'].float().to(device)
                            
                            encoder_inputs_l = data_l['encoder_inputs'].float().to(device)
                            decoder_inputs_l = data_l['decoder_inputs'].float().to(device)
                            decoder_outputs_l = data_l['decoder_outputs'].float().to(device)
                            
                        else:
                            encoder_inputs = torch.cat([data['encoder_inputs'].float().to(device), encoder_inputs], dim=0)
                            decoder_inputs = torch.cat([data['decoder_inputs'].float().to(device), decoder_inputs], dim=0)
                            decoder_outputs = torch.cat([data['decoder_outputs'].float().to(device), decoder_outputs], dim=0)

                            encoder_inputs_k = torch.cat([data_k['encoder_inputs'].float().to(device), encoder_inputs_k], dim=0)
                            decoder_inputs_k = torch.cat([data_k['decoder_inputs'].float().to(device), decoder_inputs_k], dim=0)
                            decoder_outputs_k = torch.cat([data_k['decoder_outputs'].float().to(device), decoder_outputs_k], dim=0)
                            
                            encoder_inputs_l = torch.cat([data_l['encoder_inputs'].float().to(device), encoder_inputs_l], dim=0)
                            decoder_inputs_l = torch.cat([data_l['decoder_inputs'].float().to(device), decoder_inputs_l], dim=0)
                            decoder_outputs_l = torch.cat([data_l['decoder_outputs'].float().to(device), decoder_outputs_l], dim=0)
                            
                prediction= net(encoder_inputs_k, encoder_inputs_l, encoder_inputs, decoder_outputs, batch_size, train_mode=False)
                loss = Loss(prediction, decoder_outputs, bone_length, config)
                prog_valid.update(it+1, [("Testing Loss", loss.item())])
                
        
        #Test prediction
        actions = list(x_test.keys())
        y_predict = {}
        x_testk = {}
        x_testl = {}
        with torch.no_grad():
            for act in actions:
                #print("actions", actions) #['directions', 'discussion', 'eating', 'greeting', 'phoning', 'posing', 'purchases', 'sitting', 'sittingdown', 'smoking', 'takingphoto', 'waiting', 'walking', 'walkingdog', 'walkingtogether']
                
                x_test_ = torch.from_numpy(x_test[act]).float().to(device)
                ref_pred_xtest = copy.deepcopy(prediction_dataset[0][act][0])
                
                x_test_batch = torch.from_numpy(x_test[act]).float().to(device)
                x_testl[act] = [lie_group_and_algebra_transform(frame,ref_pred_xtest) for frame in x_test_batch]
                x_test_l = torch.tensor(x_testl[act], dtype=torch.float32, device=device)
                
                x_test_batch = torch.from_numpy(x_test[act]).float().to(device)
                x_testk[act] = [CenteredScaledd(frame) for frame in x_test_batch]
                x_testk[act] = [inv_exp(ref_pred_xtest, frame) for frame in x_testk[act]]
                x_test_k = torch.from_numpy(np.array(x_testk[act])).float().to(device)                          
                
                dec_in_test_ = torch.from_numpy(dec_in_test[act]).float().to(device)      
    
                pred = net(x_test_k, x_test_l, x_test_, dec_in_test_, batch_size, train_mode=True)
    
                # Assuming pred is a tensor or array
                if isinstance(pred, torch.Tensor) or isinstance(pred, np.ndarray):
                    pred = pred.cpu().numpy() # Convert tensor to NumPy array
                else:
                    print("pred is not a valid tensor or array.")
    
                y_predict[act] = pred

        error_actions = 0.0
        for act in actions:
            if config.datatype == 'lie' or 'xyz':
                mean_error, _ = mean_per_joint_position_error(config, act, y_predict[act], y_test[act][:, :config.output_window_size, :])
                error = mean_error[[1, 3, 7, 9]]
            error_actions += error.mean()
        error_actions /= len(actions)
        if error_actions < best_error:
            print(error_actions)
            print(best_error)
            best_error_list = error
            best_error = error_actions            
            torch.save(net.state_dict(), checkpoint_dir + 'Epoch_' + str(epoch + 1) + '.pth')
        print('Current best:' + str(round(best_error_list[0], 2))+ ' ' + str(round(best_error_list[1], 2)) + ' ' + str(round(best_error_list[2], 2)) + ' ' + str(round(best_error_list[3], 2)))

In [None]:
def get_file_list(file_path):
    dir_list = os.listdir(file_path)
    if not dir_list:
        return
    else:
        dir_list = sorted(dir_list, key=lambda x: os.path.getmtime(os.path.join(file_path, x)))
        return dir_list

def mean_per_joint_position_error(config, action, y_predict, y_test):
    n_batch = y_predict.shape[0]
    nframes = y_predict.shape[1]

    mean_errors = np.zeros([n_batch, nframes])
    
    for i in range(n_batch):
        for j in range(nframes):
            if config.dataset == 'Human':
                pred = unNormalizeData(y_predict[i], config.data_mean, config.data_std, config.dim_to_ignore)
                gt = unNormalizeData(y_test[i], config.data_mean, config.data_std, config.dim_to_ignore)
            else:
                pred = np.copy(y_predict[i])
                gt = np.copy(y_test[i])
        
        # Calculate Euclidean error for each joint position
        euc_error = np.linalg.norm(gt - pred, axis=2)
        mean_errors[i, :] = np.mean(euc_error, axis=1)

    # Mean of errors across batches
    mpjpe = np.mean(mean_errors, axis=0)

    print("\n" + action)
    toprint_idx = np.array([1, 3, 7, 9, 13, 15, 17, 24])
    idx = np.where(toprint_idx < len(mpjpe))[0]
    toprint_list = ["& {:.3f} ".format(mpjpe[toprint_idx[i]]) for i in idx]
    print("".join(toprint_list))
    mpjpe_mean = np.mean(mpjpe[toprint_idx[idx]])

    return mpjpe, mpjpe_mean

def unNormalizeData(normalizedData, data_mean, data_std, dimensions_to_ignore):
    """
    Copied from https://github.com/una-dinosauria/human-motion-prediction
    """

    T = normalizedData.shape[0]
    D = data_mean.shape[0]

    origData = np.zeros((T, D), dtype=np.float32)
    dimensions_to_use = []
    for i in range(D):
        if i in dimensions_to_ignore:
            continue
        dimensions_to_use.append(i)
    dimensions_to_use = np.array(dimensions_to_use)

    origData[:, dimensions_to_use] = normalizedData

    stdMat = data_std.reshape((1, D))
    stdMat = np.repeat(stdMat, T, axis=0)
    meanMat = data_mean.reshape((1, D))
    meanMat = np.repeat(meanMat, T, axis=0)
    origData = np.multiply(origData, stdMat) + meanMat
    return origData

def rotmat2expmap(R):
    theta = np.arccos((np.trace(R) - 1) / 2.0)
    if theta < 1e-6:
        A = np.zeros((3, 1))
    else:
        A = theta / (2 * np.sin(theta)) * np.array([[R[2, 1] - R[1, 2]], [R[0, 2] - R[2, 0]], [R[1, 0] - R[0, 1]]])

    return A

def revert_coordinate_space(channels, R0, T0):
    """
    Copied from https://github.com/una-dinosauria/human-motion-prediction
    """
    n, d = channels.shape

    channels_rec = copy.copy(channels)
    R_prev = R0
    T_prev = T0
    rootRotInd = np.arange(3, 6)

    # Loop through the passed posses
    for ii in range(n):
        R_diff = expmap2rotmat(channels[ii, rootRotInd])
        R = R_diff.dot(R_prev)

        channels_rec[ii, rootRotInd] = np.reshape(rotmat2expmap(R), 3)
        T = T_prev + ((R_prev.T).dot(np.reshape(channels[ii, :3], [3, 1]))).reshape(-1)
        channels_rec[ii, :3] = T
        T_prev = T
        R_prev = R

    return channels_rec

def rotmat2euler(R):
    if R[0, 2] == 1 or R[0, 2] == -1:
        E3 = 0
        dlta = np.arctan2(R[0, 1], R[0, 2])
        if R[0, 2] == -1:
            E2 = np.pi/2
            E1 = E3 + dlta
        else:
            E2 = -np.pi/2
            E1 = -E3 + dlta
    else:
        E2 = -np.arcsin(R[0, 2])
        E1 = np.arctan2(R[1, 2]/np.cos(E2), R[2, 2]/np.cos(E2))
        E3 = np.arctan2(R[0, 1]/np.cos(E2), R[0, 0]/np.cos(E2))

    eul = np.array([E1, E2, E3])

    return eul

def expmap2rotmat(A):
    theta = np.linalg.norm(A)
    if theta == 0:
        R = np.identity(3)
    else:
        A = A / theta
        cross_matrix = np.array([[0, -A[2], A[1]], [A[2], 0, -A[0]], [-A[1], A[0], 0]])
        R = np.identity(3) + np.sin(theta) * cross_matrix + (1 - np.cos(theta)) * np.matmul(cross_matrix, cross_matrix)

    return R

In [None]:
def prediction(config, checkpoint_dir, output_dir):

    print('Start testing the Model!')

    if not (os.path.exists(output_dir)):
        os.makedirs(output_dir)
    print("Outputs saved to: " + output_dir)

    config.output_window_size = 100
        
    choose = DatasetChooser(config)
    if config.dataset == 'Human':
        # This step is to get mean value, etc for unnorm
        _, _ = choose(train=True)

    if config.longterm is False:
        prediction_dataset, bone_length = choose(prediction=True)
        x_test, y_test, dec_in_test = prediction_dataset
        actions = list(x_test.keys())
    else:
        # get raw validation data because the test data isn't usable
        train_dataset, bone_length = choose(train=False)
        test_set = train_dataset.data
        x_test = {}
        y_test = {}
        dec_in_test = {}
        test_set = test_set[0]
        x_test[config.filename] = np.reshape(test_set[:config.input_window_size-1,:], [1, -1, config.input_size])
        y_test[config.filename] = np.reshape(test_set[config.input_window_size:, :], [1, -1, config.input_size])
        dec_in_test[config.filename] = np.reshape(test_set[config.input_window_size-1:-1, :], [1, -1, config.input_size])
        config.output_window_size = y_test[config.filename].shape[1]
        actions = [config.filename]

    device = torch.device("cuda:0" if torch.cuda.is_available() else 'cuda:0')
    print('Device {} will be used to save parameters'.format(device))
    
    net = choose_net(config)
    net.to(device)
    
    dir = get_file_list(checkpoint_dir)
    net.load_state_dict(torch.load(checkpoint_dir + dir[-1], map_location='cuda:0'))
    
    y_predict = {}
    x_testk = {}
    x_testl = {}
    with torch.no_grad():
        for act in actions:
            #print("actions", actions) #['directions', 'discussion', 'eating', 'greeting', 'phoning', 'posing', 'purchases', 'sitting', 'sittingdown', 'smoking', 'takingphoto', 'waiting', 'walking', 'walkingdog', 'walkingtogether']
            x_test_ = torch.from_numpy(x_test[act]).float().to(device)
            ref_pred_xtest = copy.deepcopy(prediction_dataset[0][act][0])
            x_test_batch = torch.from_numpy(x_test[act]).float().to(device)
            x_testl[act] = [lie_group_and_algebra_transform(frame,ref_pred_xtest) for frame in x_test_batch]
            x_test_l = torch.tensor(x_testl[act], dtype=torch.float32, device=device)
            x_test_batch = torch.from_numpy(x_test[act]).float().to(device)
            x_testk[act] = [CenteredScaledd(frame) for frame in x_test_batch]
            x_testk[act] = [inv_exp(ref_pred_xtest, frame) for frame in x_testk[act]]
            x_test_k = torch.from_numpy(np.array(x_testk[act])).float().to(device)                         
            dec_in_test_ = torch.from_numpy(dec_in_test[act]).float().to(device)    

            pred = net(x_test_k, x_test_l, x_test_, dec_in_test_, batch_size, train_mode=False)
                
            # Assuming pred is a tensor or array
            if isinstance(pred, torch.Tensor) or isinstance(pred, np.ndarray):
                pred = pred.cpu().numpy() # Convert tensor to NumPy array
            else:
                print("pred is not a valid tensor or array.")
            y_predict[act] = pred
                        
    for act in actions:
        if config.datatype == 'xyz':  
            mean_error, _ = mean_per_joint_position_error(config, act, y_predict[act], y_test[act])
            sio.savemat(output_dir + 'error_' + act + '.mat', dict([('error', mean_error)]))

In [None]:
def init_weights(m):
    for name, param in m.named_parameters():
        nn.init.uniform_(param.data, -0.2, 0.2)

#### Creating the model

In [None]:
print('Creating the model ....')
model                       = Model(in_size, embed_size, hidden_size, dropout_val=dropout_val, batch_size=batch_size)
model                       = nn.DataParallel(model).cuda()

#### Train & Test

In [None]:
def create_directory(config):
    """
    crate Checkpoint directory path
    modified from https://github.com/BII-wushuang/Lie-Group-Motion-Prediction
    """
    folder_dir = config.dataset + '/' + config.datatype + '_' + config.loss + 'loss_' + config.model
    
    if config.model == 'HMR':
        folder_dir +='_RecurrentSteps='+str(config.encoder_recurrent_steps)+'_'+'ContextWindow='+str(config.context_window)+'_'+'hiddenSize='+ str(config.hidden_size)
    if config.model == 'ST_HRN':
        folder_dir +='_RecurrentSteps='+str(config.encoder_recurrent_steps)+'_hiddenSize='+str(config.hidden_size)+'_decoder_name='+ str(config.decoder)
    if config.model == 'cmatp':
        folder_dir +='_tf_kendall+Lie_Euc='+'_'+'ContextWindow='+str(config.context_window)+'_'+'hiddenSize='+ str(hidden_size)

    folder_dir += '/' + config.filename + '/'
    folder_dir += 'inputWindow=' + str(config.input_window_size) + '_outputWindow=' + str(config.output_window_size) + '/'

    checkpoint_dir = './checkpoint/' + folder_dir
    output_dir = './output/' + folder_dir

    return [checkpoint_dir, output_dir]

In [None]:
class Args:
    gpu=[0]
    training=False #test (True if train)
    action='all'
    dataset='Human'
    datatype='xyz'
    visualize=0

    
config = TrainConfig(args.dataset, args.datatype, args.action, args.gpu, args.training, args.visualize)
checkpoint_dir, output_dir = create_directory(config)

if config.train_model is True:
    train(config, checkpoint_dir)
else:
    prediction(config, checkpoint_dir, output_dir)