### Imports

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import math

### GRAPH DEFINITION


In [None]:

class Graph:
    def __init__(self, layout='mediapipe_65', strategy='uniform'):
        self.num_node = 65
        self.self_link = [(i, i) for i in range(self.num_node)]
        
        self.inward_bone_link = [
            (12, 14), (14, 16), (16, 18), (16, 20), (16, 22), (18, 20),
            (11, 13), (13, 15), (15, 17), (15, 19), (15, 21), (17, 19),
            (12, 11),
            (10, 8), (8, 6), (6, 5), (5, 4), (4, 0),
            (9, 7), (7, 3), (3, 2), (2, 1), (1, 0),
            (15, 23), (16, 44)
        ]
        
        for hand_start_idx in [23, 44]:
            self.inward_bone_link.extend([
                (hand_start_idx, hand_start_idx + 1), 
                (hand_start_idx + 1, hand_start_idx + 2),
                (hand_start_idx + 2, hand_start_idx + 3), 
                (hand_start_idx + 3, hand_start_idx + 4)
            ])
            for finger_start in range(5, 21, 4):
                self.inward_bone_link.extend([
                    (hand_start_idx, hand_start_idx + finger_start),
                    (hand_start_idx + finger_start, hand_start_idx + finger_start + 1),
                    (hand_start_idx + finger_start + 1, hand_start_idx + finger_start + 2),
                    (hand_start_idx + finger_start + 2, hand_start_idx + finger_start + 3)
                ])

        self.edge = self.self_link + self.inward_bone_link
        self.A = torch.zeros(self.num_node, self.num_node)
        for i, j in self.edge:
            self.A[j, i] = 1
            self.A[i, j] = 1

### Model Archectecture

In [None]:

# ============================================================================
#2  EFFICIENT DEPTHWISE-SEPARABLE CONVOLUTIONS
# ============================================================================
class DepthwiseSeparableConv(nn.Module):
    """Efficient convolution: 8-10x fewer parameters than standard conv"""
    def __init__(self, in_ch, out_ch, kernel_size=1, stride=1, padding=0):
        super().__init__()
        self.depthwise = nn.Conv2d(in_ch, in_ch, kernel_size, stride, padding, groups=in_ch)
        self.pointwise = nn.Conv2d(in_ch, out_ch, 1)
        self.bn = nn.BatchNorm2d(out_ch)
        self.act = nn.SiLU()
    
    def forward(self, x):
        x = self.depthwise(x)
        x = self.pointwise(x)
        x = self.bn(x)
        return self.act(x)


# ============================================================================
# 3. LIGHTWEIGHT MULTI-HEAD SELF-ATTENTION
# ============================================================================
class EfficientMultiHeadAttention(nn.Module):
    """Reduced-dimension attention for temporal modeling"""
    def __init__(self, d_model, num_heads=4, dropout=0.1):
        super().__init__()
        assert d_model % num_heads == 0
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads
        
        # Use 1x1 convs instead of linear for efficiency
        self.qkv = nn.Conv1d(d_model, d_model * 3, 1)
        self.proj = nn.Conv1d(d_model, d_model, 1)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        # x: (B, C, T, V) -> reshape to (B*V, C, T)
        B, C, T, V = x.shape
        x = x.permute(0, 3, 1, 2).reshape(B * V, C, T)
        
        qkv = self.qkv(x).reshape(B * V, 3, self.num_heads, self.head_dim, T)
        q, k, v = qkv[:, 0], qkv[:, 1], qkv[:, 2]
        
        # Scaled dot-product attention
        attn = torch.einsum('bhdt,bhds->bhts', q, k) / math.sqrt(self.head_dim)
        attn = F.softmax(attn, dim=-1)
        attn = self.dropout(attn)
        
        out = torch.einsum('bhts,bhds->bhdt', attn, v)
        out = out.reshape(B * V, C, T)
        out = self.proj(out)
        
        # Reshape back
        out = out.reshape(B, V, C, T).permute(0, 2, 3, 1)
        return out


# ============================================================================
# 4. TEMPORAL TRANSFORMER BLOCK
# ============================================================================
class TemporalTransformerBlock(nn.Module):
    """Lightweight transformer for capturing long-range temporal dependencies"""
    def __init__(self, channels, num_heads=4, dropout=0.1):
        super().__init__()
        self.attn = EfficientMultiHeadAttention(channels, num_heads, dropout)
        self.ffn = nn.Sequential(
            DepthwiseSeparableConv(channels, channels * 2, kernel_size=1),
            nn.Dropout(dropout),
            nn.Conv2d(channels * 2, channels, 1),
            nn.BatchNorm2d(channels)
        )
        self.norm1 = nn.LayerNorm(channels)
        self.norm2 = nn.LayerNorm(channels)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        # x: (B, C, T, V)
        B, C, T, V = x.shape
        
        # Attention with residual
        x_norm = x.permute(0, 2, 3, 1)  # (B, T, V, C)
        x_norm = self.norm1(x_norm).permute(0, 3, 1, 2)  # Back to (B, C, T, V)
        attn_out = self.attn(x_norm)
        x = x + self.dropout(attn_out)
        
        # FFN with residual
        x_norm = x.permute(0, 2, 3, 1)
        x_norm = self.norm2(x_norm).permute(0, 3, 1, 2)
        ffn_out = self.ffn(x_norm)
        x = x + self.dropout(ffn_out)
        
        return x


# ============================================================================
# 5. ADAPTIVE GRAPH CONVOLUTION (learnable adjacency)
# ============================================================================
class AdaptiveGCN(nn.Module):
    """Graph convolution with learnable adjacency matrix"""
    def __init__(self, in_ch, out_ch, A, num_subsets=3):
        super().__init__()
        self.num_subsets = num_subsets
        
        # Learnable adjacency matrices
        self.A = nn.Parameter(torch.stack([A] * num_subsets), requires_grad=True)
        
        # Efficient depthwise-separable convs
        self.convs = nn.ModuleList([
            DepthwiseSeparableConv(in_ch, out_ch, kernel_size=1) 
            for _ in range(num_subsets)
        ])
        
    def forward(self, x):
        # x: (B, C, T, V)
        N, C, T, V = x.shape
        out = None
        
        for i in range(self.num_subsets):
            # Graph convolution: (B, C, T, V) @ (V, V) -> (B, C, T, V)
            x_reshaped = x.view(N, C * T, V)
            z = torch.matmul(x_reshaped, self.A[i]).view(N, C, T, V)
            z = self.convs[i](z)
            out = z if out is None else out + z
            
        return out


# ============================================================================
# 6. ENHANCED ST-GCN BLOCK
# ============================================================================
class EnhancedSTGCNBlock(nn.Module):
    """ST-GCN + Temporal Transformer + Cross-Stream Attention"""
    def __init__(self, in_ch, out_ch, A, stride=1, use_transformer=True):
        super().__init__()
        self.gcn = AdaptiveGCN(in_ch, out_ch, A)
        self.tcn = DepthwiseSeparableConv(
            out_ch, out_ch, kernel_size=(3, 1), stride=(stride, 1), padding=(1, 0)
        )
        
        self.use_transformer = use_transformer
        if use_transformer:
            self.transformer = TemporalTransformerBlock(out_ch, num_heads=4)
        
        # Residual connection
        self.residual = nn.Sequential(
            nn.Conv2d(in_ch, out_ch, 1, stride=(stride, 1)),
            nn.BatchNorm2d(out_ch)
        ) if in_ch != out_ch or stride != 1 else nn.Identity()
        
    def forward(self, x):
        res = self.residual(x)
        x = self.gcn(x)
        x = self.tcn(x)
        
        if self.use_transformer:
            x = self.transformer(x)
        
        return F.silu(x + res)


# ============================================================================
# 7. CROSS-STREAM FUSION MODULE
# ============================================================================
class CrossStreamFusion(nn.Module):
    """Cross-attention between joint and bone streams"""
    def __init__(self, channels):
        super().__init__()
        self.query = nn.Conv2d(channels, channels // 4, 1)
        self.key = nn.Conv2d(channels, channels // 4, 1)
        self.value = nn.Conv2d(channels, channels, 1)
        self.gamma = nn.Parameter(torch.zeros(1))
        
    def forward(self, x1, x2):
        # x1: joint stream, x2: bone stream
        B, C, T, V = x1.shape
        
        # Compute attention: x1 queries x2
        q = self.query(x1).view(B, -1, T * V)  # (B, C/4, T*V)
        k = self.key(x2).view(B, -1, T * V)    # (B, C/4, T*V)
        v = self.value(x2).view(B, C, T * V)   # (B, C, T*V)
        
        attn = torch.bmm(q.transpose(1, 2), k)  # (B, T*V, T*V)
        attn = F.softmax(attn, dim=-1)
        
        out = torch.bmm(v, attn.transpose(1, 2))  # (B, C, T*V)
        out = out.view(B, C, T, V)
        
        return x1 + self.gamma * out


# ============================================================================
# 8. MAIN MODEL: TT-STGCN
# ============================================================================
class TT_STGCN(nn.Module):
    """Temporal Transformer ST-GCN with Cross-Stream Fusion"""
    def __init__(self, num_classes, num_joints=65, 
                 joint_in_ch=4, bone_in_ch=2, base_ch=32):
        super().__init__()
        
        graph = Graph()
        A = graph.A
        
        # Input normalization
        self.bn_joint = nn.BatchNorm2d(joint_in_ch)
        self.bn_bone = nn.BatchNorm2d(bone_in_ch)
        
        # ---- STREAM 1: Joint Stream ----
        self.joint_stream = nn.Sequential(
            EnhancedSTGCNBlock(joint_in_ch, base_ch, A, use_transformer=False),
            EnhancedSTGCNBlock(base_ch, base_ch * 2, A, use_transformer=True),
            EnhancedSTGCNBlock(base_ch * 2, base_ch * 3, A, stride=2, use_transformer=True)
        )
        
        # ---- STREAM 2: Bone Stream ----
        self.bone_stream = nn.Sequential(
            EnhancedSTGCNBlock(bone_in_ch, base_ch, A, use_transformer=False),
            EnhancedSTGCNBlock(base_ch, base_ch * 2, A, use_transformer=True),
            EnhancedSTGCNBlock(base_ch * 2, base_ch * 3, A, stride=2, use_transformer=True)
        )
        
        # Cross-stream fusion
        self.fusion = CrossStreamFusion(base_ch * 3)
        
        # Post-fusion layers
        self.post_fusion = nn.Sequential(
            EnhancedSTGCNBlock(base_ch * 3, base_ch * 4, A, stride=2, use_transformer=True),
            EnhancedSTGCNBlock(base_ch * 4, base_ch * 6, A, use_transformer=True)
        )
        
        # Classification head
        self.pool = nn.AdaptiveAvgPool2d((1, 1))
        self.dropout = nn.Dropout(0.3)
        self.fc = nn.Linear(base_ch * 6, num_classes)
        
    def forward(self, x_joint, x_bone):
        # Normalize
        x_joint = self.bn_joint(x_joint)
        x_bone = self.bn_bone(x_bone)
        
        # Process streams
        f_joint = self.joint_stream(x_joint)
        f_bone = self.bone_stream(x_bone)
        
        # Cross-stream fusion
        f_fused = self.fusion(f_joint, f_bone)
        
        # Post-fusion
        out = self.post_fusion(f_fused)
        
        # Classification
        out = self.pool(out).view(out.size(0), -1)
        out = self.dropout(out)
        out = self.fc(out)
        
        return out


# ============================================================================
# 9. MODEL SUMMARY
# ============================================================================
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

if __name__ == "__main__":
    model = TT_STGCN(num_classes=300, base_ch=32)
    print(f"Total Parameters: {count_parameters(model):,}")
    
    # Test forward pass
    x_joint = torch.randn(2, 4, 64, 65)
    x_bone = torch.randn(2, 2, 64, 65)
    out = model(x_joint, x_bone)
    print(f"Output shape: {out.shape}")

Total Parameters: 1,136,005
Output shape: torch.Size([2, 300])


### Dataset and Preprocessing

In [None]:

# ============================================================================
# 1. ADVANCED DATA AUGMENTATION
# ============================================================================
class SignLanguageAugmentation:
    """
    Augmentation techniques for skeleton-based sign language recognition
    """
    def __init__(self, 
                 shear_range=0.1,
                 rotate_range=20,
                 scale_range=(0.9, 1.1),
                 temporal_crop_ratio=0.9,
                 spatial_drop_prob=0.1,
                 temporal_mask_prob=0.15):
        self.shear_range = shear_range
        self.rotate_range = rotate_range
        self.scale_range = scale_range
        self.temporal_crop_ratio = temporal_crop_ratio
        self.spatial_drop_prob = spatial_drop_prob
        self.temporal_mask_prob = temporal_mask_prob
    
    def random_rotate(self, joints):
        """Random rotation around z-axis"""
        angle = np.random.uniform(-self.rotate_range, self.rotate_range)
        angle_rad = np.deg2rad(angle)
        cos_a, sin_a = np.cos(angle_rad), np.sin(angle_rad)
        
        rot_matrix = np.array([[cos_a, -sin_a], [sin_a, cos_a]])
        
        # Apply to x, y coordinates
        T, V, C = joints.shape
        joints_rot = joints.copy()
        joints_rot[:, :, :2] = np.dot(joints[:, :, :2].reshape(-1, 2), rot_matrix.T).reshape(T, V, 2)
        
        return joints_rot
    
    def random_scale(self, joints):
        """Random scaling"""
        scale = np.random.uniform(*self.scale_range)
        return joints * scale
    
    def random_shear(self, joints):
        """Random shearing transformation"""
        shear_x = np.random.uniform(-self.shear_range, self.shear_range)
        shear_y = np.random.uniform(-self.shear_range, self.shear_range)
        
        shear_matrix = np.array([[1, shear_x], [shear_y, 1]])
        
        T, V, C = joints.shape
        joints_shear = joints.copy()
        joints_shear[:, :, :2] = np.dot(joints[:, :, :2].reshape(-1, 2), shear_matrix.T).reshape(T, V, 2)
        
        return joints_shear
    
    def temporal_crop(self, joints, bones):
        """Randomly crop temporal sequence"""
        T = joints.shape[0]
        new_T = int(T * self.temporal_crop_ratio)
        
        if new_T >= T:
            return joints, bones
        
        start = np.random.randint(0, T - new_T + 1)
        return joints[start:start+new_T], bones[start:start+new_T]
    
    def spatial_dropout(self, joints):
        """Randomly drop spatial joints"""
        T, V, C = joints.shape
        mask = np.random.rand(V) > self.spatial_drop_prob
        joints_dropped = joints.copy()
        joints_dropped[:, ~mask, :] = 0
        return joints_dropped
    
    def temporal_masking(self, joints):
        """Mask random temporal segments"""
        T = joints.shape[0]
        num_masks = int(T * self.temporal_mask_prob)
        
        if num_masks == 0:
            return joints
        
        joints_masked = joints.copy()
        for _ in range(num_masks):
            start = np.random.randint(0, T)
            joints_masked[start] = 0
        
        return joints_masked
    
    def __call__(self, joints, bones, training=True):
        """Apply augmentation pipeline"""
        if not training:
            return joints, bones
        
        # Geometric augmentations (apply to joints, will affect bones implicitly)
        if np.random.rand() > 0.5:
            joints = self.random_rotate(joints)
        if np.random.rand() > 0.5:
            joints = self.random_scale(joints)
        if np.random.rand() > 0.5:
            joints = self.random_shear(joints)
        
        # Temporal augmentation
        if np.random.rand() > 0.5:
            joints, bones = self.temporal_crop(joints, bones)
        
        # Dropout augmentations
        if np.random.rand() > 0.5:
            joints = self.spatial_dropout(joints)
        if np.random.rand() > 0.5:
            joints = self.temporal_masking(joints)
        
        return joints, bones


# ============================================================================
# 2. ENHANCED DATASET WITH AUGMENTATION
# ============================================================================
class EnhancedSignLanguageDataset(torch.utils.data.Dataset):
    def __init__(self, video_ids, labels_dict, features_dir, seq_len, 
                 augmentation=None, training=True):
        self.video_ids = video_ids
        self.labels = labels_dict
        self.features_dir = features_dir
        self.seq_len = seq_len
        self.augmentation = augmentation
        self.training = training
        
        self.bone_to_joint_map = {
            0: 13, 1: 15, 2: 14, 3: 16, 4: 11
        }
    
    def __len__(self):
        return len(self.video_ids)
    
    def __getitem__(self, idx):
        video_id = self.video_ids[idx]
        
        # Load data
        import os
        file_path = os.path.join(self.features_dir, f"{video_id}.npz")
        data = np.load(file_path)
        
        joint_seq = data['joint_sequence'].astype(np.float32)
        bone_seq_flat = data['bone_sequence'].astype(np.float32)
        
        # Create V-dimensional bone tensor
        num_frames = bone_seq_flat.shape[0]
        bone_seq = np.zeros((num_frames, 65, 2), dtype=np.float32)
        for bone_idx, joint_idx in self.bone_to_joint_map.items():
            start_col = bone_idx * 2
            bone_seq[:, joint_idx, :] = bone_seq_flat[:, start_col:start_col+2]
        
        # Apply augmentation
        if self.augmentation:
            joint_seq, bone_seq = self.augmentation(joint_seq, bone_seq, self.training)
        
        # Handle padding/truncating
        original_len = joint_seq.shape[0]
        if original_len < self.seq_len:
            pad_len = self.seq_len - original_len
            joint_seq = np.pad(joint_seq, ((0, pad_len), (0, 0), (0, 0)), 'constant')
            bone_seq = np.pad(bone_seq, ((0, pad_len), (0, 0), (0, 0)), 'constant')
        elif original_len > self.seq_len:
            # Random crop during training, center crop during validation
            if self.training:
                start = np.random.randint(0, original_len - self.seq_len + 1)
            else:
                start = (original_len - self.seq_len) // 2
            joint_seq = joint_seq[start:start+self.seq_len]
            bone_seq = bone_seq[start:start+self.seq_len]
        
        label = self.labels[video_id]
        
        # Convert to tensor and permute to (C, T, V)
        joint_seq_tensor = torch.tensor(joint_seq).permute(2, 0, 1)
        bone_seq_tensor = torch.tensor(bone_seq).permute(2, 0, 1)
        
        return joint_seq_tensor, bone_seq_tensor, torch.tensor(label, dtype=torch.long)




'\n# Setup augmentation\naugmentation = SignLanguageAugmentation(\n    shear_range=0.1,\n    rotate_range=20,\n    scale_range=(0.9, 1.1),\n    temporal_crop_ratio=0.9,\n    spatial_drop_prob=0.1,\n    temporal_mask_prob=0.15\n)\n\n# Create enhanced dataset\ntrain_dataset = EnhancedSignLanguageDataset(\n    train_ids, labels_dict, features_dir, \n    seq_len=64, augmentation=augmentation, training=True\n)\n\n# Use label smoothing\ncriterion = LabelSmoothingCrossEntropy(smoothing=0.1)\n\n# Use AdamW optimizer with weight decay\noptimizer = torch.optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.05)\n\n# Use cosine annealing with warmup\nscheduler = CosineAnnealingWarmupRestarts(optimizer, warmup_epochs=10, max_epochs=350)\n\n# Training loop\nfor epoch in range(350):\n    train_loss, train_acc = train_one_epoch_advanced(\n        model, train_loader, optimizer, criterion, device,\n        use_mixup=True, mixup_alpha=0.2\n    )\n    \n    val_loss, val_top1, val_top5 = validate_wit

In [None]:



# ============================================================================
# 6. TRAINING LOOP WITH ALL IMPROVEMENTS
# ============================================================================
def train_one_epoch_advanced(model, loader, optimizer, criterion, device, 
                            use_mixup=True, mixup_alpha=0.2):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    for joint_batch, bone_batch, labels in loader:
        joint_batch = joint_batch.to(device)
        bone_batch = bone_batch.to(device)
        labels = labels.to(device)
        
        # Apply mixup
        if use_mixup and np.random.rand() > 0.5:
            joint_batch, bone_batch, labels_a, labels_b, lam = mixup_data(
                joint_batch, bone_batch, labels, mixup_alpha
            )
            
            optimizer.zero_grad()
            outputs = model(joint_batch, bone_batch)
            loss = mixup_criterion(criterion, outputs, labels_a, labels_b, lam)
        else:
            optimizer.zero_grad()
            outputs = model(joint_batch, bone_batch)
            loss = criterion(outputs, labels)
        
        loss.backward()
        
        # Gradient clipping
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        
        optimizer.step()
        
        running_loss += loss.item()
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()
    
    return running_loss / len(loader), 100. * correct / total


# ============================================================================
# 7. VALIDATION WITH TTA (TEST TIME AUGMENTATION)
# ============================================================================
def validate_with_tta(model, loader, criterion, device, num_tta=5):
    """Test-time augmentation for better accuracy"""
    model.eval()
    running_loss = 0.0
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for joint_batch, bone_batch, labels in loader:
            joint_batch = joint_batch.to(device)
            bone_batch = bone_batch.to(device)
            labels = labels.to(device)
            
            # Accumulate predictions from multiple augmentations
            tta_preds = []
            for _ in range(num_tta):
                outputs = model(joint_batch, bone_batch)
                tta_preds.append(F.softmax(outputs, dim=1))
            
            # Average predictions
            outputs_avg = torch.stack(tta_preds).mean(0)
            loss = criterion(outputs_avg.log(), labels)
            
            running_loss += loss.item()
            all_preds.append(outputs_avg.cpu())
            all_labels.append(labels.cpu())
    
    all_preds = torch.cat(all_preds, 0)
    all_labels = torch.cat(all_labels, 0)
    
    # Top-1 accuracy
    _, predicted = all_preds.max(1)
    correct = predicted.eq(all_labels).sum().item()
    top1_acc = 100. * correct / len(all_labels)
    
    # Top-5 accuracy
    _, pred_top5 = all_preds.topk(5, 1, True, True)
    correct_5 = pred_top5.eq(all_labels.view(-1, 1).expand_as(pred_top5)).sum().item()
    top5_acc = 100. * correct_5 / len(all_labels)
    
    return running_loss / len(loader), top1_acc, top5_acc


### Training

In [None]:
import numpy as np
import json
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import os
from tqdm import tqdm
import time

# ============================================================================
# CONFIGURATION
# ============================================================================
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {DEVICE}")

# Paths
FEATURES_DIR = '/kaggle/input/msegcn-individual/MSE_GCN_features_individual'
JSON_PATH = '/kaggle/input/json-files/nslt_300.json'

# Hyperparameters
SEQ_LEN = 64
BATCH_SIZE = 24  # Increased for better gradient estimates
EPOCHS = 150  # More epochs with better regularization
BASE_CHANNELS = 32  # Lightweight
LEARNING_RATE = 0.001
WEIGHT_DECAY = 0.05
LABEL_SMOOTHING = 0.1
MIXUP_ALPHA = 0.2

print(f"Configuration loaded. Batch size: {BATCH_SIZE}, Epochs: {EPOCHS}")


# ============================================================================
# LOAD DATA AND CREATE SPLITS
# ============================================================================
print("\nLoading JSON data...")
with open(JSON_PATH, 'r') as f:
    json_data = json.load(f)

train_ids, val_ids, test_ids = [], [], []
labels_dict = {}

unique_labels = sorted({info['action'][0] for info in json_data.values()})
class_to_idx = {label: i for i, label in enumerate(unique_labels)}
num_classes = len(unique_labels)

print(f"Number of classes: {num_classes}")

for video_id, info in json_data.items():
    file_path = os.path.join(FEATURES_DIR, f"{video_id}.npz")
    if os.path.exists(file_path):
        original_label = info['action'][0]
        labels_dict[video_id] = class_to_idx[original_label]
        
        if info['subset'] == 'train':
            train_ids.append(video_id)
        elif info['subset'] == 'val':
            val_ids.append(video_id)
        elif info['subset'] == 'test':
            test_ids.append(video_id)

print(f"Train: {len(train_ids)}, Val: {len(val_ids)}, Test: {len(test_ids)}")

# ============================================================================
# CREATE DATASETS AND DATALOADERS
# ============================================================================
print("\nCreating datasets with augmentation...")

# Augmentation for training
augmentation = SignLanguageAugmentation(
    shear_range=0.1,
    rotate_range=20,
    scale_range=(0.9, 1.1),
    temporal_crop_ratio=0.9,
    spatial_drop_prob=0.1,
    temporal_mask_prob=0.15
)

train_dataset = EnhancedSignLanguageDataset(
    train_ids, labels_dict, FEATURES_DIR, 
    seq_len=SEQ_LEN, augmentation=augmentation, training=True
)

val_dataset = EnhancedSignLanguageDataset(
    val_ids, labels_dict, FEATURES_DIR, 
    seq_len=SEQ_LEN, augmentation=None, training=False
)

test_dataset = EnhancedSignLanguageDataset(
    test_ids, labels_dict, FEATURES_DIR, 
    seq_len=SEQ_LEN, augmentation=None, training=False
)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, 
                          num_workers=2, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, 
                        num_workers=2, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, 
                         num_workers=2, pin_memory=True)

print(f"Dataloaders created. Train batches: {len(train_loader)}")


# ============================================================================
# INSTANTIATE MODEL
# ============================================================================
print("\nInitializing TT-STGCN model...")
model = TT_STGCN(num_classes=num_classes, base_ch=BASE_CHANNELS).to(DEVICE)

total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Total trainable parameters: {total_params:,}")


# ============================================================================
# SETUP TRAINING COMPONENTS
# ============================================================================
print("\nSetting up training components...")

# Loss function with label smoothing
criterion = LabelSmoothingCrossEntropy(smoothing=LABEL_SMOOTHING)

# Optimizer: AdamW with weight decay
optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE, weight_decay=WEIGHT_DECAY)

# Scheduler: Cosine annealing with warmup
scheduler = CosineAnnealingWarmupRestarts(optimizer, warmup_epochs=10, max_epochs=EPOCHS)


# ============================================================================
# TRAINING LOOP
# ============================================================================
print("\n" + "="*70)
print("STARTING TRAINING")
print("="*70)

best_val_acc = 0.0
patience_counter = 0
PATIENCE = 50  # Early stopping patience

for epoch in range(EPOCHS):
    start_time = time.time()
    
    # Train
    train_loss, train_acc = train_one_epoch_advanced(
        model, train_loader, optimizer, criterion, DEVICE,
        use_mixup=True, mixup_alpha=MIXUP_ALPHA
    )
    
    # Validate (with TTA every 10 epochs for speed)
    if (epoch + 1) % 10 == 0:
        val_loss, val_top1, val_top5 = validate_with_tta(
            model, val_loader, criterion, DEVICE, num_tta=5
        )
    else:
        val_loss, val_top1, val_top5 = validate_with_tta(
            model, val_loader, criterion, DEVICE, num_tta=1
        )
    
    # Step scheduler
    scheduler.step()
    
    epoch_time = time.time() - start_time
    current_lr = optimizer.param_groups[0]['lr']
    
    # Print progress
    print(f"Epoch {epoch+1:03d}/{EPOCHS} | Time: {epoch_time:.1f}s | "
          f"LR: {current_lr:.6f} | "
          f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.2f}% | "
          f"Val Loss: {val_loss:.4f} | Val Top-1: {val_top1:.2f}% | Val Top-5: {val_top5:.2f}%")
    
    # Save best model
    if val_top1 > best_val_acc:
        best_val_acc = val_top1
        patience_counter = 0
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'val_acc': val_top1,
        }, 'best_tt_stgcn.pth')
        print(f"âœ“ New best model saved! Val Top-1: {best_val_acc:.2f}%")
    else:
        patience_counter += 1
    
    # Early stopping
    if patience_counter >= PATIENCE:
        print(f"\nEarly stopping triggered after {epoch+1} epochs")
        break


# ============================================================================
# FINAL EVALUATION ON TEST SET
# ============================================================================
print("\n" + "="*70)
print("EVALUATING ON TEST SET")
print("="*70)

# Load best model
checkpoint = torch.load('best_tt_stgcn.pth')
model.load_state_dict(checkpoint['model_state_dict'])

test_loss, test_top1, test_top5 = validate_with_tta(
    model, test_loader, criterion, DEVICE, num_tta=10
)

print(f"\nFINAL TEST RESULTS:")
print(f"  Test Top-1 Accuracy: {test_top1:.2f}%")
print(f"  Test Top-5 Accuracy: {test_top5:.2f}%")
print(f"  Best Val Top-1 Accuracy: {best_val_acc:.2f}%")
print(f"\nâœ“ Training complete! Model saved as 'best_tt_stgcn.pth'")


# ============================================================================
# SAVE PREDICTIONS FOR SUBMISSION
# ============================================================================
print("\nGenerating predictions for submission...")

model.eval()
all_preds = []
all_video_ids = []

with torch.no_grad():
    for idx in tqdm(range(len(test_dataset)), desc="Predicting"):
        joint, bone, _ = test_dataset[idx]
        joint = joint.unsqueeze(0).to(DEVICE)
        bone = bone.unsqueeze(0).to(DEVICE)
        
        # TTA for final predictions
        tta_preds = []
        for _ in range(10):
            output = model(joint, bone)
            tta_preds.append(F.softmax(output, dim=1))
        
        pred = torch.stack(tta_preds).mean(0)
        pred_class = pred.argmax(1).item()
        
        all_preds.append(pred_class)
        all_video_ids.append(test_ids[idx])

# Save predictions
predictions_dict = {vid: pred for vid, pred in zip(all_video_ids, all_preds)}
with open('test_predictions.json', 'w') as f:
    json.dump(predictions_dict, f, indent=2)

print("âœ“ Predictions saved to 'test_predictions.json'")
print("\n" + "="*70)
print("ALL DONE! ðŸŽ‰")
print("="*70)

Using device: cuda
Configuration loaded. Batch size: 24, Epochs: 150

Loading JSON data...
Number of classes: 300
Train: 3548, Val: 900, Test: 668

Creating datasets with augmentation...
Dataloaders created. Train batches: 148

Initializing TT-STGCN model...
Total trainable parameters: 1,136,005

Setting up training components...

STARTING TRAINING
Epoch 001/150 | Time: 73.2s | LR: 0.000200 | Train Loss: 5.7460 | Train Acc: 0.39% | Val Loss: 5.6403 | Val Top-1: 1.00% | Val Top-5: 4.67%
âœ“ New best model saved! Val Top-1: 1.00%
Epoch 002/150 | Time: 80.8s | LR: 0.000300 | Train Loss: 5.5993 | Train Acc: 0.87% | Val Loss: 5.3668 | Val Top-1: 2.22% | Val Top-5: 8.33%
âœ“ New best model saved! Val Top-1: 2.22%
Epoch 003/150 | Time: 83.5s | LR: 0.000400 | Train Loss: 5.3640 | Train Acc: 1.35% | Val Loss: 5.0567 | Val Top-1: 3.78% | Val Top-5: 13.22%
âœ“ New best model saved! Val Top-1: 3.78%
Epoch 004/150 | Time: 83.7s | LR: 0.000500 | Train Loss: 5.1698 | Train Acc: 2.23% | Val Loss: 4.90

UnpicklingError: Weights only load failed. This file can still be loaded, to do so you have two options, [1mdo those steps only if you trust the source of the checkpoint[0m. 
	(1) In PyTorch 2.6, we changed the default value of the `weights_only` argument in `torch.load` from `False` to `True`. Re-running `torch.load` with `weights_only` set to `False` will likely succeed, but it can result in arbitrary code execution. Do it only if you got the file from a trusted source.
	(2) Alternatively, to load with `weights_only=True` please check the recommended steps in the following error message.
	WeightsUnpickler error: Unsupported global: GLOBAL numpy.core.multiarray.scalar was not an allowed global by default. Please use `torch.serialization.add_safe_globals([scalar])` or the `torch.serialization.safe_globals([scalar])` context manager to allowlist this global if you trust this class/function.

Check the documentation of torch.load to learn more about types accepted by default with weights_only https://pytorch.org/docs/stable/generated/torch.load.html.

## Check Test Accuracy on 300

In [None]:

import numpy as np
import json
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import os
from tqdm import tqdm
import time

# ============================================================================
# CONFIGURATION
# ============================================================================
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {DEVICE}")

# Paths
FEATURES_DIR = '/kaggle/input/msegcn-individual/MSE_GCN_features_individual'
JSON_PATH = '/kaggle/input/json-files/nslt_300.json'

# Hyperparameters
SEQ_LEN = 64
BATCH_SIZE = 24  # Increased for better gradient estimates
EPOCHS = 150  # More epochs with better regularization
BASE_CHANNELS = 32  # Lightweight
LEARNING_RATE = 0.001
WEIGHT_DECAY = 0.05
LABEL_SMOOTHING = 0.1
MIXUP_ALPHA = 0.2

print(f"Configuration loaded. Batch size: {BATCH_SIZE}, Epochs: {EPOCHS}")


# ============================================================================
# LOAD DATA AND CREATE SPLITS
# ============================================================================
print("\nLoading JSON data...")
with open(JSON_PATH, 'r') as f:
    json_data = json.load(f)

train_ids, val_ids, test_ids = [], [], []
labels_dict = {}

unique_labels = sorted({info['action'][0] for info in json_data.values()})
class_to_idx = {label: i for i, label in enumerate(unique_labels)}
num_classes = len(unique_labels)

print(f"Number of classes: {num_classes}")

for video_id, info in json_data.items():
    file_path = os.path.join(FEATURES_DIR, f"{video_id}.npz")
    if os.path.exists(file_path):
        original_label = info['action'][0]
        labels_dict[video_id] = class_to_idx[original_label]
        
        if info['subset'] == 'train':
            train_ids.append(video_id)
        elif info['subset'] == 'val':
            val_ids.append(video_id)
        elif info['subset'] == 'test':
            test_ids.append(video_id)

print(f"Train: {len(train_ids)}, Val: {len(val_ids)}, Test: {len(test_ids)}")

print("\nCreating datasets with augmentation...")

# Augmentation for training
augmentation = SignLanguageAugmentation(
    shear_range=0.1,
    rotate_range=20,
    scale_range=(0.9, 1.1),
    temporal_crop_ratio=0.9,
    spatial_drop_prob=0.1,
    temporal_mask_prob=0.15
)

train_dataset = EnhancedSignLanguageDataset(
    train_ids, labels_dict, FEATURES_DIR, 
    seq_len=SEQ_LEN, augmentation=augmentation, training=True
)

val_dataset = EnhancedSignLanguageDataset(
    val_ids, labels_dict, FEATURES_DIR, 
    seq_len=SEQ_LEN, augmentation=None, training=False
)

test_dataset = EnhancedSignLanguageDataset(
    test_ids, labels_dict, FEATURES_DIR, 
    seq_len=SEQ_LEN, augmentation=None, training=False
)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, 
                          num_workers=2, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, 
                        num_workers=2, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, 
                         num_workers=2, pin_memory=True)

print(f"Dataloaders created. Train batches: {len(train_loader)}")


# ============================================================================
# INSTANTIATE MODEL
# ============================================================================
print("\nInitializing TT-STGCN model...")
model = TT_STGCN(num_classes=num_classes, base_ch=BASE_CHANNELS).to(DEVICE)

total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Total trainable parameters: {total_params:,}")


# ============================================================================
# SETUP TRAINING COMPONENTS
# ============================================================================
print("\nSetting up training components...")

# Loss function with label smoothing
criterion = LabelSmoothingCrossEntropy(smoothing=LABEL_SMOOTHING)

# Optimizer: AdamW with weight decay
optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE, weight_decay=WEIGHT_DECAY)

# Scheduler: Cosine annealing with warmup
scheduler = CosineAnnealingWarmupRestarts(optimizer, warmup_epochs=10, max_epochs=EPOCHS)



Using device: cuda
Configuration loaded. Batch size: 24, Epochs: 150

Loading JSON data...
Number of classes: 300
Train: 3548, Val: 900, Test: 668

Creating datasets with augmentation...
Dataloaders created. Train batches: 148

Initializing TT-STGCN model...
Total trainable parameters: 1,136,005

Setting up training components...


In [11]:
import torch
from torch.utils.data import DataLoader
from tqdm import tqdm



BEST_MODEL_PATH = "/kaggle/input/ttstgcn/other/default/1/best_tt_stgcn.pth" # Change path if needed



model = TT_STGCN(num_classes=300, base_ch=BASE_CHANNELS).to(DEVICE)
checkpoint = torch.load(BEST_MODEL_PATH, map_location=DEVICE, weights_only=False)

# If checkpoint contains model_state_dict, extract it
if "model_state_dict" in checkpoint:
    state_dict = checkpoint["model_state_dict"]
else:
    state_dict = checkpoint

model.load_state_dict(state_dict, strict=False)
model.eval()

print(f"âœ… Loaded best model from: {BEST_MODEL_PATH}")


test_augmentation = SignLanguageAugmentation() # no effect since training=False
test_dataset = EnhancedSignLanguageDataset(
test_ids,
labels_dict,
FEATURES_DIR,
seq_len=SEQ_LEN,
augmentation=test_augmentation,
training=False
)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=2)

print(f"âœ… Loaded {len(test_dataset)} test samples.")

âœ… Loaded best model from: /kaggle/input/ttstgcn/other/default/1/best_tt_stgcn.pth
âœ… Loaded 668 test samples.


In [15]:
@torch.no_grad()
def evaluate_model(model, loader, device):
    model.eval()
    correct_top1, correct_top5, total = 0, 0, 0
    all_preds, all_labels = [], []
    
    for joint_batch, bone_batch, labels in tqdm(loader, desc="Evaluating"):
        joint_batch, bone_batch, labels = (
            joint_batch.to(device),
            bone_batch.to(device),
            labels.to(device),
        )
    
        outputs = model(joint_batch, bone_batch)
        probs = F.softmax(outputs, dim=1)
    
        # --- Top-1 accuracy ---
        _, pred_top1 = probs.max(1)
        correct_top1 += pred_top1.eq(labels).sum().item()
    
        # --- Top-5 accuracy ---
        _, pred_top5 = probs.topk(5, 1, True, True)
        correct_top5 += pred_top5.eq(labels.view(-1, 1).expand_as(pred_top5)).sum().item()
    
        total += labels.size(0)
    
    # --- Compute final accuracies ---
    top1_acc = 100.0 * correct_top1 / total
    top5_acc = 100.0 * correct_top5 / total
    
    return top1_acc, top5_acc


# ============================================================
# RUN EVALUATION
# ============================================================
top1, top5 = evaluate_model(model, test_loader, DEVICE)
print(f"\nðŸŽ¯ Test Accuracy (Top-1): {top1:.2f}%")
print(f"ðŸŽ¯ Test Accuracy (Top-5): {top5:.2f}%")


Evaluating: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 21/21 [00:05<00:00,  3.87it/s]


ðŸŽ¯ Test Accuracy (Top-1): 72.01%
ðŸŽ¯ Test Accuracy (Top-5): 88.77%





## Test on 100 Classes


In [None]:

import numpy as np
import json
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import os
from tqdm import tqdm
import time

# ============================================================================
# CONFIGURATION
# ============================================================================
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {DEVICE}")

# Paths
FEATURES_DIR = '/kaggle/input/msegcn-individual/MSE_GCN_features_individual'
JSON_PATH = '/kaggle/input/json-files/nslt_100.json'

# Hyperparameters
SEQ_LEN = 64
BATCH_SIZE = 24  # Increased for better gradient estimates
EPOCHS = 150  # More epochs with better regularization
BASE_CHANNELS = 32  # Lightweight
LEARNING_RATE = 0.001
WEIGHT_DECAY = 0.05
LABEL_SMOOTHING = 0.1
MIXUP_ALPHA = 0.2

print(f"Configuration loaded. Batch size: {BATCH_SIZE}, Epochs: {EPOCHS}")


# ============================================================================
# LOAD DATA AND CREATE SPLITS
# ============================================================================
print("\nLoading JSON data...")
with open(JSON_PATH, 'r') as f:
    json_data = json.load(f)

train_ids, val_ids, test_ids = [], [], []
labels_dict = {}

unique_labels = sorted({info['action'][0] for info in json_data.values()})
class_to_idx = {label: i for i, label in enumerate(unique_labels)}
num_classes = len(unique_labels)

print(f"Number of classes: {num_classes}")

for video_id, info in json_data.items():
    file_path = os.path.join(FEATURES_DIR, f"{video_id}.npz")
    if os.path.exists(file_path):
        original_label = info['action'][0]
        labels_dict[video_id] = class_to_idx[original_label]
        
        if info['subset'] == 'train':
            train_ids.append(video_id)
        elif info['subset'] == 'val':
            val_ids.append(video_id)
        elif info['subset'] == 'test':
            test_ids.append(video_id)

print(f"Train: {len(train_ids)}, Val: {len(val_ids)}, Test: {len(test_ids)}")


print("\nCreating datasets with augmentation...")

# Augmentation for training
augmentation = SignLanguageAugmentation(
    shear_range=0.1,
    rotate_range=20,
    scale_range=(0.9, 1.1),
    temporal_crop_ratio=0.9,
    spatial_drop_prob=0.1,
    temporal_mask_prob=0.15
)

train_dataset = EnhancedSignLanguageDataset(
    train_ids, labels_dict, FEATURES_DIR, 
    seq_len=SEQ_LEN, augmentation=augmentation, training=True
)

val_dataset = EnhancedSignLanguageDataset(
    val_ids, labels_dict, FEATURES_DIR, 
    seq_len=SEQ_LEN, augmentation=None, training=False
)

test_dataset = EnhancedSignLanguageDataset(
    test_ids, labels_dict, FEATURES_DIR, 
    seq_len=SEQ_LEN, augmentation=None, training=False
)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, 
                          num_workers=2, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, 
                        num_workers=2, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, 
                         num_workers=2, pin_memory=True)

print(f"Dataloaders created. Train batches: {len(train_loader)}")


# ============================================================================
# INSTANTIATE MODEL
# ============================================================================
print("\nInitializing TT-STGCN model...")
model = TT_STGCN(num_classes=num_classes, base_ch=BASE_CHANNELS).to(DEVICE)

total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Total trainable parameters: {total_params:,}")


# ============================================================================
# SETUP TRAINING COMPONENTS
# ============================================================================
print("\nSetting up training components...")

# Loss function with label smoothing
criterion = LabelSmoothingCrossEntropy(smoothing=LABEL_SMOOTHING)

# Optimizer: AdamW with weight decay
optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE, weight_decay=WEIGHT_DECAY)

# Scheduler: Cosine annealing with warmup
scheduler = CosineAnnealingWarmupRestarts(optimizer, warmup_epochs=10, max_epochs=EPOCHS)



Using device: cuda
Configuration loaded. Batch size: 24, Epochs: 150

Loading JSON data...
Number of classes: 100
Train: 1441, Val: 338, Test: 258

Creating datasets with augmentation...
Dataloaders created. Train batches: 61

Initializing TT-STGCN model...
Total trainable parameters: 1,097,405

Setting up training components...


In [33]:
import torch
from torch.utils.data import DataLoader
from tqdm import tqdm



BEST_MODEL_PATH = "/kaggle/input/ttstgcn/other/default/1/best_tt_stgcn.pth" # Change path if needed



model = TT_STGCN(num_classes=300, base_ch=BASE_CHANNELS).to(DEVICE)
checkpoint = torch.load(BEST_MODEL_PATH, map_location=DEVICE, weights_only=False)

# If checkpoint contains model_state_dict, extract it
if "model_state_dict" in checkpoint:
    state_dict = checkpoint["model_state_dict"]
else:
    state_dict = checkpoint

model.load_state_dict(state_dict, strict=False)
model.eval()

print(f"âœ… Loaded best model from: {BEST_MODEL_PATH}")


test_augmentation = SignLanguageAugmentation() # no effect since training=False
test_dataset = EnhancedSignLanguageDataset(
test_ids,
labels_dict,
FEATURES_DIR,
seq_len=SEQ_LEN,
augmentation=test_augmentation,
training=False
)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=2)

print(f"âœ… Loaded {len(test_dataset)} test samples.")

âœ… Loaded best model from: /kaggle/input/ttstgcn/other/default/1/best_tt_stgcn.pth
âœ… Loaded 258 test samples.


In [34]:
@torch.no_grad()
def evaluate_model(model, loader, device):
    model.eval()
    correct_top1, correct_top5, total = 0, 0, 0
    all_preds, all_labels = [], []
    
    for joint_batch, bone_batch, labels in tqdm(loader, desc="Evaluating"):
        joint_batch, bone_batch, labels = (
            joint_batch.to(device),
            bone_batch.to(device),
            labels.to(device),
        )
    
        outputs = model(joint_batch, bone_batch)
        probs = F.softmax(outputs, dim=1)
    
        # --- Top-1 accuracy ---
        _, pred_top1 = probs.max(1)
        correct_top1 += pred_top1.eq(labels).sum().item()
    
        # --- Top-5 accuracy ---
        _, pred_top5 = probs.topk(5, 1, True, True)
        correct_top5 += pred_top5.eq(labels.view(-1, 1).expand_as(pred_top5)).sum().item()
    
        total += labels.size(0)
    
    # --- Compute final accuracies ---
    top1_acc = 100.0 * correct_top1 / total
    top5_acc = 100.0 * correct_top5 / total
    
    return top1_acc, top5_acc


# ============================================================
# RUN EVALUATION
# ============================================================
top1, top5 = evaluate_model(model, test_loader, DEVICE)
print(f"\nðŸŽ¯ Test Accuracy (Top-1): {top1:.2f}%")
print(f"ðŸŽ¯ Test Accuracy (Top-5): {top5:.2f}%")


Evaluating: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 9/9 [00:01<00:00,  4.61it/s]


ðŸŽ¯ Test Accuracy (Top-1): 74.03%
ðŸŽ¯ Test Accuracy (Top-5): 90.31%



