<a href="https://colab.research.google.com/github/FourthYearUni/ML-DeeplearningModel/blob/main/assignment_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [6]:
pip install scikit-learn numpy tensorflow tqdm cleanlab openpyxl pandas



In [7]:
"""
This module handles any file operations
"""

import os
import hashlib
import subprocess

from openpyxl import Workbook
import pandas as pd

class File:
    """
    This class defines methods that do regular file operations some of these include
    - Reading files
    - Writing files
    - Calculating file hash
    - Deleting files
    """

    def __init__(self) -> None:
        self.hash_function = hashlib.sha256()
        self.chunk_size = 4096

    def calculate_hash(self, file_path: str) -> str:
        """
        Calculate the hash of a file
        """
        result = subprocess.run(["sha256sum", file_path], stdout=subprocess.PIPE)
        return result.stdout.decode("utf-8").split(" ")[0]

    def find_duplicate_files(self, folder_path: str) -> list:
        """
        Find duplicate files in a folder
        """
        hashes = {}
        duplicates = []
        workbook = Workbook()
        sheet = workbook.active
        sheet["A1"] = "Hash"
        sheet["B1"] = "File Path"

        for file in os.listdir(folder_path):
            file_path = os.path.join(folder_path, file)
            file_hash = self.calculate_hash(file_path)
            if file_hash in hashes:
                # If the file hash already exists in the dictionary append the file path property
                hashes[file_hash].append(file_path)
                sheet.append([file_hash, file_path])
            else:
                # If the file hash does not exist in the dictionary add it as a new key
                hashes[file_hash] = [file_path]
        workbook.save("hashes.xlsx")

        for files in hashes.values():
            if len(files) > 1:
                duplicates.append(files)

        return duplicates

    def delete_file(self, file_path: str) -> None:
        """
        Delete a file
        """
        os.remove(file_path)
        print(f"Deleted file {file_path}")


In [8]:
"""
Utility for resizing images
"""

import cv2
# from .files import File


def resize_picture(im_path: str) -> int:
    """
    Returns a resized picture
    """
    files = File()
    image = cv2.imread(im_path)
    points = (150, 150)
    try:
        im_shape = image.shape
    except Exception as e:
        print(f"{e}. Image is {im_path}")
        files.delete_file(im_path)

    if im_shape[0] != points[0] and im_shape[1] != im_shape[1]:
        resized = cv2.resize(image, points, interpolation=cv2.INTER_LINEAR)
        cv2.imwrite(im_path, resized)
        print("This image was resized")
        return 1
    print("This image was not resized")
    return 0


In [9]:
"""
This module handles the labelling process of the images
"""
from pathlib import Path
from openpyxl import Workbook


class Labeller:
    """
    Provides methods for labelling images
    """

    def __init__(self) -> None:
        self.label_file = Path(__file__).parent / "../../labels.xlsx"
        self.workbook = Workbook()
        self.sheet = self.workbook.active
        self.sheet["A1"] = "Label"
        self.sheet["B1"] = "Image"

    def label_images(self, label: str, image: str, index: int) -> None:
        """
        Labels the images
        """
        print(f"Index is {index}")
        self.sheet.append([label, image])

    def save_labels(self) -> None:
        """
        Saves the labels to a file
        """
        self.workbook.save(self.label_file)
        print("Labels saved successfully")








In [10]:
"""
This module handles data cleaning and labelling.
@author: Alain Mugisha (U2083264)
"""

import os
import shutil
from pathlib import Path

import cv2
from pandas import read_excel
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from numpy import array
from cleanlab import Datalab
from google.colab import drive

drive.mount('/content/drive')

# from ..Utils.files import File
# from src.Utils.labeller import Labeller
# from ..Utils.resize import resize_picture


class Cleaner:
    """
    This class handles data cleaning and labelling.
    """

    def __init__(self) -> None:
        base_dir = '/content/drive/My Drive/'
        self.data_folder = Path(__file__).parent / "../../Data/"
        self.clean_data_folder = Path(__file__).parent / "../../CleanData/"
        self.norm_data = Path(__file__).parent / "../../NormalizedData/"
        self.label_file = Path(__file__).parent / "../../labels.xlsx"
        self.labels = []
        self.files_util = File()
        self.labeller = Labeller()

    def process_images(self) -> None:
        """
        Process images prior to normalization and training
        - Renames images
        - Resizes them if necessary
        """

        for folder in os.listdir(self.data_folder):
            # Use absolute path in order to have a valid path to traverse
            abs_folder = os.path.join(self.data_folder, folder)
            valid_extensions = ["HEIC", "jpg", "png", "jpeg"]
            print(f"Processing folder {abs_folder}")

            for index, file in enumerate(os.listdir(abs_folder)):
                abs_file = os.path.join(abs_folder, file)
                ext = file.split(".")[1]
                # Check if the file is mislabelled with the wrong type of extension
                if ext not in valid_extensions:
                    # Assume that the extension has space between it and some other text
                    ext = valid_extensions[3]
                file_name = f"{folder}_{index}.{ext}"
                out_path = os.path.join(self.clean_data_folder, file_name)
                shutil.copy(abs_file, out_path)
                print(f"Saving file {file_name}")
                self.labeller.label_images(folder, file_name, index)

        self.labeller.save_labels()

    @staticmethod
    def process_labels(data_folder: str, label_path: str):
        """
        Process the labels and images and normalizes the image arrays
        """
        labels_df = read_excel(label_path)
        labels_df.head()
        image_size = (150, 150)

        x = []  # Array for image
        y = []  # Array of label strings

        for _, row in labels_df.iterrows():
            img_path = os.path.join(data_folder, row["Image"])
            if os.path.isfile (img_path) is False:
                continue
            img = load_img(img_path, target_size=image_size)
            x.append(img_to_array(img))
            y.append(row["Label"])

        X = array(x) / 255.0  # Normalization
        Y = array(y)
        return X, Y

    def resize_all_pictures(self) -> int:
        """
        This function ensures that all files are of the same size
        """
        total_resized_images = 0
        invalid_images = 0
        total_processed_images = 0

        for _, file in enumerate(os.listdir(self.clean_data_folder)):
            try:
                abs_file = os.path.join(self.clean_data_folder, file)
                total_resized_images += resize_picture(abs_file)
                total_processed_images += 1
                print(f"Total images processed: {total_processed_images}")
            except Exception:
                invalid_images += 1
        return invalid_images, total_resized_images


In [11]:
"""
This module provides model training capabilities
"""

import datetime

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.utils import to_categorical
from tensorflow.config.experimental import (
    list_physical_devices,
    set_virtual_device_configuration,
    VirtualDeviceConfiguration,
)
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, TensorBoard


class Trainer:
    """
    This class defines training methods the models train here do the
    following
    - Image classification
    - Calculating predicted probabilities
    """

    def __init__(self, labels: list, images: list):
        self.encoder = LabelEncoder()
        self.y_encoded = None
        self.y_onehot = None
        self.labels = labels
        self.images = images
        self.logs_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")

    def encode_categorical(self):
        """
        Encodes the labels using one hot encoding
        """
        self.y_encoded = self.encoder.fit_transform(self.labels)
        self.y_onehot = to_categorical(self.y_encoded)

    def split(self):
        """
        Splits the dataset into training and test components
        """
        x_train, x_test, y_train, y_test = train_test_split(
            self.images, self.y_onehot, test_size=0.2, random_state=42
        )
        return (x_train, x_test, y_train, y_test)

    def build_cnn_model(self):
        """
        Builds the cnn model to use
        """
        gpus = list_physical_devices("GPU")
        datagen = ImageDataGenerator(
            width_shift_range=0.2, height_shift_range=0.2, horizontal_flip=True
        )

        early_stop = EarlyStopping(
            monitor="val_loss",  # Monitor validation loss
            patience=10,  # Wait for 5 epochs before stopping if no improvement
            restore_best_weights=True,
        )
        learning_rate_red = ReduceLROnPlateau(
            monitor="val_loss", patience=5, factor=0.5, min_lr=1e-6, verbose=1
        )
        tensor_board = TensorBoard(log_dir=self.logs_dir, histogram_freq=1)
        # Encode the labels before training
        if gpus:
            try:
                set_virtual_device_configuration(
                    gpus[0], [VirtualDeviceConfiguration(memory_limit=4096)]
                )

                self.encode_categorical()
                model = Sequential(
                    [
                        # Build feature map and activation function and return an activation map.
                        Conv2D(
                            32, (3, 3), activation="relu", input_shape=(150, 150, 3)
                        ),
                        MaxPooling2D((2, 2)),
                        Conv2D(32, (3, 3), activation="relu"),
                        MaxPooling2D((2, 2)),
                        Conv2D(32, (3, 3), activation="relu"),
                        Flatten(),
                        Dense(64, activation="relu"),
                        Dense(64, activation="relu"),
                        Dense(8, activation="softmax"),
                    ]
                )

                # Call the splitter and obtain the x_train and y_train values
                x_train, x_test, y_train, y_test = self.split()
                datagen.fit(x_train)

                model.compile(
                    optimizer="adam",
                    loss="categorical_crossentropy",
                    metrics=["accuracy"],
                )
                model.fit(
                    datagen.flow(x_train, y_train, batch_size=128),
                    epochs=180,
                    validation_data=(x_test, y_test),
                    callbacks=[early_stop, learning_rate_red, tensor_board],
                )
                predictions = model.predict(self.images)
                return predictions
            except RuntimeError as e:
                print(e)


In [None]:
"""
Main entry point for the application
"""

# from src.Preprocessing.cleaner import Cleaner
# from src.Training.trainer import Trainer


class Main:
    """
    Main class fpr the application
    """

    def __init__(self) -> None:
        self.cleaner = Cleaner()
        image_array, label_array = self.cleaner.process_labels(
            self.cleaner.clean_data_folder, self.cleaner.label_file
        )
        self.trainer = Trainer(labels=label_array, images=image_array)

    def run(self) -> None:
        """
        Application entry point
        """
        # rename files and move them to the clean folder
        # self.cleaner.process_images()

        # resize all pictures
        # total_invalid, total_resized = self.cleaner.resize_all_pictures()
        # print(
        #     f"Number of invalid pictures: {total_invalid}\nNumber of resized images{total_resized}"
        # )

        # Train CNN model
        predictions = self.trainer.build_cnn_model()
        print(predictions)


if __name__ == "__main__":
    main = Main()
    main.run()


In [None]:
from google.colab import drive
drive.mount('/content/drive')