In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import numpy as np
import os
import json
from typing import List

In [2]:
with open('annotations/thumos_14_anno.json', 'r') as f:
    thumos_14_anno = json.load(f)
    
len(thumos_14_anno['database'].keys())

FileNotFoundError: [Errno 2] No such file or directory: 'annotations/thumos_14_anno.json'

In [None]:
class THUMOSFeatureDataset(Dataset):
    def __init__(self, feature_dir, anno_path, max_len=100):
        self.feature_dir = feature_dir
        self.max_len = max_len
        with open(anno_path, "r") as f:
            self.annotations = json.load(
                f
            ).database  # {"video_name": [{"start": float, "end": float, "label": int}, ...]}
        self.video_list = list(self.annotations.keys())

    def __len__(self):
        return len(self.video_list)

    def __getitem__(self, idx):
        video_name = self.video_list[idx]
        feature_path = os.path.join(self.feature_dir, video_name + ".npy")
        feature = np.load(feature_path)

        # Pad/Trim
        if feature.shape[0] < self.max_len:
            pad = np.zeros((self.max_len - feature.shape[0], feature.shape[1]))
            feature = np.vstack([feature, pad])
        else:
            feature = feature[: self.max_len]

        # Generate target maps
        gt = self.annotations[video_name]  # list of actions
        start_label = np.zeros(self.max_len)
        end_label = np.zeros(self.max_len)
        for a in gt:
            start = int(a["segment"][0] / 100 * self.max_len)
            end = int(a["end"][1] / 100 * self.max_len)
            if 0 <= start < self.max_len:
                start_label[start] = 1
            if 0 <= end < self.max_len:
                end_label[end] = 1

        return (
            torch.tensor(feature, dtype=torch.float32),
            torch.tensor(start_label, dtype=torch.float32),
            torch.tensor(end_label, dtype=torch.float32),
            video_name,
        )

In [None]:
# Full BMN-like Temporal Action Detection Model for THUMOS14
# Includes: Dataset loading, model, training, proposal generation, and inference.

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import numpy as np
import os
import json
from typing import List

# -------------------- Dataset --------------------
class THUMOSFeatureDataset(Dataset):
    def __init__(self, feature_dir, anno_path, max_len=100):
        self.feature_dir = feature_dir
        self.max_len = max_len
        with open(anno_path, 'r') as f:
            self.annotations = json.load(f)  # {"video_name": [{"start": float, "end": float, "label": int}, ...]}
        self.video_list = list(self.annotations.keys())

    def __len__(self):
        return len(self.video_list)

    def __getitem__(self, idx):
        video_name = self.video_list[idx]
        feature_path = os.path.join(self.feature_dir, video_name + ".npy")
        feature = np.load(feature_path)

        # Pad/Trim
        if feature.shape[0] < self.max_len:
            pad = np.zeros((self.max_len - feature.shape[0], feature.shape[1]))
            feature = np.vstack([feature, pad])
        else:
            feature = feature[:self.max_len]

        # Generate target maps
        gt = self.annotations[video_name]  # list of actions
        start_label = np.zeros(self.max_len)
        end_label = np.zeros(self.max_len)
        for a in gt:
            start = int(a['start'] / 100 * self.max_len)
            end = int(a['end'] / 100 * self.max_len)
            if 0 <= start < self.max_len:
                start_label[start] = 1
            if 0 <= end < self.max_len:
                end_label[end] = 1

        return (
            torch.tensor(feature, dtype=torch.float32),
            torch.tensor(start_label, dtype=torch.float32),
            torch.tensor(end_label, dtype=torch.float32),
            video_name
        )

# -------------------- BMN Minimal Model --------------------
class BMNMinimal(nn.Module):
    def __init__(self, input_dim=400, hidden_dim=256):
        super(BMNMinimal, self).__init__()
        self.base_conv = nn.Sequential(
            nn.Conv1d(input_dim, hidden_dim, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv1d(hidden_dim, hidden_dim, kernel_size=3, padding=1),
            nn.ReLU()
        )
        self.start_layer = nn.Conv1d(hidden_dim, 1, kernel_size=1)
        self.end_layer = nn.Conv1d(hidden_dim, 1, kernel_size=1)
        self.conf_layer = nn.Conv1d(hidden_dim, 1, kernel_size=1)

    def forward(self, x):
        # x: (B, T, D) => (B, D, T)
        x = x.permute(0, 2, 1)
        feat = self.base_conv(x)
        start = torch.sigmoid(self.start_layer(feat)).squeeze(1)
        end = torch.sigmoid(self.end_layer(feat)).squeeze(1)
        conf = torch.sigmoid(self.conf_layer(feat)).squeeze(1)
        return start, end, conf

# -------------------- Loss Function --------------------
def bmn_loss(start_pred, end_pred, conf_pred, start_gt, end_gt):
    start_loss = F.binary_cross_entropy(start_pred, start_gt)
    end_loss = F.binary_cross_entropy(end_pred, end_gt)
    conf_loss = F.mse_loss(conf_pred, torch.max(start_gt, end_gt))
    return start_loss + end_loss + conf_loss

# -------------------- Proposal Generation --------------------
def generate_proposals(start_scores, end_scores, conf_scores, threshold=0.5):
    proposals = []
    T = len(start_scores)
    for i in range(T):
        if start_scores[i] > threshold:
            for j in range(i + 1, T):
                if end_scores[j] > threshold:
                    score = (start_scores[i] * end_scores[j] * conf_scores[i])
                    proposals.append((i, j, score.item()))
    proposals = sorted(proposals, key=lambda x: x[2], reverse=True)
    return proposals[:100]  # top 100

# -------------------- Inference --------------------
def run_inference(model, dataloader, device):
    model.eval()
    results = {}
    with torch.no_grad():
        for features, _, _, vid in dataloader:
            features = features.to(device)
            start, end, conf = model(features)
            props = generate_proposals(start[0], end[0], conf[0])
            results[vid[0]] = props
    return results

# -------------------- Main Script --------------------
if __name__ == '__main__':
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    feature_dir = './data/thumos_features/'
    anno_path = './data/thumos_val_annotations.json'

    dataset = THUMOSFeatureDataset(feature_dir, anno_path)
    loader = DataLoader(dataset, batch_size=1, shuffle=False)

    model = BMNMinimal()
    model.to(device)

    # Optional: load checkpoint
    # model.load_state_dict(torch.load("bmn_model.pth"))

    results = run_inference(model, loader, device)
    for vid, props in results.items():
        print(f"Video {vid}:")
        for s, e, score in props[:5]:
            print(f"  Start {s}, End {e}, Score {score:.3f}")