In [1]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from pose_dataset import PoseData

In [2]:
INPUT_SIZE = 33 * 3
HIDDEN_SIZE = 64
NUM_LAYERS = 2 # number of RNNs to stack
NUM_CLASSES = 9 # number of categories

LEARNING_RATE = 0.1

In [3]:
TIME_DIM = 0
BATCH_DIM = 1
COORD_DIM = 2

In [4]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [5]:
class PoseScoringModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(PoseScoringModel, self).__init__()
        self.num_layers = num_layers
        self.hidden_size = hidden_size
        self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)
        self.sigmoid = nn.Sigmoid()

    def forward(self, X):
        h0 = torch.zeros(self.num_layers, X.size(BATCH_DIM), self.hidden_size).to(device)

        out, _ = self.gru(X, h0)
        # out: batch x time x hidden
        out = out[:, -1, :]
        # out: batch x hidden
        out = self.fc(out)
        out = self.sigmoid(out)
        return out

In [6]:
model = PoseScoringModel(INPUT_SIZE, HIDDEN_SIZE, NUM_LAYERS, NUM_CLASSES)

In [7]:
loss = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr = LEARNING_RATE)

In [8]:
NUM_WORKERS = 2

In [10]:
data = PoseData("../Data")
data.shuffle()

data = data[:20]

train_data = data[:int(len(data) * .9)]
test_data = data[int(len(data) * .9):]

train_dataloader = DataLoader(train_data, batch_size=32, shuffle=True)
test_dataloader = DataLoader(test_data, batch_size=32, shuffle=True)
# train_dataloader = DataLoader(train_data, batch_size=32, shuffle=True, num_workers=NUM_WORKERS, pin_memory=True)
# test_dataloader = DataLoader(test_data, batch_size=32, shuffle=True, num_workers=NUM_WORKERS, pin_memory=True)

In [None]:
import os
from sklearn.model_selection import KFold
from torchmetrics.classification import MulticlassF1Score

EPOCHS = 10  # Number of epochs for training
model.train()  # Set the model to training mode

# Create a directory to save snapshots if it doesn't exist
os.makedirs("snapshots", exist_ok=True)
# Number of splits for k-fold cross-validation
k_folds = 5
kf = KFold(n_splits=k_folds, shuffle=True)

# Convert dataset to a tensor for splitting
data_tensor = torch.tensor(train_data)

for fold, (train_idx, val_idx) in enumerate(kf.split(data_tensor)):
    print(f"Fold {fold + 1}/{k_folds}")
    
    # Split data into training and validation sets
    train_subset = data_tensor[train_idx]
    val_subset = data_tensor[val_idx]
    
    train_dataloader = DataLoader(train_subset, batch_size=32, shuffle=True)
    val_dataloader = DataLoader(val_subset, batch_size=32, shuffle=False)
    
    for epoch in range(EPOCHS):
        model.train()
        for i, (inputs, targets) in enumerate(train_dataloader):
            inputs = inputs[:, :, :, :3].flatten(-1)
            # Forward pass
            outputs = model(inputs)
            loss_value = loss(outputs, targets)

            # Backward pass and optimization
            optimizer.zero_grad()
            loss_value.backward()
            optimizer.step()

            if i % 10 == 0:
                print(f"Fold [{fold + 1}/{k_folds}], Epoch [{epoch + 1}/{EPOCHS}], "
                      f"Step [{i + 1}/{len(train_dataloader)}], Loss: {loss_value.item():.4f}")

        # Evaluate on validation set
        model.eval()
        val_loss = 0
        correct = 0
        total = 0
        f1_metric = MulticlassF1Score(num_classes=3, average='micro').to(device)
        with torch.no_grad():
            for inputs, targets in val_dataloader:
                inputs, targets = inputs.to(device), targets.to(device)

                logits = model(inputs)
                preds = torch.argmax(logits, dim=1)

                # Update the metric
                f1_metric.update(preds, targets)
                
        f1_score = f1_metric.compute()
        print(f"F1-score (micro): {f1_score.item():.4f}")
        f1_metric.reset()
        # Save a snapshot of the model at each epoch
        torch.save(model.state_dict(), f"snapshots/model_fold{fold + 1}_epoch{epoch + 1}.pth")

ValueError: only one element tensors can be converted to Python scalars

In [None]:
model.eval()  # Set the model to evaluation mode
correct = 0
total = 0
eval_loss = 0

f1_metric = MulticlassF1Score(num_classes=3, average='micro').to(device)

with torch.no_grad():  # Disable gradient computation for evaluation
    for inputs, targets in test_dataloader:
        inputs, targets = inputs.to(device), targets.to(device)

        logits = model(inputs)
        preds = torch.argmax(logits, dim=1)

        # Update the metric
        f1_metric.update(preds, targets)

# Compute final F1
f1_score = f1_metric.compute()
print(f"F1-score (micro): {f1_score.item():.4f}")

# Reset for next epoch
f1_metric.reset()