This notebook will setup all of the code that will load the models with the finetuned weights and test them on a variety of test sets. First, the optimal similarity threshold needs to be determined to find the right AUC by running the model on the validation set and returning a tensor of all of the similarities and another tensor of the ground truth labels. This is basically already in the other one

Then, iterate through different cutoffs and select the one that maximizes the geometric mean, np.sqrt(TPR * (1-FPR))

Finally, given this threshold, evaluate the test sets. Save vector of loss for each pair to the correct folder, return the accuracy and average loss

In [None]:
# !pip install timm torchmetrics

from __future__ import print_function
from __future__ import division
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import torchvision
from torchvision import datasets, models, transforms
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
import time
import os
import copy
import pandas as pd
from skimage import io, transform
from sklearn import metrics
import timm
from skimage.color import gray2rgb
print("PyTorch Version: ",torch.__version__)
print("Torchvision Version: ",torchvision.__version__)
from torchmetrics.classification import BinaryAUROC

PyTorch Version:  1.13.0+cu116
Torchvision Version:  0.14.0+cu116


In [None]:
#TODO: Add support for the new dataset from imagenetO
class siameseDataset(Dataset):
    
    def __init__(self, csv_file, transform=None, device = "cuda"):
        """
        Args:
            csv_file (string): Path to the csv file with annotations.
            transform (callable, optional): Optional transform to be applied
                on a sample.
        """
        self.images_frame = pd.read_csv(csv_file, names = ["Image1 Path", "Label1", "Image2 Path", "Label2"])
        train_csv = '/content/drive/MyDrive/MLMI/CompVision/tiny-imagenet-200/train.csv'
        if(csv_file == train_csv):
            print('is train')
            self.sim_df = self.images_frame[0:80000:2]
            print('sim dim:', len(self.sim_df))
            self.dif_df = self.images_frame[80000:len(self.images_frame):3]
            print('dif dim:', len(self.dif_df))
            self.images_frame = pd.concat([self.sim_df, self.dif_df])
        
        self.transform = transform

    def __len__(self):
        return len(self.images_frame)

    def __getitem__(self, idx):
        
        # print('getting item')
        if torch.is_tensor(idx):
            idx = idx.tolist()
        
        train_path = '/content/tiny-imagenet-200/train'
        val_or_test_path = '/content/tiny-imagenet-200/val/images'   
        
        
        img_path1 = self.images_frame.iloc[idx, 0].split('/tiny-imagenet-200/')[1] #Unpacks path and fixes to be compatible with google drive 
        label1 = self.images_frame.iloc[idx, 1]
        
        img_path2 = self.images_frame.iloc[idx, 2].split('/tiny-imagenet-200/')[1] #Unpacks path and fixes to be compatible with google drive
        label2 = self.images_frame.iloc[idx, 3]

        spec_img1 = img_path1.split('/')[-1]
        spec_img2 = img_path2.split('/')[-1]
        
        if 'train' in img_path1:
            class_img1 = spec_img1.split("_")[0]
            class_img2 = spec_img2.split("_")[0]
            img_path1 = os.path.join(train_path, class_img1, "images", spec_img1)
            img_path2 = os.path.join(train_path, class_img2, "images", spec_img2)
        elif 'val' in img_path1 or 'test' in img_path1:
            
            img_path1 = os.path.join(val_or_test_path, spec_img1)
            img_path2 = os.path.join(val_or_test_path, spec_img2)
        else:
            raise ValueError('Bad Image Path. "Test", "Train" "Val" not found.')

        flag = ((label1 == label2) * 2) -1

        # if flag == -1:
        #     print("negative flag loaded")
        # print(img_path1, img_path2)

        image1 = io.imread(img_path1)
        image2 = io.imread(img_path2)

        #Check shapes and enforce RGB

        if (len(image1.shape) != 3):
            # print("bad image1")
            image1 = gray2rgb(image1)
        if (len(image2.shape) != 3):
            # print("bad image2")
            image2 = gray2rgb(image2)
            
        # print('imread successful')
        if self.transform:
            try:
                image1 = self.transform(image1)
                image2 = self.transform(image2)
                # print('transforming')
            except:
                print('ERROR in transforming')
        
        sample = {'image1': image1, 'image2': image2, 'flag': flag}

        return sample

Helper Functions to initialize models

In [None]:
def set_parameter_requires_grad(model, feature_extracting):
    if feature_extracting:
        for param in model.parameters():
            param.requires_grad = False

In [None]:
def initialize_model(model_name, num_classes, feature_extract, use_pretrained=True):
    # Initialize these variables which will be set in this if statement. Each of these
    #   variables is model specific.
    model_ft = None
    input_size = 0
    if model_name == "resnet18":
        """ Resnet18
        """
        model_ft = models.resnet18(pretrained=use_pretrained)
        resnet18_state_dict_path = '/content/drive/MyDrive/MLMI/CompVision/tiny-imagenet-200/models/resnet18/best_auc_weights'
        resnet18_state_dict = torch.load(resnet18_state_dict_path)

        model_ft.load_state_dict(copy.deepcopy(resnet18_state_dict))
        set_parameter_requires_grad(model_ft, feature_extract)
        num_ftrs = model_ft.fc.in_features
        model_ft.fc = nn.Linear(num_ftrs, num_classes)
        input_size = 224

    elif model_name == "resnet50":
        """ Resnet50
        """
        model_ft = models.resnet50(pretrained=use_pretrained)
        resnet50_state_dict_path = '/content/drive/MyDrive/MLMI/CompVision/tiny-imagenet-200/models/resnet50/best_auc_weights'
        resnet50_state_dict = torch.load(resnet50_state_dict_path)

        model_ft.load_state_dict(copy.deepcopy(resnet50_state_dict))
        set_parameter_requires_grad(model_ft, feature_extract)
        num_ftrs = model_ft.fc.in_features
        model_ft.fc = nn.Linear(num_ftrs, num_classes)
        input_size = 224
    
    elif model_name == "resnet101":
        """ Resnet101
        """
        model_ft = models.resnet101(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        num_ftrs = model_ft.fc.in_features
        model_ft.fc = nn.Linear(num_ftrs, num_classes)
        input_size = 224

    elif model_name == "alexnet":
        """ Alexnet
        """
        model_ft = models.alexnet(pretrained=use_pretrained)
        alexnet_state_dict_path = '/content/drive/MyDrive/MLMI/CompVision/tiny-imagenet-200/models/alexnet/best_auc_weights'
        alexnet_state_dict = torch.load(alexnet_state_dict_path)

        model_ft.load_state_dict(copy.deepcopy(alexnet_state_dict))
        set_parameter_requires_grad(model_ft, feature_extract)
        num_ftrs = model_ft.classifier[6].in_features
        model_ft.classifier[6] = nn.Linear(num_ftrs,num_classes)
        input_size = 224

    elif model_name == "vgg":
        """ VGG11_bn
        """
        model_ft = models.vgg16_bn(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        num_ftrs = model_ft.classifier[6].in_features
        model_ft.classifier[6] = nn.Linear(num_ftrs,num_classes)
        input_size = 224

    elif model_name == "squeezenet":
        """ Squeezenet
        """
        model_ft = models.squeezenet1_0(pretrained=use_pretrained)
        squeezenet_state_dict_path = '/content/drive/MyDrive/MLMI/CompVision/tiny-imagenet-200/models/squeezenet/best_auc_weights'
        squeezenet_state_dict = torch.load(squeezenet_state_dict_path)

        model_ft.load_state_dict(copy.deepcopy(squeezenet_state_dict))
        set_parameter_requires_grad(model_ft, feature_extract)
        model_ft.classifier[1] = nn.Conv2d(512, num_classes, kernel_size=(1,1), stride=(1,1))
        model_ft.num_classes = num_classes
        input_size = 224

    elif model_name == "densenet":
        """ Densenet
        """
        model_ft = models.densenet121(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        num_ftrs = model_ft.classifier.in_features
        model_ft.classifier = nn.Linear(num_ftrs, num_classes) 
        input_size = 224

    elif model_name == "inception":
        """ Inception v3 
        Be careful, expects (299,299) sized images and has auxiliary output
        """
        model_ft = models.inception_v3(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        # Handle the auxilary net
        num_ftrs = model_ft.AuxLogits.fc.in_features
        model_ft.AuxLogits.fc = nn.Linear(num_ftrs, num_classes)
        # Handle the primary net
        num_ftrs = model_ft.fc.in_features
        model_ft.fc = nn.Linear(num_ftrs,num_classes)
        input_size = 299
    
    elif model_name == "vit":
        """Vision Transformer
        """
        model_ft = timm.create_model('vit_base_patch16_224', pretrained=True, num_classes=num_classes)
        set_parameter_requires_grad(model_ft, feature_extract)
    elif model_name == "efficientnet":
        """efficientnet_b4
        """
        model_ft = timm.create_model('efficientnet_b0', pretrained=True, num_classes=num_classes)
        efficient_state_dict_path = '/content/drive/MyDrive/MLMI/CompVision/tiny-imagenet-200/models/efficientnet/best_auc_weights'
        efficient_state_dict = torch.load(efficient_state_dict_path)

        model_ft.load_state_dict(copy.deepcopy(efficient_state_dict))
        set_parameter_requires_grad(model_ft, feature_extract)
        input_size = 224

    else:
        print(model_name)
        print("Invalid model name, exiting...")
        exit()
    
    return model_ft, input_size

# Initialize the model for this run
# model_ft, input_size = initialize_model(model_name, num_classes, feature_extract, use_pretrained=True)

# Print the model we just instantiated
# print(model_ft)

Helper Functions to find optimal threshold 

In [None]:
from traitlets.config.application import T
def geomMean(tpr, fpr):
    return np.sqrt(tpr * (1-fpr))

def findOptimalThreshold(groundtruth, predictions):
    #Takes in two arrays and finds optimal threshold
    assert len(groundtruth) == len(predictions)
    fpr, tpr, thresholds = metrics.roc_curve(groundtruth, predictions)

    best_score = 0
    best_threshold = 0
    for i in range(len(thresholds)):
        threshold = thresholds[i]
        score = geomMean(tpr, fpr)
        if score > best_score:
            best_threshold = threshold
            best_score = score
    return best_threshold

In [None]:
def eval_model(model_name, model, dataloader, criterion):
    #Takes in a model and outputs the groundtruth labels alongside the models prediction 
    since = time.time()

    validation_labels = torch.empty(0).to(device)
    validation_pred_distance = torch.empty(0).to(device)

    model.eval()   # Set model to evaluate mode

    running_loss = 0.0
    running_corrects = 0


    # Iterate over data.
    # print("just before dataloader")
    for batch, obj in enumerate(dataloader):
        # print(batch, "/", len(dataloaders[phase]))
        try:
            # print("trying unpack obj")
            image1 = obj["image1"]
            image2 = obj["image2"]
            flag = obj["flag"]
        except:
            training_grayscale_errors += 1
            continue

        image1 = image1.to(device)
        image2 = image2.to(device)
        flag = flag.to(device)


        # track history off since model in eval mode
        with torch.set_grad_enabled(False):

            # print('running model1')
            output1 = model(image1)
            # print('running model2')
            output2 = model(image2)

            #Add predictions if in a validation mode
            distance_func = nn.CosineSimilarity(dim = 1, eps = 1e-6)
            predicted_similarity = distance_func(output1, output2)
            # print(predicted_similarity.shape)
            # print(flag.shape)
            # assert predicted_similarity.shape == (batch_size,1) #ensure that it is the same size
            labels = (flag + 1) / 2 #converts to 0 and 1
            validation_labels = torch.cat((validation_labels,labels))
            validation_pred_distance = torch.cat((validation_pred_distance, predicted_similarity))


    return validation_labels, validation_pred_distance

Ok now that helper functions are setup, it's time to setup dataloaders for validation

In [None]:
transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Resize(224),
        transforms.CenterCrop(224),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

val_different_csv = '/content/drive/MyDrive/MLMI/CompVision/tiny-imagenet-200/val_different.csv'
val_similar_csv = '/content/drive/MyDrive/MLMI/CompVision/tiny-imagenet-200/val_similar.csv'
val_different_dataset = siameseDataset(csv_file= val_different_csv, transform=transform)
val_similar_dataset = siameseDataset(csv_file= val_similar_csv, transform=transform)

val_different_dataloader = torch.utils.data.DataLoader(val_different_dataset, batch_size=64, shuffle=False, num_workers = 2, pin_memory = True)
val_similar_dataloader = torch.utils.data.DataLoader(val_different_dataset, batch_size=64, shuffle=False, num_workers = 2, pin_memory = True)

SyntaxError: ignored

Now to initialize model, get the optimal threshold on the validation set, and save those predictions.

In [None]:
model_name = "resnet18"
criterion = nn.CosineEmbeddingLoss()
model_ft, input_size = initialize_model(model_name, num_classes = 128, feature_extract = False, use_pretrained = True)
different_groundtruth, different_predictions = eval_model(model_name, model_ft, val_different_dataloader, criterion)
similar_groundtruth, similar_predictions = eval_model(model_name, model_ft, val_similar_dataloader, criterion)

combined_groundtruth = different_groundtruth + similar_groundtruth
combined_predictions = different_predictions + similar_predictions

opt_thresh = findOptimalThreshold(combined_groundtruth, combined_predictions)

validation_arr = np.array([combined_groundtruth, combined_predictions])
model_dir = '/content/drive/MyDrive/MLMI/CompVision/tiny-imagenet-200/models/{}'.format(model_name)
np.savetxt(os.path.join(model_dir, 'validation_predictions.txt'), validation_arr)
print(model_name, "has an optimal threshold of", opt_thresh)
#TODO: Add saving optimum threshold

Now, with the optimal threshold found, the model can be tested on each one, after which it the predictions and labels will be saved

In [None]:
def generateDataloaderDict():
    test_sets = ['/content/drive/MyDrive/MLMI/CompVision/tiny-imagenet-200/test_seen_similar.csv',
                 '/content/drive/MyDrive/MLMI/CompVision/tiny-imagenet-200/test_unseen_similar.csv',
                 '/content/drive/MyDrive/MLMI/CompVision/tiny-imagenet-200/test_imagenetO_similar.csv',
                 '/content/drive/MyDrive/MLMI/CompVision/tiny-imagenet-200/test_seen_seen_different.csv',
                 '/content/drive/MyDrive/MLMI/CompVision/tiny-imagenet-200/test_unseen_seen_different.csv',
                 '/content/drive/MyDrive/MLMI/CompVision/tiny-imagenet-200/test_unseen_unseen_different.csv',
                 '/content/drive/MyDrive/MLMI/CompVision/tiny-imagenet-200/test_imagenetO_different.csv',
                 ]
    dataloaders_dict = {}
    for csv_path in test_sets:

        dataset = siameseDataset(csv_path, transform = transform)
        test_name = csv_path.split('/')[-1].split('.')[0]
        dataloaders_dict[test_name] = torch.utils.data.DataLoader(dataset, batch_size=64, shuffle=False, num_workers = 2, pin_memory = True)



In [None]:
def testModel(dataloader, model_ft, model_name, test_name, opt_thresh):

    groundtruth, predictions = eval_model(model_name, model_ft, dataloader, criterion)
    test_arr = np.array([groundtruth, predictions])
    model_dir = '/content/drive/MyDrive/MLMI/CompVision/tiny-imagenet-200/models/{}'.format(model_name)
    np.savetxt(os.path.join(model_dir, '{}_predictions.txt'.format(test_name)), test_arr)
    predictions_mask = (predictions > opt_thresh).astype(int)
    accuracy = metrics.accuracy_score(groundtruth, predictions)
    return accuracy



def testModelWrapper(dataloader_dict, model_name, opt_thresh):
    #Returns the accuracies in the order of the dataloader_dict
    accuracies = []
    model = initialize_model(model_name, num_classes = 128, feature_extract = False, use_pretrained = True)
    for test_name in dataloader_dict:
        dataloader = dataloader_dict[test_name]
        accuracy = testModel(dataloader, model, model_name, test_name, opt_thresh)
        accuracies.append(accuracy)
    return accuracies


def testAllModels():

    dataloader_dict = generateDataloaderDict()
    all_accuracies = []
    model_names = ["resnet18", "resnet50", "alexnet", "squeezenet", "efficientnet"]
    for model_name in model_names:
        model_dir = '/content/drive/MyDrive/MLMI/CompVision/tiny-imagenet-200/models/{}'.format(model_name)
        opt_thresh = np.loadtxt(os.path.join(model_dir, 'opt_thresh.txt'))

        accuracies = testModelWrapper(dataloader_dict, model_name, opt_thresh)
        all_accuracies.append(accuracies)
    accuracy_df = pd.DataFrame(all_accuracies, columns = dataloader_dict.keys())
    accuracy_df.to_csv('/content/drive/MyDrive/MLMI/CompVision/tiny-imagenet-200/models/model_accuracies.csv', index = False)
    
    