# 1. Load the Data

* First connect the drive to the Lab before any processes.
* Load all the available data that we have available to our project.
* It may impede the process and speed but I think reducing the bactch number will help.
* Splitting data into training, validation, and test sets.



In [1]:
import os
import random
import shutil
import numpy as np
import torch
import tensorflow as tf
import torch.nn as nn
import torch.optim as optim
import torchvision
import cv2

In [2]:
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from sklearn.metrics import accuracy_score
from sklearn.ensemble import VotingClassifier
from PIL import Image
from torchvision import datasets, transforms
from torch.utils.data import random_split, DataLoader, Dataset, WeightedRandomSampler
from collections import Counter
from tensorflow import keras
from tensorflow.keras.applications import MobileNetV2
from sklearn.model_selection import train_test_split

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [None]:
root_folders = [
    "/content/drive/MyDrive/CoWorkStuff/Data/01_test_and_val_dataset",
    "/content/drive/MyDrive/CoWorkStuff/Data/02_test_and_val_dataset_improved",
    "/content/drive/MyDrive/CoWorkStuff/Data/03_test_and_val_dataset_resampled",
    "/content/drive/MyDrive/CoWorkStuff/Data/04_test_and_val_dataset_resampled_resized",
    "/content/drive/MyDrive/CoWorkStuff/Data/05_train_dataset",
    "/content/drive/MyDrive/CoWorkStuff/Data/06_train_deduplicated",
    "/content/drive/MyDrive/CoWorkStuff/Data/07_train_deduplicated_dataset",
    "/content/drive/MyDrive/CoWorkStuff/Data/08_train_data",
    "/content/drive/MyDrive/CoWorkStuff/Data/09_train_data_resized",
    "/content/drive/MyDrive/CoWorkStuff/Data/10_train_pal_data",
    "/content/drive/MyDrive/CoWorkStuff/Data/11_test_pal_data",
    "/content/drive/MyDrive/CoWorkStuff/Data/12_val_pal_data"
    ]


# Define a combined folder to store the merged data
combined_folder = "/content/drive/MyDrive/CoWorkStuff/CombinedData"

# Create the combined dataset folder if it doesn't exist
os.makedirs(combined_folder, exist_ok=True)

In [None]:
# Define a dictionary to map subfolder names to class labels
class_mapping = {
    'subfolder1': 'Cercospora',
    'subfolder2': 'Healthy',
    'subfolder3': 'Miner',
    'subfolder4': 'Phoma',
    'subfolder5': 'Rust'
    }

# Move subfolder content into the combined folder
def move_subfolder_content(src, dst):
  for item in os.listdir(src):
    s = os.path.join(src, item)
    d = os.path.join(dst, item)
    if os.path.isdir(s):
      os.makedirs(d, exist_ok=True)
      move_subfolder_content(s, d)
    else:
      shutil.move(s, d)

# Combine data from different root folders into the combined folder
for root_folder in root_folders:
  for subfolder in os.listdir(root_folder):
    subfolder_path = os.path.join(root_folder, subfolder)
    if os.path.isdir(subfolder_path):
      # Get the class label from the dictionary
      class_label = class_mapping.get(subfolder, 'Other')
      # Create a target folder based on the class label
      target_folder = os.path.join(combined_folder, class_label)
      # Create the target folder if it doesn't exist
      os.makedirs(target_folder, exist_ok=True)
      # Move the subfolder content to the target folder
      move_subfolder_content(subfolder_path, target_folder)

In [None]:
# Recursively collect image files from the folder(s)
def collect_image_files(folder):
  image_files = []
  for root, _, files in os.walk(folder):
    for file in files:
      if file.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp', '.gif')):
        image_files.append(os.path.join(root, file))
  return image_files

# Shuffle and split data for each class
def shuffle_and_split_data(root_folder, target_root, train_ratio, val_ratio):
  for class_label in os.listdir(root_folder):
    class_folder = os.path.join(root_folder, class_label)

    if os.path.isdir(class_folder):
      # List all files in the class folder
      class_data = collect_image_files(class_folder)
      random.shuffle(class_data)

      # Split the data into train, validation, and test sets
      num_samples = len(class_data)
      num_train = int(train_ratio * num_samples)
      num_val = int(val_ratio * num_samples)

      train_data = class_data[:num_train]
      val_data = class_data[num_train:num_train + num_val]
      test_data = class_data[num_train + num_val:]

      # Create target folders
      train_folder = os.path.join(target_root, 'train', class_label)
      val_folder = os.path.join(target_root, 'val', class_label)
      test_folder = os.path.join(target_root, 'test', class_label)

      os.makedirs(train_folder, exist_ok=True)
      os.makedirs(val_folder, exist_ok=True)
      os.makedirs(test_folder, exist_ok=True)

      # Move data to respective folders
      move_data(train_data, class_folder, train_folder)
      move_data(val_data, class_folder, val_folder)
      move_data(test_data, class_folder, test_folder)

# Define move_data
def move_data(data_list, source_folder, dest_folder):
  for data in data_list:
    src_path = os.path.join(source_folder, data)
    dst_path = os.path.join(dest_folder, data)
    if os.path.exists(src_path):
      shutil.move(src_path, dst_path)

# Define the split ratios
train_ratio = 0.65
val_ratio = 0.15
test_ratio = 0.20

# Define the target root folder for the train, val, and test sets
target_root = "/content/drive/MyDrive/CoWorkStuff/SplitData"

# Create the train, validation, and test folders
os.makedirs(os.path.join(target_root, 'train'), exist_ok=True)
os.makedirs(os.path.join(target_root, 'val'), exist_ok=True)
os.makedirs(os.path.join(target_root, 'test'), exist_ok=True)

# Shuffle and split data for each class
shuffle_and_split_data(combined_folder, target_root, train_ratio, val_ratio)

#######################################################################################################

# 2. Load the **Models Pre-trained** by our talented model building team

* Select as many models that were created.
* If possible, check the best performing one or possibly fine tunning them.
* Preferrably ensamble the models to make it more solid and stronger.
* Without any particular order or favouritism, for the pre-trained models; credit, thanks and props goes out to the following collaborators:
  - Darshan
  - Lucas
  - Dimitra
  - Juan

In [4]:
# Load .h5 model (TensorFlow/Keras)
model_01 = keras.models.load_model(
    "/content/drive/MyDrive/CoWorkStuff/Models/model_CNN1_BRACOL.h5")

TypeError: weight_decay is not a valid argument, kwargs should be empty  for `optimizer_experimental.Optimizer`.

In [5]:
model_02 = keras.models.load_model(
    "/content/drive/MyDrive/CoWorkStuff/Models/baseline_resnet50.h5")

TypeError: weight_decay is not a valid argument, kwargs should be empty  for `optimizer_experimental.Optimizer`.

In [None]:
model_03 = keras.models.load_model(
    "/content/drive/MyDrive/CoWorkStuff/Models/without_cersc_and_healthy_resnet50_deduplicated_mix_val_train_67acc.h5")

In [None]:
model_04 = keras.models.load_model(
    "/content/drive/MyDrive/CoWorkStuff/Models/without_cersc_and_healthy_resnet50_deduplicated.h5")

In [None]:
model_05 = keras.models.load_model(
    "/content/drive/MyDrive/CoWorkStuff/Models/without_cersc_and_healthy_resnet50.h5")

In [None]:
model_06 = keras.models.load_model(
    "/content/drive/MyDrive/CoWorkStuff/Models/without_cersc_resnet50_deduplicated_mix_val_train_75acc.h5")

In [6]:
# Load .pth model (PyTorch)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
torch.cuda.empty_cache()

class CoffeeLeafClassifier(nn.Module):
  def __init__(self):
    super(CoffeeLeafClassifier, self).__init__()

    # Convolutional layers
    self.conv_layers = nn.Sequential(
        nn.Conv2d(3, 32, kernel_size=3, padding=1),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=2, stride=2),

        nn.Conv2d(32, 64, kernel_size=3),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=2, stride=2),

        nn.Conv2d(64, 128, kernel_size=3),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=2, stride=2),
        )
    # Fully connected layers
    self.fc_layers = nn.Sequential(
        nn.Linear(128 * 30 * 30, 512),
        nn.ReLU(),
        nn.Dropout(0.5),

        nn.Linear(512, 256),
        nn.ReLU(),
        nn.Dropout(0.5),

        nn.Linear(256, 128),
        nn.ReLU(),
        nn.Dropout(0.5),
        nn.Linear(128, 5) # 5 classes
        )

  def forward(self, x):
    x = self.conv_layers(x)
    x = x.view(x.size(0), -1) # Flatten the output
    x = self.fc_layers(x)
    return x

model_path = os.path.join(
    "/content/drive/MyDrive/CoWorkStuff/Models/cnn_strategy1_weighted_loss",
    "/content/drive/MyDrive/CoWorkStuff/Models/cnn_strategy1_weighted_loss/coffee_leaf_classifier.pth")
model_07 = torch.load(model_path, map_location=torch.device('cpu'))
model_07.to(device)
model_07.eval()

cpu


CoffeeLeafClassifier(
  (conv_layers): Sequential(
    (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1))
    (4): ReLU()
    (5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (6): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1))
    (7): ReLU()
    (8): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (fc_layers): Sequential(
    (0): Linear(in_features=115200, out_features=512, bias=True)
    (1): ReLU()
    (2): Dropout(p=0.5, inplace=False)
    (3): Linear(in_features=512, out_features=256, bias=True)
    (4): ReLU()
    (5): Dropout(p=0.5, inplace=False)
    (6): Linear(in_features=256, out_features=128, bias=True)
    (7): ReLU()
    (8): Dropout(p=0.5, inplace=False)
    (9): Linear(in_features=128, out_features=5, bias=True)
  )
)

In [7]:
# Load the MobileNetV2 model with random weights
model_08 = MobileNetV2(weights="imagenet", include_top=False)



In [19]:
import tensorflow as tf
from tensorflow import keras

model_01 = keras.applications.ResNet50(weights=None, include_top=True)  # Replace with the appropriate architecture

# Load the pre-trained weights
model_01.load_weights("C:/Users/Sabunity Ltd/Desktop/task5_models_improvement/Models/baseline_resnet50.h5") 

ValueError: Layer count mismatch when loading weights from file. Model expected 107 layers, found 5 saved layers.

In [16]:
# Load .h5 model (TensorFlow/Keras)
model_01 = keras.models.load_model(
    "C:/Users/Sabunity Ltd/Desktop/task5_models_improvement/Models/baseline_resnet50.h5")

TypeError: weight_decay is not a valid argument, kwargs should be empty  for `optimizer_experimental.Optimizer`.

In [None]:
# Load .h5 model (TensorFlow/Keras)
model_02 = keras.models.load_model(
    "C:/Users/Sabunity Ltd/Desktop/task5_models_improvement/Models/model_CNN1_BRACOL.h5")

In [None]:
# Load .h5 model (TensorFlow/Keras)
model_03 = keras.models.load_model(
    "C:/Users/Sabunity Ltd/Desktop/task5_models_improvement/Models/without_cersc_and_healthy_resnet50.h5")

In [None]:
# Load .h5 model (TensorFlow/Keras)
model_04 = keras.models.load_model(
    "C:/Users/Sabunity Ltd/Desktop/task5_models_improvement/Models/without_cersc_and_healthy_resnet50_deduplicated.h5")

In [None]:
# Load .h5 model (TensorFlow/Keras)
model_05 = keras.models.load_model(
    "C:/Users/Sabunity Ltd/Desktop/task5_models_improvement/Models/without_cersc_and_healthy_resnet50_deduplicated_mix_val_train_67acc.h5")

In [11]:
# Load .h5 model (TensorFlow/Keras)
model_06 = keras.models.load_model(
    "C:/Users/Sabunity Ltd/Desktop/task5_models_improvement/Models/without_cersc_resnet50_deduplicated_mix_val_train_75acc.h5")

TypeError: weight_decay is not a valid argument, kwargs should be empty  for `optimizer_experimental.Optimizer`.

In [8]:
# Load .pth model (PyTorch)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
torch.cuda.empty_cache()

class CoffeeLeafClassifier(nn.Module):
    def __init__(self):
        super(CoffeeLeafClassifier, self).__init__()
        
        # Convolutional layers
        self.conv_layers = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            
            nn.Conv2d(32, 64, kernel_size=3),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            
            nn.Conv2d(64, 128, kernel_size=3),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        
        # Fully connected layers
        self.fc_layers = nn.Sequential(
            nn.Linear(128 * 30 * 30, 512),
            nn.ReLU(),
            nn.Dropout(0.5),
            
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Dropout(0.5),
            
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, 5) # 5 classes
        )

    def forward(self, x):
        x = self.conv_layers(x)
        x = x.view(x.size(0), -1) # Flatten the output
        x = self.fc_layers(x)
        return x

model_path = os.path.join(
    "C:/Users/Sabunity Ltd/Desktop/task5_models_improvement/Models/cnn_strategy1_weighted_loss",
    "C:/Users/Sabunity Ltd/Desktop/task5_models_improvement/Models/cnn_strategy1_weighted_loss/coffee_leaf_classifier.pth")
model_07 = torch.load(model_path, map_location=torch.device('cpu'))
model_07.to(device)
model_07.eval()

cpu


CoffeeLeafClassifier(
  (conv_layers): Sequential(
    (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1))
    (4): ReLU()
    (5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (6): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1))
    (7): ReLU()
    (8): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (fc_layers): Sequential(
    (0): Linear(in_features=115200, out_features=512, bias=True)
    (1): ReLU()
    (2): Dropout(p=0.5, inplace=False)
    (3): Linear(in_features=512, out_features=256, bias=True)
    (4): ReLU()
    (5): Dropout(p=0.5, inplace=False)
    (6): Linear(in_features=256, out_features=128, bias=True)
    (7): ReLU()
    (8): Dropout(p=0.5, inplace=False)
    (9): Linear(in_features=128, out_features=5, bias=True)
  )
)

In [9]:
# Load the MobileNetV2 model with random weights
model_08 = MobileNetV2(weights="imagenet", include_top=False)



#######################################################################################################

# 3. Voting classifier

In [None]:
# path to test data folder
val_data_01 = "/content/drive/MyDrive/CoWorkStuff/SplitData/val"

# lists to store images and their labels
test_images = []
test_labels = []

# list to store bad image paths
bad_file_list = []
bad_count = 0

# list of models
models = [
    ('model_01', model_01),
    ('model_02', model_02),
    ('model_03', model_03),
    ('model_04', model_04),
    ('model_05', model_05),
    ('model_06', model_06),
    ('model_07', model_07),
    ('model_08', model_08)
    ]

# batch size for loading and preprocessing images
batch_size = 32

In [None]:
# Iterate through the subfolders (class labels) the test data folder
for class_label in os.listdir(val_data_01):
  class_folder = os.path.join(val_data_01, class_label)

  # NumPy to efficiently load and preprocess images in batches
  image_paths = [os.path.join(class_folder, image_file) for image_file in os.listdir(class_folder)]
  num_images = len(image_paths)
  num_batches = (num_images + batch_size - 1) // batch_size

  for batch_idx in range(num_batches):
    start_idx = batch_idx * batch_size
    end_idx = min((batch_idx + 1) * batch_size, num_images)
    batch_paths = image_paths[start_idx:end_idx]

    try:
      images = np.array([img_to_array(load_img(image_path, target_size=(224, 224))) for image_path in batch_paths])
      images = images / 255.0
      test_images.extend(images)
      test_labels.extend([class_label] * len(images))

    except Exception as e:
      # Handle bad images by appending their paths to the bad_file_list
      bad_file_list.extend(batch_paths)
      bad_count += len(batch_paths)

In [None]:
# Convert the lists to NumPy arrays
X_test = np.array(test_images)
y_test = np.array(test_labels)

In [None]:
# Create a majority voting classifier
voting_classifier = VotingClassifier(estimators=models, voting='hard')

In [None]:
voting_classifier.fit(X_train, y_train)

In [None]:
# Make predictions using the ensemble model
ensemble_predictions = voting_classifier.predict(X_test)

NotFittedError: ignored

In [None]:
# Now you can evaluate the ensemble model's performance
accuracy = accuracy_score(y_test, ensemble_predictions)

print(f'Ensemble Model Accuracy: {accuracy * 100:.2f}%')
print(f'Number of Bad Images: {bad_count}')
print('Bad Image Paths:')
for bad_path in bad_file_list:
    print(bad_path)