In [None]:
"""
Program Title: train_proposed_model.ipynb
Programmer/s: Idan Josh Bosi

Where the program fits in the general system designs
This program fits within a broader facial skin disease classification system, specifically handling the tasks of data preprocessing, 
feature extraction, and classification. It serves as the model training component, which takes raw image data, 
applies transformations and augmentations, and produces a trained model capable of accurately categorizing skin diseases.

Date Written: October 4, 2024
Date Revised: November 4, 2024

Purpose:
The primary purpose of this code is to preprocess images, extract features, and classify 
the images using a machine learning model optimized for multi-class facial skin disease identification.

Data Structures, Algorithms, and Control:
- Data structures: PyTorch DataLoader, NumPy arrays for data handling
- Algorithms: AlexNet for feature extraction, K-means for segmentation and CLAHE transforms for preprocessing, and XGBoost for classification
- Control: Structured as functions and classes to support modularity and reusability, 
with specific handling for image augmentation and feature extraction.
"""

import torch
import os
import matplotlib.pyplot as plt
import numpy as np
import torchvision.datasets as datasets
from torch.utils.data import DataLoader
import numpy as np
from torch.utils.data import DataLoader
import random
import cv2
from PIL import Image
import torchvision.transforms as transforms

SEED = 123
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed(SEED) 

In [None]:

BATCH_SIZE = 32
INPUT_SIZE = (227, 227)

def count_files_in_directory(directory):
    total_files = 0
    for root, dirs, files in os.walk(directory):
        total_files += len(files)
    return total_files

main_data_dir = r""
train_dir = os.path.join(main_data_dir, "train")
val_dir = os.path.join(main_data_dir, "val")
test_dir = os.path.join(main_data_dir, "test")

train_files = count_files_in_directory(train_dir)
val_files = count_files_in_directory(val_dir)
test_files = count_files_in_directory(test_dir)

print(f"Training Dataset: {train_files}")
print(f"Validation Dataset: {val_files}")
print(f"Test Dataset: {test_files}")

In [None]:

# Load the training dataset to calculate mean and std, and get class labels
train_dataset = datasets.ImageFolder(root=train_dir)
class_n = list(train_dataset.class_to_idx.keys())  # Automatically retrieves class names from folders
print("Class to label mapping:", train_dataset.class_to_idx)

In [None]:
from sklearn.cluster import KMeans
import numpy as np
from PIL import Image

class KMeansSegmentation:
    def __init__(self, n_clusters=3, overlay_alpha=0.5):
        self.n_clusters = n_clusters
        self.overlay_alpha = overlay_alpha

    def __call__(self, img):
        # Convert PIL image to NumPy array
        img_np = np.array(img)

        # Reshape the image to a 2D array of pixels
        h, w, c = img_np.shape
        pixel_values = img_np.reshape((-1, 3))

        # Apply K-Means clustering
        kmeans = KMeans(n_clusters=self.n_clusters, random_state=42)
        kmeans.fit(pixel_values)
        centers = np.uint8(kmeans.cluster_centers_)
        labels = kmeans.labels_

        # Create the segmented image
        segmented_image = centers[labels.flatten()]
        segmented_image = segmented_image.reshape((h, w, c))

        # Blend the segmented regions back onto the original image
        img_segmented = cv2.addWeighted(img_np, 1 - self.overlay_alpha, segmented_image, self.overlay_alpha, 0)

        # Convert back to PIL Image
        return Image.fromarray(img_segmented)


In [None]:
class CLAHETransform:
    def __init__(self, clip_limit=2.0, tile_grid_size=(8, 8)):
        self.clip_limit = clip_limit
        self.tile_grid_size = tile_grid_size

    def __call__(self, img):
        img_np = np.array(img)

        img_lab = cv2.cvtColor(img_np, cv2.COLOR_RGB2LAB)
        l, a, b = cv2.split(img_lab)

        clahe = cv2.createCLAHE(clipLimit=self.clip_limit, tileGridSize=self.tile_grid_size)
        l_clahe = clahe.apply(l)

        img_clahe = cv2.merge((l_clahe, a, b))
        img_clahe = cv2.cvtColor(img_clahe, cv2.COLOR_LAB2RGB)

        return Image.fromarray(img_clahe)

In [None]:
MEAN = (0.5960, 0.4489, 0.4046)
STD = (0.2102, 0.1782, 0.1719)

class CustomRotation:
    def __init__(self, degrees, border_mode=cv2.BORDER_REPLICATE):
        self.degrees = degrees
        self.border_mode = border_mode

    def __call__(self, img):
        img_array = np.array(img)
        h, w = img_array.shape[:2]
        center = (w // 2, h // 2)

        angle = np.random.uniform(-self.degrees, self.degrees)
        rotation_matrix = cv2.getRotationMatrix2D(center, angle, 1.0)

        rotated_img = cv2.warpAffine(
            img_array,
            rotation_matrix,
            (w, h),
            flags=cv2.INTER_LINEAR,
            borderMode=self.border_mode
        )
        return transforms.functional.to_pil_image(rotated_img)


class CustomAffine:
    def __init__(self, degrees, translate, shear, border_mode=cv2.BORDER_REPLICATE):
        self.degrees = degrees
        self.translate = translate
        self.shear = shear
        self.border_mode = border_mode

    def __call__(self, img):

        img_array = np.array(img)
        

        h, w = img_array.shape[:2]
        
        tx = np.random.uniform(-self.translate[0] * w, self.translate[0] * w)
        ty = np.random.uniform(-self.translate[1] * h, self.translate[1] * h)
        shear_x = np.random.uniform(-self.shear, self.shear)
        shear_y = np.random.uniform(-self.shear, self.shear)

        src_pts = np.float32([[0, 0], [w, 0], [0, h]])
        dst_pts = np.float32([
            [tx, ty],
            [w + shear_x, ty],
            [shear_x, h + shear_y]
        ])
        affine_matrix = cv2.getAffineTransform(src_pts, dst_pts)


        affine_img = cv2.warpAffine(
            img_array,
            affine_matrix,
            (w, h),
            flags=cv2.INTER_LINEAR,
            borderMode=self.border_mode
        )
        return transforms.functional.to_pil_image(affine_img)


transform_train = transforms.Compose([
    transforms.Resize(INPUT_SIZE),  
    transforms.RandomHorizontalFlip(p=0.5), 
    transforms.RandomApply([CustomRotation(degrees=20)], p=0.5), 
    transforms.RandomChoice([ 
        transforms.ColorJitter(brightness=0.2, contrast=0.2),
        transforms.GaussianBlur(kernel_size=(5, 9), sigma=(0.1, 5)),
        transforms.RandomGrayscale(p=0.1)
    ]),
    CLAHETransform(clip_limit=2.0, tile_grid_size=(8, 8)), 
    transforms.RandomApply([KMeansSegmentation(n_clusters=12, overlay_alpha=0.5)], p=0.1),  
    transforms.ToTensor(), 
    transforms.Normalize(mean=MEAN, std=STD)  
])


transform_val_test = transforms.Compose([
    transforms.Resize(INPUT_SIZE),
    CLAHETransform(clip_limit=2.0, tile_grid_size=(8, 8)),
    transforms.ToTensor(),
    transforms.Normalize(mean=MEAN, std=STD)
])

# Load datasets
train_dataset = datasets.ImageFolder(root=train_dir, transform=transform_train)
val_dataset = datasets.ImageFolder(root=val_dir, transform=transform_val_test)
test_dataset = datasets.ImageFolder(root=test_dir, transform=transform_val_test)

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

In [None]:
# Get a batch of images and labels from the DataLoader
images, labels = next(iter(train_loader))
print(images.shape)  

In [None]:

# Function to unnormalize the image for visualization
def unnormalize(image, mean, std):
    # Convert the tensor to a NumPy array and transpose dimensions to (H, W, C)
    image = image.numpy().transpose((1, 2, 0))  
    
    # Unnormalize by reversing the mean and std normalization
    image = (image * std) + mean  
    
    # Clip values to be between 0 and 1 for valid image display
    image = np.clip(image, 0, 1)  
    return image


# Visualize a batch of images from the train_loader
def visualize_loader(loader, mean, std, class_names, num_images=6):
    # Get a batch of images
    data_iter = iter(loader)
    images, labels = next(data_iter)  

    # Plot the images
    plt.figure(figsize=(12, 8))
    for i in range(num_images):
        plt.subplot(2, 3, i+1)
        image = unnormalize(images[i], mean, std)  
        plt.imshow(image)
        plt.title(f"Class: {class_names[labels[i]]}")
        plt.axis('off')

    plt.tight_layout()
    plt.show()

# Use this function to visualize a batch of images
visualize_loader(train_loader, mean=MEAN, std=STD, class_names=class_n)

In [None]:
import torch.nn as nn
from torchvision import models
from torchsummary import summary

model = models.alexnet(weights=None)  

model.classifier = nn.Sequential(
    *list(model.classifier.children())[:3]  
)

weight_path = r""  
state_dict = torch.load(weight_path, weights_only=True)

new_state_dict = {}
for k, v in state_dict.items():
    new_key = k.replace("model.", "") if k.startswith("model.") else k
    if new_key in model.state_dict().keys():
        new_state_dict[new_key] = v


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

model.load_state_dict(new_state_dict, strict=False)  

for param in model.parameters():
    param.requires_grad = False

model.eval()
print(model)

input_size = (3, 227, 227)
summary(model, input_size=input_size, device=str(device))

In [None]:
def extract_features(data_loader, model, device):
    features = []
    labels = []
    
    model.to(device)  # Move the model to the GPU
    model.eval()  # Set the model to evaluation mode
    
    with torch.no_grad():  # Disable gradient computation
        for images, target_labels in data_loader:
            images = images.to(device) 
            target_labels = target_labels.to(device)  

            # Extract features using the model
            output_features = model(images)
            
            # Move features back to the CPU and convert to NumPy arrays
            features.append(output_features.cpu().numpy()) 
            labels.append(target_labels.cpu().numpy())
    
    # Concatenate features and labels from all batches
    features = np.vstack(features)
    labels = np.concatenate(labels)
    return features, labels

train_features, y_train = extract_features(train_loader, model, device)
val_features, y_val = extract_features(val_loader, model, device)
test_features, y_test = extract_features(test_loader, model, device)


# Make sure the extracted features are in a format compatible with XGBoost
print("Train features shape:", train_features.shape)
print("Validation features shape:", val_features.shape)
print("Test features shape:", test_features.shape)

In [None]:
import optuna
import plotly
from xgboost import XGBClassifier
import json
import tqdm as notebook_tqdm

def objective(trial):
    params = {
        'learning_rate': trial.suggest_float('learning_rate', 0.005, 0.1, log=True),
        'max_depth': trial.suggest_int('max_depth', 3, 5),
        'n_estimators': trial.suggest_int('n_estimators', 50, 300),
        'min_child_weight': trial.suggest_int('min_child_weight', 10, 20),
        'reg_alpha': trial.suggest_float('reg_alpha', 0.01, 1.0, log=True),
        'reg_lambda': trial.suggest_float('reg_lambda', 0.1, 10.0, log=True),
    }

    xgb_model = XGBClassifier(
        objective='multi:softmax',
        num_class=len(class_n),
        random_state=SEED,
        eval_metric='merror',
        tree_method='auto',
        device='cuda',  
        early_stopping_rounds=20,
        **params,
    )

    xgb_model.fit(
        train_features, y_train,
        eval_set=[(val_features, y_val)],
        verbose=False,
    )

    accuracy = xgb_model.score(val_features, y_val)
    return accuracy

sampler = optuna.samplers.TPESampler(seed=SEED)


study = optuna.create_study(direction='maximize', sampler=sampler)
study.optimize(objective, n_trials=50)

best_params = study.best_params
print("\nBest hyperparameters found:", best_params)

best_value = study.best_value
print("\nBest value found:", best_value)


optimized_xgb_model = XGBClassifier(
    objective='multi:softmax',
    num_class=len(class_n),
    random_state=SEED,
    eval_metric='merror',
    tree_method='auto',
    device='cuda',  
    early_stopping_rounds=20,  
    **best_params,
)

optimized_xgb_model.fit(
    train_features, y_train,
    eval_set=[(val_features, y_val)],
    verbose=True,
)

# Save model and parameters
optimized_xgb_model.save_model('final_xgboost_model.json')
with open('best_hyperparameters.json', 'w') as f:
    json.dump(best_params, f)
print("Model and parameters saved successfully.")

In [None]:
optuna.visualization.plot_param_importances(study)

In [None]:
from sklearn.metrics import accuracy_score

train_predictions = optimized_xgb_model.predict(train_features)
train_accuracy = accuracy_score(y_train, train_predictions)
print(f'Training Accuracy: {train_accuracy * 100:.2f}%')

val_predictions = optimized_xgb_model.predict(val_features)
val_accuracy = accuracy_score(y_val, val_predictions)
print(f'Validation Accuracy: {val_accuracy * 100:.2f}%')

test_predictions = optimized_xgb_model.predict(test_features)
test_accuracy = accuracy_score(y_test, test_predictions)
print(f'Test Accuracy: {test_accuracy * 100:.2f}%')