In [None]:
import pandas as pd
import os
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split

In [None]:
from torch import nn
import torch
from torch.utils.data import TensorDataset, DataLoader
import torch.optim as optim
from torchvision import transforms, datasets
from PIL import Image
from torch.utils.data import WeightedRandomSampler

class Model:  
    """
    This class represents an AI model.
    """
    
    def __init__(self):
        """
        Constructor for Model class.
  
        Parameters
        ----------
        self : object
            The instance of the object passed by Python.
        """
        # initialize neural network sequence
        self.cnn = nn.Sequential(
            nn.Conv2d(3, 8, kernel_size=3, stride=1, padding=1),
            nn.MaxPool2d(kernel_size=2),
            nn.ReLU(),
            nn.Dropout(0.2),

            nn.Conv2d(8, 16, kernel_size=3, stride=1, padding=1),
            nn.MaxPool2d(kernel_size=2),
            nn.ReLU(),
            nn.Dropout(0.2),

            nn.Flatten(),
            nn.Linear(256, 32),
            nn.ReLU(),
            nn.Linear(32, 3)  
        )

        # initialize hyperparameters
        self.learning_rate = 0.002
        self.batch_size = 32
        self.epochs = 25

    def fit(self, X, y):
        """
        Train the model using the input data.
        
        Parameters
        ----------
        X : ndarray of shape (n_samples, channel, height, width)
            Training data.
        y : ndarray of shape (n_samples,)
            Target values.
            
        Returns
        -------
        self : object
            Returns an instance of the trained model.
        """

        X, y = Model.preprocess(X, y)
        X, y = Model.balance_dataset(X, y)
        #X, y = Model.feature_engineer(X, y)

        

        # Increase the weight of the minority classes more significantly
        #class_weights = torch.tensor([total_count / (len(class_counts) * class_count) for class_count in class_counts])
        class_weights = torch.tensor([1, 1, 1])

        class_weights = class_weights / class_weights.sum()

        print('Class weights:', class_weights)
        Model.print_class_counts(y)

        # print percentage of each label
        Model.print_label_percentage(y)

        X_tensor = torch.tensor(X, dtype=torch.float32)
        y_tensor = torch.tensor(y, dtype=torch.long)

        # Calculate weights for each sample
        class_sample_counts = torch.tensor([(y_tensor == t).sum() for t in torch.unique(y_tensor, sorted=True)])
        class_weights = 1. / class_sample_counts.float()
        weights = class_weights[y_tensor.long()]

        # Create a weighted sampler to handle imbalanced classes
        sampler = WeightedRandomSampler(weights, len(weights))

        # Create a dataset and data loader
        dataset = TensorDataset(X_tensor, y_tensor)
        dataloader = DataLoader(dataset, batch_size=self.batch_size if self.batch_size else len(dataset), sampler=sampler)

        # Define loss function and optimizer for classification
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(self.cnn.parameters(), lr=self.learning_rate)

        for epoch in range(self.epochs):
            for inputs, targets in dataloader:
                # Zero the parameter gradients
                optimizer.zero_grad()

                # Forward pass
                outputs = self.cnn(inputs)
                loss = criterion(outputs, targets)

                # Backward and optimize
                loss.backward()
                optimizer.step()

            print(f'Epoch {epoch+1}/{self.epochs}, Loss: {loss.item()}')

        return self
    
    def predict(self, X):
        """
        Use the trained model to make predictions.
        
        Parameters
        ----------
        X : ndarray of shape (n_samples, channel, height, width)
            Input data.
            
        Returns
        -------
        ndarray of shape (n_samples,)
        Predicted target values per element in X.
           
        """
        X = Model.preprocess_predict(X)
        X_tensor = torch.tensor(X, dtype=torch.float32)
        dataset = TensorDataset(X_tensor)
        dataloader = DataLoader(dataset, batch_size=len(dataset), shuffle=False)
        predictions = []
        for inputs in dataloader:
            outputs = self.cnn(inputs[0])
            _, predicted = torch.max(outputs.data, 1)
            predictions += predicted.tolist()

        
        return np.array(predictions)
        
    
    @staticmethod
    def preprocess(images, labels):
        # remove images where label is nan
        images = images[~np.isnan(labels)]
        labels = labels[~np.isnan(labels)]
        
        # replace nan with 0
        images = np.nan_to_num(images)

        # cap min to 0 and max to 255
        images = np.clip(images, 0, 255)

        print('Shape:', images.shape)

        # remove all blues and greens
        images[:, 1, :, :] = 0
        images[:, 2, :, :] = 0

        # normalize the images
        images = images / 255.0

        return images, labels
    
    @staticmethod
    def preprocess_predict(images):
        # replace nan values with 0
        images = np.nan_to_num(images)

        # cap min to 0 and max to 255
        images = np.clip(images, 0, 255)
        
        # remove all blues and greens
        images[:, 1, :, :] = 0
        images[:, 2, :, :] = 0
        
        # normalize the images
        images = images / 255.0

        return images

    @staticmethod
    def feature_engineer(images, labels):
        T = transforms.Compose([
            transforms.RandomHorizontalFlip(),
            transforms.RandomRotation(10),
        ])

        # crete 10 different transformations
        T_list = [
            transforms.RandomHorizontalFlip(),
            transforms.RandomRotation(20),
            transforms.RandomVerticalFlip(),
            transforms.RandomRotation(40),
            transforms.RandomInvert(),
            transforms.ColorJitter(brightness=0.5),
            transforms.ColorJitter(contrast=0.5),
            transforms.ColorJitter(saturation=0.5),
            transforms.ColorJitter(hue=0.5),
            transforms.RandomGrayscale(p=0.1),
        ]

        print("Shape before:", images.shape)

        augmented_images = []
        augmented_labels = []

        # get images and labels where label is 1 or 2
        images_to_engineer = images[labels != 0]
        labels_to_engineer = labels[labels != 0]

        print("Number of images to engineer:", len(images_to_engineer))

        for (image, label) in zip(images_to_engineer, labels_to_engineer):
            image = image.transpose(1, 2, 0)  # Convert to HWC format for PIL
            for transform in T_list:
                img_pil = Image.fromarray(image.astype('uint8'), 'RGB')

                # Apply transformation
                augmented_img = transform(img_pil)

                # Convert back to CHW format and append
                augmented_np = np.asarray(augmented_img).transpose(2, 0, 1)
                augmented_images.append(augmented_np)
                augmented_labels.append(label)

        images = np.concatenate((images, augmented_images), axis=0)
        labels = np.concatenate((labels, augmented_labels), axis=0)

        print('Shape:', images.shape)

        return images, labels
    
    @staticmethod
    def balance_dataset(images, labels, min_proportions=[0.1, 0.9]):
        unique_labels, counts = np.unique(labels, return_counts=True)
        total_samples = len(labels)
        
        # Determine minimum count for each label based on proportions
        min_counts = [int(total_samples * p) for p in min_proportions]
        
        # Sort labels by their count (ascending)
        sorted_indices = np.argsort(counts)
        
        for idx, min_count in zip(sorted_indices, min_counts):
            label = unique_labels[idx]
            current_count = counts[idx]
            
            if current_count < min_count:
                # Calculate the number of samples to add
                add_count = min_count - current_count
                
                # Get indices of the current label
                label_indices = np.where(labels == label)[0]
                
                # Randomly select indices to duplicate
                add_indices = np.random.choice(label_indices, add_count)
                
                # Add the images and labels
                images = np.concatenate((images, images[add_indices]))
                labels = np.concatenate((labels, labels[add_indices]))

        return images, labels

    @staticmethod
    def print_label_percentage(y):
        total_count = len(y)
        unique_labels, counts = np.unique(y, return_counts=True)
        for label, count in zip(unique_labels, counts):
            print(f'Label {label}: {count / total_count * 100:.2f}%')

    @staticmethod
    def print_class_counts(y):
        unique_labels, counts = np.unique(y, return_counts=True)
        print('Class counts:', dict(zip(unique_labels, counts)))

In [None]:
# Load data
with open('data.npy', 'rb') as f:
    data = np.load(f, allow_pickle=True).item()
    X = data['image']
    y = data['label']

In [None]:
# Split train and test
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=1)

# Filter test data that contains no labels
# In Coursemology, the test data is guaranteed to have labels
nan_indices = np.argwhere(np.isnan(y_test)).squeeze()
mask = np.ones(y_test.shape, bool)
mask[nan_indices] = False
X_test = X_test[mask]
y_test = y_test[mask]

# Train and predict
model = Model()
model.fit(X_train, y_train)
y_pred = model.predict(X_test)

# Evaluate model predition
# Learn more: https://scikit-learn.org/stable/modules/classes.html#module-sklearn.metrics
print("F1 Score (macro): {0:.2f}".format(f1_score(y_test, y_pred, average='macro'))) # You may encounter errors, you are expected to figure out what's the issue.

In [None]:
# print first 20 predictions beside ground truth
#for i in range(20):
#    print(f'Prediction: {y_pred[i]}, Ground Truth: {y_test[i]}')

print(f'Total predictions: {len(y_pred)}')
print(f'Total ground truth: {len(y_test)}')

# get the indices where true label is 1
label_1_indices_true = np.where(y_test == 1)[0]
# get the indices where predicted label is 1
label_1_indices_pred = np.where(y_pred == 1)[0]

# get the indices where true label is 2
label_2_indices_true = np.where(y_test == 2)[0]
# get the indices where predicted label is 2
label_2_indices_pred = np.where(y_pred == 2)[0]

# get the indices where true label is 0
label_0_indices_true = np.where(y_test == 0)[0]
# get the indices where predicted label is 0
label_0_indices_pred = np.where(y_pred == 0)[0]

# print number of true and predicted for each label
for label in range(3):
    label_indices_true = np.where(y_test == label)[0]
    label_indices_pred = np.where(y_pred == label)[0]
    print(f'Label {label}: {len(label_indices_true)} true, {len(label_indices_pred)} predicted')

print("=====================================")

# out of the images with label 0, how many did we predict correctly
correct_count = 0
for label_0_index in label_0_indices_true:
    if label_0_index in label_0_indices_pred:
        correct_count += 1
print(f'Label 0: {correct_count} correct out of {len(label_0_indices_true)}')

# out of the images with label 1, how many did we predict correctly
correct_count = 0
for label_1_index in label_1_indices_true:
    if label_1_index in label_1_indices_pred:
        correct_count += 1
print(f'Label 1: {correct_count} correct out of {len(label_1_indices_true)}')

# out of the images with label 2, how many did we predict correctly
correct_count = 0
for label_2_index in label_2_indices_true:
    if label_2_index in label_2_indices_pred:
        correct_count += 1
print(f'Label 2: {correct_count} correct out of {len(label_2_indices_true)}')

# Convert y_test and y_pred to binary format (1 for label '1' and 0 for all other labels)
y_test_binary = (y_test == 1).astype(int)
y_pred_binary = (y_pred == 1).astype(int)

# Calculate Precision, Recall, and F1 Score for label '1' (binary classification)
precision = precision_score(y_test_binary, y_pred_binary, pos_label=1)
recall = recall_score(y_test_binary, y_pred_binary, pos_label=1)
f1 = f1_score(y_test_binary, y_pred_binary, pos_label=1)

print(f"Precision for label 1: {precision:.2f}")
print(f"Recall for label 1: {recall:.2f}")
print(f"F1 Score for label 1: {f1:.2f}")

In [None]:
random_states = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
f1_scores = []
for r in random_states:
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=r)
    nan_indices = np.argwhere(np.isnan(y_test)).squeeze()
    mask = np.ones(y_test.shape, bool)
    mask[nan_indices] = False
    X_test = X_test[mask]
    y_test = y_test[mask]
    model = Model()
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    f1_scores.append(f1_score(y_test, y_pred, average='macro'))

print(f'Average F1 Score: {np.mean(f1_scores):.2f}')
print(f'Nax F1 Score: {np.max(f1_scores):.2f}')
print(f'Min F1 Score: {np.min(f1_scores):.2f}')

# 0.68 ave, 0.86 max, 0.56 min