In [2]:
import pickle
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import accuracy_score, classification_report
import matplotlib.pyplot as plt
from tqdm import tqdm
import os

In [3]:
torch.manual_seed(42)
np.random.seed(42)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [4]:
with open('dataset/ucf101_2d.pkl', 'rb') as f:
    data = pickle.load(f)

splits = data['split']
annotations = data['annotations']

print(f"Total annotations: {len(annotations)}")
print(f"Splits available: {list(splits.keys())}")

Total annotations: 13320
Splits available: ['train1', 'train2', 'train3', 'test1', 'test2', 'test3']


In [5]:
frame_counts =[]
num_persons = []
keypoint_shapes = []

for ann in annotations:
    keypoints = ann['keypoint']
    frame_counts.append(ann['total_frames'])
    num_persons.append(keypoints.shape[0])
    keypoint_shapes.append(keypoints.shape)

frame_counts = np.array(frame_counts) 
num_persons = np.array(num_persons)

MAX_FRAMES = int(np.percentile(frame_counts, 95))
print(f"95th percentile of frame counts: {MAX_FRAMES}")
#Hecho así para evitar videos extremadamente largos

95th percentile of frame counts: 359


In [9]:
class UCF101Dataset(Dataset):
    def __init__(self, annotations, indices, max_frames=119, person_strategy='first', augment=False):
        self.annotations = [annotations[i] for i in indices]
        self.max_frames = max_frames
        self.person_strategy = person_strategy
        self.augment = augment
    
    def __len__(self):
        return len(self.annotations)
    
    def _select_person(self, keypoints, keypoint_scores):
        
        num_persons = keypoints.shape[0]
        if num_persons == 1:
            return keypoints[0], keypoint_scores[0]
        
        if self.person_strategy == 'first':
            return keypoints[0], keypoint_scores[0]
        
        elif self.person_strategy == 'max_conf':
            avg_conf_per_person = keypoint_scores.mean(axis=(1, 2))
            best_person_idx = avg_conf_per_person.argmax()
            return keypoints[best_person_idx], keypoint_scores[best_person_idx]
        
        elif self.person_strategy == 'average':
            
            weights = keypoint_scores[..., np.newaxis]  # (num_persons, T, 17, 1)
            weighted_keypoints = (keypoints * weights).sum(axis=0)
            total_weights = weights.sum(axis=0) + 1e-8
            avg_keypoints = weighted_keypoints / total_weights
            avg_scores = keypoint_scores.mean(axis=0)
            return avg_keypoints, avg_scores
        else:

            raise ValueError(f"Unknown person selection strategy: {self.person_strategy}")
        
    def __getitem__(self, idx):
        sample = self.annotations[idx]

        keypoints = sample['keypoint']  
        keypoint_scores = sample['keypoint_score']
        label = sample['label']

        keypoints, keypoint_scores = self._select_person(keypoints, keypoint_scores)
        
        T, num_joints, coords = keypoints.shape

        if T < self.max_frames:
            pad_frames = self.max_frames - T
            keypoints = np.pad(keypoints, ((0, pad_frames), (0, 0), (0, 0)), mode='constant')
            keypoint_scores = np.pad(keypoint_scores, ((0, pad_frames), (0, 0)), mode='constant')
        else:
            keypoints = keypoints[:self.max_frames]
            keypoint_scores = keypoint_scores[:self.max_frames]

        features = np.concatenate([keypoints, keypoint_scores[..., np.newaxis]], axis=-1)

        if self.augment:
            noise = np.random.randn(*features.shape) * 0.01
            features = features + noise

        features = features.transpose(2, 0, 1)

        return torch.FloatTensor(features), torch.LongTensor([label])[0]

In [7]:
class PoseKeypointCNN(nn.Module):
    def __init__(self, num_classes=101, max_frames=119, num_joints=17, in_channels=3):
        super(PoseKeypointCNN, self).__init__()
        
        self.temporal_conv = nn.Sequential(
            nn.Conv2d(in_channels, 64, kernel_size=(5, 1), padding=(2, 0)),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Dropout(0.2),
            
            nn.Conv2d(64, 128, kernel_size=(5, 1), padding=(2, 0)),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(2, 1)),
            nn.Dropout(0.3),
            
            nn.Conv2d(128, 256, kernel_size=(5, 1), padding=(2, 0)),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(2, 1)),
            nn.Dropout(0.3)
        )
        
        
        self.spatial_conv = nn.Sequential(
            nn.Conv2d(256, 256, kernel_size=(1, 3), padding=(0, 1)),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.Dropout(0.3),
            
            nn.Conv2d(256, 512, kernel_size=(1, 3), padding=(0, 1)),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d((1, 1)),
            nn.Dropout(0.4)
        )
        
        
        self.fc = nn.Sequential(
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(256, num_classes)
        )
        
    def forward(self, x):
        
        x = self.temporal_conv(x)
        x = self.spatial_conv(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

In [10]:
BATCH_SIZE = 32
LEARNING_RATE = 0.001
NUM_EPOCHS = 30

SPLIT_NUM = 1

PERSON_STRATEGY = 'max_conf'  # Options: 'first', 'max_conf', 'average'

train_indices = splits[f'train{SPLIT_NUM}']
test_indices = splits[f'test{SPLIT_NUM}']

train_dataset = UCF101Dataset(annotations, train_indices, MAX_FRAMES, 
                                      person_strategy=PERSON_STRATEGY, augment=True)
test_dataset = UCF101Dataset(annotations, test_indices, MAX_FRAMES, 
                                     person_strategy=PERSON_STRATEGY, augment=False)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

num_classes = 101

TypeError: list indices must be integers or slices, not str

In [None]:
model = PoseKeypointCNN(num_classes=num_classes, max_frames=MAX_FRAMES).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=5, verbose=True)

In [None]:
def train_epoch(model, loader, criterion, optimizer):
    model.train()
    total_loss = 0
    all_preds = []
    all_labels = []
    
    for features, labels in tqdm(loader, desc="Training"):
        features, labels = features.to(device), labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(features)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        preds = outputs.argmax(dim=1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())
    
    avg_loss = total_loss / len(loader)
    accuracy = accuracy_score(all_labels, all_preds)
    return avg_loss, accuracy

def evaluate(model, loader, criterion):
    model.eval()
    total_loss = 0
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for features, labels in tqdm(loader, desc="Evaluating"):
            features, labels = features.to(device), labels.to(device)
            
            outputs = model(features)
            loss = criterion(outputs, labels)
            
            total_loss += loss.item()
            preds = outputs.argmax(dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    avg_loss = total_loss / len(loader)
    accuracy = accuracy_score(all_labels, all_preds)
    return avg_loss, accuracy, all_preds, all_labels

In [None]:
train_losses, train_accs = [], []
val_losses, val_accs = [], []
best_val_acc = 0

print("\nStarting training...")
for epoch in range(NUM_EPOCHS):
    print(f"\n{'='*50}")
    print(f"Epoch {epoch+1}/{NUM_EPOCHS}")
    print(f"{'='*50}")
    
    train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer)
    val_loss, val_acc, _, _ = evaluate(model, test_loader, criterion)
    
    train_losses.append(train_loss)
    train_accs.append(train_acc)
    val_losses.append(val_loss)
    val_accs.append(val_acc)
    
    print(f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f}")
    print(f"Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.4f}")
    
    # Learning rate scheduling
    scheduler.step(val_acc)
    
    # Save best model
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        torch.save(model.state_dict(), f'best_model_split{SPLIT_NUM}.pth')
        print(f"✓ Saved best model (Val Acc: {val_acc:.4f})")

In [None]:
model.load_state_dict(torch.load(f'best_model_split{SPLIT_NUM}.pth'))
_, test_acc, test_preds, test_labels = evaluate(model, test_loader, criterion)

print(f"\nBest Validation Accuracy: {best_val_acc:.4f}")
print(f"Final Test Accuracy: {test_acc:.4f}")

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Loss plot
axes[0].plot(train_losses, label='Train Loss')
axes[0].plot(val_losses, label='Val Loss')
axes[0].set_xlabel('Epoch')
axes[0].set_ylabel('Loss')
axes[0].set_title('Training and Validation Loss')
axes[0].legend()
axes[0].grid(True)

# Accuracy plot
axes[1].plot(train_accs, label='Train Acc')
axes[1].plot(val_accs, label='Val Acc')
axes[1].set_xlabel('Epoch')
axes[1].set_ylabel('Accuracy')
axes[1].set_title('Training and Validation Accuracy')
axes[1].legend()
axes[1].grid(True)

plt.tight_layout()
plt.savefig(f'training_curves_split{SPLIT_NUM}.png', dpi=300, bbox_inches='tight')
plt.show()

print(f"\n✓ Training complete! Best model saved as 'best_model_split{SPLIT_NUM}.pth'")