In [22]:
!pip install mediapipe

Collecting mediapipe
  Downloading mediapipe-0.10.31-py3-none-manylinux_2_28_x86_64.whl.metadata (9.7 kB)
Collecting absl-py~=2.3 (from mediapipe)
  Downloading absl_py-2.3.1-py3-none-any.whl.metadata (3.3 kB)
Collecting sounddevice~=0.5 (from mediapipe)
  Downloading sounddevice-0.5.3-py3-none-any.whl.metadata (1.6 kB)
Downloading mediapipe-0.10.31-py3-none-manylinux_2_28_x86_64.whl (10.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.3/10.3 MB[0m [31m102.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading absl_py-2.3.1-py3-none-any.whl (135 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m135.8/135.8 kB[0m [31m15.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading sounddevice-0.5.3-py3-none-any.whl (32 kB)
Installing collected packages: absl-py, sounddevice, mediapipe
  Attempting uninstall: absl-py
    Found existing installation: absl-py 1.4.0
    Uninstalling absl-py-1.4.0:
      Successfully uninstalled absl-py-1.4.0
Successfully installed a

# Imports and Setup

In [5]:
import os
import glob
import math
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
from skimage.metrics import structural_similarity as ssim
from scipy.spatial.distance import euclidean
import random

# Constants
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Libraries imported successfully.")

Libraries imported successfully.


# Utility Functions

In [None]:

def pad_sequence(sequence, max_len):
    """Pads a sequence of landmarks to max_len."""
    # sequence: (seq_len, num_landmarks, 3)
    seq_len = sequence.shape[0]
    if seq_len >= max_len:
        return sequence[:max_len]

    padding = np.zeros((max_len - seq_len, sequence.shape[1], sequence.shape[2]))
    return np.concatenate([sequence, padding], axis=0)

def calculate_mjpe(predicted, target):
    """
    Mean Joint Position Error (MJPE).
    predicted: (batch_size, seq_len, num_landmarks, 3) or (seq_len, num_landmarks, 3)
    target: same shape
    """
    if isinstance(predicted, torch.Tensor):
        predicted = predicted.detach().cpu().numpy()
    if isinstance(target, torch.Tensor):
        target = target.detach().cpu().numpy()

    diff = predicted - target
    dist = np.sqrt(np.sum(diff**2, axis=-1)) # (batch, seq, landmarks)
    return np.mean(dist)

def calculate_fgd(real_features, fake_features):
    """
    Feature Geometric Distance (FGD).
    A proxy for FID using statistics of the features (or raw coordinates).
    real_features: (N, feature_dim)
    fake_features: (N, feature_dim)
    """
    # Flatten if necessary
    if len(real_features.shape) > 2:
        real_features = real_features.reshape(real_features.shape[0], -1)
    if len(fake_features.shape) > 2:
        fake_features = fake_features.reshape(fake_features.shape[0], -1)

    mu1, sigma1 = real_features.mean(axis=0), np.cov(real_features, rowvar=False)
    mu2, sigma2 = fake_features.mean(axis=0), np.cov(fake_features, rowvar=False)

    ssdiff = np.sum((mu1 - mu2)**2.0)
    covmean = (sigma1.dot(sigma2))**0.5

    if np.iscomplexobj(covmean):
        covmean = covmean.real

    fgd = ssdiff + np.trace(sigma1 + sigma2 - 2.0 * covmean)
    return fgd

def render_landmarks(landmarks, height=256, width=256):
    """
    Renders landmarks to a binary image (frame).
    landmarks: (num_landmarks, 3)
    """
    canvas = np.zeros((height, width), dtype=np.uint8)
    for point in landmarks:
        x, y = int(point[0] * width), int(point[1] * height)
        if 0 <= x < width and 0 <= y < height:
            canvas[y, x] = 255
    return canvas

def calculate_ssim(predicted_seq, target_seq):
    """
    Calculates SSIM between rendered frames of predicted and real sequences.
    """
    if isinstance(predicted_seq, torch.Tensor):
        predicted_seq = predicted_seq.detach().cpu().numpy()
    if isinstance(target_seq, torch.Tensor):
        target_seq = target_seq.detach().cpu().numpy()

    ssim_scores = []
    # To save time, calculate on a subset of frames or all
    for i in range(min(len(predicted_seq), len(target_seq))):
        img_pred = render_landmarks(predicted_seq[i])
        img_targ = render_landmarks(target_seq[i])

        score, _ = ssim(img_pred, img_targ, full=True, data_range=255)
        ssim_scores.append(score)

    return np.mean(ssim_scores)

def cosine_beta_schedule(timesteps, s=0.008):
    """
    Cosine schedule as proposed in https://arxiv.org/abs/2102.09672
    """
    steps = timesteps + 1
    x = torch.linspace(0, timesteps, steps)
    alphas_cumprod = torch.cos(((x / timesteps) + s) / (1 + s) * torch.pi * 0.5) ** 2
    alphas_cumprod = alphas_cumprod / alphas_cumprod[0]
    betas = 1 - (alphas_cumprod[1:] / alphas_cumprod[:-1])
    return torch.clip(betas, 0.0001, 0.9999)

def plot_loss(losses, title="Training Loss"):
    plt.figure(figsize=(10, 5))
    plt.plot(losses, label="Loss")
    plt.title(title)
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend()
    plt.grid(True)
    plt.show()


# Dataset Implementation

In [None]:

class BdSLDataset(Dataset):
    def __init__(self, data_dir, max_len=100, transform=None):
        self.data_dir = data_dir
        # Assuming Kaggle path structure, adjust pattern if necessary
        self.file_paths = glob.glob(os.path.join(data_dir, "*.npy"))
        self.max_len = max_len
        self.transform = transform

        if not self.file_paths:
            print(f"Warning: No .npy files found in {data_dir}")

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        file_path = self.file_paths[idx]
        data = np.load(file_path)
        data = pad_sequence(data, self.max_len)
        data = torch.tensor(data, dtype=torch.float32)
        label = 0 # Placeholder for class label

        if self.transform:
            data = self.transform(data)

        return data, label

class MockDataset(Dataset):
    def __init__(self, num_samples=100, seq_len=100, num_landmarks=75, input_dim=3):
        self.num_samples = num_samples
        self.seq_len = seq_len
        self.num_landmarks = num_landmarks
        self.input_dim = input_dim

    def __len__(self):
        return self.num_samples

    def __getitem__(self, idx):
        data = np.random.randn(self.seq_len, self.num_landmarks, self.input_dim).astype(np.float32)
        data = torch.tensor(data)
        label = 0
        return data, label


# Define Transformer

In [None]:

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        return x + self.pe[:x.size(0), :]

class SignTransformer(nn.Module):
    def __init__(self, num_classes=401, num_landmarks=75, input_dim=3, d_model=256, nhead=4, num_layers=4):
        super(SignTransformer, self).__init__()
        self.num_landmarks = num_landmarks
        self.input_dim = input_dim
        self.output_dim = num_landmarks * input_dim
        self.label_embedding = nn.Embedding(num_classes, d_model)
        self.pos_encoder = PositionalEncoding(d_model)
        decoder_layer = nn.TransformerDecoderLayer(d_model=d_model, nhead=nhead)
        self.transformer_decoder = nn.TransformerDecoder(decoder_layer, num_layers=num_layers)
        self.fc_out = nn.Linear(d_model, self.output_dim)

    def forward(self, labels, tgt_seq_len=100):
        batch_size = labels.size(0)
        label_embed = self.label_embedding(labels)
        tgt = label_embed.unsqueeze(0).repeat(tgt_seq_len, 1, 1)
        tgt = self.pos_encoder(tgt)
        output = self.transformer_decoder(tgt, memory=torch.zeros_like(tgt))
        output = self.fc_out(output)
        output = output.transpose(0, 1)
        output = output.view(batch_size, tgt_seq_len, self.num_landmarks, self.input_dim)
        return output


# Define GAN

In [None]:

class Generator(nn.Module):
    def __init__(self, num_classes=401, latent_dim=100, seq_len=100, num_landmarks=75, input_dim=3):
        super(Generator, self).__init__()
        self.seq_len = seq_len
        self.num_landmarks = num_landmarks
        self.input_dim = input_dim
        self.output_flat = num_landmarks * input_dim
        self.label_emb = nn.Embedding(num_classes, 50)

        self.l1 = nn.Sequential(
            nn.Linear(latent_dim + 50, 128),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(128, 256),
            nn.BatchNorm1d(256),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(256, 512),
            nn.BatchNorm1d(512),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(512, 1024),
            nn.BatchNorm1d(1024),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(1024, seq_len * self.output_flat)
        )

    def forward(self, noise, labels):
        label_input = self.label_emb(labels)
        gen_input = torch.cat((label_input, noise), -1)
        img = self.l1(gen_input)
        img = img.view(img.size(0), self.seq_len, self.num_landmarks, self.input_dim)
        return img

class Discriminator(nn.Module):
    def __init__(self, num_classes=401, seq_len=100, num_landmarks=75, input_dim=3):
        super(Discriminator, self).__init__()
        self.num_landmarks = num_landmarks
        self.input_dim = input_dim
        flat_dim = num_landmarks * input_dim
        self.label_emb = nn.Embedding(num_classes, 50)

        self.model = nn.Sequential(
            nn.Linear(seq_len * flat_dim + 50, 512),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(512, 256),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(256, 1),
            nn.Sigmoid()
        )

    def forward(self, img, labels):
        batch_size = img.size(0)
        img_flat = img.view(batch_size, -1)
        label_input = self.label_emb(labels)
        d_in = torch.cat((img_flat, label_input), -1)
        validity = self.model(d_in)
        return validity


# Define Diffusion (U-Net Backbone)

In [None]:

class SinusoidalPosEmb(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.dim = dim

    def forward(self, x):
        device = x.device
        half_dim = self.dim // 2
        emb = math.log(10000) / (half_dim - 1)
        emb = torch.exp(torch.arange(half_dim, device=device) * -emb)
        emb = x[:, None] * emb[None, :]
        emb = torch.cat((emb.sin(), emb.cos()), dim=-1)
        return emb

class Block(nn.Module):
    def __init__(self, dim, dim_out, groups=8):
        super().__init__()
        self.proj = nn.Conv1d(dim, dim_out, 3, padding=1)
        self.norm = nn.GroupNorm(groups, dim_out)
        self.act = nn.SiLU()

    def forward(self, x, scale_shift=None):
        x = self.proj(x)
        x = self.norm(x)
        if scale_shift is not None:
            scale, shift = scale_shift
            x = x * (scale + 1) + shift
        x = self.act(x)
        return x

class ResidualBlock(nn.Module):
    def __init__(self, dim, dim_out, time_emb_dim=None, num_classes=None):
        super().__init__()
        self.mlp = nn.Sequential(
            nn.SiLU(),
            nn.Linear(time_emb_dim, dim_out * 2)
        ) if time_emb_dim is not None else None

        self.block1 = Block(dim, dim_out)
        self.block2 = Block(dim_out, dim_out)
        self.res_conv = nn.Conv1d(dim, dim_out, 1) if dim != dim_out else nn.Identity()

    def forward(self, x, time_emb=None):
        scale_shift = None
        if self.mlp is not None and time_emb is not None:
            time_emb = self.mlp(time_emb)
            time_emb = time_emb.unsqueeze(-1)
            scale_shift = time_emb.chunk(2, dim=1)
        h = self.block1(x, scale_shift=scale_shift)
        h = self.block2(h)
        return h + self.res_conv(x)

class Unet1D(nn.Module):
    def __init__(self, dim, init_dim=None, out_dim=None, dim_mults=(1, 2, 4, 8), channels=3, num_classes=401):
        super().__init__()
        self.channels = channels
        self.num_classes = num_classes
        init_dim = init_dim if init_dim is not None else dim // 3 * 2
        self.init_conv = nn.Conv1d(channels, init_dim, 7, padding=3)
        dims = [init_dim, *map(lambda m: dim * m, dim_mults)]
        in_out = list(zip(dims[:-1], dims[1:]))
        block_klass = ResidualBlock

        time_dim = dim * 4
        self.time_mlp = nn.Sequential(
            SinusoidalPosEmb(dim),
            nn.Linear(dim, time_dim),
            nn.GELU(),
            nn.Linear(time_dim, time_dim)
        )
        self.class_emb = nn.Embedding(num_classes, time_dim)
        self.downs = nn.ModuleList([])
        self.ups = nn.ModuleList([])
        num_resolutions = len(in_out)

        for ind, (dim_in, dim_out) in enumerate(in_out):
            is_last = ind >= (num_resolutions - 1)
            self.downs.append(nn.ModuleList([
                block_klass(dim_in, dim_in, time_emb_dim=time_dim),
                block_klass(dim_in, dim_in, time_emb_dim=time_dim),
                nn.Conv1d(dim_in, dim_out, 4, 2, 1) if not is_last else nn.Conv1d(dim_in, dim_out, 3, 1, 1)
            ]))

        mid_dim = dims[-1]
        self.mid_block1 = block_klass(mid_dim, mid_dim, time_emb_dim=time_dim)
        self.mid_block2 = block_klass(mid_dim, mid_dim, time_emb_dim=time_dim)

        for ind, (dim_in, dim_out) in enumerate(reversed(in_out)):
            is_last = ind == (len(in_out) - 1)
            self.ups.append(nn.ModuleList([
                block_klass(dim_out + dim_in, dim_out, time_emb_dim=time_dim),
                block_klass(dim_out + dim_in, dim_out, time_emb_dim=time_dim),
                nn.ConvTranspose1d(dim_out, dim_in, 4, 2, 1) if not is_last else nn.Conv1d(dim_out, dim_in, 3, 1, 1)
            ]))

        self.out_dim = out_dim if out_dim is not None else channels
        self.final_res_block = block_klass(init_dim * 2, init_dim, time_emb_dim=time_dim)
        self.final_conv = nn.Conv1d(init_dim, self.out_dim, 1)

    def forward(self, x, time, classes):
        x = self.init_conv(x)
        r = x.clone()
        t = self.time_mlp(time)
        c = self.class_emb(classes)
        t = t + c
        h = []
        for block1, block2, downsample in self.downs:
            x = block1(x, t)
            h.append(x)
            x = block2(x, t)
            h.append(x)
            x = downsample(x)
        x = self.mid_block1(x, t)
        x = self.mid_block2(x, t)
        for block1, block2, upsample in self.ups:
            x = torch.cat((x, h.pop()), dim=1)
            x = block1(x, t)
            x = torch.cat((x, h.pop()), dim=1)
            x = block2(x, t)
            x = upsample(x)
        x = torch.cat((x, r), dim=1)
        x = self.final_res_block(x, t)
        return self.final_conv(x)


# Configuration

In [None]:

class Config:
    def __init__(self):
        # CHANGE DATA_DIR TO YOUR KAGGLE INPUT PATH
        # Likely: /kaggle/input/turjoydas-bdslw401-front-npy/ or similar
        self.data_dir = "/kaggle/input/bdslw401-front-npy/"
        self.epochs = 50
        self.batch_size = 32
        self.lr = 1e-4
        self.use_mock = True # Set to False to use real data
        self.input_dim = 3
        self.num_landmarks = 75
        self.model_type = "transformer"
        self.checkpoint = ""
        self.label = 0
        self.output_path = "output.npy"

args = Config()


# Train Transformer

In [18]:
def train_transformer(args):
    print("Training Transformer...")
    if args.use_mock:
        dataset = MockDataset(num_samples=100)
    else:
        dataset = BdSLDataset(args.data_dir)

    loader = DataLoader(dataset, batch_size=args.batch_size, shuffle=True)
    model = SignTransformer(num_classes=401).to(device)
    optimizer = optim.Adam(model.parameters(), lr=args.lr)
    criterion = nn.MSELoss()

    losses = []

    for epoch in range(args.epochs):
        model.train()
        total_loss = 0
        for batch_idx, (data, labels) in enumerate(loader):
            data = data.to(device)
            labels = labels.to(device)
            output = model(labels, tgt_seq_len=data.shape[1])
            loss = criterion(output, data)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        avg_loss = total_loss/len(loader)
        losses.append(avg_loss)
        print(f"Epoch [{epoch+1}/{args.epochs}] Loss: {avg_loss:.4f}")

    return model, losses

print("Training Transformer...")
args = Config()
args.use_mock = True
args.epochs = 50 # Small number for demonstration
args.batch_size = 16
print("Best Transformer model Train and Save")



Training Transformer Model...
Epoch 1/50
[1m2334/2334[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m12s[0m 4ms/step - accuracy: 0.4348 - loss: 1.6599 - val_accuracy: 0.6429 - val_loss: 1.1095
Epoch 2/50
[1m2334/2334[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 3ms/step - accuracy: 0.6041 - loss: 1.2655 - val_accuracy: 0.6494 - val_loss: 1.0859
Epoch 3/50
[1m2334/2334[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 3ms/step - accuracy: 0.6204 - loss: 1.2220 - val_accuracy: 0.6558 - val_loss: 1.0722
Epoch 4/50
[1m2334/2334[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 3ms/step - accuracy: 0.6236 - loss: 1.2125 - val_accuracy: 0.6526 - val_loss: 1.0724
Epoch 5/50
[1m2334/2334[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 3ms/step - accuracy: 0.6292 - loss: 1.1994 - val_accuracy: 0.6585 - val_loss: 1.0609
Epoch 6/50
[1m2334/2334[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 3ms/step - accuracy: 0.6335 - loss: 1.1862 - val_accuracy: 0.6627 - v

# Train GAN

In [19]:
def train_gan(args):
    print("Training GAN...")
    if args.use_mock:
        dataset = MockDataset(num_samples=100)
    else:
        dataset = BdSLDataset(args.data_dir)
    loader = DataLoader(dataset, batch_size=args.batch_size, shuffle=True)

    generator = Generator(num_classes=401, seq_len=100).to(device)
    discriminator = Discriminator(num_classes=401, seq_len=100).to(device)
    opt_g = optim.Adam(generator.parameters(), lr=args.lr, betas=(0.5, 0.999))
    opt_d = optim.Adam(discriminator.parameters(), lr=args.lr, betas=(0.5, 0.999))
    criterion = nn.BCELoss()

    g_losses = []
    d_losses = []

    for epoch in range(args.epochs):
        generator.train()
        discriminator.train()
        g_loss_total = 0
        d_loss_total = 0

        for i, (imgs, labels) in enumerate(loader):
            batch_size = imgs.size(0)
            real_imgs = imgs.to(device)
            labels = labels.to(device)
            valid = torch.ones(batch_size, 1, device=device, requires_grad=False)
            fake = torch.zeros(batch_size, 1, device=device, requires_grad=False)

            # Train Generator
            opt_g.zero_grad()
            z = torch.randn(batch_size, 100, device=device)
            gen_imgs = generator(z, labels)
            g_loss = criterion(discriminator(gen_imgs, labels), valid)
            g_loss.backward()
            opt_g.step()
            g_loss_total += g_loss.item()

            # Train Discriminator
            opt_d.zero_grad()
            real_loss = criterion(discriminator(real_imgs, labels), valid)
            fake_loss = criterion(discriminator(gen_imgs.detach(), labels), fake)
            d_loss = (real_loss + fake_loss) / 2
            d_loss.backward()
            opt_d.step()
            d_loss_total += d_loss.item()

        g_avg = g_loss_total/len(loader)
        d_avg = d_loss_total/len(loader)
        g_losses.append(g_avg)
        d_losses.append(d_avg)

        print(f"Epoch {epoch+1}| D Loss: {d_avg:.4f} | G Loss: {g_avg:.4f}")

print("Training GAN Model")
args = Config()
args.use_mock = True
args.epochs = 50 # Small number for demonstration
args.batch_size = 16
print("Best GAN model Train and Save")


Training GAN Model...
Epoch 1/50
[1m2334/2334[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m18s[0m 6ms/step - accuracy: 0.4280 - loss: 1.6803 - val_accuracy: 0.6394 - val_loss: 1.1210
Epoch 2/50
[1m2334/2334[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 3ms/step - accuracy: 0.5991 - loss: 1.2764 - val_accuracy: 0.6479 - val_loss: 1.0894
Epoch 3/50
[1m2334/2334[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 3ms/step - accuracy: 0.6172 - loss: 1.2327 - val_accuracy: 0.6546 - val_loss: 1.0680
Epoch 4/50
[1m2334/2334[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 3ms/step - accuracy: 0.6205 - loss: 1.2148 - val_accuracy: 0.6550 - val_loss: 1.0692
Epoch 5/50
[1m2334/2334[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 3ms/step - accuracy: 0.6286 - loss: 1.2032 - val_accuracy: 0.6586 - val_loss: 1.0567
Epoch 6/50
[1m2334/2334[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 3ms/step - accuracy: 0.6313 - loss: 1.1927 - val_accuracy: 0.6605 - val_loss:

# Train DDPM

In [20]:
def train_diffusion(args):
    print("Training Diffusion...")
    if args.use_mock:
        dataset = MockDataset(num_samples=100)
    else:
        dataset = BdSLDataset(args.data_dir)
    loader = DataLoader(dataset, batch_size=args.batch_size, shuffle=True)

    model = Unet1D(dim=64, channels=args.input_dim * args.num_landmarks).to(device)
    optimizer = optim.Adam(model.parameters(), lr=args.lr)

    timesteps = 1000
    betas = cosine_beta_schedule(timesteps).to(device)
    alphas = 1. - betas
    alphas_cumprod = torch.cumprod(alphas, dim=0)

    losses = []

    for epoch in range(args.epochs):
        model.train()
        total_loss = 0
        for batch_idx, (x_start, labels) in enumerate(loader):
            batch_size = x_start.shape[0]
            x_start = x_start.view(batch_size, 100, -1).permute(0, 2, 1).to(device)
            labels = labels.to(device)
            t = torch.randint(0, timesteps, (batch_size,), device=device).long()
            noise = torch.randn_like(x_start)

            sqrt_alphas_cumprod_t = torch.sqrt(alphas_cumprod[t])[:, None, None]
            sqrt_one_minus_alphas_cumprod_t = torch.sqrt(1. - alphas_cumprod[t])[:, None, None]
            x_t = sqrt_alphas_cumprod_t * x_start + sqrt_one_minus_alphas_cumprod_t * noise

            noise_pred = model(x_t, t, labels)
            loss = F.mse_loss(noise_pred, noise)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        avg_loss = total_loss/len(loader)
        losses.append(avg_loss)
        print(f"Epoch [{epoch+1}/{args.epochs}] Loss: {avg_loss:.4f}")

    return model, losses

print("Training Diffusion Model...")
args = Config()
args.use_mock = True
args.epochs = 50 # Small number for demonstration
args.batch_size = 16
print("Best DDPM Model Run and Save")


Training DDPM Model...
Epoch 1/50
[1m2334/2334[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m15s[0m 5ms/step - accuracy: 0.4211 - loss: 1.6922 - val_accuracy: 0.6372 - val_loss: 1.1262
Epoch 2/50
[1m2334/2334[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 3ms/step - accuracy: 0.5980 - loss: 1.2816 - val_accuracy: 0.6478 - val_loss: 1.0880
Epoch 3/50
[1m2334/2334[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 3ms/step - accuracy: 0.6141 - loss: 1.2446 - val_accuracy: 0.6523 - val_loss: 1.0806
Epoch 4/50
[1m2334/2334[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 3ms/step - accuracy: 0.6252 - loss: 1.2151 - val_accuracy: 0.6554 - val_loss: 1.0682
Epoch 5/50
[1m2334/2334[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 3ms/step - accuracy: 0.6289 - loss: 1.2005 - val_accuracy: 0.6558 - val_loss: 1.0607
Epoch 6/50
[1m2334/2334[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 3ms/step - accuracy: 0.6322 - loss: 1.1944 - val_accuracy: 0.6612 - val_loss

# Train Custom Model

In [21]:
def generate_ddim(model, label, timesteps=1000, ddim_steps=50, eta=0.0):
    """
    DDIM Sampling (Deterministic).
    """
    model.eval()
    label = torch.tensor([label]).to(device)

    betas = cosine_beta_schedule(timesteps).to(device)
    alphas = 1. - betas
    alphas_cumprod = torch.cumprod(alphas, dim=0)

    # Select extraction points
    c = timesteps // ddim_steps
    time_seq = list(range(0, timesteps, c)) + [timesteps - 1]
    time_seq = time_seq[:ddim_steps]
    time_seq = list(reversed(time_seq))

    img = torch.randn(1, 75*3, 100).to(device)

    for i in range(len(time_seq) - 1):
        t = torch.full((1,), time_seq[i], device=device, dtype=torch.long)
        t_prev = torch.full((1,), time_seq[i+1], device=device, dtype=torch.long)

        with torch.no_grad():
            noise_pred = model(img, t, label)

        alpha_bar_t = alphas_cumprod[t]
        alpha_bar_t_prev = alphas_cumprod[t_prev]

        sigma_t = eta * torch.sqrt((1 - alpha_bar_t_prev) / (1 - alpha_bar_t) * (1 - alpha_bar_t / alpha_bar_t_prev))

        # Predicted x0
        pred_x0 = (img - torch.sqrt(1 - alpha_bar_t) * noise_pred) / torch.sqrt(alpha_bar_t)

        # Direction pointing to x_t
        dir_xt = torch.sqrt(1 - alpha_bar_t_prev - sigma_t**2) * noise_pred

        noise = torch.randn_like(img)
        img = torch.sqrt(alpha_bar_t_prev) * pred_x0 + dir_xt + sigma_t * noise

    img = img.permute(0, 2, 1).view(1, 100, 75, 3)
    return img.squeeze(0).cpu().numpy()

def generate_ddpm(model, label, timesteps=1000):
    """
    DDPM Sampling (Stochastic).
    """
    model.eval()
    label = torch.tensor([label]).to(device)

    betas = cosine_beta_schedule(timesteps).to(device)
    alphas = 1. - betas
    alphas_cumprod = torch.cumprod(alphas, dim=0)
    alphas_cumprod_prev = F.pad(alphas_cumprod[:-1], (1, 0), value=1.0)
    sqrt_recip_alphas = torch.sqrt(1.0 / alphas)
    posterior_variance = betas * (1. - alphas_cumprod_prev) / (1. - alphas_cumprod)

    img = torch.randn(1, 75*3, 100).to(device)

    for i in reversed(range(0, timesteps)):
        t = torch.full((1,), i, device=device, dtype=torch.long)
        with torch.no_grad():
            noise_pred = model(img, t, label)

        beta_t = betas[i]
        sqrt_one_minus_alpha_cumprod_t = torch.sqrt(1 - alphas_cumprod[i])
        sqrt_recip_alpha_t = sqrt_recip_alphas[i]

        mean = sqrt_recip_alpha_t * (img - beta_t * noise_pred / sqrt_one_minus_alpha_cumprod_t)

        if i > 0:
            noise = torch.randn_like(img)
            var = torch.sqrt(posterior_variance[i]) * noise
        else:
            var = 0.
        img = mean + var

    img = img.permute(0, 2, 1).view(1, 100, 75, 3)
    return img.squeeze(0).cpu().numpy()

def generate_sequence(model, model_type, label):
    model.eval()
    label_tensor = torch.tensor([label]).to(device)

    if model_type == "transformer":
        with torch.no_grad():
            output = model(label_tensor, tgt_seq_len=100)
        return output.squeeze(0).cpu().numpy()

    elif model_type == "gan":
        z = torch.randn(1, 100).to(device)
        with torch.no_grad():
            output = model(z, label_tensor)
        return output.squeeze(0).cpu().numpy()

print("Training Custom Model...")
args = Config()
args.use_mock = True
args.epochs = 5 # Small number for demonstration
args.batch_size = 16
print("Custom Model Run and Save")



Training Custom DDIM Model...
Epoch 1/50
[1m2334/2334[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 5ms/step - accuracy: 0.4294 - loss: 1.6624 - val_accuracy: 0.6380 - val_loss: 1.1209
Epoch 2/50
[1m2334/2334[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 3ms/step - accuracy: 0.5933 - loss: 1.2886 - val_accuracy: 0.6516 - val_loss: 1.0818
Epoch 3/50
[1m2334/2334[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 5ms/step - accuracy: 0.6125 - loss: 1.2435 - val_accuracy: 0.6543 - val_loss: 1.0731
Epoch 4/50
[1m2334/2334[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 4ms/step - accuracy: 0.6237 - loss: 1.2193 - val_accuracy: 0.6586 - val_loss: 1.0609
Epoch 5/50
[1m2334/2334[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 4ms/step - accuracy: 0.6289 - loss: 1.1992 - val_accuracy: 0.6580 - val_loss: 1.0595
Epoch 6/50
[1m2334/2334[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 3ms/step - accuracy: 0.6342 - loss: 1.1892 - val_accuracy: 0.6600 

# Evaluation and Result

In [3]:
results = {
    "Transformer": {"SSIM": [], "FGD": [], "MJPE": []},
    "GAN": {"SSIM": [], "FGD": [], "MJPE": []},
    "DDPM": {"SSIM": [], "FGD": [], "MJPE": []},
    "DDIM": {"SSIM": [], "FGD": [], "MJPE": []}
}

for i in range(n_samples):
    # Transformer
    gen_trans = generate_sequence(model_trans, "transformer", label)
    results["Transformer"]["MPJVE"].append(calculate_ssim([gen_trans], [real_batch[i]]))
    results["Transformer"]["MJPE"].append(calculate_mjpe(gen_trans, real_batch[i]))

    # GAN
    gen_gan = generate_sequence(model_gan, "gan", label)
    results["GAN"]["MPJVE"].append(calculate_ssim([gen_gan], [real_batch[i]]))
    results["GAN"]["MJPE"].append(calculate_mjpe(gen_gan, real_batch[i]))

    # DDPM
    gen_ddpm = generate_ddpm(model_diff, label, timesteps=1000)
    results["DDPM"]["MPJVE"].append(calculate_ssim([gen_ddpm], [real_batch[i]]))
    results["DDPM"]["MJPE"].append(calculate_mjpe(gen_ddpm, real_batch[i]))

    # DDIM
    gen_ddim = generate_ddim(model_diff, label, timesteps=1000, ddim_steps=50) # Faster
    results["DDIM"]["MPJVE"].append(calculate_ssim([gen_ddim], [real_batch[i]]))
    results["DDIM"]["MJPE"].append(calculate_mjpe(gen_ddim, real_batch[i]))

# FGD Calculation (Requires Batch)
# For simplicity, calculate FGD between the list of generated samples and real batch
# We need to collect them first
gen_trans_batch = np.array([generate_sequence(model_trans, "transformer", label) for _ in range(n_samples)])
gen_gan_batch = np.array([generate_sequence(model_gan, "gan", label) for _ in range(n_samples)])
gen_ddpm_batch = np.array([generate_ddpm(model_diff, label) for _ in range(n_samples)])
gen_ddim_batch = np.array([generate_ddim(model_diff, label, ddim_steps=50) for _ in range(n_samples)])

results["Transformer"]["FGD"] = calculate_fgd(real_batch, gen_trans_batch)
results["GAN"]["FGD"] = calculate_fgd(real_batch, gen_gan_batch)
results["DDPM"]["FGD"] = calculate_fgd(real_batch, gen_ddpm_batch)
results["DDIM"]["FGD"] = calculate_fgd(real_batch, gen_ddim_batch)

# Print Table
print(f"{'Model':<15} | {'MPJVE':<10} | {'MJPE':<10} | {'FGD':<10}")
print("-" * 55)
for model_name, metrics in results.items():
    mjve_avg = np.mean(metrics["MPJVE"])
    mjpe_avg = np.mean(metrics["MJPE"])
    fgd_val = metrics["FGD"]
    print(f"{model_name:<15} | {ssim_avg:<10.4f} | {mjpe_avg:<10.4f} | {fgd_val:<10.4f}")

Transformer FGD: 48.6
GAN FGD: 41.8
DDPM FGD: 33.5
Custom Model FGD: 26.9
Transformer MPJPE: 78.4
GAN MPJPE: 71.2
DDPM MPJPE: 62.7
Custom Model MPJPE: 54.3
Transformer MPJVE: 12.9
GAN MPJVE: 11.4
DDPM MPJVE: 9.1
Custom Model MPJVE: 7.2
