In [1]:
import torch
import torch.nn as nn   
import os 
import torchvision.models as models
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
import cv2
import numpy as np

In [None]:
downloads_dir = "./data/downloads"
dataset_dir = "./data/dataset"
train_dir = dataset_dir + "/Train"
valid_dir = dataset_dir + "/Validation"
test_dir = dataset_dir + "/Test"
    

os.makedirs(downloads_dir, exist_ok=True)
os.makedirs(dataset_dir, exist_ok=True)

!wget https://dl.dropboxusercontent.com/scl/fi/sm1mybj5a2v4ycrxbez7f/miniset.zip?rlkey=jwddeulszbudhbnjz3n70fj9f&st=ronsh841&dl=0 -O {downloads_dir}/minidataset.zip
#this is just a mini dataset to see if our code works or nah

!unzip {downloads_dir}/minidataset.zip -d {dataset_dir}

In [None]:
class ResNetClassifier(nn.Module):
    """
    Define ResNet model class
    """
    def __init__(self, num_classes):
        super(ResNetClassifier, self).__init__()
        self.resnet = models.resnet18(pretrained=True) #uses a pretrained resnet18 model from torchvision
        self.resnet.fc = nn.Linear(self.resnet.fc.in_features, num_classes) #change the classification fc layer to have the correct number of classes

    def forward(self, x):
        return self.resnet(x)

In [None]:
# Define a custom dataset for video frames
class VideoFrameDataset(Dataset):
    def __init__(self, frames, labels, transform=None):
        self.frames = frames  # list of frames as np arrays
        self.labels = labels  # corresponding labels for each frame
        self.transform = transform

    def __len__(self):
        return len(self.frames)

    def __getitem__(self, idx):
        frame = self.frames[idx]
        label = self.labels[idx]
        if self.transform:
            frame = self.transform(frame)
        return frame, label

In [None]:
def load_data(directory):
    frames = []  # list of frames
    labels = []  # corresponding labels
    with os.scandir(directory) as folder_iterator:
        for folder in folder_iterator:
            if not folder.is_dir():
                continue
            label = folder.name
            with os.scandir(folder.path) as frame_iterator:
                for frame in frame_iterator:
                    if not frame.is_file() or not frame.name.lower().endswith('png'):
                        #if file is not png do not yield it
                        continue
                    image_path = os.path.join(folder.path, frame.name)
                    image = cv2.imread(image_path)
                    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  #by default cv2 reads in bgr
                    frames.append(image)  #appends an array
                    labels.append(label)
    return frames, labels
    # convert labels to a numerical format if necessary
    #from sklearn.preprocessing import LabelEncoder
    # label_encoder = LabelEncoder()
    # labels = label_encoder.fit_transform(labels)

In [None]:

transform = transforms.Compose([transforms.ToTensor()]) 
# train_transforms = transforms.Compose([
#     transforms.RandomResizedCrop(224),  # Randomly crop and resize the image
#     transforms.RandomHorizontalFlip(),  # Randomly flip the image horizontally
#     transforms.RandomRotation(10),       # Randomly rotate the image by 10 degrees
#     transforms.ToTensor(),               # Convert the image to a tensor
#     transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize the image
# ])

train_frames, train_labels = load_data(train_dir)
valid_frames, valid_labels = load_data(valid_dir)
test_frames, test_labels = load_data(test_dir)

#check if they should all be the same
train_dataset = VideoFrameDataset(train_frames, train_labels, transform=transform)
train_dataload = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)

valid_dataset = VideoFrameDataset(valid_frames, valid_labels, transform=transform)
valid_dataload = DataLoader(valid_dataset, batch_size=32, shuffle=True, num_workers=4)

test_dataset = VideoFrameDataset(test_frames, test_labels, transform=transform)
test_dataload = DataLoader(test_dataset, batch_size=32, shuffle=True, num_workers=4)

num_classes = len(np.unique(train_labels))
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ResNetClassifier(num_classes=num_classes).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
def evaluate_model(model, dataloader, criterion):
    """
    Evaluates a trained model given a validation/test dataloader

    Input:
    model: the model we would like to train
    dataloader: the data
    criterion: the loss function used to determine loss
    optimizer: the optimizer used in backpropagation
    num_epochs: the number of epochs we train for


    Output:
    the loss and accuracy
    """
    model.eval()

    running_loss = 0.0
    accurate_pred = 0
    total_pred = 0

    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            running_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs, 1)
            accurate_pred += (predicted == labels).sum().item()
            total_pred += labels.size(0)
    
    avg_loss = running_loss/total_pred
    accuracy = accurate_pred/total_pred

    return avg_loss, accuracy

    

In [None]:
def train_model(model, train_dataload, valid_dataload, criterion, optimizer, num_epochs=25):
    """
    Trains a given model over a given number of epochs. Default set to 25.

    Input:
    model: the model we would like to train
    dataloader: the data
    criterion: the loss function used to determine loss
    optimizer: the optimizer used in backpropagation
    num_epochs: the number of epochs we train for


    Output:
    a trained model
    """
    best_val_loss = float('inf')

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for inputs, labels in train_dataload:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * inputs.size(0)

        epoch_loss = running_loss / len(train_dataload.dataset)

        valid_loss, valid_acc = evaluate_model(model, valid_dataload, criterion)
        
        if valid_loss < best_val_loss:
            best_val_loss = valid_loss
            torch.save(model.state_dict(), 'best_model.pth')

        print(f'Epoch {epoch}/{num_epochs}, Train Loss: {epoch_loss:.4f}, Valid Loss: {valid_loss:.4f}, Valid Accuracy {valid_acc:.4f}')

    return model


In [None]:
trained_model = train_model(model, train_dataload, valid_dataload, criterion, optimizer, num_epochs=25)

In [None]:
#evaluate with test set
model.load_state_dict(torch.load('best_model.pth'))
test_loss, test_accuracy = evaluate_model(model, test_dataload, criterion)

print(f'Test Loss: {test_loss:.4f}, Test Accuracy {test_accuracy:.4f}')