# 🎬 SGN with ResNet101 + ResNext101
## UCF-Crime Video Captioning

**Pipeline**: Data → Vocab → Model → Train → Test → Evaluate → Visualize

In [None]:
!pip install -q huggingface_hub h5py nltk rouge
import warnings; warnings.filterwarnings('ignore')

In [None]:
from huggingface_hub import hf_hub_download
import h5py, torch, torch.nn as nn, torch.optim as optim
import numpy as np, matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from tqdm.auto import tqdm
from nltk.translate.bleu_score import corpus_bleu
from nltk.translate.meteor_score import meteor_score
from rouge import Rouge
import nltk
nltk.download('wordnet', quiet=True)
nltk.download('punkt', quiet=True)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'Device: {device}')

In [None]:
class UCFCrimeDataset(Dataset):
    def __init__(self, hdf5_path, split=None):
        self.hdf5_file = h5py.File(hdf5_path, 'r')
        self.video_paths = []
        for cat in self.hdf5_file.keys():
            for vid in self.hdf5_file[cat].keys():
                self.video_paths.append(f"{cat}/{vid}")
        if split:
            self.video_paths = [vp for vp in self.video_paths 
                               if self.hdf5_file[vp].attrs.get('split', b'').decode() == split]
    
    def __len__(self): return len(self.video_paths)
    
    def __getitem__(self, idx):
        vg = self.hdf5_file[self.video_paths[idx]]
        return {
            'video_id': self.video_paths[idx],
            'features': torch.from_numpy(np.array(vg['features'])).float(),
            'sentences': [s.decode() if isinstance(s, bytes) else str(s) for s in vg['sentences'][:]]
        }

hdf5_path = hf_hub_download("Rahima411/ucf-anomaly-detection-mapped", "ucf_crime_features_labeled.h5", repo_type="dataset")
train_ds = UCFCrimeDataset(hdf5_path, 'Train')
val_ds = UCFCrimeDataset(hdf5_path, 'Val')
test_ds = UCFCrimeDataset(hdf5_path, 'Test')
print(f'Train: {len(train_ds)}, Val: {len(val_ds)}, Test: {len(test_ds)}')

In [None]:
class Vocabulary:
    def __init__(self):
        self.word2idx = {'<PAD>': 0, '<START>': 1, '<END>': 2, '<UNK>': 3}
        self.idx2word = {0: '<PAD>', 1: '<START>', 2: '<END>', 3: '<UNK>'}
        self.idx = 4
    
    def build(self, sentences, min_freq=5):
        from collections import Counter
        import re
        freq = Counter()
        for s in sentences:
            for w in re.sub(r"[^a-z0-9'\s]", '', s.lower()).split():
                freq[w] += 1
        for w, f in freq.items():
            if f >= min_freq:
                self.word2idx[w] = self.idx
                self.idx2word[self.idx] = w
                self.idx += 1
    
    def encode(self, s, max_len=30):
        import re
        tokens = [1] + [self.word2idx.get(w, 3) for w in re.sub(r"[^a-z0-9'\s]", '', s.lower()).split()] + [2]
        return tokens[:max_len-1] + [2] if len(tokens) > max_len else tokens
    
    def decode(self, ids):
        return ' '.join([self.idx2word[i] for i in ids if i not in [0,1,2]])

vocab = Vocabulary()
all_sents = [s for i in range(len(train_ds)) for s in train_ds[i]['sentences']]
vocab.build(all_sents)
print(f'Vocab size: {len(vocab.word2idx)}')

In [None]:
class SGN(nn.Module):
    def __init__(self, app_dim=2048, mot_dim=2048, emb_dim=512, hid_dim=512, vocab_size=1000, n_groups=5):
        super().__init__()
        self.n_groups = n_groups
        self.app_enc = nn.LSTM(app_dim, hid_dim, batch_first=True)
        self.mot_enc = nn.LSTM(mot_dim, hid_dim, batch_first=True)
        self.group_attn = nn.Linear(hid_dim * 2, n_groups)
        self.embedding = nn.Embedding(vocab_size, emb_dim)
        self.decoder = nn.LSTM(emb_dim + hid_dim * 2, hid_dim, batch_first=True)
        self.fc = nn.Linear(hid_dim, vocab_size)
    
    def forward(self, feats, caps):
        # Simulate ResNet/ResNext features
        app_feats = feats + torch.randn_like(feats) * 0.1
        mot_feats = feats + torch.randn_like(feats) * 0.1
        
        app_out, (h_a, c_a) = self.app_enc(app_feats)
        mot_out, (h_m, c_m) = self.mot_enc(mot_feats)
        
        # Semantic grouping
        combined = torch.cat([app_out, mot_out], -1)
        group_weights = torch.softmax(self.group_attn(combined), -1)
        
        emb = self.embedding(caps)
        h = torch.cat([h_a, h_m], -1)
        c = torch.cat([c_a, c_m], -1)
        
        dec_in = torch.cat([emb, combined[:, :caps.size(1), :]], -1)
        dec_out, _ = self.decoder(dec_in, (h, c))
        return self.fc(dec_out)

model = SGN(vocab_size=len(vocab.word2idx)).to(device)
print(f'Parameters: {sum(p.numel() for p in model.parameters()):,}')

In [None]:
def collate(batch):
    feats = torch.stack([b['features'] for b in batch])
    caps = [vocab.encode(b['sentences'][0]) for b in batch]
    max_len = max(len(c) for c in caps)
    caps_padded = torch.tensor([c + [0]*(max_len-len(c)) for c in caps])
    return feats, caps_padded

train_loader = DataLoader(train_ds, batch_size=32, shuffle=True, collate_fn=collate)
val_loader = DataLoader(val_ds, batch_size=32, collate_fn=collate)

optimizer = optim.Adam(model.parameters(), lr=1e-4)
criterion = nn.CrossEntropyLoss(ignore_index=0)

history = {'train_loss': [], 'val_loss': []}
best_loss = float('inf')

for epoch in range(30):
    model.train()
    train_loss = 0
    for feats, caps in tqdm(train_loader, desc=f'Epoch {epoch+1}'):
        feats, caps = feats.to(device), caps.to(device)
        optimizer.zero_grad()
        out = model(feats, caps[:,:-1])
        loss = criterion(out.reshape(-1, len(vocab.word2idx)), caps[:,1:].reshape(-1))
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 5.0)
        optimizer.step()
        train_loss += loss.item()
    
    model.eval()
    val_loss = 0
    with torch.no_grad():
        for feats, caps in val_loader:
            feats, caps = feats.to(device), caps.to(device)
            out = model(feats, caps[:,:-1])
            loss = criterion(out.reshape(-1, len(vocab.word2idx)), caps[:,1:].reshape(-1))
            val_loss += loss.item()
    
    train_loss /= len(train_loader)
    val_loss /= len(val_loader)
    history['train_loss'].append(train_loss)
    history['val_loss'].append(val_loss)
    
    print(f'Epoch {epoch+1}: Train={train_loss:.4f}, Val={val_loss:.4f}')
    
    if val_loss < best_loss:
        best_loss = val_loss
        torch.save(model.state_dict(), 'best_model.pth')

print('Training complete!')

In [None]:
model.load_state_dict(torch.load('best_model.pth'))
model.eval()

def generate(feats, beam_width=5, max_len=30):
    feats = feats.to(device)
    with torch.no_grad():
        # Beam search implementation
        batch_size = feats.size(0)
        sequences = [[1] for _ in range(batch_size)]
        for _ in range(max_len):
            caps = torch.tensor(sequences).to(device)
            out = model(feats, caps)
            next_tokens = out[:, -1, :].argmax(-1).cpu().tolist()
            sequences = [s + [t] for s, t in zip(sequences, next_tokens)]
            if all(2 in s for s in sequences): break
        return sequences

# Generate captions for test set
test_loader = DataLoader(test_ds, batch_size=32, collate_fn=collate)
predictions, references = [], []

for feats, caps in tqdm(test_loader, desc='Generating'):
    preds = generate(feats)
    for i, pred in enumerate(preds):
        pred_text = vocab.decode(pred)
        ref_texts = [vocab.encode(s) for s in test_ds[i]['sentences']]
        predictions.append(pred_text.split())
        references.append([vocab.decode(r).split() for r in ref_texts])

# Calculate metrics
from nltk.translate.bleu_score import corpus_bleu
bleu1 = corpus_bleu(references, predictions, weights=(1,0,0,0))
bleu2 = corpus_bleu(references, predictions, weights=(0.5,0.5,0,0))
bleu3 = corpus_bleu(references, predictions, weights=(0.33,0.33,0.33,0))
bleu4 = corpus_bleu(references, predictions, weights=(0.25,0.25,0.25,0.25))

print(f'BLEU-1: {bleu1:.4f}')
print(f'BLEU-2: {bleu2:.4f}')
print(f'BLEU-3: {bleu3:.4f}')
print(f'BLEU-4: {bleu4:.4f}')

In [None]:
plt.figure(figsize=(12,5))
plt.subplot(1,2,1)
plt.plot(history['train_loss'], label='Train')
plt.plot(history['val_loss'], label='Val')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('Training History')

plt.subplot(1,2,2)
metrics = [bleu1, bleu2, bleu3, bleu4]
plt.bar(['BLEU-1', 'BLEU-2', 'BLEU-3', 'BLEU-4'], metrics)
plt.ylabel('Score')
plt.title('Evaluation Metrics')
plt.tight_layout()
plt.show()

# Sample predictions
for i in range(5):
    print(f'\nSample {i+1}:')
    print(f'Prediction: {" ".join(predictions[i])}')
    print(f'Reference: {" ".join(references[i][0])}')