# Imports and Configuration
This cell imports necessary libraries including PyTorch, NumPy, and TikToken. It also sets up the configuration dictionaries for the SMPL-X input and the GPT model architecture, defines hyperparameters like batch size and learning rate, and initializes the tokenizer and computation device.

In [None]:
import math
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
import tiktoken
import h5py
from tqdm import tqdm
import matplotlib.pyplot as plt

cfg_smplx = {
    "input_dim": 66,
    "padded_dim": 72,
    "n_heads": 6,
    "n_layers": 10,
    "drop_rate": 0.1,
    "qkv_bias": False
}

GPT_CONFIG_124M = {
    "vocab_size": 100258,
    "context_length": 1024,
    "emb_dim": 768,
    "n_heads": 12,
    "n_layers": 8,
    "drop_rate": 0.1,
    "qkv_bias": False
}

BATCH_SIZE = 16
LEARNING_RATE = 3e-4
NUM_EPOCHS = 10
MAX_SEQ_LEN = 300

BOS_TOKEN = 100256
EOS_TOKEN = 100257

cfg = GPT_CONFIG_124M
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Running on: {device}")

tokenizer = tiktoken.get_encoding('cl100k_base')

# Robust Neural Network Modules 
Here we define helper classes and functions to improve model stability and performance. This includes RobustRMSNorm for normalization, SwiGLU for the feed-forward activation, and functions to apply Rotary Positional Embeddings (RoPE) and Stochastic Depth (DropPath).

In [None]:
class RobustRMSNorm(nn.Module):
    def __init__(self, dim, eps=1e-5):
        super().__init__()
        self.eps = eps
        self.weight = nn.Parameter(torch.ones(dim))

    def forward(self, x):
        x_fp32 = x.float()
        var = x_fp32.pow(2).mean(-1, keepdim=True)
        x_norm = x_fp32 * torch.rsqrt(var + self.eps)
        return self.weight * x_norm.type_as(x)


class SwiGLU(nn.Module):
    def __init__(self, dim, hidden_dim, drop_rate=0.1):
        super().__init__()
        self.w1 = nn.Linear(dim, hidden_dim, bias=False)
        self.w2 = nn.Linear(dim, hidden_dim, bias=False)
        self.w3 = nn.Linear(hidden_dim, dim, bias=False)
        self.dropout = nn.Dropout(drop_rate)

    def forward(self, x):
        return self.dropout(self.w3(F.silu(self.w1(x)) * self.w2(x)))

def apply_rotary_emb(x, freqs_cis):
    x_float = x.float()
    x_real, x_imag = x_float.reshape(*x.shape[:-1], -1, 2).unbind(-1)
    freqs_cos, freqs_sin = freqs_cis.unbind(-1)
    freqs_cos = freqs_cos.unsqueeze(0).unsqueeze(2)
    freqs_sin = freqs_sin.unsqueeze(0).unsqueeze(2)
    x_out_real = x_real * freqs_cos - x_imag * freqs_sin
    x_out_imag = x_real * freqs_sin + x_imag * freqs_cos
    x_out = torch.stack([x_out_real, x_out_imag], dim=-1).flatten(3)
    return x_out.type_as(x)


def precompute_freqs_cis(dim, end, theta=10000.0):
    freqs = 1.0 / (theta ** (torch.arange(0, dim, 2)[: (dim // 2)].float() / dim))
    t = torch.arange(end, device=freqs.device)
    freqs = torch.outer(t, freqs).float()
    return torch.stack([torch.cos(freqs), torch.sin(freqs)], dim=-1)


class DropPath(nn.Module):
    def __init__(self, drop_prob=0.0):
        super().__init__()
        self.drop_prob = drop_prob

    def forward(self, x):
        if self.drop_prob == 0.0 or not self.training:
            return x
        keep_prob = 1 - self.drop_prob
        shape = (x.shape[0],) + (1,) * (x.ndim - 1)
        random_tensor = keep_prob + torch.rand(shape, dtype=x.dtype, device=x.device)
        random_tensor.floor_()
        return x.div(keep_prob) * random_tensor

# Attention Mechanisms
This section defines the attention layers. SafeAdvancedSelfAttention handles standard attention with support for RoPE and causal masking. SafeAdvancedCrossAttention allows the decoder to attend to the encoder's output, integrating motion information into the text generation process.

In [None]:
class SafeAdvancedSelfAttention(nn.Module):
    def __init__(self, dim, n_heads, drop_rate=0.1, use_rope=True):
        super().__init__()
        self.n_heads = n_heads
        self.head_dim = dim // n_heads
        self.scale = 1.0 / math.sqrt(self.head_dim)
        
        self.W_query = nn.Linear(dim, dim, bias=False)
        self.W_key = nn.Linear(dim, dim, bias=False)
        self.W_value = nn.Linear(dim, dim, bias=False)
        self.W_out = nn.Linear(dim, dim, bias=False)
        
        self.q_norm = RobustRMSNorm(self.head_dim)
        self.k_norm = RobustRMSNorm(self.head_dim)
        self.dropout = nn.Dropout(drop_rate)
        self.use_rope = use_rope

    def forward(self, x, freqs_cis=None, causal_mask=None, padding_mask=None):
        b, seq, _ = x.shape
        q = self.W_query(x).view(b, seq, self.n_heads, self.head_dim)
        k = self.W_key(x).view(b, seq, self.n_heads, self.head_dim)
        v = self.W_value(x).view(b, seq, self.n_heads, self.head_dim)
        
        q = self.q_norm(q)
        k = self.k_norm(k)
        
        if self.use_rope and freqs_cis is not None:
            freqs_cis_curr = freqs_cis[:seq]
            q = apply_rotary_emb(q, freqs_cis_curr)
            k = apply_rotary_emb(k, freqs_cis_curr)
            
        q, k, v = q.transpose(1, 2), k.transpose(1, 2), v.transpose(1, 2)
        
        attn_scores = (q.float() @ k.float().transpose(-2, -1)) * self.scale
        
        if causal_mask is not None:
            attn_scores = attn_scores.masked_fill(causal_mask[:seq, :seq] == 0, float('-inf'))
        
        if padding_mask is not None:
            padding_mask_expanded = padding_mask.unsqueeze(1).unsqueeze(2)
            attn_scores = attn_scores.masked_fill(padding_mask_expanded == 0, float('-inf'))
        
        attn_scores = torch.clamp(attn_scores, min=-1000, max=1000)
        attn_probs = torch.softmax(attn_scores, dim=-1).type_as(v)
        attn_probs = self.dropout(attn_probs)
        
        out = (attn_probs @ v).transpose(1, 2).contiguous().view(b, seq, -1)
        return self.W_out(out)


class SafeAdvancedCrossAttention(nn.Module):
    def __init__(self, dim, enc_dim, n_heads, drop_rate=0.1):
        super().__init__()
        self.n_heads = n_heads
        self.head_dim = dim // n_heads
        self.scale = 1.0 / math.sqrt(self.head_dim)
        
        self.W_q = nn.Linear(dim, dim, bias=False)
        self.W_k = nn.Linear(enc_dim, dim, bias=False)
        self.W_v = nn.Linear(enc_dim, dim, bias=False)
        self.W_out = nn.Linear(dim, dim, bias=False)
        
        self.q_norm = RobustRMSNorm(self.head_dim)
        self.k_norm = RobustRMSNorm(self.head_dim)
        self.dropout = nn.Dropout(drop_rate)

    def forward(self, x, x_enc, enc_mask=None):
        b, seq, _ = x.shape
        enc_seq = x_enc.shape[1]
        
        q = self.W_q(x).view(b, seq, self.n_heads, self.head_dim)
        k = self.W_k(x_enc).view(b, enc_seq, self.n_heads, self.head_dim)
        v = self.W_v(x_enc).view(b, enc_seq, self.n_heads, self.head_dim)
        
        q = self.q_norm(q)
        k = self.k_norm(k)
        
        q, k, v = q.transpose(1, 2), k.transpose(1, 2), v.transpose(1, 2)
        
        attn_scores = (q.float() @ k.float().transpose(-2, -1)) * self.scale
        
        if enc_mask is not None:
            enc_mask_expanded = enc_mask.unsqueeze(1).unsqueeze(2)
            attn_scores = attn_scores.masked_fill(enc_mask_expanded == 0, float('-inf'))
        
        attn_scores = torch.clamp(attn_scores, min=-1000, max=1000)
        attn_probs = torch.softmax(attn_scores, dim=-1).type_as(v)
        
        out = (self.dropout(attn_probs) @ v).transpose(1, 2).contiguous().view(b, seq, -1)
        return self.W_out(out)

# Transformer Blocks
This cell defines the EncoderBlock and DecoderBlock. The encoder block consists of self-attention and a feed-forward network. The decoder block adds a cross-attention layer between the self-attention and feed-forward layers to process the encoded motion features.

In [None]:
class EncoderBlock(nn.Module):
    def __init__(self, dim, n_heads, drop_rate=0.1, drop_path=0.0):
        super().__init__()
        self.norm1 = RobustRMSNorm(dim)
        self.attn = SafeAdvancedSelfAttention(dim, n_heads, drop_rate, use_rope=True)
        self.drop_path = DropPath(drop_path)
        self.norm2 = RobustRMSNorm(dim)
        self.ffn = SwiGLU(dim, 4*dim, drop_rate)

    def forward(self, x, freqs_cis=None, padding_mask=None):
        x = x + self.drop_path(self.attn(self.norm1(x), freqs_cis, causal_mask=None, padding_mask=padding_mask))
        x = x + self.drop_path(self.ffn(self.norm2(x)))
        return x


class DecoderBlock(nn.Module):
    def __init__(self, dim, enc_dim, n_heads, drop_rate=0.1, drop_path=0.0):
        super().__init__()
        self.norm1 = RobustRMSNorm(dim)
        self.self_attn = SafeAdvancedSelfAttention(dim, n_heads, drop_rate, use_rope=True)
        self.drop_path = DropPath(drop_path)
        self.norm2 = RobustRMSNorm(dim)
        self.cross_attn = SafeAdvancedCrossAttention(dim, enc_dim, n_heads, drop_rate)
        self.norm3 = RobustRMSNorm(dim)
        self.ffn = SwiGLU(dim, 4*dim, drop_rate)

    def forward(self, x, x_enc, freqs_cis, causal_mask, enc_mask=None):
        x = x + self.drop_path(self.self_attn(self.norm1(x), freqs_cis, causal_mask=causal_mask))
        x = x + self.drop_path(self.cross_attn(self.norm2(x), x_enc, enc_mask=enc_mask))
        x = x + self.drop_path(self.ffn(self.norm3(x)))
        return x

# Motion Encoder 
This class projects the raw SMPL-X motion data into the model dimension and processes it through a stack of encoder blocks. It also handles the precomputation of rotary embeddings for the encoder.

In [None]:
class AdvancedEncoder(nn.Module):
    def __init__(self, cfg_smplx):
        super().__init__()
        self.input_dim = cfg_smplx['input_dim']
        self.model_dim = cfg_smplx['padded_dim']
        
        self.input_proj = nn.Sequential(
            nn.Linear(self.input_dim, self.model_dim),
            nn.LayerNorm(self.model_dim),
            nn.GELU(),
            nn.Dropout(cfg_smplx['drop_rate'])
        )
        
        head_dim = self.model_dim // cfg_smplx['n_heads']
        freqs = precompute_freqs_cis(head_dim, 4096)
        self.register_buffer('freqs_cis', freqs)
        
        dpr = torch.linspace(0, 0.1, cfg_smplx['n_layers']).tolist()
        self.blocks = nn.ModuleList([
            EncoderBlock(self.model_dim, cfg_smplx['n_heads'], cfg_smplx['drop_rate'], dpr[i])
            for i in range(cfg_smplx['n_layers'])
        ])
        self.norm = RobustRMSNorm(self.model_dim)

    def forward(self, x, padding_mask=None):
        pad_amt = self.model_dim - x.shape[-1]
        x = F.pad(x, (0, pad_amt))
        
        for block in self.blocks:
            x = block(x, self.freqs_cis, padding_mask)
        return self.norm(x)

# GPT Model 
The main model class. It initializes the Motion Encoder and the Text Decoder. It handles the full forward pass, projecting text tokens, applying position embeddings, and running the decoder blocks which attend to the encoded motion features.

In [None]:
class GPTModel(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        self.encoder = AdvancedEncoder(cfg_smplx)
        self.tok_emb = nn.Embedding(cfg['vocab_size'], cfg['emb_dim'])
        self.drop = nn.Dropout(cfg['drop_rate'])
        
        head_dim = cfg['emb_dim'] // cfg['n_heads']
        freqs = precompute_freqs_cis(head_dim, cfg['context_length'])
        self.register_buffer('freqs_cis', freqs)
        self.register_buffer('causal_mask', torch.tril(torch.ones(cfg['context_length'], cfg['context_length'])))
        
        dpr = torch.linspace(0, 0.1, cfg['n_layers']).tolist()
        self.blocks = nn.ModuleList([
            DecoderBlock(cfg['emb_dim'], cfg_smplx['padded_dim'], cfg['n_heads'], cfg['drop_rate'], dpr[i])
            for i in range(cfg['n_layers'])
        ])
        
        self.norm = RobustRMSNorm(cfg['emb_dim'])
        self.head = nn.Linear(cfg['emb_dim'], cfg['vocab_size'], bias=False)
        self.apply(self._init_weights)

    def _init_weights(self, m):
        if isinstance(m, nn.Linear):
            torch.nn.init.normal_(m.weight, std=0.02)
        elif isinstance(m, nn.Embedding):
            torch.nn.init.normal_(m.weight, std=0.02)

    def configure_optimizers(self, weight_decay, learning_rate):
        param_dict = {pn: p for pn, p in self.named_parameters() if p.requires_grad}
        decay = [p for n, p in param_dict.items() if p.dim() >= 2]
        no_decay = [p for n, p in param_dict.items() if p.dim() < 2]
        optim_groups = [
            {'params': decay, 'weight_decay': weight_decay},
            {'params': no_decay, 'weight_decay': 0.0}
        ]
        return torch.optim.AdamW(optim_groups, lr=learning_rate)

    def forward(self, tok_id, smplx_seq, enc_mask=None):
        enc_out = self.encoder(smplx_seq, enc_mask)
        x = self.tok_emb(tok_id)
        x = self.drop(x)
        
        for block in self.blocks:
            x = block(x, enc_out, self.freqs_cis, self.causal_mask, enc_mask)
        
        x = self.norm(x)
        return self.head(x)

# Data Loading
Helper A function to read HDF5 files containing motion sequences and text descriptions. It processes the raw text data to remove artifacts and ensures it is ready for tokenization.

In [None]:
def load_hdf5_data_into_memory(h5_file_path):
    sequences = []
    feedbacks = []
    
    print(f"Loading {h5_file_path} into memory...")
    try:
        with h5py.File(h5_file_path, "r") as h5f:
            if "sequences" not in h5f or "feedbacks" not in h5f:
                print(f"Skipping {h5_file_path} (keys missing)")
                return [], []
            
            seq_ds = h5f["sequences"]
            feedback_ds = h5f["feedbacks"]
            
            total = len(seq_ds)
            for i in range(total):
                seq_flat = np.array(seq_ds[i], dtype=np.float32)
                seq = seq_flat.reshape(-1, 66)
                sequences.append(seq)
                
                raw_fb = feedback_ds[i]
                if hasattr(raw_fb, 'decode'):
                    raw_fb = raw_fb.decode('utf-8')
                else:
                    raw_fb = str(raw_fb)
                
                raw_fb = raw_fb.replace('.,', '. ')
                raw_fb = raw_fb.replace("'", '')
                raw_fb = raw_fb.strip()
                
                feedbacks.append(raw_fb)
                
    except Exception as e:
        print(f"Error loading {h5_file_path}: {e}")
        return [], []

    return sequences, feedbacks

# Dataset Class
A custom PyTorch Dataset that prepares the data for training. It handles random cropping of motion sequences and tokenizes the text, adding the required BOS and EOS tokens.

In [None]:
class InMemoryMotionDataset(Dataset):
    def __init__(self, sequences, feedbacks, tokenizer, split='train', max_len=300):
        self.sequences = sequences
        self.feedbacks = feedbacks
        self.tokenizer = tokenizer
        self.split = split
        self.max_len = max_len
        self.BOS = BOS_TOKEN
        self.EOS = EOS_TOKEN

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        seq_data = self.sequences[idx].copy()
        
        fb_options = self.feedbacks[idx]
        if isinstance(fb_options, str):
            fb_options = [fb_options]

        if self.split == 'train':
            text_str = np.random.choice(fb_options)
        else:
            text_str = fb_options[0]
        text_str = str(text_str).strip()

        num_frames = seq_data.shape[0]
        if num_frames > self.max_len:
            start_f = np.random.randint(0, num_frames - self.max_len)
            seq_data = seq_data[start_f : start_f + self.max_len]
        
        smplx_tensor = torch.from_numpy(seq_data).float()

        token_ids = self.tokenizer.encode(text_str)
        token_ids = [self.BOS] + token_ids + [self.EOS]
        text_tensor = torch.tensor(token_ids, dtype=torch.long)

        return smplx_tensor, text_tensor

# Collate Function 
This function prepares batches of data. It pads the motion sequences and text inputs to the same length within a batch and generates padding masks for the motion data.

In [None]:
def collate_fn_sequence_batch(batch):
    smplx_list = []
    text_input_list = []
    text_target_list = []
    motion_lengths = []
    
    for smplx, text in batch:
        smplx_list.append(smplx)
        motion_lengths.append(len(smplx))
        
        text_input_list.append(text[:-1])
        text_target_list.append(text[1:])

    smplx_batch = pad_sequence(smplx_list, batch_first=True, padding_value=0.0)
    
    max_motion_len = smplx_batch.shape[1]
    motion_mask = torch.zeros(len(batch), max_motion_len)
    for i, length in enumerate(motion_lengths):
        motion_mask[i, :length] = 1.0
    
    inputs_batch = pad_sequence(text_input_list, batch_first=True, padding_value=0)
    targets_batch = pad_sequence(text_target_list, batch_first=True, padding_value=-100)
    
    return inputs_batch, targets_batch, smplx_batch, motion_mask

# Training Loop
The train_model function manages the training process. It loads data, splits it into train/val/test sets, runs the training epochs using mixed precision, calculates validation/test metrics, and keeps track of the loss history.

In [None]:
def train_model(h5_files):
    history = {
        'train_loss': [],
        'val_loss': [],
        'test_loss': []
    }
    
    all_seqs = []
    all_fbs = []
    for f in h5_files:
        s, fb = load_hdf5_data_into_memory(f)
        all_seqs.extend(s)
        all_fbs.extend(fb)
    
    n = len(all_seqs)
    idx1 = int(0.8 * n)
    idx2 = int(0.9 * n)
    
    train_seqs, train_fbs = all_seqs[:idx1], all_fbs[:idx1]
    val_seqs, val_fbs = all_seqs[idx1:idx2], all_fbs[idx1:idx2]
    test_seqs, test_fbs = all_seqs[idx2:], all_fbs[idx2:]
    
    print(f"Train: {len(train_seqs)}, Val: {len(val_seqs)}, Test: {len(test_seqs)}")
    
    train_ds = InMemoryMotionDataset(train_seqs, train_fbs, tokenizer, 'train', MAX_SEQ_LEN)
    val_ds = InMemoryMotionDataset(val_seqs, val_fbs, tokenizer, 'val', MAX_SEQ_LEN)
    test_ds = InMemoryMotionDataset(test_seqs, test_fbs, tokenizer, 'test', MAX_SEQ_LEN)
    
    train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True, 
                              collate_fn=collate_fn_sequence_batch, drop_last=True)
    val_loader = DataLoader(val_ds, batch_size=BATCH_SIZE, shuffle=False, 
                            collate_fn=collate_fn_sequence_batch)
    test_loader = DataLoader(test_ds, batch_size=BATCH_SIZE, shuffle=False, 
                             collate_fn=collate_fn_sequence_batch)
    
    model = GPTModel(cfg).to(device)
    optimizer = model.configure_optimizers(weight_decay=0.1, learning_rate=LEARNING_RATE)
    scheduler = torch.optim.lr_scheduler.OneCycleLR(
        optimizer, 
        max_lr=LEARNING_RATE, 
        steps_per_epoch=len(train_loader), 
        epochs=NUM_EPOCHS,
        pct_start=0.05
    )
    
    scaler = torch.cuda.amp.GradScaler()
    print(f"\nStarting Training...")
    
    for epoch in range(NUM_EPOCHS):
        model.train()
        running_loss = 0.0
        batch_count = 0
        
        pbar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{NUM_EPOCHS}")
        
        for inputs, target, smplx, motion_mask in pbar:
            inputs = inputs.to(device)
            target = target.to(device)
            smplx = smplx.to(device)
            motion_mask = motion_mask.to(device)
            
            optimizer.zero_grad()
            
            with torch.cuda.amp.autocast():
                logits = model(inputs, smplx, enc_mask=motion_mask)
                loss = F.cross_entropy(logits.flatten(0, 1), target.flatten())
            
            scaler.scale(loss).backward()
            scaler.unscale_(optimizer)
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            scaler.step(optimizer)
            scaler.update()
            scheduler.step()
            
            running_loss += loss.item()
            batch_count += 1
            pbar.set_postfix({"Loss": f"{loss.item():.4f}", "LR": f"{scheduler.get_last_lr()[0]:.6f}"})
        avg_train_loss = running_loss / batch_count
        
        model.eval()
        val_loss = 0.0
        val_steps = 0
        with torch.no_grad():
            for inputs, target, smplx, motion_mask in val_loader:
                inputs = inputs.to(device)
                target = target.to(device)
                smplx = smplx.to(device)
                motion_mask = motion_mask.to(device)
                
                with torch.cuda.amp.autocast():
                    logits = model(inputs, smplx, enc_mask=motion_mask)
                    loss = F.cross_entropy(logits.flatten(0, 1), target.flatten())
                val_loss += loss.item()
                val_steps += 1
        avg_val = val_loss / val_steps if val_steps > 0 else 0
        
        test_loss = 0.0
        test_steps = 0
        with torch.no_grad():
            for inputs, target, smplx, motion_mask in test_loader:
                inputs = inputs.to(device)
                target = target.to(device)
                smplx = smplx.to(device)
                motion_mask = motion_mask.to(device)
                
                with torch.cuda.amp.autocast():
                    logits = model(inputs, smplx, enc_mask=motion_mask)
                    loss = F.cross_entropy(logits.flatten(0, 1), target.flatten())
                test_loss += loss.item()
                test_steps += 1
        avg_test = test_loss / test_steps if test_steps > 0 else 0
        
        history['train_loss'].append(avg_train_loss)
        history['val_loss'].append(avg_val)
        history['test_loss'].append(avg_test)
        
        print(f"Ep {epoch+1} | Train: {avg_train_loss:.4f} | Val: {avg_val:.4f} | Test: {avg_test:.4f}")

    torch.save({
        'model': model.state_dict(), 
        'config': cfg,
        'cfg_smplx': cfg_smplx,
        'history': history
    }, "advanced_model_fixed.pt")
    print("Model Saved!")
    
    return model, test_ds, history

# Overfitting Test
This block performs a sanity check by attempting to overfit the model on a single small batch of data. If the model is implemented correctly, the loss should drop towards zero.

In [None]:
print("="*50)
print("OVERFIT TEST")
print("="*50)

model = GPTModel(cfg).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3)

h5_files = [r"C:\Users\Zohaib\Documents\rehman_dataset_h5py\motion_dataset.h5"]
seqs, fbs = load_hdf5_data_into_memory(h5_files[0])
small_ds = InMemoryMotionDataset(seqs[:100], fbs[:100], tokenizer, 'train', MAX_SEQ_LEN)
small_loader = DataLoader(small_ds, batch_size=8, shuffle=True, collate_fn=collate_fn_sequence_batch)

inputs, target, smplx, motion_mask = next(iter(small_loader))
inputs, target = inputs.to(device), target.to(device)
smplx, motion_mask = smplx.to(device), motion_mask.to(device)

print(f"Input shape: {inputs.shape}")
print(f"Target shape: {target.shape}")
print(f"Motion shape: {smplx.shape}")
print(f"First input tokens: {inputs[0, :5].tolist()}")
print()

losses = []
for i in range(100):
    optimizer.zero_grad()
    logits = model(inputs, smplx, enc_mask=motion_mask)
    loss = F.cross_entropy(logits.flatten(0, 1), target.flatten())
    loss.backward()
    optimizer.step()
    losses.append(loss.item())
    if i % 10 == 0:
        print(f"Step {i:3d}: Loss = {loss.item():.4f}")

print()
if losses[-1] < 0.5:
    print("✅ SUCCESS: Model can learn! Proceed with full training.")
else:
    print("❌ FAILED: Model cannot overfit. There's a bug in the architecture.")

# Text Generation Function
This function uses the trained model to generate text descriptions for new motion inputs. It implements autoregressive decoding, starting with the BOS token and stopping when the EOS token is generated.

In [None]:
def generate_text(model, smplx_input, tokenizer, max_new_tokens=50, temperature=1.0, top_k=None):
    model.eval()
    device = next(model.parameters()).device
    
    if smplx_input.dim() == 2:
        smplx_input = smplx_input.unsqueeze(0)
    smplx_input = smplx_input.to(device)
    
    motion_mask = torch.ones(1, smplx_input.shape[1], device=device)

    idx = torch.tensor([[BOS_TOKEN]], dtype=torch.long, device=device)

    for _ in range(max_new_tokens):
        idx_cond = idx[:, -1024:]
        
        with torch.no_grad():
            logits = model(idx_cond, smplx_input, enc_mask=motion_mask)
        
        logits = logits[:, -1, :] / temperature

        if top_k is not None:
            v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
            logits[logits < v[:, [-1]]] = -float('Inf')

        probs = F.softmax(logits, dim=-1)
        
        if temperature > 0:
            idx_next = torch.multinomial(probs, num_samples=1)
        else:
            idx_next = torch.argmax(probs, dim=-1, keepdim=True)
        
        idx = torch.cat((idx, idx_next), dim=1)

        if idx_next.item() == EOS_TOKEN:
            break

    generated_ids = idx[0].tolist()[1:]
    if EOS_TOKEN in generated_ids:
        generated_ids = generated_ids[:generated_ids.index(EOS_TOKEN)]
    
    generated_text = tokenizer.decode(generated_ids)
    return generated_text

# Visualization Utility
This function takes the training history dictionary and plots the training, validation, and test loss curves to visualize the model's learning progress.

In [None]:
def plot_training_history(history, save_path=None):
    epochs = range(1, len(history['train_loss']) + 1)
    
    plt.figure(figsize=(10, 6))
    
    plt.plot(epochs, history['train_loss'], 'b-o', label='Train Loss', linewidth=2, markersize=6)
    plt.plot(epochs, history['val_loss'], 'g-s', label='Validation Loss', linewidth=2, markersize=6)
    plt.plot(epochs, history['test_loss'], 'r-^', label='Test Loss', linewidth=2, markersize=6)
    
    plt.xlabel('Epoch', fontsize=12)
    plt.ylabel('Cross-Entropy Loss', fontsize=12)
    plt.title('Training Progress: Motion-to-Text Model', fontsize=14, fontweight='bold')
    plt.legend(loc='upper right', fontsize=11)
    plt.grid(True, alpha=0.3)
    
    plt.xticks(epochs)
    
    min_val_epoch = history['val_loss'].index(min(history['val_loss'])) + 1
    min_val_loss = min(history['val_loss'])
    plt.annotate(f'Best Val: {min_val_loss:.4f}', 
                 xy=(min_val_epoch, min_val_loss),
                 xytext=(min_val_epoch + 0.5, min_val_loss + 0.05),
                 fontsize=10, color='green',
                 arrowprops=dict(arrowstyle='->', color='green', lw=1))
    
    plt.tight_layout()
    
    if save_path:
        plt.savefig(save_path, dpi=150, bbox_inches='tight')
        print(f"Plot saved to: {save_path}")
    
    plt.show()

# Main Execution
Training This is the main entry point for the script. It defines the dataset paths, initiates the training process, and then plots the results.

In [None]:
h5_files = [
        r"C:\Users\Zohaib\Documents\rehman_dataset_h5py\motion_dataset.h5",
        r"C:\Users\Zohaib\Documents\rehman_dataset_h5py\motion_dataset_02.h5",
        r"C:\Users\Zohaib\Documents\rehman_dataset_h5py\motion_dataset_03.h5"
    ]
    
model, test_ds, history = train_model(h5_files)
plot_training_history(history, save_path="training_loss_plot.png")

# Loading a Saved Model
This cell provides code to reload the model architecture and state dictionary from a saved checkpoint file for future use.

In [None]:
model = GPTModel(cfg).to(device)

model.load_state_dict(torch.load('advanced_model_fixed.pt', weights_only=True))

model.eval()