# Cars Dataset

In [None]:
import numpy as np
from matplotlib import pyplot as plt
from pathlib import Path
from PIL import Image
import random
import scipy
from sklearn.linear_model import SGDClassifier
from sklearn.utils import shuffle
import time
from torchvision.models import resnet18, ResNet18_Weights
from torch.nn import Identity
from torchvision.io import read_image
from torchvision import transforms
import torch
from tqdm import tqdm

plt.rcParams['figure.figsize'] = [12, 8]

%load_ext autoreload
%autoreload 2

## Data

The dataset should (be downloaded)[https://ai.stanford.edu/~jkrause/cars/car_dataset.html] and extracted into a `data/` folder at the project root.  

In [None]:
DATA_PATH = Path("data")

### TASK 1 - Build a function that converts a labelled dataset into labelled and unlabelled subsets.

In [None]:
def get_labelled_and_unlabelled_indexes(
    dataset_labels,
    proportion
):
    if 0 >= proportion >= 1:
        raise ValueError("`proportion` should be a float between 0 and 1.")

    split_index = int(len(dataset_labels) * (1 - proportion))
    unique_classes = np.unique(dataset_labels).tolist()
    
    if split_index < len(unique_classes):
        min_proportion = len(unique_classes) / len(dataset_labels)
        raise ValueError(
            f"The proportion should be greater than {min_proportion} to ensure" \
            "all unique classes have at least one instance labelled."
        )
        
    dataset_indexes = np.arange(0, len(dataset_labels))
    
    while True:
        random.shuffle(dataset_indexes)
    
        labelled_indexes = dataset_indexes[:split_index]
        unlabelled_indexes = dataset_indexes[split_index:]
    
        # Make sure at least one instance of each class is labelled
        if np.unique(np.array(dataset_labels)[labelled_indexes]).tolist() == unique_classes:
            break
    
    return labelled_indexes, unlabelled_indexes 

### TASK 2 - Data cleaning

# TODO (PRINT deletion)

In [None]:
def delete_non_RGB_images(image_path):
    delete_count = 0
    for filename in image_path.iterdir():
        image = Image.open(filename)
        if image.mode == "RGB":
            continue
            
        print(f"Deleting {filename} (is not an RGB image).")
        filename.unlink()
        delete_count += 1

    print(f">>> Deleted {delete_count} files in {image_path}.")

    
for image_path in [
    DATA_PATH / "cars_train", 
    DATA_PATH / "cars_test", 
]:
    delete_non_RGB_images(image_path)

### TASK 3 - Dataset representation

In [None]:
def load_images(image_path):
    images = {}
    for filename in image_path.iterdir():
        # Directly load images as tensors
        images[filename.name] = read_image(str(filename))
    
    print(f"{len(images)} images in {image_path}.")
    
    return images    

train_images = load_images(DATA_PATH / "cars_train")
test_images = load_images(DATA_PATH / "cars_test")

In [None]:
def load_annotation_files(filename):
    data = scipy.io.loadmat(filename)["annotations"][0]
    
    annotations = {}
    for d in data:
        image_filename = d[-1][0]
        _class = d[-2][0][0]
        
        annotations[image_filename] = _class
    
    return annotations
        
train_annotations = load_annotation_files(DATA_PATH / "devkit" / "cars_train_annos.mat")

In [None]:
def prepare_model():
    model = resnet18(weights=ResNet18_Weights.DEFAULT)
    # Replace last layer
    model.fc = Identity()
    
    return model.eval()

model = prepare_model()

In [None]:
def prepare_dataset(model, transform, images, annotations):
    dataset = {}
    for i, (image_filename, image) in enumerate(tqdm(images.items())):
        if image_filename not in annotations:
            print(image_filename)
            continue
        
        x = image / 256
        x = transform(x)
        x = x.float()
        x = x.unsqueeze(0)
        
        embedding = model(x).detach()
        embedding = embedding[0].numpy()
        
        dataset[i] = {
            "embedding": embedding, 
            "class_idx": annotations[image_filename], 
            "labelled": True
        }
        
    return dataset
    
dataset = prepare_dataset(
    model=model, 
    transform=ResNet18_Weights.DEFAULT.transforms(), 
    images=train_images, 
    annotations=train_annotations
)
torch.save(dataset, DATA_PATH / "dataset.pt")

# dataset = torch.load(DATA_PATH / "dataset.pt")

### TASK 4 - Build a partially labelled dataset

In [None]:
dataset_inputs, dataset_labels = zip(*[
    (d["embedding"], d["class_idx"]) for d in dataset.values()
])
    
labelled_indexes, unlabelled_indexes = get_labelled_and_unlabelled_indexes(
    dataset_labels,
    0.4
)

### TASK 5 - Create train/validation split

In [None]:
def split_dataset(
    dataset_inputs,
    dataset_labels,
    training_proportion
):
    if 0 >= training_proportion >= 1:
        raise ValueError("`training_proportion` should be a float between 0 and 1.")
    
    index_split = int(len(dataset_inputs) * training_proportion)
    
    training_inputs = dataset_inputs[:index_split]
    training_labels = dataset_labels[:index_split]
    print(f"Train: {len(training_inputs)} samples")
    
    valid_inputs = dataset_inputs[index_split:]
    valid_labels = dataset_labels[index_split:]
    print(f"Valid: {len(valid_inputs)} samples")
    
    return training_inputs, training_labels, valid_inputs, valid_labels

### TASK 6 - Create experiment(s) to convince clients that more labelled data will improve model performance 

In [None]:
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline

def select_items(my_list, selected_indexes):
    selected_items = [
        item for i, item in enumerate(my_list) if i in selected_indexes
    ]
    
    return selected_items

def train_model(X_train, y_train, X_test, y_test, max_iteration=1000, verbose=0):
    classifier = make_pipeline(
        StandardScaler(),
        SGDClassifier(loss="perceptron", max_iter=max_iteration,  tol=1e-3, penalty="l2", verbose=verbose)
    )
    
    t0 = time.time()
    classifier.fit(X_train, y_train)
    t1 = time.time()
    
    print(f"Classifier trained in {round(t1 - t0, 2)} seconds.")
    
    score = classifier.score(X_test, y_test)
    
    return classifier, score


dataset_inputs_labelled = select_items(dataset_inputs, labelled_indexes)
dataset_labels_labelled = select_items(dataset_labels, labelled_indexes)

X_train, y_train, X_valid, y_valid = split_dataset(
    dataset_inputs_labelled,
    dataset_labels_labelled,
    training_proportion=0.8
)

# ratios = np.arange(0.1, 1, 0.1)
ratios = [1]
scores = []

for ratio in ratios:
    split_index = int(len(X_train) * ratio)
    X_subset_train = X_train[:split_index]
    y_subset_train = y_train[:split_index]
    
    classifier, score = train_model(
        X_subset_train, 
        y_subset_train, 
        X_valid, 
        y_valid, 
        max_iteration=1000, 
        verbose=0
    )
    scores.append(score)

print(score)
# plt.plot(ratios, scores)

### TASK 7 - Active learning to select new instances to be labelled

In [None]:
X_candidates = select_items(dataset_inputs, unlabelled_indexes)
y_candidates = select_items(dataset_labels, unlabelled_indexes)

K_candidates = int(len(dataset_inputs) * 0.25)

probabilities = classifier.predict_proba(X_candidates)
entropies = scipy.stats.entropy(probabilities)

selected_candidate_indexes = np.argsort(entropies).tolist()[-K_candidates:]

### TASK 8 - Final model training and evaluation

In [None]:
X_extra = select_items(dataset_inputs, selected_candidate_indexes)
y_extra = select_items(dataset_labels, selected_candidate_indexes)

X_final = X_train + X_extra
y_final = y_train + y_extra

classifier.fit(X_final, y_final)

# We want our validation set to remain the same
score = classifier.score(X_valid, y_valid)

print(f"FINAL SCORE {score}")