In [1]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("debashishsau/aslamerican-sign-language-aplhabet-dataset")

print("Path to dataset files:", path)

  from .autonotebook import tqdm as notebook_tqdm


Path to dataset files: C:\Users\eshan\.cache\kagglehub\datasets\debashishsau\aslamerican-sign-language-aplhabet-dataset\versions\1


In [2]:
import os
import sys
import torch
import cv2
import matplotlib.pyplot as plt
from torchvision.datasets import ImageFolder
from torchvision import transforms
from torch.utils.data import DataLoader, Subset
from sklearn.model_selection import train_test_split
import numpy as np
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, TensorDataset
from torch.nn.utils.rnn import pad_sequence

In [3]:
def print_line(*args):
    """ Inline print and go to the begining of line
    """
    args1 = [str(arg) for arg in args]
    str_ = ' '.join(args1)
    print('\r' + str_, end='')

In [4]:
data_dir = path + '/ASL_Alphabet_Dataset/asl_alphabet_train'

subdirs = [d for d in os.listdir(data_dir)
           if os.path.isdir(os.path.join(data_dir, d))]
print("Subdirectories (should be one per label):", subdirs)

Subdirectories (should be one per label): ['A', 'B', 'C', 'D', 'del', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'nothing', 'O', 'P', 'Q', 'R', 'S', 'space', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z']


In [5]:
class ASLImageFolder(Dataset):
    """
    Very much like torchvision.datasets.ImageFolder,
    but based on cv2 so you can mirror your real-time code exactly.
    """
    def __init__(self, root_dir, img_size=(224,224), normalize=True):
        self.root_dir = root_dir
        self.img_size = img_size

        # build list of (path, label_idx)
        classes = sorted(
            d for d in os.listdir(root_dir)
            if os.path.isdir(os.path.join(root_dir, d))
        )
        self.class_to_idx = {cls:i for i,cls in enumerate(classes)}

        self.samples = []
        for cls in classes:
            cls_dir = os.path.join(root_dir, cls)
            for fname in os.listdir(cls_dir):
                if fname.lower().endswith(('.png','.jpg','.jpeg')):
                    self.samples.append((os.path.join(cls_dir, fname),
                                         self.class_to_idx[cls]))

        # transforms: resize, center-crop, to‐tensor, normalize (if you used that)
        tfms = [
            transforms.ToPILImage(),
            transforms.Resize(self.img_size),
            transforms.CenterCrop(self.img_size),
            transforms.ToTensor()
        ]
        if normalize:
            tfms.append(transforms.Normalize(mean=[0.485,0.456,0.406],
                                             std =[0.229,0.224,0.225]))
        self.transform = transforms.Compose(tfms)

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        path, label = self.samples[idx]
        # load with cv2 to match your real-time code
        img = cv2.imread(path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = self.transform(img)
        return img, label

# ——— Usage ———

batch_size = 32

# instantiate
dataset = ASLImageFolder(data_dir, img_size=(224,224), normalize=True)
class_names = sorted(
    dataset.class_to_idx,
    key=lambda k: dataset.class_to_idx[k]
)
num_classes = len(class_names)

# split indices
from sklearn.model_selection import train_test_split
idx = list(range(len(dataset)))
train_idx, test_idx = train_test_split(
    idx, test_size=0.2,
    stratify=[label for _, label in dataset.samples],
    random_state=42
)

# create subsets & loaders
from torch.utils.data import Subset
train_loader = DataLoader(
    Subset(dataset, train_idx),
    batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True
)
test_loader = DataLoader(
    Subset(dataset, test_idx),
    batch_size=batch_size, shuffle=False, num_workers=0, pin_memory=True
)

class_names = sorted(dataset.class_to_idx, key=lambda k: dataset.class_to_idx[k])
print("Classes:", class_names)
print("Number of classes:", num_classes)
print("Number of training samples:", len(train_loader.dataset))
print("Number of test samples:", len(test_loader.dataset))

Classes: ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'del', 'nothing', 'space']
Number of classes: 29
Number of training samples: 178459
Number of test samples: 44615


In [6]:
def one_hot_encode_labels(y_train, y_test, num_classes):
    y_train_enc = nn.functional.one_hot(y_train, num_classes=num_classes).float()
    y_test_enc = nn.functional.one_hot(y_test, num_classes=num_classes).float()
    return y_train_enc, y_test_enc

In [7]:
def conv_layer(in_channels, out_channels, num_convs, use_batchnorm=True, p_dropout=0.0):
    layers = []
    for _ in range(num_convs):
        layers.append(nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1))
        if use_batchnorm:
            layers.append(nn.BatchNorm2d(out_channels))
        layers.append(nn.ReLU(inplace=True))
        in_channels = out_channels
    layers.append(nn.MaxPool2d(kernel_size=2, stride=2))
    if p_dropout > 0:
        layers.append(nn.Dropout(p_dropout))
    return nn.Sequential(*layers)

class Classifier(nn.Module):
    def __init__(self, num_classes=29, img_size=(224,224), dropout_feat=0.25, dropout_fc=0.5):
        super().__init__()
        c, h, w = 3, *img_size

        # VGG-style feature extractor: conv-conv-pool, conv-conv-pool, …
        self.features = nn.Sequential(
            conv_layer(c,   64, num_convs=2, p_dropout=dropout_feat),
            conv_layer(64, 128, num_convs=2, p_dropout=dropout_feat),
            conv_layer(128,256, num_convs=3, p_dropout=dropout_feat),
            conv_layer(256,512, num_convs=3, p_dropout=dropout_feat),
            # (Optionally omit the last block or reduce its size if overfitting proves severe)
        )

        # compute flattened feature size with a dummy forward
        with torch.no_grad():
            dummy = torch.zeros(1, c, h, w)
            n_feats = self.features(dummy).view(1, -1).size(1)

        # classifier head
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(n_feats, 512),   nn.ReLU(inplace=True), nn.Dropout(dropout_fc),
            nn.Linear(512, 512),       nn.ReLU(inplace=True), nn.Dropout(dropout_fc),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x


In [10]:
def train_cnn_model(model, train_loader, test_loader, device,
                    epochs=10, lr=1e-3):
    model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)

    train_losses, train_accs = [], []
    val_losses,   val_accs   = [], []

    for epoch in range(1, epochs+1):
        # ——— TRAINING ———
        model.train()
        running_loss = 0.0
        running_correct = 0
        running_total = 0

        for batch_idx, (imgs, labels) in enumerate(train_loader, start=1):
            imgs, labels = imgs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(imgs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss    += loss.item() * imgs.size(0)
            preds            = outputs.argmax(dim=1)
            running_correct += (preds == labels).sum().item()
            running_total   += labels.size(0)

            # print per‐batch progress
            print_line(f"[Epoch {epoch}/{epochs}] "
                       f"Train batch {batch_idx}/{len(train_loader)} "
                       f"loss={loss.item():.4f}")

        # finish training line
        print()

        train_loss = running_loss / running_total
        train_acc  = running_correct / running_total
        train_losses.append(train_loss)
        train_accs.append(train_acc)


        # ——— VALIDATION ———
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0

        with torch.no_grad():
            for batch_idx, (imgs, labels) in enumerate(test_loader, start=1):
                imgs, labels = imgs.to(device), labels.to(device)
                outputs = model(imgs)
                loss = criterion(outputs, labels)

                val_loss    += loss.item() * imgs.size(0)
                preds        = outputs.argmax(dim=1)
                val_correct += (preds == labels).sum().item()
                val_total   += labels.size(0)

                # print per‐batch progress
                print_line(f"[Epoch {epoch}/{epochs}] "
                           f"Val batch {batch_idx}/{len(test_loader)} "
                           f"loss={loss.item():.4f}")

        # finish validation line
        print()

        val_loss = val_loss / val_total
        val_acc  = val_correct / val_total
        val_losses.append(val_loss)
        val_accs.append(val_acc)

        # final epoch summary
        print(f"[Epoch {epoch}/{epochs}] "
              f"train_loss={train_loss:.4f}, train_acc={train_acc:.4f} | "
              f"val_loss={val_loss:.4f}, val_acc={val_acc:.4f}")

        PATH = f"cnn_asl_model_{epoch}.pth"
        torch.save(model.state_dict(), PATH)
        print(f"Saved model weights to {PATH}")

    return model, {
        'train_loss': train_losses,
        'train_acc':  train_accs,
        'val_loss':   val_losses,
        'val_acc':    val_accs
    }


In [11]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Classifier(num_classes, img_size=(224, 224)).to(device)
model, history = train_cnn_model(
    model,
    train_loader,
    test_loader,
    device='cuda' if torch.cuda.is_available() else 'cpu',
    epochs=100,
    lr=1e-3
)

[Epoch 1/100] Train batch 5577/5577 loss=3.3897
[Epoch 1/100] Val batch 1395/1395 loss=3.3215
[Epoch 1/100] train_loss=3.4023, train_acc=0.0365 | val_loss=3.3584, val_acc=0.0365
Saved model weights to cnn_asl_model_1.pth
[Epoch 2/100] Train batch 5577/5577 loss=3.3370
[Epoch 2/100] Val batch 1395/1395 loss=3.3225
[Epoch 2/100] train_loss=3.3584, train_acc=0.0370 | val_loss=3.3581, val_acc=0.0379
Saved model weights to cnn_asl_model_2.pth
[Epoch 3/100] Train batch 5577/5577 loss=3.3485
[Epoch 3/100] Val batch 1395/1395 loss=3.3267
[Epoch 3/100] train_loss=3.3582, train_acc=0.0367 | val_loss=3.3581, val_acc=0.0379
Saved model weights to cnn_asl_model_3.pth
[Epoch 4/100] Train batch 3545/5577 loss=3.3728

KeyboardInterrupt: 

In [None]:
epochs = range(1, len(history['train_loss']) + 1)

# Loss curves
plt.figure(figsize=(8,4))
plt.plot(epochs, history['train_loss'], label='Train Loss')
plt.plot(epochs, history['val_loss'],   label='Val Loss')
plt.title('Loss over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()

# Accuracy curves
plt.figure(figsize=(8,4))
plt.plot(epochs, history['train_acc'], label='Train Acc')
plt.plot(epochs, history['val_acc'],   label='Val Acc')
plt.title('Accuracy over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

In [None]:
def imshow(img_tensor, mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)):
    img = img_tensor.cpu().clone()                              # clone to not modify original
    for t, m, s in zip(img, mean, std):
        t.mul_(s).add_(m)                                       # unnormalize
    img = img.permute(1, 2, 0).numpy()                          # C×H×W → H×W×C
    plt.imshow(img)
    plt.axis('off')

def visualize_predictions(model, loader, classes, device, num_images=8):
    """
    Show `num_images` from `loader` along with the model's predictions.

    Args:
        model      : your trained CNN
        loader     : a DataLoader (usually test_loader)
        classes    : list of class names
        device     : 'cuda' or 'cpu'
        num_images : total images to show
    """
    model.eval()
    images_shown = 0
    plt.figure(figsize=(12, num_images//4 * 3))

    with torch.no_grad():
        for imgs, labels in loader:
            imgs, labels = imgs.to(device), labels.to(device)
            outputs = model(imgs)                   # (B, C)
            preds   = outputs.argmax(dim=1)         # (B,)

            for i in range(imgs.size(0)):
                if images_shown >= num_images:
                    break

                plt.subplot(num_images//4, 4, images_shown+1)
                imshow(imgs[i])
                title = f"P: {classes[preds[i].item()]}"
                # optional: also show true label
                # title += f"\nT: {classes[labels[i].item()]}"
                plt.title(title, fontsize=10)
                images_shown += 1

            if images_shown >= num_images:
                break

    plt.tight_layout()
    plt.show()


visualize_predictions(model, test_loader, class_names, device, num_images=12)

In [None]:
class AE(nn.Module):
    def __init__(self):
        super(AE, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(28 * 28, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 36),
            nn.ReLU(),
            nn.Linear(36, 18),
            nn.ReLU(),
            nn.Linear(18, 9)
        )
        self.decoder = nn.Sequential(
            nn.Linear(9, 18),
            nn.ReLU(),
            nn.Linear(18, 36),
            nn.ReLU(),
            nn.Linear(36, 64),
            nn.ReLU(),
            nn.Linear(64, 128),
            nn.ReLU(),
            nn.Linear(128, 28 * 28),
            nn.Sigmoid()
        )

    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded

In [None]:
autoencoder = AE()
loss_function = nn.MSELoss()
optimizer = torch.optim.Adam(autoencoder.parameters(), lr=1e-3, weight_decay=1e-5)

epochs = 20
outputs = []
losses = []

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
autoencoder.to(device)

for epoch in range(epochs):
    step = 0
    for images, _ in dataset:
        images = images.view(-1, 28 * 28).to(device)

        reconstructed = autoencoder(images)
        loss = loss_function(reconstructed, images)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        losses.append(loss.item())

        print_line(f"[Epoch {epoch}/{epochs}] "
                       f"Step {step}/{len(dataset)} "
                       f"loss={loss.item():.4f}")
        step += 1

    outputs.append((epoch, images, reconstructed))
    print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item():.6f}")

plt.style.use('fivethirtyeight')
plt.figure(figsize=(8, 5))
plt.plot(losses, label='Loss')
plt.xlabel('Iterations')
plt.ylabel('Loss')
plt.legend()
plt.show()

PATH = "ae_asl_model.pth"
torch.save(autoencoder.state_dict(), PATH)
print(f"Saved autoencoder weights to {PATH}")

In [None]:
autoencoder.eval()
dataiter = iter(dataset)
images, _ = next(dataiter)

images = images.view(-1, 28 * 28).to(device)
reconstructed = autoencoder(images)

fig, axes = plt.subplots(nrows=2, ncols=10, figsize=(10, 3))
for i in range(10):
    axes[0, i].imshow(images[i].cpu().detach().numpy().reshape(28, 28), cmap='gray')
    axes[0, i].axis('off')
    axes[1, i].imshow(reconstructed[i].cpu().detach().numpy().reshape(28, 28), cmap='gray')
    axes[1, i].axis('off')
plt.show()

In [None]:
class VAE(nn.Module):
    def __init__(self, latent_dim=9):
        super(VAE, self).__init__()
        # encoder: 28×28 → 128 → … → latent_dim×2 (for μ and logvar)
        self.fc1        = nn.Linear(28*28, 128)
        self.bn1        = nn.BatchNorm1d(128)           # <— add this
        self.fc2_mu     = nn.Linear(128, latent_dim)
        self.fc2_logvar = nn.Linear(128, latent_dim)
        # decoder: latent_dim → 128 → … → 28×28
        self.fc3        = nn.Linear(latent_dim, 128)
        self.bn3        = nn.BatchNorm1d(128)           # <— add this
        self.fc4        = nn.Linear(128, 28*28)

    def encode(self, x):
        x  = x.view(-1, 28*28)
        h1 = nn.functional.relu(self.bn1(self.fc1(x)))              # <— wrap fc1 in bn1
        return self.fc2_mu(h1), self.fc2_logvar(h1)

    def reparameterize(self, mu, logvar):
        std = (0.5 * logvar).exp()
        eps = torch.randn_like(std)
        return mu + eps * std

    def decode(self, z):
        h3 = nn.functional.relu(self.bn3(self.fc3(z)))              # <— wrap fc3 in bn3
        return torch.sigmoid(self.fc4(h3))

    def forward(self, x):
        mu, logvar = self.encode(x)
        z          = self.reparameterize(mu, logvar)
        return self.decode(z), mu, logvar


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

vae = VAE(latent_dim=9).to(device)
optimizer = torch.optim.Adam(vae.parameters(), lr=1e-3, weight_decay=1e-5)

num_epochs = 10
for epoch in range(1, num_epochs+1):
    vae.train()
    running_loss = 0.0

    step = 0
    for images, _ in dataset:
        images = images.view(-1, 28*28).to(device)

        recon, mu, logvar = vae(images)
        recon_loss = nn.functional.mse_loss(recon, images, reduction='sum')
        kld = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
        loss = recon_loss + kld

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        print_line(f"[Epoch {epoch}/{num_epochs}] "
                       f"Step {step}/{len(dataset)} "
                       f"loss={loss.item()/(step+1):.4f}")
        step += 1

    avg_loss = running_loss / len(dataset.dataset)  # or len(dataset) if dataset yields one sample
    print(f"Epoch {epoch}/{num_epochs} — Loss: {avg_loss:.6f}")

# save your VAE
torch.save(vae.state_dict(), "vae_asl_model.pth")
print("Saved VAE weights to vae_asl_model.pth")


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

vae = VAE(latent_dim=9)
vae.load_state_dict(torch.load("vae_asl_model.pth"))
vae.to(device)

latent_dim = 9
num_samples = 16

vae.eval()
with torch.no_grad():
    z = torch.randn(num_samples, latent_dim).to(device)
    generated = vae.decode(z).cpu()

grid_size = 4
fig, axes = plt.subplots(grid_size, grid_size, figsize=(8, 8))

for idx, ax in enumerate(axes.flatten()):
    img = generated[idx].view(28, 28)  # reshape to image dimensions
    ax.imshow(img, cmap='gray')
    ax.axis('off')

plt.tight_layout()
plt.show()

# 1) find the index for class “B”
b_idx = dataset.class_to_idx['B']

# 2) collect μ(x) for the first N B-images
N = 20
mus = []
with torch.no_grad():
    for img, label in dataset:
        if label != b_idx:
            continue
        x, _    = img.view(-1, 28*28).to(device), label
        mu, _   = vae.encode(x)          # mu: (1, latent_dim)
        mus.append(mu)
        if len(mus) >= N:
            break

# 3) build the class-mean z
all_mus = torch.cat(mus, dim=0)        # (N, latent_dim)
mu_B    = all_mus.mean(dim=0, keepdim=True)  # (1, latent_dim)

# 4) sample around that mean
num_sibs = 16
scale    = 0.7
eps      = torch.randn(num_sibs, latent_dim, device=device)
z_B      = mu_B + eps * scale         # (16, latent_dim)

with torch.no_grad():
    sibs = vae.decode(z_B).cpu()       # (16, 784)

# 5) plot
grid = int(num_sibs**0.5)
fig, axes = plt.subplots(grid, grid, figsize=(6, 6))
for i, ax in enumerate(axes.flatten()):
    ax.imshow(sibs[i].view(28, 28).numpy(), cmap='gray')
    ax.axis('off')
plt.tight_layout()
plt.show()

In [None]:
#Media Pipe Lightweight Model

import cv2
import mediapipe as mp

class LandmarkDataset(Dataset):
    def __init__(self, samples, class_to_idx):
        """
        samples: list of (path, label_idx)
        class_to_idx: dict mapping class name → integer
        """
        self.samples = samples
        self.class_to_idx = class_to_idx

        # set up MediaPipe Hands in static_image_mode
        mp_hands = mp.solutions.hands
        self.hands = mp_hands.Hands(
            static_image_mode=True,
            max_num_hands=1,                # single hand per image
            min_detection_confidence=0.7
        )

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        path, label = self.samples[idx]
        # load BGR (we can feed BGR directly to MP Solutions API)
        img = cv2.imread(path)
        if img is None:
            # fallback to zeros
            feats = np.zeros(42, dtype=np.float32)
        else:
            res = self.hands.process(img)
            if not res.multi_hand_landmarks:
                feats = np.zeros(42, dtype=np.float32)
            else:
                lm = res.multi_hand_landmarks[0].landmark
                # flatten x,y for each of the 21 landmarks → 42-dim vector
                feats = np.array(
                    [coord for pt in lm for coord in (pt.x, pt.y)],
                    dtype=np.float32
                )

        return torch.from_numpy(feats), label

samples = dataset.samples
class_to_idx = dataset.class_to_idx
num_classes = len(class_to_idx)

all_idx = list(range(len(samples)))
train_idx, test_idx = train_test_split(
    all_idx,
    test_size=0.2,
    stratify=[lbl for _, lbl in samples],
    random_state=42 #The Meaning of Life
)

landmark_ds = LandmarkDataset(samples, class_to_idx=class_to_idx)
train_ds = Subset(landmark_ds, train_idx)
test_ds = Subset(landmark_ds, test_idx)

batch_size = 64
train_loader_mp = DataLoader(train_ds, batch_size=64, shuffle=True)
test_loader_mp = DataLoader(test_ds, batch_size=64, shuffle=True)


In [None]:
class HandPointClassifier(nn.Module):
    def __init__(self, in_dim=42, hidden=128, num_classes=num_classes):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(in_dim, hidden),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(hidden, hidden),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(hidden, num_classes)
        )

    def forward(self, x, return_probs=True):
        logits = self.net(x)
        if return_probs:
            return nn.functional.softmax(logits, dim=1)
        return logits

In [None]:
model = HandPointClassifier().to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()

for epoch in range(1, 31):
    model.train()
    total_loss = 0
    n_batches = len(train_loader)
    for batch_idx, (feats, labels) in enumerate(train_loader, start=1):
        feats, labels = feats.to(device), labels.to(device)
        logits = model(feats)
        loss   = criterion(logits, labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * feats.size(0)

        avg_loss_so_far = total_loss / (batch_idx * feats.size(0))
        print_line(
            f"Epoch {epoch}/{num_epochs}",
            f"Batch {batch_idx}/{n_batches}",
            f"loss: {avg_loss_so_far:.4f}"
        )

    print()

    # validation
    model.eval()
    correct = 0
    with torch.no_grad():
        for feats, labels in test_loader:
            feats, labels = feats.to(device), labels.to(device)
            pred = model(feats).argmax(dim=1)
            correct += (pred == labels).sum().item()
    val_acc = correct / len(test_ds)

    print(f" → Validation accuracy: {val_acc:.3f}")



In [None]:
class SequenceModel(nn.Module):
    def __init__(self, cnn_encoder: nn.Module, cnn_output_dim: int,
                 rnn_hidden_size: int, num_layers: int, num_classes: int):
        super().__init__()
        self.cnn_encoder = cnn_encoder
        self.rnn = nn.LSTM(input_size=cnn_output_dim, hidden_size=rnn_hidden_size,
                           num_layers=num_layers, batch_first=True,
                           bidirectional=True)
        self.fc = nn.Linear(rnn_hidden_size*2, num_classes)
    def forward(self, x):
        b, s, C, H, W = x.size()
        x = x.view(b*s, C, H, W)
        feats = self.cnn_encoder(x)
        feats = feats.view(b, s, -1)
        outs, _ = self.rnn(feats)
        outs = self.fc(outs)
        return outs

def build_sequence_model(cnn_model, img_size, rnn_hidden_size, num_layers, num_classes, device='cpu'):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    cnn_feat = nn.Sequential(cnn_model.features, nn.Flatten())
    dummy = torch.zeros(1, 3, *img_size)
    feat_dim = cnn_feat(dummy).size(1)
    model = SequenceModel(cnn_feat, feat_dim, rnn_hidden_size, num_layers, num_classes).to(device)
    return model

In [None]:
def collate_fn(batch):
    seqs, labs = zip(*batch)
    seq_lengths = [s.size(0) for s in seqs]
    padded_seqs = pad_sequence(seqs, batch_first=True)            # (B, T_max, feat)
    padded_labs = pad_sequence(labs,  batch_first=True,
                               padding_value=-100)                # for ignore_index
    return padded_seqs, padded_labs

In [None]:
def train_rnn_model(model, train_loader, test_loader, device,
                    epochs=10, lr=1e-3):
    model.to(device)
    criterion = nn.CrossEntropyLoss(ignore_index=-100)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)

    for epoch in range(epochs):
        # ——— TRAINING ———
        model.train()
        running_loss = 0.0
        running_correct = 0
        running_tokens = 0

        for seqs, labs in train_loader:
            seqs, labs = seqs.to(device), labs.to(device)
            optimizer.zero_grad()
            outputs = model(seqs)                         # (B, T, C)
            B, T, C = outputs.shape

            outputs_flat = outputs.view(-1, C)           # (B*T, C)
            labs_flat    = labs.view(-1)                 # (B*T,)
            loss = criterion(outputs_flat, labs_flat)
            loss.backward()
            optimizer.step()

            # accumulate stats
            mask = labs_flat.ne(-100)
            running_loss    += loss.item() * mask.sum().item()
            preds_flat       = outputs_flat.argmax(1)
            running_correct += (preds_flat[mask] == labs_flat[mask]).sum().item()
            running_tokens  += mask.sum().item()

        train_loss = running_loss / running_tokens
        train_acc  = running_correct / running_tokens

        # ——— VALIDATION ———
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_tokens = 0
        with torch.no_grad():
            for seqs, labs in test_loader:
                seqs, labs = seqs.to(device), labs.to(device)
                outputs = model(seqs)
                B, T, C = outputs.shape

                outputs_flat = outputs.view(-1, C)
                labs_flat    = labs.view(-1)
                loss = criterion(outputs_flat, labs_flat)

                mask = labs_flat.ne(-100)
                val_loss    += loss.item() * mask.sum().item()
                preds_flat   = outputs_flat.argmax(1)
                val_correct += (preds_flat[mask] == labs_flat[mask]).sum().item()
                val_tokens  += mask.sum().item()

        val_loss /= val_tokens
        val_acc   = val_correct / val_tokens

        scheduler.step()
        print(f"Epoch {epoch+1}/{epochs} | "
              f"train_loss {train_loss:.4f}, train_acc {train_acc:.4f} | "
              f"val_loss {val_loss:.4f}, val_acc {val_acc:.4f}")

    return model

In [None]:
model = build_sequence_model(model, (224,224), rnn_hidden_size=128, num_layers=2, num_classes=num_classes)
model = train_rnn_model(model, train_loader, test_loader, device,
                        epochs=10, lr=1e-3)