1. Image Resizing

In [4]:
import cv2
import os
from glob import glob

def resize_images(input_dir, output_dir, size=(224, 224)):
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    
    for dirpath, _, filenames in os.walk(input_dir):
        for filename in filenames:
            if filename.endswith('.jpg'):
                img_path = os.path.join(dirpath, filename)
                print(f"Processing {img_path}")
                
                img = cv2.imread(img_path)
                if img is not None:
                    # Create the corresponding output path
                    relative_path = os.path.relpath(dirpath, input_dir)
                    output_subdir = os.path.join(output_dir, relative_path)
                    if not os.path.exists(output_subdir):
                        os.makedirs(output_subdir)
                    
                    output_path = os.path.join(output_subdir, filename)
                    resized_img = cv2.resize(img, size)
                    cv2.imwrite(output_path, resized_img)
                    print(f"Saved resized image to {output_path}")
                else:
                    print(f"Error reading image: {img_path}")

# Resize train, validate, and test images
resize_images(r'dataset_initial', r'dataset_resized')



2. Outliers Detection

In [None]:
import matplotlib.pyplot as plt
from PIL import Image, UnidentifiedImageError
import os
import numpy as np

def display_images(image_paths, n=5):
    """
    Display up to 'n' images from the given list of image paths.
    """
    for i in range(min(n, len(image_paths))):
        img = Image.open(image_paths[i])
        plt.imshow(img)
        plt.axis('off')
        plt.show()

def detect_outliers(image_paths, size_threshold=(200, 200), aspect_ratio_range=(0.5, 2.0)):
    """
    Detect outliers based on size and aspect ratio.
    Args:
        image_paths (list): List of image paths.
        size_threshold (tuple): Minimum (width, height) allowed.
        aspect_ratio_range (tuple): Min and max aspect ratio (width/height).
    
    Returns:
        valid_paths (list): List of valid image paths.
        outlier_paths (list): List of outlier image paths.
    """
    valid_paths = []
    outlier_paths = []

    for path in image_paths:
        try:
            with Image.open(path) as img:
                width, height = img.size
                aspect_ratio = width / height

                # Check if the image is within valid size and aspect ratio range
                if (
                    width >= size_threshold[0]
                    and height >= size_threshold[1]
                    and aspect_ratio_range[0] <= aspect_ratio <= aspect_ratio_range[1]
                ):
                    valid_paths.append(path)
                else:
                    outlier_paths.append(path)
        except (UnidentifiedImageError, IOError):
            # Handle corrupted or unopenable images
            outlier_paths.append(path)

    return valid_paths, outlier_paths

def remove_outliers(outlier_paths):
    """
    Remove files listed in outlier_paths.
    """
    for path in outlier_paths:
        try:
            os.remove(path)
            print(f"Removed outlier: {path}")
        except Exception as e:
            print(f"Failed to remove {path}: {e}")

# Collect all image paths
image_paths = []
for subdir, dirs, files in os.walk(r'dataset_resized'):
    for file in files:
        file_path = os.path.join(subdir, file)
        image_paths.append(file_path)

# Detect and remove outliers
valid_paths, outlier_paths = detect_outliers(image_paths)
print(f"Found {len(outlier_paths)} outliers.")

remove_outliers(outlier_paths)

# Display valid images
display_images(valid_paths)


3.  Corrupted Files Detection

In [None]:
from PIL import Image
import os

def is_image_corrupted(file_path):
    try:
        img = Image.open(file_path)
        img.verify()  # Verifies if the image can be opened
        return False
    except (IOError, SyntaxError) as e:
        return True

corrupted_images = []
for subdir, dirs, files in os.walk(r'dataset_resized'):
    for file in files:
        file_path = os.path.join(subdir, file)
        if is_image_corrupted(file_path):
            corrupted_images.append(file_path)

print(f"Found {len(corrupted_images)} corrupted images.")
for img_path in corrupted_images:
    os.remove(img_path)  # Remove the corrupted image

4. Duplicates Removal

In [None]:
import hashlib
import os

def hash_image(image_path):
    with open(image_path, 'rb') as f:
        return hashlib.md5(f.read()).hexdigest()

image_hashes = {}
duplicate_images = []

for subdir, dirs, files in os.walk(r'dataset_resized'):
    for file in files:
        file_path = os.path.join(subdir, file)
        img_hash = hash_image(file_path)
        if img_hash in image_hashes:
            duplicate_images.append(file_path)
        else:
            image_hashes[img_hash] = file_path

print(f"Found {len(duplicate_images)} duplicate images.")
for img_path in duplicate_images:
    os.remove(img_path)  # Remove the duplicate image

5. Data Segmentation

In [None]:
import os
import shutil
import random

# Define your source and destination directories
source_dir = r'dataset_resized'  # Update this path
destination_dir = r'dataset_final'  # Update this path

# Create new train, validate, and test folders
os.makedirs(os.path.join(destination_dir, 'train'), exist_ok=True)
os.makedirs(os.path.join(destination_dir, 'val'), exist_ok=True)
os.makedirs(os.path.join(destination_dir, 'test'), exist_ok=True)

# Define the split ratios
train_ratio = 0.7
validate_ratio = 0.15
test_ratio = 0.15

# Iterate through each disease subfolder
for disease_folder in os.listdir(source_dir):
    disease_path = os.path.join(source_dir, disease_folder)
    
    if os.path.isdir(disease_path):  # Check if it's a folder
        images = os.listdir(disease_path)
        random.shuffle(images)  # Shuffle the images
        
        # Calculate split indices
        total_images = len(images)
        train_count = int(total_images * train_ratio)
        validate_count = int(total_images * validate_ratio)
        
        # Split images
        train_images = images[:train_count]
        validate_images = images[train_count:train_count + validate_count]
        test_images = images[train_count + validate_count:]

        # Create subfolders for each category and copy images
        for category, image_list in zip(['train', 'val', 'test'], [train_images, validate_images, test_images]):
            category_path = os.path.join(destination_dir, category, disease_folder)
            os.makedirs(category_path, exist_ok=True)
            for image in image_list:
                shutil.copy(os.path.join(disease_path, image), category_path)

print("Images have been organized into train, validate, and test folders.")

6. Data Augmentation

In [None]:
import os
import random
import torch
from torchvision import datasets, transforms
from PIL import Image

# Define augmentations to apply
augmentations = transforms.Compose([
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(),
])

# Path to the training dataset
train_data_path = r'dataset_final\train'

# Target number of images per class
target_count = 200

# Loop over each class folder in the training directory
for class_name in os.listdir(train_data_path):
    class_path = os.path.join(train_data_path, class_name)

    if os.path.isdir(class_path):
        # Get list of image files in this class folder
        image_files = [f for f in os.listdir(class_path) if f.endswith(('.png', '.jpg', '.jpeg'))]
        current_count = len(image_files)

        if current_count < target_count:
            print(f"Augmenting class {class_name}: {current_count} -> {target_count}")
            for i in range(target_count - current_count):
                # Randomly select an image to augment
                img_file = random.choice(image_files)
                img_path = os.path.join(class_path, img_file)

                # Open the image and apply augmentations
                img = Image.open(img_path)
                augmented_img = augmentations(img)

                # Save the augmented image
                new_img_name = f"{class_name}_aug_{i}.jpg"
                new_img_path = os.path.join(class_path, new_img_name)
                augmented_img.save(new_img_path)

print("Augmentation complete.")



7. Model Training

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, models, transforms
import copy
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import pandas as pd
import os

# Define the base directory relative to the script location
base_dir = os.path.dirname(os.path.abspath(__file__))

# Construct paths using os.path.join
train_dir = os.path.join(base_dir, 'dataset_final', 'train')
val_dir = os.path.join(base_dir, 'dataset_final', 'val')

# Set device to CPU
device = torch.device("cpu")

# Number of classes in your dataset
num_classes = 43

# Data transformations for training and validation
data_transforms = {
    'train': transforms.Compose([
        transforms.RandomRotation(10),
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

# Load datasets
image_datasets = {
    'train': datasets.ImageFolder(root=train_dir, transform=data_transforms['train']),
    'val': datasets.ImageFolder(root=val_dir, transform=data_transforms['val']),
}

# Create DataLoader
dataloaders = {
    'train': torch.utils.data.DataLoader(image_datasets['train'], batch_size=32, shuffle=True),
    'val': torch.utils.data.DataLoader(image_datasets['val'], batch_size=32, shuffle=False),
}

# Evaluation function
def evaluate_model(model, dataloader, criterion):
    model.eval()
    running_loss = 0.0
    running_corrects = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            _, preds = torch.max(outputs, 1)

            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    test_loss = running_loss / len(dataloader.dataset)
    test_acc = running_corrects.double() / len(dataloader.dataset)

    return test_loss, test_acc, all_preds, all_labels

# Train and evaluate the DenseNet model with early stopping and regularization
def train_and_evaluate(model, num_epochs=10, patience=3):
    model.to(device)
    criterion = nn.CrossEntropyLoss()

    # Add L2 regularization (weight_decay)
    optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9, weight_decay=1e-4)

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    best_loss = float('inf')
    early_stop_counter = 0

    # Store loss and accuracy per epoch
    train_losses, val_losses = [], []
    train_accuracies, val_accuracies = [], []

    for epoch in range(num_epochs):
        print(f"Epoch {epoch + 1}/{num_epochs}")
        print('-' * 10)

        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
            else:
                model.eval()

            running_loss = 0.0
            running_corrects = 0

            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)
                    _, preds = torch.max(outputs, 1)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)

            print(f'{phase.capitalize()} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            if phase == 'train':
                train_losses.append(epoch_loss)
                train_accuracies.append(epoch_acc)
            else:
                val_losses.append(epoch_loss)
                val_accuracies.append(epoch_acc)

            # Early stopping: check for validation loss improvement
            if phase == 'val':
                if epoch_loss < best_loss:
                    best_loss = epoch_loss
                    best_acc = epoch_acc
                    best_model_wts = copy.deepcopy(model.state_dict())
                    early_stop_counter = 0  # Reset counter if improvement
                else:
                    early_stop_counter += 1  # Increment if no improvement

        if early_stop_counter >= patience:
            print("Early stopping triggered!")
            break

    print(f'Best val Acc: {best_acc:.4f}')

    model.load_state_dict(best_model_wts)

    # Define the path
    model_save_path = os.path.join(base_dir, 'DenseNet_model.pth')

    # Create the directory if it doesn't exist
    os.makedirs(os.path.dirname(model_save_path), exist_ok=True)

    # Save the model
    torch.save(model.state_dict(), model_save_path)
    print(f'Model saved to {model_save_path}')

    return model, best_acc, train_losses, val_losses, train_accuracies, val_accuracies

# Initialize the DenseNet model with Dropout
class DenseNetWithDropout(nn.Module):
    def __init__(self, num_classes):
        super(DenseNetWithDropout, self).__init__()
        self.densenet = models.densenet121(weights='DEFAULT')
        self.densenet.classifier = nn.Sequential(
            nn.Dropout(p=0.5),  # Dropout before final layer
            nn.Linear(self.densenet.classifier.in_features, num_classes)
        )

    def forward(self, x):
        return self.densenet(x)

model = DenseNetWithDropout(num_classes)

print("\nTraining DenseNet with early stopping, dropout, and L2 regularization...")
trained_model, accuracy, train_losses, val_losses, train_accuracies, val_accuracies = train_and_evaluate(model)

# Print final evaluation metrics
test_loss, test_acc, preds, labels = evaluate_model(trained_model, dataloaders['val'], nn.CrossEntropyLoss())
print(f'DenseNet Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}')

# Confusion Matrix
cm = confusion_matrix(labels, preds)

cm_df = pd.DataFrame(cm, index=image_datasets['val'].classes, columns=image_datasets['val'].classes)

# Save the confusion matrix to an Excel file
excel_file_path = os.path.join(base_dir, 'confusion_matrix.xlsx')
cm_df.to_excel(excel_file_path)

print(f"Confusion matrix saved to {excel_file_path}")

# Plotting Confusion Matrix
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=image_datasets['val'].classes)
disp.plot(cmap=plt.cm.Blues)
plt.title('Confusion Matrix for DenseNet')
plt.show()

# Plotting Loss and Accuracy
epochs = range(1, len(train_losses) + 1)

plt.figure(figsize=(12, 5))

# Plot Loss
plt.subplot(1, 2, 1)
plt.plot(epochs, train_losses, 'bo-', label='Train Loss')
plt.plot(epochs, val_losses, 'ro-', label='Val Loss')
plt.title('DenseNet Loss per Epoch')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

# Plot Accuracy
plt.subplot(1, 2, 2)
plt.plot(epochs, train_accuracies, 'bo-', label='Train Accuracy')
plt.plot(epochs, val_accuracies, 'ro-', label='Val Accuracy')
plt.title('DenseNet Accuracy per Epoch')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()

plt.show()


7. Model Testing

In [None]:
import os
import torch
import torch.nn as nn
from torchvision import datasets, models, transforms
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
import pandas as pd

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Define the base directory relative to the script location
base_dir = os.path.dirname(os.path.abspath(__file__))

# Define the number of classes in your dataset
num_classes = 43

# Define the data transformations for the test set
test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# Load the test dataset
test_dataset_path = os.path.join(base_dir, "dataset_final", "test")
if not os.path.exists(test_dataset_path):
    raise FileNotFoundError(f"Test dataset path does not exist: {test_dataset_path}")

test_dataset = datasets.ImageFolder(root=test_dataset_path, transform=test_transform)

# Create DataLoader for the test set
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=False)

# Define the DenseNet model with dropout
class DenseNetWithDropout(nn.Module):
    def __init__(self, num_classes):
        super(DenseNetWithDropout, self).__init__()
        self.densenet = models.densenet121(weights='DEFAULT')  # For PyTorch >= 1.12.0
        self.densenet.classifier = nn.Sequential(
            nn.Dropout(p=0.5),  # Dropout before final layer
            nn.Linear(self.densenet.classifier.in_features, num_classes)
        )

    def forward(self, x):
        return self.densenet(x)

# Initialize the model and load the trained weights
model = DenseNetWithDropout(num_classes).to(device)
model_path = os.path.join(base_dir, "DenseNet_model02.pth")
if not os.path.exists(model_path):
    raise FileNotFoundError(f"Model file does not exist: {model_path}")

model.load_state_dict(torch.load(model_path, map_location=device))
model.eval()

# Evaluation function for the test set
def evaluate_model(model, dataloader):
    criterion = nn.CrossEntropyLoss()
    running_loss = 0.0
    running_corrects = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            _, preds = torch.max(outputs, 1)

            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    test_loss = running_loss / len(dataloader.dataset)
    test_acc = running_corrects.double() / len(dataloader.dataset)

    return test_loss, test_acc, all_preds, all_labels

# Run evaluation on the test set
test_loss, test_acc, preds, labels = evaluate_model(model, test_loader)
print(f'Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}')

# Confusion matrix
cm = confusion_matrix(labels, preds)
cm_df = pd.DataFrame(cm, index=test_dataset.classes, columns=test_dataset.classes)

# Save confusion matrix to Excel
excel_file_path = os.path.join(base_dir, "confusion_matrix_test.xlsx")
cm_df.to_excel(excel_file_path)
print(f"Confusion matrix saved to {excel_file_path}")

# Plot confusion matrix
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=test_dataset.classes)
disp.plot(cmap=plt.cm.Blues)
plt.title('Confusion Matrix for Test Set')
plt.show()
