In [1]:
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import zipfile
import os

# Setup envoirement

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device in uso:", device)

Device in uso: cuda


In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Data loading

In [4]:
zip_path = "/content/drive/MyDrive/AML_MistakeDetection_DATA/features/gopro/segments/1s/video/omnivore.zip"
extract_dir = "/content/omnivore_extracted"

os.makedirs(extract_dir, exist_ok=True)

with zipfile.ZipFile(zip_path, 'r') as z:
    z.extractall(extract_dir)

extract_dir = "/content/omnivore_extracted/omnivore"

print("Extracted files:", len(os.listdir(extract_dir)))

Extracted files: 384


In [6]:
import json

with open("/content/drive/MyDrive/AML_MistakeDetection_DATA/annotation_json/complete_step_annotations.json") as f:
    annotations = json.load(f)

In [12]:
class VideoFeatureDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.from_numpy(X).float()
        self.y = torch.from_numpy(y).long()

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

In [8]:
def get_labels_for_npz(npz_file, annotations):
    # es: "10_3_360.mp4_1s_1s.npz"
    base = os.path.basename(npz_file)
    activity, attempt = base.split("_")[:2]  # "10", "3"
    recording_id = f"{activity}_{attempt}"

    # carica feature
    data = np.load(npz_file)
    arr = data[list(data.keys())[0]]  # shape (N, 400)
    N = arr.shape[0]

    labels = np.zeros(N, dtype=np.int64)  # default: no-error = 0

    # trova annotation di questo recording
    info = annotations[recording_id]
    steps = info["steps"]

    # assegnazione label per ogni secondo
    for step in steps:
        has_error = int(step["has_errors"])  # True→1, False→0
        start = step["start_time"]
        end   = step["end_time"]

        if start == -1 or end == -1 or has_error == 0:
            continue

        for sec in range(int(start), int(end) + 1, 1):
            sec_start = sec
            sec_end   = sec + 1

            # check overlap
            if sec_start >= start and sec_end <= end:
                labels[sec] = has_error

    return arr, labels

In [9]:
all_X = []
all_y = []

extract_dir = "/content/omnivore_extracted/omnivore"

for f in sorted(os.listdir(extract_dir)):
    if f.endswith(".npz"):
        X, y = get_labels_for_npz(os.path.join(extract_dir, f), annotations)
        all_X.append(X)
        all_y.append(y)

X = np.concatenate(all_X, axis=0)
y = np.concatenate(all_y, axis=0)

print(X.shape, y.shape)


(340320, 1024) (340320,)


In [14]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(
    X, y,
    test_size=0.2,
    shuffle=True,
)

In [13]:
train_dataset = VideoFeatureDataset(X_train, y_train)
test_dataset  = VideoFeatureDataset(X_test,  y_test)

TypeError: expected np.ndarray (got list)

In [40]:
batch_size = 64

train_dataset = Dataset(train_data)
test_dataset = Dataset(test_data)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# MLP

In [41]:
class MLPCapitainCook(nn.Module):
  def __init__(self, in_features: int) -> None:
    super(MLPCapitainCook, self).__init__()
    self.fc1 = nn.Linear(in_features, 256)
    self.relu = nn.ReLU()
    self.fc2 = nn.Linear(256, 1)
    self.sigmoid = nn.Sigmoid()

  def forward(self, x):
    x = self.relu(self.fc1(x))
    return self.sigmoid(self.fc2(x))

In [42]:
model = MLPCapitainCook(1024).to(device)

In [25]:
lr = 0.001
optimizer = torch.optim.Adam(model.parameters(), lr)
criterion = nn.BCELoss(torch.tensor([1, 1.5]))
epochs = 1

In [None]:
model.train()

for t in range(epochs):
    total_loss = 0

    for inputs, labels in train_loader:
        inputs = inputs.to(device)
        labels = labels.to(device)

        # Forward
        outputs = model(inputs)

        loss = criterion(outputs, labels)
        total_loss += loss.item()

        # Backprop
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    avg_loss = total_loss / len(train_loader)
    print(f"Epoch {t+1}/{epochs} - Train Loss: {avg_loss:.4f}")
