In [1]:
# Scienctic computing 
import numpy as np
import pandas as pd

# Pytorch
import torch
from torch import Tensor

# Python libraries
import _pickle as pkl
import bz2
import warnings
import re

# Laforge animation dataset
from lafan1.extract import Anim

# Plotting
import plotly.express as ex
import plotly.graph_objects as go

# Local scripts
from src.prepare_data import DataPreprocess

In [2]:
DATA_DIR = "data"
MOTION_ZIP = "lafan1/lafan1.zip"
DATASET_DIR = "dataset"
FRAME_TIME = 0.03333
FRAMES = 7840

In [3]:
def validate_config(config:dict=None) -> None:
    '''
    Validate that the model config contains necessary parameters of right type.

    Parameters:
        config (dict): a dictionary containing the configuration of NN model.
    Returns:
        None
    '''
    model_config_template = {
        "input_dim" : int, # Required
        "output_dim" : int,
        "hidden_dim" : int,
        "dropout" : 0.2,
        "device" : "cuda" if torch.cuda.is_available() else "cpu", 
    }
    required_parameters = ["input_dim", "output_dim", "hidden_dim"]
    if config is None or sum([param in config for param in required_parameters]) != len(required_parameters):
        raise ValueError("""
        Model config is not provided or not valid.
        Please provide a dictionary with at least the following three parameters:
        {
            'input_dim' : int,
            'output_dim' : int,
            'hidden_dim' : int,
        }
        """)
    
    for k,v in model_config_template.items():
        if isinstance(v, type):
            if not isinstance(config[k], v):
                warnings.warn("Received {} of type {} should be {}".format(k, type(config[k]), v))
        config.setdefault(k, v)

def read_pbz2(filename:str) -> Anim:
    with bz2.BZ2File(filename, 'rb') as f:
        anim = pkl.load(f)
    return anim 
    

In [4]:
class MultilayerPerceptron(torch.nn.Module):
    def __init__(self, config:dict=None):
        validate_config(config)
        super(MultilayerPerceptron, self).__init__()

        self.input_layer = torch.nn.Linear(config["input_dim"], config["hidden_dim"], bias=True) # dimension: I x H
        self.hidden_layer = torch.nn.Linear(config["hidden_dim"], config["hidden_dim"], bias=True) # dimension: H x H
        self.output_layer = torch.nn.Linear(config["hidden_dim"], config["output_dim"], bias=True) # dimension: H x O
        self.dropout_rate = config["dropout"]
        
        # Register the layer parameters, otherwise you won't be able to get all parameters using model.parameters()
        self.layers = torch.nn.ModuleList([self.input_layer, self.hidden_layer, self.output_layer])

        for layer in self.layers:
            self.init_param(layer, device=config["device"])
            # layer.to(config["device"])
            
    def forward(self, x:Tensor) -> Tensor:
        '''Note: ReLu() and Dropout() are applied as functions'''
        output = torch.nn.functional.dropout(torch.nn.functional.relu(self.input_layer(x)), p=self.dropout_rate) # Dropout( Relu( Wx + B ) )
        output = torch.nn.functional.dropout(torch.nn.functional.relu(self.hidden_layer(output)), p=self.dropout_rate)
        output = self.output_layer(output)
        return output
    
    def init_param(self, layer, device="cpu"):
        '''Randomly intialize the layer weights and intialize biases to 0.01'''
        torch.nn.init.xavier_uniform_(layer.weight)
        layer.bias.data.fill_(.01)


In [5]:
# Read all preprocessed walk clips from disk from 'prepare_data.ipynb'
# clips = a list of clips of dimension 7840 x 12
clips = [read_pbz2(os.path.join(DATASET_DIR, file)) for file in os.listdir(DATASET_DIR)]

In [6]:
# Prepare input samples and ground truths
# Splitting the dataset into three sets for training, validating, testing
# Now we are working with tensors (type = torch.Tensor)

torch.set_default_dtype(torch.float64)  # set the default data type to be double (default: float)

# Remember that clip has shape (Frames x Features) = (7840 x 12)
x_tensors = torch.concat([torch.from_numpy(clip[:-1]) for clip in clips],axis=0)  # inputs for the model
y_tensors = torch.concat([torch.from_numpy(clip[1:]) for clip in clips],axis=0)   # ground truth for the data, which is the next frame of the input frame

# dropping acceleration data from ground_truth data, which are 7th to 9th column 
target_feature_indices = [i for i in range(12) if i not in (6,7,8)] 
y_tensors = y_tensors[:, target_feature_indices]

input_dim = x_tensors.shape[1]  # 12
output_dim = y_tensors.shape[1] # 9

dataset = torch.utils.data.TensorDataset(x_tensors, y_tensors)

num_samples = len(x_tensors) # sample = frame = row
num_train_samples = int(0.8 * num_samples)  # 80% for training, 10% for validation and 20% for testing
num_test_samples = int(num_samples - num_train_samples) 
num_val_samples = num_test_samples // 2

# Randomly split dataset into the 3 sets given the number of samples in each set
train_set, val_set, test_set = torch.utils.data.dataset.random_split(
    dataset, [num_train_samples, num_val_samples, num_val_samples],
    generator=torch.Generator().manual_seed(2048))

# reuse validation set for testing
test_set += val_set


In [7]:
num_cpus = os.cpu_count()
config = {
    "input_dim" : input_dim,    # 12
    "output_dim" : output_dim,  # 9
    "hidden_dim" : 32,
    "dropout" : 0.2,
    "device" : "cuda" if torch.cuda.is_available() else "cpu", 
}

model = MultilayerPerceptron(config)
model.to(config["device"]) # sets the device on which the model will be executed on
print(model.summarize(max_depth=2))

EPOCHS = 10
learning_rate = 1e-3
loss_fn = torch.nn.functional.mse_loss
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1, gamma=0.95)
batch_size = 512
log_interval = 10

train_loader = torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=True, pin_memory=True, num_workers=num_cpus)
val_loader = torch.utils.data.DataLoader(val_set, batch_size=batch_size, shuffle=True, pin_memory=True, num_workers=num_cpus)
test_loader = torch.utils.data.DataLoader(test_set, batch_size=batch_size, shuffle=False, pin_memory=True, num_workers=num_cpus)

In [8]:
# Training
torch.manual_seed(2048)
model.train()
for epoch in range(1, EPOCHS+1):
    for batch_idx, (x, y) in enumerate(train_loader):
        x = x.to(config["device"])
        y = y.to(config["device"])
        optimizer.zero_grad()
        output = model(x)
        mse_loss = loss_fn(output,y)
        mse_loss.backward()
        optimizer.step()
        if batch_idx % log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(x), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), mse_loss.item()))
            
            val_loss = 0
            model.eval()
            with torch.no_grad():
                for x,y in val_loader:
                    x = x.to(config["device"])
                    y = y.to(config["device"])
                    output = model(x)
                    val_loss += loss_fn(output,y,reduction='sum').item()
            
            val_loss /= len(val_loader.dataset)
            print('\nValidation set: Average loss: {:.4f}\n'.format(val_loss))
            model.train()
    


Validation set: Average loss: 9.2110


Validation set: Average loss: 7.0194


Validation set: Average loss: 5.5841


Validation set: Average loss: 4.5408


Validation set: Average loss: 3.7944


Validation set: Average loss: 3.1541


Validation set: Average loss: 2.7436


Validation set: Average loss: 2.4533


Validation set: Average loss: 2.2210


Validation set: Average loss: 2.0503


Validation set: Average loss: 1.9304


Validation set: Average loss: 1.8025


Validation set: Average loss: 1.7177


Validation set: Average loss: 1.6779


Validation set: Average loss: 1.6132


Validation set: Average loss: 1.5600


Validation set: Average loss: 1.5097


Validation set: Average loss: 1.4487


Validation set: Average loss: 1.3934


Validation set: Average loss: 1.3620


Validation set: Average loss: 1.3313


Validation set: Average loss: 1.2927


Validation set: Average loss: 1.2531


Validation set: Average loss: 1.2190


Validation set: Average loss: 1.1971


Validation set: Average 

In [9]:
# Testing
model.eval()
test_loss = 0
with torch.no_grad():
    for x,y in test_loader:
        x = x.to(config["device"])
        y = y.to(config["device"])
        output = model(x)
        test_loss += loss_fn(output,y, reduction='sum').item()  # sum up batch loss

test_loss /= len(test_loader.dataset)

print('\nTest set: Average loss: {:.4f}\n'.format(test_loss))


Test set: Average loss: 0.6396



In [10]:
# Using the model
import time
bvh_files = os.listdir(DATA_DIR)
walk_clips = [os.path.join(DATA_DIR,f) for f in bvh_files if re.search("walk", f) is not None]
data = DataPreprocess(write=False)(walk_clips[-1])

input_frames = torch.from_numpy(data["data"][:-1])
target_frames = torch.from_numpy(data["data"][1:])
target_frames = target_frames[:, target_feature_indices]

num_frames = len(target_frames)
predicted_frames = [None]*num_frames

model.eval()
model.to("cpu")
start_time = time.time()
idx = 0
loss = 0
with torch.no_grad():
    for x,y in zip(input_frames, target_frames):
        predicted = model(x)
        loss += torch.nn.functional.mse_loss(predicted, y)
        predicted_frames[idx] = predicted
        idx += 1
end_time = time.time()
predicted_frames = np.asarray([p.numpy() for p in predicted_frames])

print(
    f"""Finished one clip with {num_frames} frames in {end_time - start_time} sec\n
    Avg time per frame: {(end_time-start_time)/num_frames}\n 
    Avg. loss: {loss/num_frames}\n"""
    )


Finished one clip with 4887 frames in 0.3276047706604004 sec

    Avg time per frame: 6.703596698596284e-05
 
    Avg. loss: 0.06914901260954019



In [11]:
F = 20
fig = go.Figure()
fig.add_trace(
    go.Scatter3d(x=target_frames[:F, 0], y=target_frames[:F, 1], z=target_frames[:F, 2], name="Target")
).add_trace(
    go.Scatter3d(x=predicted_frames[:F, 0], y=predicted_frames[:F, 1], z=predicted_frames[:F, 2], name="Predicted")
)
fig.show()