<a href="https://colab.research.google.com/github/RodolfoFerro/human-motion-prediction-pytorch/blob/master/notebooks/human_motion_prediction_Custom_Model_Notebook.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# human-motion-prediction 🕺🏻

> **Note:** This repo is a fork of this one: https://github.com/cimat-ris/human-motion-prediction-pytorch
>
> The code has been refactored preserving the logic and structure, but adding functionalities to run it in Google Colab.

> Pytorch implementation of:
>
> &nbsp;&nbsp; Julieta Martinez, Michael J. Black, Javier Romero. _**On human motion prediction using recurrent neural networks**_. In CVPR 17.
> 
> The paper can be found on arXiv: [https://arxiv.org/pdf/1705.02445.pdf](https://arxiv.org/pdf/1705.02445.pdf)

Find the repo of this code here: https://github.com/RodolfoFerro/human-motion-prediction-pytorch.git

Clone the repository with code:

In [None]:
!git clone https://github.com/RodolfoFerro/human-motion-prediction-pytorch.git
%cd human-motion-prediction-pytorch
!ls

Dowload the data:

> We need to install `gdown` to download the data from Google Drive into our local folder.

In [None]:
!mkdir data
%cd data

!gdown https://drive.google.com/uc?id=1hqE6GrWZTBjVzmbehUBO7NTrbEgDNqbH
!unzip -q h3.6m.zip
!rm h3.6m.zip
%cd ..
!ls

### Custom model

You can create a custom model by creating a new object with inheritance of `nn.Module`:

In [None]:
"""Sequence-to-sequence model for human motion prediction."""

import logging

import numpy as np
import torch
import torch.nn.functional as F
from torch import nn


class MotionPredictor(nn.Module):
    """Sequence-to-sequence model for human motion prediction"""

    def __init__(
            self,
            source_seq_len,
            target_seq_len,
            rnn_size,  # recurrent layer hidden size
            batch_size,
            learning_rate,
            learning_rate_decay_factor,
            number_of_actions,
            dropout=0.3):
        """Constructor of the class.
        
        Parameters
        ----------
        source_seq_len: int
            Length of the input sequence.
        target_seq_len: int
            Length of the target sequence.
        rnn_size: int
            Number of units in the rnn.
        batch_size: int
            The size of the batches used during training; the model
            construction is independent of batch_size, so it can be
            changed after initialization if this is convenient, e.g.,
            for decoding.
        learning_rate: float
            Learning rate to start with.
        learning_rate_decay_factor: 
            Decay learning rate by this much when needed.
        number_of_actions: int
            Number of classes we have.
        """

        super(MotionPredictor, self).__init__()

        self.human_dofs = 54
        self.input_size = self.human_dofs + number_of_actions

        logging.info(f'Input size is {self.input_size}')
        self.source_seq_len = source_seq_len
        self.target_seq_len = target_seq_len
        self.rnn_size = rnn_size
        self.batch_size = batch_size
        self.dropout = dropout

        # Create the RNN that will summarize the state
        self.cell = torch.nn.GRUCell(self.input_size, self.rnn_size)
        self.fc1 = nn.Linear(self.rnn_size, self.input_size)

    def forward(self, encoder_inputs, decoder_inputs, device):
        """Forward pass of the model.

        Parameters
        ----------
        encoder_inputs : torch.Tensor
            The input to the encoder.
        decoder_inputs : torch.Tensor
            The input to the decoder.
        device : torch.device
            The device on which to do the computation.
        
        Returns
        -------
        outputs : torch.Tensor
            The transposed output of the model.
        """

        def loop_function(prev, i):
            return prev

        batch_size = encoder_inputs.shape[0]
        # To pass these data through a RNN we need to switch the first
        # two dimensions
        encoder_inputs = torch.transpose(encoder_inputs, 0, 1)
        decoder_inputs = torch.transpose(decoder_inputs, 0, 1)
        state = torch.zeros(batch_size, self.rnn_size).to(device)

        # Encoding
        for i in range(self.source_seq_len - 1):
            # Apply the RNN cell
            state = self.cell(encoder_inputs[i], state)

            # Apply dropout in training
            state = F.dropout(state, self.dropout, training=self.training)

        outputs = []
        prev = None

        # Decoding, sequentially
        for i, inp in enumerate(decoder_inputs):
            # Use teacher forcing?
            if prev is not None:
                inp = loop_function(prev, i)
            #inp = inp.detach()

            state = self.cell(inp, state)

            # Output is seen as a residual to the previous value
            output = inp + self.fc1(
                F.dropout(state, self.dropout, training=self.training))
            outputs.append(output.view([1, batch_size, self.input_size]))
            prev = output

        outputs = torch.cat(outputs, 0)

        # Size should be batch_size x target_seq_len x input_size
        outputs = torch.transpose(outputs, 0, 1)

        return outputs

    def get_batch(self, data, actions, device):
        """Get a random batch of data from the specified bucket, prepare
        for step.
        
        Parameters
        ----------
        data:
            A list of sequences of size n-by-d to fit the model to.
        actions:
            A list of the actions we are using
        device:
            The device on which to do the computation (cpu/gpu)
        
        Returns
        -------
        encoder_inputs : torch.Tensor
            The constructed batches have the proper format to call
            step(...) later.
        decoder_inputs : torch.Tensor
            The constructed batches have the proper format to call
            step(...) later.
        target_weights : torch.Tensor
            The constructed batches have the proper format to call
            step(...) later.
        """

        # Select entries at random
        all_keys = list(data.keys())
        chosen_keys = np.random.choice(len(all_keys), self.batch_size)

        # How many frames in total do we need?
        total_frames = self.source_seq_len + self.target_seq_len
        encoder_inputs = np.zeros(
            (self.batch_size, self.source_seq_len - 1, self.input_size),
            dtype=float)
        decoder_inputs = np.zeros(
            (self.batch_size, self.target_seq_len, self.input_size),
            dtype=float)
        decoder_outputs = np.zeros(
            (self.batch_size, self.target_seq_len, self.input_size),
            dtype=float)

        # Generate the sequences
        for i in range(self.batch_size):
            the_key = all_keys[chosen_keys[i]]

            # Get the number of frames
            n, _ = data[the_key].shape

            # Sample somewhere in the middle
            idx = np.random.randint(16, n - total_frames)

            # Select the data around the sampled points
            data_sel = data[the_key][idx:idx + total_frames, :]

            # Add the data
            encoder_inputs[i, :, 0:self.input_size] = data_sel[
                0:self.source_seq_len - 1, :]
            decoder_inputs[i, :, 0:self.input_size] = data_sel[
                self.source_seq_len - 1:self.source_seq_len +
                self.target_seq_len - 1, :]
            decoder_outputs[i, :,
                            0:self.input_size] = data_sel[self.source_seq_len:,
                                                          0:self.input_size]

        encoder_inputs = torch.tensor(encoder_inputs).float().to(device)
        decoder_inputs = torch.tensor(decoder_inputs).float().to(device)
        decoder_outputs = torch.tensor(decoder_outputs).float().to(device)

        return encoder_inputs, decoder_inputs, decoder_outputs

    def find_indices_srnn(self, data, action):
        """Find the same action indices as in SRNN.
        
        See https://github.com/asheshjain399/RNNexp/blob/master/structural_rnn/CRFProblems/H3.6m/processdata.py#L325

        Parameters
        ----------
        data:
            A list of sequences.
        action:
            The action.
        
        Returns
        -------
        idx : list
            A list of indices where the action is found.
        """

        # Used a fixed dummy seed, following
        # https://github.com/asheshjain399/RNNexp/blob/srnn/structural_rnn/forecastTrajectories.py#L29
        SEED = 1234567890
        rng = np.random.RandomState(SEED)

        subject = 5
        subaction1 = 1
        subaction2 = 2

        T1 = data[(subject, action, subaction1, 'even')].shape[0]
        T2 = data[(subject, action, subaction2, 'even')].shape[0]
        prefix, suffix = 50, 100

        # Test is performed always on subject 5
        # Select 8 random sub-sequences (by specifying their indices)
        idx = []
        idx.append(rng.randint(16, T1 - prefix - suffix))
        idx.append(rng.randint(16, T2 - prefix - suffix))
        idx.append(rng.randint(16, T1 - prefix - suffix))
        idx.append(rng.randint(16, T2 - prefix - suffix))
        idx.append(rng.randint(16, T1 - prefix - suffix))
        idx.append(rng.randint(16, T2 - prefix - suffix))
        idx.append(rng.randint(16, T1 - prefix - suffix))
        idx.append(rng.randint(16, T2 - prefix - suffix))

        return idx

    def get_batch_srnn(self, data, action, device):
        """Get a random batch of data from the specified bucket,
        prepare for step.

        Parameters
        ----------
        data: dict
            Dictionary with k:v, k=((subject, action, subsequence, 'even')),
            v=nxd matrix with a sequence of poses.
        action: str
            The action to load data from, e.g. 'walking'.
        
        Returns
        -------
        encoder_inputs : torch.Tensor
            The constructed batches have the proper format to call
            step(...) later.
        decoder_inputs : torch.Tensor
            The constructed batches have the proper format to call
            step(...) later.
        target_weights : torch.Tensor
            The constructed batches have the proper format to call
            step(...) later.
        """

        actions = [
            'directions', 'discussion', 'eating', 'greeting', 'phoning',
            'posing', 'purchases', 'sitting', 'sittingdown', 'smoking',
            'takingphoto', 'waiting', 'walking', 'walkingdog',
            'walkingtogether'
        ]

        if not action in actions:
            raise ValueError(f'Unrecognized action {action}')

        frames = {}
        frames[action] = self.find_indices_srnn(data, action)

        batch_size = 8  # we always evaluate 8 sequences
        subject = 5  # we always evaluate on subject 5
        source_seq_len = self.source_seq_len
        target_seq_len = self.target_seq_len

        seeds = [(action, (i % 2) + 1, frames[action][i])
                 for i in range(batch_size)]

        encoder_inputs = np.zeros(
            (batch_size, source_seq_len - 1, self.input_size), dtype=float)
        decoder_inputs = np.zeros(
            (batch_size, target_seq_len, self.input_size), dtype=float)
        decoder_outputs = np.zeros(
            (batch_size, target_seq_len, self.input_size), dtype=float)

        # Compute the number of frames needed
        total_frames = source_seq_len + target_seq_len

        # Reproducing SRNN's sequence subsequence selection as done in
        # https://github.com/asheshjain399/RNNexp/blob/master/structural_rnn/CRFProblems/H3.6m/processdata.py#L343
        for i in range(batch_size):
            _, subsequence, idx = seeds[i]
            idx = idx + 50

            data_sel = data[(subject, action, subsequence, 'even')]
            data_sel = data_sel[(idx - source_seq_len):(idx +
                                                        target_seq_len), :]

            encoder_inputs[i, :, :] = data_sel[0:source_seq_len - 1, :]
            decoder_inputs[i, :, :] = data_sel[source_seq_len -
                                               1:(source_seq_len +
                                                  target_seq_len - 1), :]
            decoder_outputs[i, :, :] = data_sel[source_seq_len:, :]

        encoder_inputs = torch.tensor(encoder_inputs).float().to(device)
        decoder_inputs = torch.tensor(decoder_inputs).float().to(device)
        decoder_outputs = torch.tensor(decoder_outputs).float().to(device)

        return encoder_inputs, decoder_inputs, decoder_outputs


You can define parameters by creating a dictionary:

In [None]:
import os

from src.parsers import training_parser_from_dict


training_params = {
    'learning_rate': 0.00001,
    'learning_rate_decay_factor': 0.95,
    'learning_rate_step': 10000,
    'batch_size': 128,
    'iterations': int(1e4), # Must be an integer
    'test_every': 100,
    'size': 512,
    'seq_length_in': 50,
    'seq_length_out': 10,
    'data_dir': os.path.normpath('./data/h3.6m/dataset'),
    'train_dir': os.path.normpath('./experiments/'),
    'action': 'all',
    'log_file': '',
    'log_level': 20
}

args = training_parser_from_dict(training_params)
args

### Model training

If you created a custom model, you may need to create a custom training funciton:

In [None]:
"""Code for training an RNN for motion prediction."""

import logging
import sys
import os

import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.optim as optim

IN_COLAB = 'google.colab' in sys.modules
if not IN_COLAB:
    from parsers import training_parser
    from utils.data_utils import read_all_data
    from utils.data_utils import define_actions
    from models.motionpredictor import MotionPredictor
else:
    from src.utils.data_utils import read_all_data
    from src.utils.data_utils import define_actions
    from src.models.motionpredictor import MotionPredictor


def train(args):
    """Train a seq2seq model on human motion.

    Parameters
    ----------
    args : argparse.Namespace
        Arguments from the parser.
    """

    # Set logger
    if args.log_file == '':
        logging.basicConfig(format='%(levelname)s: %(message)s',
                            level=args.log_level)
    else:
        logging.basicConfig(filename=args.log_file,
                            format='%(levelname)s: %(message)s',
                            level=args.log_level)

    # Set directory
    train_dir = os.path.normpath(
        os.path.join(args.train_dir, args.action, f'out_{args.seq_length_out}',
                     f'iterations_{args.iterations}', f'size_{args.size}',
                     f'lr_{args.learning_rate}'))

    # Detect device
    if torch.cuda.is_available():
        logging.info(torch.cuda.get_device_name(torch.cuda.current_device()))
    else:
        logging.info('cpu')
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    logging.info('Train dir: ' + train_dir)
    os.makedirs(train_dir, exist_ok=True)

    # Set of actions
    actions = define_actions(args.action)
    number_of_actions = len(actions)

    train_set, test_set, _, _, _, _ = read_all_data(actions,
                                                    args.seq_length_in,
                                                    args.seq_length_out,
                                                    args.data_dir)

    # Create model for training only
    model = MotionPredictor(
        args.seq_length_in,
        args.seq_length_out,
        args.size,  # hidden layer size
        args.batch_size,
        args.learning_rate,
        args.learning_rate_decay_factor,
        len(actions))
    model = model.to(device)

    # This is the training loop
    loss, val_loss = 0.0, 0.0
    current_step = 0
    all_losses = []
    all_val_losses = []

    # The optimizer
    #optimiser = optim.SGD(model.parameters(), lr=args.learning_rate)
    optimiser = optim.Adam(model.parameters(),
                           lr=args.learning_rate,
                           betas=(0.9, 0.999))

    for _ in range(args.iterations):
        optimiser.zero_grad()
        # Set a flag to compute gradients
        model.train()

        # === Training step ===
        # Get batch from the training set
        encoder_inputs, decoder_inputs, decoder_outputs = model.get_batch(
            train_set, actions, device)

        # Forward pass
        preds = model(encoder_inputs, decoder_inputs, device)

        # Loss: Mean Squared Errors
        step_loss = (preds - decoder_outputs)**2
        step_loss = step_loss.mean()

        # Backpropagation
        step_loss.backward()

        # Gradient descent step
        optimiser.step()

        step_loss = step_loss.cpu().data.numpy()

        if current_step % 10 == 0:
            logging.info(f'step {current_step:04}; step_loss: {step_loss:.4f}')
        loss += step_loss / args.test_every
        current_step += 1

        # === step decay ===
        if current_step % args.learning_rate_step == 0:
            args.learning_rate = args.learning_rate * args.learning_rate_decay_factor
            optimiser = optim.Adam(model.parameters(),
                                   lr=args.learning_rate,
                                   betas=(0.9, 0.999))
            print('Decay learning rate. New value at {args.learning_rate}')

        # Once in a while, save checkpoint, print statistics.
        if current_step % args.test_every == 0:
            model.eval()
            # === Validation ===
            encoder_inputs, decoder_inputs, decoder_outputs = model.get_batch(
                test_set, actions, device)
            preds = model(encoder_inputs, decoder_inputs, device)

            step_loss = (preds - decoder_outputs)**2
            val_loss = step_loss.mean()

            print('\n=================================\n'
                  f'Global step:         {current_step}\n'
                  f'Learning rate:       {args.learning_rate:.4}\n'
                  f'Train loss avg:      {loss:.4}\n'
                  '-------------------------------\n'
                  f'Val loss:            {val_loss:.4}\n'
                  '=================================\n')
            all_val_losses.append(
                [current_step, val_loss.cpu().detach().numpy()])
            all_losses.append([current_step, loss])
            torch.save(model, train_dir + '/model_' + str(current_step))

            # Reset loss
            loss = 0

    vlosses = np.array(all_val_losses)
    tlosses = np.array(all_losses)

    # Plot losses
    plt.plot(vlosses[:, 0], vlosses[:, 1], 'b')
    plt.plot(tlosses[:, 0], tlosses[:, 1], 'r')
    plt.legend(['Validation loss', 'Training loss'])
    plt.show()


In [None]:
train(args)

Once the model is trained, you can test it.

In [None]:
from src.parsers import testing_parser_from_dict


testing_params = {
    'learning_rate': 0.00001,
    'batch_size': 128,
    'iterations': int(1e4),
    'size': 512,
    'seq_length_out': 10,
    'horizon_test_step': 25,
    'data_dir': os.path.normpath('./data/h3.6m/dataset'),
    'train_dir': os.path.normpath('./experiments/'),
    'action': 'all',
    'load_model': 10000,
    'log_level': 20,
    'log_file': '',
}

args = testing_parser_from_dict(testing_params)
args

In [None]:
from src.test import test


test(args)

> If you need a custom function to test your model, you can create a new cell with a structure similar to the custom training section.

After testing the model, you can create an animation of the results. This will save all the output frames so we can later create a gif animation.

In [None]:
from src.parsers import animation_parser_from_dict


animation_params = {
    'sample_id': 0,
    'imgs_dir': os.path.normpath('./images/')
}

args = animation_parser_from_dict(animation_params)
args

In [None]:
import matplotlib.pyplot as plt
plt.style.use('ggplot')

from src.animate import animate



animate(args)

Let's create the gif animation.

In [None]:
from src.animate import create_gif


create_gif('./images/', '.', filename='animation.gif')

Congrats, you're done! 🎉

---