In [19]:
import numpy as np
import os
from PIL import Image
from pathlib import Path 
from sklearn.preprocessing import StandardScaler
from sklearn.neural_network import MLPClassifier
from sklearn.model_selection import train_test_split


# Please write the optimal hyperparameter values you obtain in the global variable 'optimal_hyperparm' below. This
# variable should contain the values when I look at your submission. I should not have to run your code to populate this
# variable.
optimal_hyperparam = {}

class COC131:
 def q1(self, filename=None):
    """
    This function should be used to load the data. To speed-up processing in later steps, lower resolution of the
    image to 32*32. The folder names in the root directory of the dataset are the class names. After loading the
    dataset, you should save it into an instance variable self.x (for samples) and self.y (for labels). Both self.x
    and self.y should be numpy arrays of dtype float.

    :param filename: this is the name of an actual random image in the dataset. You don't need this to load the
    dataset. This is used for testing the implementation.
    :return res1: a one-dimensional numpy array containing the flattened low-resolution image in file 'filename'.
    Flatten the image in the row major order. The dtype for the array should be float.
    :return res2: a string containing the class name for the image in file 'filename'. This string should be the same as
    one of the folder names in the originally shared dataset.
    """

    # Get all subfolders within the dataset directory
    subfolders = [entry.path for entry in os.scandir("../dataset") if entry.is_dir()]

    # Creates a list of all paths for the images stored within the ../dataset directory
    image_paths = [os.path.join(subfolder, file)
                 for subfolder in subfolders
                 for file in os.listdir(subfolder) if file.endswith(('.jpg'))] # If the file is an image

    # Processes each image within the dataset directory and stores it into the images array
    self.x = np.array([np.array(Image.open(image_path).resize((32, 32)), dtype=float).flatten() for image_path in image_paths])

    # Gets the name of the folder that the image is stored in (the classification of the image)
    self.y = np.array([os.path.basename(os.path.dirname(image_path)) for image_path in image_paths])

    # If a filename is provided, find and return it
    if filename:
        matching_image_path = next((image_path for image_path in image_paths if filename in image_path), None)
        
        if matching_image_path:
            res1 = np.array(Image.open(matching_image_path).resize((32, 32)), dtype=float).flatten()
            res2 = os.path.basename(os.path.dirname(matching_image_path))  # Extract class name
            return res1, res2

    return self.x, self.y



    def q2(self, inp):
        """
        This function should compute the standardized data from a given 'inp' data. The function should work for a
        dataset with any number of features.

        :param inp: an array from which the standardized data is to be computed.
        :return res2: a numpy array containing the standardized data with standard deviation of 2.5. The array should
        have the same dimensions as the original data
        :return res1: sklearn object used for standardization.
        """

        standard_scaler = StandardScaler() # Creates a sklearn object used for standardisation
        standardised_data = standardScaler.fit_transform(inp) # Standardises the data (setting the standard deviation to 1)

        res1 = standard_scaler
        res2 = standardised_data * 2.5 # Standardises the data to have a standard deviation of 2.5

        return res2, res1

    def q3(self, test_size=None, pre_split_data=None, hyperparam=None):
        
        """
        This function builds an MLP Classifier using the dataset loaded in function 'q1' and evaluates model
        erformance. You can assume that the function 'q1' has been called prior to calling this function.
        This function supports hyperparameter optimizations.
        
        :param test_size: The proportion of the dataset that should be reserved for testing. Should be a fraction between 0 and 1.
        Default is 0.3 (30% for testing)
        :param pre_split_data: Can be used to provide data already split into training and testing.
        param hyperparam: Dictionary of hyperparameter values to be tested during optimization.
        :return: The function returns 1 model object and 3 numpy arrays containing the loss, training accuracy,
        and testing accuracy after each training iteration for the best model found.
        """
        # Normalise the data using q2()
        if self.scaler is None:
            self.x, self.scaler = self.q2(self.x)

        # Set default test size to 30% if none is provided
        if test_size is None:
            test_size = 0.3  # Default 70% training, 30% testing split
    
        # Train/test split if pre-split data is not provided
        if pre_split_data:
            x_train, x_test, y_train, y_test = pre_split_data
        else:
            x_train, x_test, y_train, y_test = train_test_split(self.x, self.y, test_size=test_size, random_state=1)
    
        # If hyperparameters are provided, use them directly
        if hyperparam:
            model = MLPClassifier(**hyperparam, random_state=1) # Set the random_state to 1 as a value to standardise the classifier generated
            model.fit(x_train, y_train)
            self.best_hyperparams = hyperparam  # Store the given hyperparameters
        else:
            # Define parameter grid for Grid Search (excluding alpha, which is tuned in q4)
            param_grid = {
                "hidden_layer_sizes": [(50,), (100,), (100, 50), (200,)],  # Different network sizes
                "learning_rate": ["constant", "adaptive"],  # Learning rate type
                "solver": ["adam", "sgd"]  # Optimisation solvers
            }

        # Initialize Grid Search with cross-validation
        # This Grid Search creates a 
        grid_search = GridSearchCV(
            MLPClassifier(random_state=1),  # Set the random_state to 1 as a value to standardise the classifier generated
            param_grid)

        # Perform Grid Search
        grid_search.fit(x_train, y_train)

        # Get the best model and hyperparameters
        model = grid_search.best_estimator_
        self.best_hyperparams = grid_search.best_params_
    
        # Extract performance metrics
        loss_curve = np.array(model.loss_curve_)  # Loss during training
        train_accuracy = np.array([accuracy_score(y_train, model.predict(x_train))])
        test_accuracy = np.array([accuracy_score(y_test, model.predict(Xxtest))])
    
        return model, loss_curve, train_accuracy, test_accuracy

    def q4(self):
        """
        This function studies the impact of alpha (L2 regularization) on model performance.
        It trains multiple MLP classifiers using different alpha values while keeping other 
        hyperparameters the same as the best found in q3().
    
        :return res: Dictionary containing accuracy scores for each alpha.
        """
    
        if self.x is None or self.y is None:
            raise ValueError("Dataset not loaded. Please call q1() first.")
        
        if not self.best_hyperparams:
            raise ValueError("Best hyperparameters not found. Please run q3() first.")
    
        # Train/test split (same as q3)
        X_train, X_test, y_train, y_test = train_test_split(self.x, self.y, test_size=0.3, random_state=1)
    
        # Alpha values to test
        alpha_values = [0, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 2, 5, 10, 50, 100]
    
        train_accuracies = []
        test_accuracies = []
    
        # Train a model for each alpha value
        for alpha in alpha_values:
            # Use best hyperparameters from q3(), but override alpha
            hyperparams = self.best_hyperparams.copy()
            hyperparams["alpha"] = alpha
    
            model = MLPClassifier(**hyperparams, random_state=1)
            model.fit(X_train, y_train)
    
            # Evaluate model
            train_acc = accuracy_score(y_train, model.predict(X_train))
            test_acc = accuracy_score(y_test, model.predict(X_test))
    
            train_accuracies.append(train_acc)
            test_accuracies.append(test_acc)
    
        # Store results in a dictionary
        res = {
            "alpha_values": np.array(alpha_values),
            "train_accuracies": np.array(train_accuracies),
            "test_accuracies": np.array(test_accuracies)
        }

    return res

    def q5(self):
        """
        This function performs hypothesis testing to study the impact of using CV with and without Stratification
        on the performance of MLPClassifier. Set other model hyperparameters to the best values obtained in the previous
        questions. Use 5-fold cross validation for this question. You can assume that the function 'q1' has been called
        prior to calling this function.
    
        :return: The function returns 4 items - the final testing accuracy for both methods of CV, p-value of the
                 test and a string representing the result of hypothesis testing. The string can have only two possible values:
                 'Splitting method impacted performance' or 'Splitting method had no effect'.
        """
    
        if self.x is None or self.y is None:
            raise ValueError("Dataset not loaded. Please call q1() first.")
    
        if not self.best_hyperparams:
            raise ValueError("Best hyperparameters not found. Please run q3() first.")
    
        # Use best hyperparameters from q3()
        hyperparams = self.best_hyperparams.copy()
    
        # Define the model
        model = MLPClassifier(**hyperparams, random_state=1)
    
        # Define 5-Fold CV (Without Stratification)
        kf = KFold(n_splits=5, shuffle=True, random_state=1)
        cv_scores_kf = cross_val_score(model, self.x, self.y, cv=kf, scoring="accuracy", n_jobs=-1)
    
        # Define 5-Fold Stratified CV
        skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=1)
        cv_scores_skf = cross_val_score(model, self.x, self.y, cv=skf, scoring="accuracy", n_jobs=-1)
    
        # Compute mean testing accuracy
        mean_acc_kf = np.mean(cv_scores_kf)
        mean_acc_skf = np.mean(cv_scores_skf)
    
        # Perform paired t-test to compare distributions
        t_stat, p_value = ttest_rel(cv_scores_kf, cv_scores_skf)
    
        # Determine hypothesis test result
        alpha = 0.05  # Significance level
        if p_value < alpha:
            hypothesis_result = "Splitting method impacted performance"
        else:
            hypothesis_result = "Splitting method had no effect"
    
        return mean_acc_kf, mean_acc_skf, p_value, hypothesis_result

    def q6(self):
        """
        This function should perform unsupervised learning using LocallyLinearEmbedding in Sklearn. You can assume that
        the function 'q1' has been called prior to calling this function.

        :return: The function should return the data you visualize.
        """

        res = np.zeros(1)

        return res

In [2]:
dataset = COC131()
image_array, class_label = dataset.q1("Forest_1.jpg")  # Change to an actual filename in your dataset
print(class_label)
images, labels = dataset.q1()
print(images)
print(labels)

Forest
[[148. 121. 120. ... 106.  94. 104.]
 [ 56.  98.  83. ... 173. 127. 112.]
 [ 69.  93.  96. ... 204. 160. 150.]
 ...
 [ 32.  55.  80. ...  32.  53.  81.]
 [ 54.  71.  80. ...  53.  73.  82.]
 [102.  99. 108. ...  96.  96. 106.]]
['AnnualCrop' 'AnnualCrop' 'AnnualCrop' ... 'SeaLake' 'SeaLake' 'SeaLake']
