This notebook trains a classification model to predict 'tags' or 'severity' for the given Project Sidewalk label type.

In [27]:
import os
from copy import deepcopy
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
import torch
from torch import nn, optim
import warnings
warnings.filterwarnings('ignore', category=UserWarning)
from torchvision import transforms, io
from dinov2.models.vision_transformer import vit_small, vit_base, vit_large, vit_giant2
import pandas as pd
from torch.utils.data import Dataset

from comet_ml import Experiment
from comet_ml.integration.pytorch import log_model

In [28]:
# some initial variables

local_directory = os.getcwd()

# Enum for the classification categories we support.
C12N_CATEGORY = {
    'TAGS': 'tags',
    'SEVERITY': 'severity'
}

# this has to be consistent with the data generation script
c12n_category_offset = 8

In [29]:
# All the parameters that need to be configured for a training run should be in this cell.
# Everywhere else we will use these variables.

# classification category. currently, one of 'severity' or 'tags'.
c12n_category = C12N_CATEGORY['SEVERITY']
label_type = 'obstacle'
gsv_not_pannellum = True

image_dimension = 256

base_model_size = 'base'

In [30]:
# These are settings for ensuring input images to DinoV2 are properly sized

class ResizeAndPad:
    def __init__(self, target_size, multiple):
        self.target_size = target_size
        self.multiple = multiple

    def __call__(self, img):
        # Resize the image
        img = transforms.Resize(self.target_size)(img)

        # Calculate padding
        pad_width = (self.multiple - img.width % self.multiple) % self.multiple
        pad_height = (self.multiple - img.height % self.multiple) % self.multiple

        # Apply padding
        img = transforms.Pad((pad_width // 2, pad_height // 2, pad_width - pad_width // 2, pad_height - pad_height // 2))(img)

        return img

# This is what DinoV2 sees
target_size = (image_dimension, image_dimension)

# Below are functions that every image will be passed through, including data augmentations
data_transforms = {
    "train": transforms.Compose(
        [
            transforms.ToPILImage(),
            ResizeAndPad(target_size, 14),
            # transforms.RandomRotation(360),
            # transforms.RandomHorizontalFlip(),
            # transforms.RandomVerticalFlip(),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
        ]
    ),
    "inference": transforms.Compose([ ResizeAndPad(target_size, 14),
                                               transforms.ToTensor(),
                                               transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
                                             ]
                                            )
}


In [31]:
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

if torch.cuda.is_available():
    print("GPU available")
else:
    print("GPU not available")

GPU not available


In [32]:
def get_target_classes(dir_path, offset):
    file_path = os.path.join(dir_path, '_classes.csv')
    data = pd.read_csv(file_path)
    header_row = data.columns.tolist()
    return header_row[offset:]

In [33]:
# Comet tracking
# experiment = Experiment(
#   api_key="ACp1vdQWhJgzUu6Svb9vcKyPH",
#   project_name="ps-tags",
#   workspace="hoominchu"
# )

In [34]:
if gsv_not_pannellum:
    csv_data_dir_path_train = '../datasets/crops-' + label_type + '-' + c12n_category + '/train'
    model_name_to_save = 'cls-' + base_model_size[0] + '-' + label_type + '-' + c12n_category + '-best.pth'
else:
    csv_data_dir_path_train = '../datasets/crops-' + label_type + '-' + c12n_category + '-pannellum' + '/train'
    model_name_to_save = 'cls-' + base_model_size[0] + '-' + label_type + '-' + c12n_category + '-pannellum-best.pth'



# we will pass this to the model, so we don't have to change it manually
n_target_classes = len(get_target_classes(csv_data_dir_path_train, c12n_category_offset))

In [35]:
# Define a new classifier layer that contains a few linear layers with a ReLU to make predictions positive
class DinoVisionTransformerClassifier(nn.Module):

    def __init__(self, model_size="small", nc=1):
        super(DinoVisionTransformerClassifier, self).__init__()
        self.model_size = model_size

        # loading a model with registers
        n_register_tokens = 4

        if model_size == "small":
            model = vit_small(patch_size=14,
                              img_size=526,
                              init_values=1.0,
                              num_register_tokens=n_register_tokens,
                              block_chunks=0)
            self.embedding_size = 384
            self.number_of_heads = 6

        elif model_size == "base":
            model = vit_base(patch_size=14,
                             img_size=526,
                             init_values=1.0,
                             num_register_tokens=n_register_tokens,
                             block_chunks=0)
            self.embedding_size = 768
            self.number_of_heads = 12

        elif model_size == "large":
            model = vit_large(patch_size=14,
                              img_size=526,
                              init_values=1.0,
                              num_register_tokens=n_register_tokens,
                              block_chunks=0)
            self.embedding_size = 1024
            self.number_of_heads = 16

        elif model_size == "giant":
            model = vit_giant2(patch_size=14,
                               img_size=526,
                               init_values=1.0,
                               num_register_tokens=n_register_tokens,
                               block_chunks=0)
            self.embedding_size = 1536
            self.number_of_heads = 24

        # Download pre-trained weights and place locally as-needed:
        # - small: https://dl.fbaipublicfiles.com/dinov2/dinov2_vits14/dinov2_vits14_reg4_pretrain.pth
        # - base:  https://dl.fbaipublicfiles.com/dinov2/dinov2_vitb14/dinov2_vitb14_reg4_pretrain.pth
        # - large: https://dl.fbaipublicfiles.com/dinov2/dinov2_vitl14/dinov2_vitl14_reg4_pretrain.pth
        # - giant: https://dl.fbaipublicfiles.com/dinov2/dinov2_vitg14/dinov2_vitg14_reg4_pretrain.pth
        model.load_state_dict(torch.load(Path('{}/../dinov2_vit{}14_reg4_pretrain.pth'.format(local_directory, base_model_size[0]))))

        self.transformer = deepcopy(model)


        # @zhihan, question: should the 256 be the same as the image resolution? or does it not matter?
        self.classifier = nn.Sequential(nn.Linear(self.embedding_size, 256), nn.ReLU(), nn.Linear(256, nc))

    def forward(self, x):
        x = self.transformer(x)
        x = self.transformer.norm(x)
        x = self.classifier(x)
        return x

model = DinoVisionTransformerClassifier(base_model_size, n_target_classes)

In [36]:
model = model.to(device)
model = model.train()
# change the binary cross-entropy loss below to a different loss if using more than 2 classes
# https://pytorch.org/docs/stable/nn.html#loss-functions

# @zhihan, question: are these loss functions correct?
if c12n_category == C12N_CATEGORY['TAGS']:
    criterion = nn.BCEWithLogitsLoss()
elif c12n_category == C12N_CATEGORY['SEVERITY']:
    criterion = nn.L1Loss()


optimizer = optim.Adam(model.parameters(), lr=1e-6)

num_epochs = 100

hyper_params = {
   "learning_rate": '1e-6',
   "steps": num_epochs,
   "batch_size": 1,
    # add other hyper params
}

# experiment.log_parameters(hyper_params)

In [37]:
# custom data loader
class PSLabelsDataset(Dataset):
    """Face Landmarks dataset."""

    def __init__(self, csv_file, root_dir, offset=3, transform=None):
        """
        Args:
            csv_file (string): Path to the csv file with annotations.
            root_dir (string): Directory with all the images.
            offset (int, optional): number of columns to skip to get to the severity or tags info e.g. normalized_x and normalized_y need to be skipped.
            transform (callable, optional): Optional transform to be applied
                on a sample.
        """
        self.landmarks_frame = pd.read_csv(csv_file)
        self.root_dir = root_dir
        self.transform = transform
        self.offset = offset

    def __len__(self):
        return len(self.landmarks_frame)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        img_name = os.path.join(self.root_dir,
                                self.landmarks_frame.iloc[idx, 0])
        image = io.read_image(img_name, mode=io.ImageReadMode.RGB)
        labels = self.landmarks_frame.iloc[idx, self.offset:]
        labels = np.array(labels)
        labels = labels.astype('float')
        # sample = {'image': image, 'labels': labels}

        if self.transform:
            image = self.transform(image)

        return image, labels


In [38]:
dataset = PSLabelsDataset(csv_data_dir_path_train + '/' + '_classes.csv', root_dir=csv_data_dir_path_train, offset=c12n_category_offset, transform=data_transforms['train'])

dataloaders = {
    "train": torch.utils.data.DataLoader(dataset, batch_size=4, shuffle=True)
}

In [None]:

# accuracy calculation for multilabel i.e. tags
def calculate_accuracy_multilabel(outputs, labels):
    # Convert outputs to probabilities using sigmoid
    probabilities = torch.sigmoid(outputs)
    # Convert probabilities to predicted classes
    predicted_classes = probabilities > 0.5
    # Calculate accuracy
    n_labels = labels.size(1)
    correct_predictions = ((predicted_classes == labels.byte()).sum().item()) / n_labels
    
    total_predictions = labels.size(0)
    
    return correct_predictions / total_predictions

# accuracy calculation for multiclass i.e. severity
def calculate_accuracy_severity(outputs, labels):
    # Convert outputs to probabilities using sigmoid
    # Convert probabilities to predicted classes
    predicted_class_idx = torch.argmax(output, 1).item()

    return int(labels[0][predicted_class_idx].item())

epoch_losses = []
epoch_accuracies = []

best_accuracy = 0
best_loss = 100

print("Training...")
for epoch in range(num_epochs):
    batch_losses = []
    batch_accuracies = []

    for data in dataloaders["train"]:
        # get the input batch and the ground truth labels
        batch_of_images, gt_labels = data

        # zero the parameter gradients
        optimizer.zero_grad()

        # model prediction
        if c12n_category == C12N_CATEGORY['TAGS']:
            output = model(batch_of_images.to(device)).squeeze(dim=1)
        elif c12n_category == C12N_CATEGORY['SEVERITY']:
            output = model(batch_of_images.to(device))  # do not squeeze here

        # compute loss and do gradient descent
        loss = criterion(output, gt_labels.float().to(device))

        loss.backward()
        optimizer.step()

        batch_losses.append(loss.item())

        # Calculate and record batch accuracy
        
        accuracy = 0
        
        if c12n_category == C12N_CATEGORY['TAGS']:
            accuracy = calculate_accuracy_multilabel(output, gt_labels.to(device))
        elif c12n_category == C12N_CATEGORY['SEVERITY']:
            accuracy = calculate_accuracy_severity(output, gt_labels.to(device))
        
        batch_accuracies.append(accuracy)

    epoch_losses.append(np.mean(batch_losses))
    epoch_accuracy = np.mean(batch_accuracies)
    epoch_loss = epoch_losses[-1]
    epoch_accuracies.append(epoch_accuracy)

    print("  -> Epoch {}: Loss = {:.5f}, Accuracy = {:.3f}%".format(epoch, epoch_losses[-1], 100*epoch_accuracy))
    
    # save the model if it has the best accuracy so far
    # @zhihan, should we be checking for accuracy at all? or should we just consider the model with the lowest loss as the best? 
    if epoch_accuracy > best_accuracy:
        best_accuracy = epoch_accuracy
        best_loss = epoch_loss
        torch.save(model.state_dict(), '{}/'.format(local_directory) + model_name_to_save)
        print('Saving model based on accuracy: {:.5f} | Accuracy: {:.3f}%'.format(best_loss, 100*best_accuracy))
    elif epoch_accuracy == best_accuracy:
        if epoch_loss < best_loss:
            best_loss = epoch_loss
            best_accuracy = epoch_accuracy
            torch.save(model.state_dict(), '{}/'.format(local_directory) + model_name_to_save)
            print('Saving model based on loss: {:.5f} | Accuracy: {:.3f}%'.format(best_loss, 100*best_accuracy))
            
    # # track on comet ml        
    # log_model(experiment, model_name_to_save, model)
    

Training...


In [1]:
# Plotting accuracy
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.plot(epoch_accuracies, label='Accuracy', color='blue')
plt.title("Training Accuracy")
plt.xlabel("Epoch Number")
plt.ylabel("Accuracy")

# Plotting loss
plt.subplot(1, 2, 2)
plt.plot(epoch_losses, label='Loss', color='red')
plt.title("Training Loss")
plt.xlabel("Epoch Number")
plt.ylabel("Loss")

plt.show()

NameError: name 'plt' is not defined