# Transformer Training on Pose Data

In [1]:
!nvidia-smi -L

GPU 0: NVIDIA GeForce RTX 4090 (UUID: GPU-59ba7a4d-461d-6c44-7eea-a4200c322183)


In [17]:
import os
import math
import numpy as np
import torch
import torch.nn as nn
import torch.distributions as D
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
from tqdm import tqdm
from sklearn.preprocessing import MinMaxScaler


## Loading the Data

In [3]:
def find_line(lines, prefix):
  for line in lines:
    if line.startswith('Frames:'):
      return line

In [4]:
def read_bvh_file(file_path):
    # Read file contents
    with open(file_path, 'r') as f:
        file_contents = f.read()
        
    # Split file contents by newline characters
    lines = file_contents.split('\n')

    # Find the channel names
    channel_names = []
    joint_name = None
    for line in lines:
      line = line.strip()
      if line.startswith('JOINT') or line.startswith('ROOT'):
        # Joint line looks like this:
        # JOINT Spine2
        joint_data = line.split(' ')
        joint_name = joint_data[1]
      if line.startswith('CHANNELS'):
        # Channels line looks like this:
        # CHANNELS 3 Yrotation Xrotation Zrotation
        channel_data = line.split(' ')
        for channel in channel_data[2:]:
          channel_names.append(f'{joint_name}_{channel}')
    
    # Find the number of frames and the start of the motion data
    num_frames_line = find_line(lines, 'Frames:')
    num_frames = int(num_frames_line.split(' ')[1])
    motion_data_index = lines.index('MOTION') + 3
    header = '\n'.join(lines[:motion_data_index])
    
    # Find the number of channels in the file
    first_frame_data = lines[motion_data_index].strip().split(' ')
    num_channels = len(first_frame_data)
    print('Channels:', num_channels)

    # Extract the motion data as a string
    motion_data_str = ''.join(lines[motion_data_index:motion_data_index+num_frames])
    
    # Convert the motion data to a numpy array
    motion_data = np.fromstring(motion_data_str, sep=' ')
    motion_data = motion_data.reshape((num_frames, -1))
    
    # Convert the numpy array to a PyTorch tensor
    motion_tensor = torch.tensor(motion_data, dtype=torch.float32)
    
    return motion_tensor, header, channel_names

In [5]:
raw_data, header, channel_names = read_bvh_file('train_data/flute2.bvh')
print(raw_data.shape)
#print(channel_names)

Channels: 183
torch.Size([19298, 183])


## Building a PyTorch Dataset

In [6]:
class BVHDataset(Dataset):
    def __init__(self, file_path, input_size, output_size, seq_length, future_delta):
        self.file_path = file_path
        self.input_size = input_size
        self.output_size = output_size
        self.seq_length = seq_length
        self.future_delta = future_delta

        # Read BVH file
        self.motion_tensor, self.header, self.channel_names = read_bvh_file(file_path)

        # Compute the total number of sequences in the file
        self.total_sequences = len(self.motion_tensor) - self.seq_length - self.future_delta

        # Compute input_mean and input_std
        self.input_mean = torch.mean(self.motion_tensor, dim=(0,))
        self.input_std = torch.std(self.motion_tensor, dim=(0,))
        self.input_std = torch.where(self.input_std == 0, torch.tensor(1e-7), self.input_std)

    def __len__(self):
        return self.total_sequences

    def __getitem__(self, idx):
        # Compute the sequence index for the given index
        seq_idx = idx + self.seq_length

        # Get the sequence of length seq_length as input x
        input_tensor = self.motion_tensor[seq_idx - self.seq_length:seq_idx, :self.input_size]

        # Get the frame future_delta frames into the future as output y
        output_tensor = self.motion_tensor[seq_idx + self.future_delta, :self.output_size]

        # Normalize input and output tensors
        input_tensor = (input_tensor - self.input_mean) / self.input_std
        output_tensor = (output_tensor - self.input_mean[:self.output_size]) / self.input_std[:self.output_size]

        return input_tensor, output_tensor


In [15]:
# Create data loaders

num_channels = raw_data.shape[-1]
input_size = num_channels
output_size = num_channels
seq_length = 100
future_delta = 200
batch_size = 32


dataset = BVHDataset('train_data/flute2.bvh', input_size, output_size, seq_length, future_delta)

# Split the dataset into training and validation sets
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

Channels: 183


## Defining the Model

In [12]:
import torch
import torch.nn as nn

class PoseTransformer(nn.Module):
    def __init__(self, input_dim, model_dim, num_heads, num_layers, output_dim, sequence_length):
        super(PoseTransformer, self).__init__()

        self.model_dim = model_dim
        self.sequence_length = sequence_length

        self.embedding = nn.Linear(input_dim, model_dim)
        self.position_encoding = nn.Parameter(torch.randn(sequence_length, 1, model_dim))

        self.encoder_layer = nn.TransformerEncoderLayer(
            d_model=model_dim,
            nhead=num_heads,
            dim_feedforward=model_dim * 4,
            dropout=0.1
        )
        self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=num_layers)

        self.fc_out = nn.Linear(model_dim, output_dim)

    def forward(self, x):
        # x shape: (batch_size, sequence_length, input_dim)
        x = self.embedding(x) + self.position_encoding
        # x shape: (sequence_length, batch_size, model_dim)
        x = x.permute(1, 0, 2)

        x = self.transformer_encoder(x)
        # x shape: (batch_size, sequence_length, model_dim)
        x = x.permute(1, 0, 2)

        # Use only the last frame for prediction
        x = x[:, -1, :]
        x = self.fc_out(x)

        return x

# Hyperparameters
input_dim = 183
output_dim = 183
model_dim = 512
num_heads = 8
num_layers = 6
sequence_length = 100

# Create the model
model = PoseTransformer(input_dim, model_dim, num_heads, num_layers, output_dim, sequence_length)


In [None]:
# Loss function
criterion = nn.MSELoss()

# Optimizer
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Learning rate scheduler (optional)
scheduler = StepLR(optimizer, step_size=10, gamma=0.9)

# Training parameters
num_epochs = 50
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Move the model to the device
model.to(device)

# Main training loop
for epoch in range(num_epochs):
    # Training
    model.train()
    train_loss = 0
    for batch_idx, (input_seq, target_seq) in enumerate(tqdm(train_loader)):
        input_seq, target_seq = input_seq.to(device), target_seq.to(device)

        optimizer.zero_grad()

        output_seq = model(input_seq)
        loss = criterion(output_seq, target_seq)

        loss.backward()
        optimizer.step()

        train_loss += loss.item()

    train_loss /= len(train_loader)
    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.6f}")

    # Validation
    model.eval()
    val_loss = 0
    with torch.no_grad():
        for batch_idx, (input_seq, target_seq) in enumerate(val_loader):
            input_seq, target_seq = input_seq.to(device), target_seq.to(device)

            output_seq = model(input_seq)
            loss = criterion(output_seq, target_seq)

            val_loss += loss.item()

        val_loss /= len(val_loader)
        print(f"Epoch {epoch+1}/{num_epochs}, Validation Loss: {val_loss:.6f}")

    # Update learning rate
    scheduler.step()
