# Skin Lesion Classification using Deep Learning

If you're using Tinder, all the necessary requirements are already installed in a conda environment.

To activate the environment in the terminal, use the command: ```conda activate env```

## Accessing TensorBoard:

1. Navigate to the TensorBoard logs directory:
    ```cd skin_lesion_classification/logs```

2. Start TensorBoard:
    ```tensorboard --logdir ./ --bind_all```

3. ctrl + click on the TensorBoard link.

Run the cell below if you haven't installed the requirements on your machine yet.

In [54]:
# !pip install -r requirements.txt

## Importing all the libraries

In [55]:

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim.lr_scheduler import StepLR

import pytorch_lightning
import torchvision.models as models
import torchvision.transforms as transforms
from pytorch_lightning.loggers import TensorBoardLogger

from torch.utils.data import DataLoader, Dataset, random_split
from torchvision.datasets import ImageFolder
from torchvision.utils import make_grid

import pytorch_lightning as pl
from pytorch_lightning.callbacks import ModelCheckpoint
from pytorch_lightning.callbacks import EarlyStopping
from efficientnet_pytorch import EfficientNet

from PIL import Image
import pandas as pd
import os

import numpy as np
import random
from sklearn.metrics import roc_curve, auc, confusion_matrix, precision_score, recall_score, f1_score, roc_auc_score
import seaborn as sns
import matplotlib.pyplot as plt
import time
import json


## Set seeds for reproducibility

In [56]:

seed = 42
torch.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)

# Ensure reproducibility for the dataset split
generator = torch.Generator().manual_seed(seed)

def seed_worker(worker_id):
    worker_seed = torch.initial_seed() % 2**32
    np.random.seed(worker_seed)
    random.seed(worker_seed)
    # If you're using CUDA:
    
if torch.cuda.is_available():
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)  # if you are using multi-GPU.
    num_cuda_devices = torch.cuda.device_count()
    print(f"Number of GPUs available: {num_cuda_devices}")
    for i in range(num_cuda_devices):
        print(f"GPU {i}: {torch.cuda.get_device_name(i)}")
        
# Para operações determinísticas no PyTorch
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

Number of GPUs available: 2
GPU 0: NVIDIA GeForce GTX 1080 Ti
GPU 1: NVIDIA GeForce GTX 1080 Ti


# Loading CSV

In [57]:

class CustomDataset(Dataset):
    def __init__(self, indices, csv_file, root_dir_1, root_dir_2, transform):
        self.indices = indices                                           # Indices of the samples to be loaded
        self.root_dir_1 = root_dir_1                                     # Path to the first directory where images are stored.
        self.root_dir_2 = root_dir_2                                     # Path to the second directory where images are stored.
        self.transform = transform                                       # Transformations to be applied to the images.
        self.annotations = pd.read_csv(csv_file).iloc[indices]           # Load the CSV file and extract the relevant rows
        
    def __len__(self):
        return len(self.annotations)                                     # Return the number of samples in the dataset.
    
    def __getitem__(self, idx):
        img_code = self.annotations.iloc[idx, 1]  # Extract the file code from the DataFrame
        img_name = img_code + '.jpg'  # Add the '.jpg' extension
        img_path = None

        # Check if image is in directory 1
        if os.path.exists(os.path.join(self.root_dir_1, img_name)):
            img_path = os.path.join(self.root_dir_1, img_name)

        # Check if image is in directory 2
        elif os.path.exists(os.path.join(self.root_dir_2, img_name)):
            img_path = os.path.join(self.root_dir_2, img_name)
        
        # Print an error message if image is not found in either directory
        if img_path is None:
            print("IDX ",idx )
            print(f"File {img_name} not found in any of the specified directories.")
            return None, None  # Return None for image and label

        # Open the image and convert to RGB if found
        # This operation is included as a precaution to ensure all images are treated consistently
        image = Image.open(img_path).convert('RGB')
        label = torch.tensor(self.annotations.iloc[idx, 2])  # Convert label to tensor

        if self.transform:
            image = self.transform(image)  # Apply transformations
            
        return image, label

# Define data augmentations and transformations
transform_train = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.RandomRotation(20),
    transforms.RandomResizedCrop(299, scale=(0.8, 1.0)),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
    transforms.ToTensor(),
])

transform = transforms.Compose([
    transforms.Resize((299, 299)),
    transforms.ToTensor(),
])

# Load CSV file and define paths
csv_file = '/home/ashiley/HAM10000_metadata_alterado.csv'
data_path_1 = '/home/ashiley/HAM10000_images_part_1'
data_path_2 = '/home/ashiley/HAM10000_images_part_2'

# Load the full dataset once
df = pd.read_csv(csv_file)
total_samples = len(df)
indices = list(range(total_samples))

# Split the dataset into training, validation, and test
train_size = int(0.8 * total_samples)  # 80% for training
val_size = int(0.1 * total_samples)  # 10% for validation
test_size = total_samples - train_size - val_size  # Remaining 10% for testing

# Split the dataset into training, validation, and test sets
train_dataset, val_dataset, test_dataset = random_split(indices, [train_size, val_size, test_size], generator=generator)
    
# just balancing
train_dataset = CustomDataset(train_dataset.indices, csv_file, data_path_1, data_path_2, transform_train)
val_dataset = CustomDataset(val_dataset.indices, csv_file, data_path_1, data_path_2, transform)
test_dataset = CustomDataset(test_dataset.indices, csv_file, data_path_1, data_path_2, transform)

input_channels = 3  # Number of channels in the input images (RGB)
num_classes = 2     # Number of classes in the classification task (malignant or benign)

cuda_available = torch.cuda.is_available()

# Calculate class weights
class_counts = df['dx'].value_counts()
class_weights = 1.0 / torch.tensor(class_counts, dtype=torch.float)
class_weights = class_weights / class_weights.sum()  # Normalize to make the sum of weights equal to 1

if cuda_available:
    num_cuda_devices = torch.cuda.device_count()
    print("CUDA is available and {} CUDA device(s) is(are) available.".format(num_cuda_devices))
else:
    print("CUDA is not available. You are running on CPU.")

# Move a tensor to a specific GPU or CPU
# Always use GPU 0 if available
device = torch.device("cuda:0" if cuda_available else "cpu")

# Move the tensor to the chosen device
class_weights = class_weights.to(device)  # Move to the correct device

# The class with a weight of 0.8021 (Class 1) is more heavily penalized in terms of errors, while the class with a weight of 0.1979 (Class 0) has a lower penalty.
print("Class counts:", class_counts)
print("Class weights (before normalization):", 1.0 / torch.tensor(class_counts, dtype=torch.float))
print("Class weights (after normalization):", class_weights)

# Create DataLoaders for the training, validation, and test sets
train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True, worker_init_fn=seed_worker, generator=generator)
val_dataloader = DataLoader(val_dataset, batch_size=64, shuffle=False, worker_init_fn=seed_worker, generator=generator)
test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=False, worker_init_fn=seed_worker, generator=generator)

CUDA is available and 2 CUDA device(s) is(are) available.
Class counts: dx
0    7919
1    1954
Name: count, dtype: int64
Class weights (before normalization): tensor([0.0001, 0.0005])
Class weights (after normalization): tensor([0.1979, 0.8021], device='cuda:0')


# Defining Generic Classifier

In [58]:
class GenericClassifier(pl.LightningModule):
    def __init__(self, model_name, num_classes, learning_rate, class_weights):
        super(GenericClassifier, self).__init__()
        
        # Dictionary to map model names to their creation functions
        model_dict = {
            'vgg': models.vgg16,
            'resnet': models.resnet18,
            'alexnet': models.alexnet,
            'efficientnet': EfficientNet.from_pretrained,
            'inception': models.inception_v3
        }
        
        # Select model
        if model_name not in model_dict:
            raise ValueError(f"Model {model_name} is not supported. Choose from {list(model_dict.keys())}.")
        
        if model_name == 'efficientnet':
            self.model = model_dict[model_name]('efficientnet-b0', num_classes=num_classes)
            
            for name, param in self.model.named_parameters():
                if '_fc' not in name:  
                    param.requires_grad = False
        else:
            self.model = model_dict[model_name](pretrained=True)
            if model_name == 'vgg':
                for param in self.model.parameters():
                    param.requires_grad = False
                for param in self.model.classifier[6].parameters():
                    param.requires_grad = True
                self.model.classifier[6] = nn.Linear(self.model.classifier[6].in_features, num_classes)
            
            elif model_name == 'resnet':
                for param in self.model.parameters():
                    param.requires_grad = False
                for param in self.model.fc.parameters():
                    param.requires_grad = True
                self.model.fc = nn.Linear(self.model.fc.in_features, num_classes)
            
            elif model_name == 'alexnet':
                for param in self.model.parameters():
                    param.requires_grad = False
                for param in self.model.classifier[6].parameters():
                    param.requires_grad = True
                self.model.classifier[6] = nn.Linear(self.model.classifier[6].in_features, num_classes)
            
            elif model_name == 'inception':
                for name, param in self.model.named_parameters():
                    if "fc" not in name:  
                        param.requires_grad = False
                
                in_features = self.model.fc.in_features
                self.model.fc = nn.Linear(in_features, num_classes)
        
        self.learning_rate = learning_rate
        self.class_weights = class_weights
        self.val_preds = []
        self.val_true = []
        self.test_preds = []
        self.test_true = []
        
    def forward(self, x):
        if isinstance(self.model, models.Inception3):
            x = self.model(x)
            return x.logits if hasattr(x, 'logits') else x  # Use the main output for Inception
        else:
            return self.model(x)

        
    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.learning_rate)
        scheduler = StepLR(optimizer, step_size=5, gamma=0.1)
        return {
            'optimizer': optimizer,
            'lr_scheduler': scheduler,
            'monitor': 'val_loss'
        }

    def training_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.cross_entropy(logits, y, weight=self.class_weights)
        self.log('train_loss', loss, on_step=True, on_epoch=True, prog_bar=True, logger=True)
        return loss

    def validation_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.cross_entropy(logits, y, weight=self.class_weights)
        preds = torch.argmax(logits, dim=1)
        acc = torch.sum(preds == y).item() / len(y)
        self.val_probs.extend(torch.softmax(logits, dim=1).detach().cpu().numpy())
        self.val_preds.extend(preds.detach().cpu().numpy())
        self.val_true.extend(y.detach().cpu().numpy())
        self.log('val_loss', loss, on_step=False, on_epoch=True, prog_bar=True, logger=True)
        self.log('val_acc', acc, on_step=False, on_epoch=True, prog_bar=True, logger=True)
        return loss

    def test_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.cross_entropy(logits, y, weight=self.class_weights)
        preds = torch.argmax(logits, dim=1)
        acc = torch.sum(preds == y).item() / len(y)
        self.test_probs.extend(torch.softmax(logits, dim=1).detach().cpu().numpy())
        self.test_preds.extend(preds.detach().cpu().numpy())
        self.test_true.extend(y.detach().cpu().numpy())
        self.log('test_loss', loss, on_step=False, on_epoch=True, prog_bar=True, logger=True)
        self.log('test_acc', acc, on_step=False, on_epoch=True, prog_bar=True, logger=True)
        
        # Printing the accuracy
        print(f"Test Accuracy: {acc:.6f}")
        return loss, acc

    def on_validation_epoch_start(self):
        self.val_preds = []
        self.val_probs = []
        self.val_true = []

    def on_test_epoch_start(self):
        self.test_preds = []
        self.test_probs = []
        self.test_true = []

# Save metrics functions

In [59]:
def save_metrics(val_true, val_preds, test_true, test_preds, val_probs, test_probs, experiment_name, time_taken):
    output_dir = os.path.join('balanced_results', experiment_name)
    os.makedirs(output_dir, exist_ok=True)
    
    val_confusion = confusion_matrix(val_true, val_preds)
    test_confusion = confusion_matrix(test_true, test_preds)
    
    val_precision = precision_score(val_true, val_preds, average='macro')
    test_precision = precision_score(test_true, test_preds, average='macro')
    
    val_recall = recall_score(val_true, val_preds, average='macro')
    test_recall = recall_score(test_true, test_preds, average='macro')
    
    val_f1 = f1_score(val_true, val_preds, average='macro')
    test_f1 = f1_score(test_true, test_preds, average='macro')
    
    print("Validation Confusion Matrix:\n", val_confusion)
    print("Test Confusion Matrix:\n", test_confusion)
    print("Validation Precision: ", val_precision)
    print("Test Precision: ", test_precision)
    print("Validation Recall: ", val_recall)
    print("Test Recall: ", test_recall)
    print("Validation F1-Score: ", val_f1)
    print("Test F1-Score: ", test_f1)
    
    metrics = {
        "val_precision": val_precision,
        "test_precision": test_precision,
        "val_recall": val_recall,
        "test_recall": test_recall,
        "val_f1": val_f1,
        "test_f1": test_f1,
        "time_taken": time_taken
    }
    
    with open(os.path.join(output_dir, 'metrics.json'), 'w') as f:
        json.dump(metrics, f, indent=4)
    
    fig, axes = plt.subplots(1, 2, figsize=(15, 5))
    
    sns.heatmap(val_confusion, annot=True, fmt='d', cmap='Blues', ax=axes[0])
    axes[0].set_title('Validation Confusion Matrix')
    axes[0].set_xlabel('Predicted')
    axes[0].set_ylabel('True')
    
    sns.heatmap(test_confusion, annot=True, fmt='d', cmap='Blues', ax=axes[1])
    axes[1].set_title('Test Confusion Matrix')
    axes[1].set_xlabel('Predicted')
    axes[1].set_ylabel('True')
    
    confusion_matrices_path = os.path.join(output_dir, 'confusion_matrices.png')
    plt.savefig(confusion_matrices_path)
    plt.close()
    
    plt.figure(figsize=(10, 5))
    sns.barplot(data=pd.DataFrame({
        'Precision': [val_precision, test_precision],
        'Recall': [val_recall, test_recall],
        'F1-Score': [val_f1, test_f1]
    }, index=['Validation', 'Test']))
    plt.title('Metrics Comparison')
    plt.ylabel('Score')
    
    metrics_comparison_path = os.path.join(output_dir, 'metrics_comparison.png')
    plt.savefig(metrics_comparison_path)
    plt.close()
    
    np.savetxt(os.path.join(output_dir, 'val_probs.csv'), np.array(val_probs), delimiter=',')
    np.savetxt(os.path.join(output_dir, 'val_true.csv'), np.array(val_true), delimiter=',')
    np.savetxt(os.path.join(output_dir, 'test_probs.csv'), np.array(test_probs), delimiter=',')
    np.savetxt(os.path.join(output_dir, 'test_true.csv'), np.array(test_true), delimiter=',')

In [60]:
def plot_auc_roc_curves(experiment_names, title):
    plt.figure(figsize=(10, 8))
    
    for experiment_name in experiment_names:
        output_dir = os.path.join('results', experiment_name)
        
        # Load probabilities and true labels
        val_probs = np.loadtxt(os.path.join(output_dir, 'val_probs.csv'), delimiter=',')
        val_true = np.loadtxt(os.path.join(output_dir, 'val_true.csv'), delimiter=',')
        test_probs = np.loadtxt(os.path.join(output_dir, 'test_probs.csv'), delimiter=',')
        test_true = np.loadtxt(os.path.join(output_dir, 'test_true.csv'), delimiter=',')
        
        # Compute ROC curve and AUC
        fpr, tpr, _ = roc_curve(test_true, test_probs[:, 1])  # Assuming binary classification
        roc_auc = auc(fpr, tpr)
        
        plt.plot(fpr, tpr, label=f'{experiment_name} (AUC = {roc_auc:.2f})')
    
    plt.plot([0, 1], [0, 1], 'k--')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('ROC Curve for Different Models')
    plt.legend(loc='lower right')
    plt.savefig(f'results/{title}_roc_curve_comparison.png')
    plt.show()

# Train Model

In [61]:
def train_model(model_name, num_classes, experiment_name, learning_rate):
    
    start = time.time()
    
    model = GenericClassifier(model_name=model_name, num_classes=num_classes, learning_rate=learning_rate, class_weights=class_weights)
    
    early_stop_callback = EarlyStopping(monitor='val_acc', patience=3, mode='max')
    checkpoint_callback = ModelCheckpoint(monitor='val_acc', mode='max')
    
    trainer = pl.Trainer(
        max_epochs=10, 
        accelerator='auto',  # Use 'auto' to let Lightning handle device selection
        logger=TensorBoardLogger("logs", name=experiment_name),
        callbacks=[checkpoint_callback, early_stop_callback]
    )
    
    trainer.fit(model=model, train_dataloaders=train_dataloader, val_dataloaders=val_dataloader)
    
    trainer.test(model, test_dataloader)
    
    end = time.time()
    
    time_taken = end - start
    
    print(f"Time taken: {time_taken} seconds.")
    
    save_metrics(
        model.val_true, model.val_preds, 
        model.test_true, model.test_preds, 
        model.val_probs, model.test_probs, 
        experiment_name,
        time_taken    
    )