<a href="https://colab.research.google.com/github/MRameezU/ISIC2017-Unet/blob/main/notebooks/isic_cancer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [6]:
!nvidia-smi

Thu Nov 28 10:48:38 2024       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 560.35.03              Driver Version: 560.35.03      CUDA Version: 12.6     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  Tesla T4                       Off |   00000000:00:04.0 Off |                    0 |
| N/A   42C    P8              9W /   70W |       1MiB /  15360MiB |      0%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+
|   1  Tesla T4                       Off |   00

## 0- Get Setup

In [7]:
# !pip install --upgrade torch
# !pip install --upgrade torchvision

In [8]:
import torch
import torchvision

print(f"torch version: {torch.__version__}")
print(f"torchvision version: {torchvision.__version__}")

torch version: 2.4.0
torchvision version: 0.19.0


device agnostic code

In [9]:
device= "cuda" if torch.cuda.is_available() else "cpu"
device

'cuda'

## 1 - Gettting Data

### 1.0 - Downloading and Reorganizing training data

In [10]:
import requests

import zipfile

from pathlib import Path

from tqdm.notebook import tqdm

# training data

# path to data folder

data_path = Path("data/")
train_data_path = data_path / "train"
binary_mask_data_path = data_path / "binary"
train_zip_url = "https://isic-challenge-data.s3.amazonaws.com/2017/ISIC-2017_Training_Data.zip"
train_zip_file = data_path / "ISIC-2017_Training_Data.zip"

# binary mask

train_binary_zip_url = "https://isic-challenge-data.s3.amazonaws.com/2017/ISIC-2017_Training_Part1_GroundTruth.zip"
train_binary_zip_file = data_path / "ISIC-2017_Training_Part1_GroundTruth.zip"


def download_file(url, dest_path):
    """Downloads a file from a URL to a destination path with progress bar."""
    response = requests.get(url, stream=True)
    response.raise_for_status()  # Raise an error for bad responses
    total_size = int(response.headers.get('content-length', 0))
    with open(dest_path, "wb") as file, tqdm(
        desc=f"Downloading {dest_path.name}",total=total_size,
        unit="B",unit_scale=True,unit_divisor=1024) as bar:
        for chunk in response.iter_content(chunk_size=1024):
            file.write(chunk)
            bar.update(len(chunk))
    print(f"Download Complete: {dest_path}")



def extract_zip(file_path, extract_to):
    """Extracts a zip file to the specified directory."""
    with zipfile.ZipFile(file_path, mode="r") as zip_file:
        print(f"Extracting {file_path.name} to {extract_to}")
        zip_file.extractall(extract_to)
    print(f"Extraction Complete: {extract_to}")

In [None]:
# Main script

if train_data_path.is_dir() and binary_mask_data_path.is_dir():
    print(f"{train_data_path} and {binary_mask_data_path} directories already exist.")
else:
    print(f"Preparing data directories at {data_path}")
    train_data_path.mkdir(parents=True, exist_ok=True)

    # Download training data

    print(f"Downloading Training Data from: {train_zip_url}")
    download_file(train_zip_url, train_zip_file)

    # Extract the zip file
    extract_zip(train_zip_file, train_data_path)
    binary_mask_data_path.mkdir(parents=True,exist_ok=True)
    # Download training data
    print(f"Downloading Binary Mask Data from: {train_binary_zip_url}")
    download_file(train_binary_zip_url, train_binary_zip_file)
    # Extract the zip file
    extract_zip(train_binary_zip_file, binary_mask_data_path)

Preparing data directories at data
Downloading Training Data from: https://isic-challenge-data.s3.amazonaws.com/2017/ISIC-2017_Training_Data.zip


Downloading ISIC-2017_Training_Data.zip:   0%|          | 0.00/5.80G [00:00<?, ?B/s]

the data contain the Images and their respective Superpixel mask and we have to download Binary mask seperately
#### Seperating the Inputs and Ouputs
our train data folder contain both Training Images and SuperPixel mask therfore seperating them into different folders

In [None]:
import os
from pathlib import Path
import shutil

def organize_files(dataset_folder, image_output_folder, superpixel_output_folder):
    """
    Organize files by separating images and superpixel masks into different folders.

    Args:
        dataset_folder (str or Path): Path to the folder containing both images and masks.
        image_output_folder (str or Path): Path to the folder where images will be moved.
        superpixel_output_folder (str or Path): Path to the folder where superpixel masks will be moved
    """
    # Convert paths to Path objects
    dataset_folder = Path(dataset_folder)
    image_output_folder = Path(image_output_folder)
    superpixel_output_folder = Path(superpixel_output_folder)

    # Create output folders if they don't exist
    image_output_folder.mkdir(parents=True, exist_ok=True)
    superpixel_output_folder.mkdir(parents=True, exist_ok=True)
    # Iterate through all files in the dataset folder
    for file in dataset_folder.iterdir():
        if file.is_file():
            if file.name.endswith(".jpg"):
                # Move image file
                shutil.move(str(file), str(image_output_folder / file.name))
            elif file.name.endswith("_superpixels.png"):
                # Move superpixel mask file
                shutil.move(str(file), str(superpixel_output_folder / file.name))
    print(f"Files have been organized. Images moved to {image_output_folder}, masks to {superpixel_output_folder}.")

if __name__ == "__main__":

    dataset_folder=train_data_path / "ISIC-2017_Training_Data"   #"data/train/ISIC-2017_Data"
    image_output_folder="ISIC-2017_Data/train/Images"
    superpixel_output_folder="ISIC-2017_Data/train/Superpixel"

    organize_files(dataset_folder=dataset_folder,
                   image_output_folder=image_output_folder,
                   superpixel_output_folder=superpixel_output_folder)

Moving Binary masks

In [None]:
import shutil

# moving file to a consolidated location
source_dir = binary_mask_data_path / "ISIC-2017_Training_Part1_GroundTruth" #Path("data/train/ISIC-2017_Training_Part1_GroundTruth")
destination_dir = Path("ISIC-2017_Data/train/Binary")
# Create the destination directory if it doesn't exist
destination_dir.mkdir(parents=True, exist_ok=True)
for file in source_dir.iterdir():
  if file.is_file():
    shutil.move(str(file),str(destination_dir/file.name))

#### Deleting the Extras

Deleting the `data_path` folder to save storage

In [None]:
# Deleting our data_path after getting our desizerd ouput to free storage
if data_path.exists() and data_path.is_dir():
    shutil.rmtree(data_path)
    print(f"Folder '{data_path}' and all its subdirectories have been deleted.")
else:
    print(f"Folder '{data_path}' does not exist.")

#### Result

 ISIC-2017_Data This folder contains the dataset for the ISIC 2017 skin cancer segmentation project. The dataset is organized into the following subfolders:

 ## Structure



### Subfolders



1. **Images/**

   - Contains the original images used for training and validation.



2. **Superpixel/**

   - Contains the superpixel masks generated for each image.



3. **Binary/**

   - Contains the binary masks indicating the regions of interest in each image.


### 1.1 - Downloading reorganizing Validation data

In [None]:
# path to data folder
data_path = Path("data/")
validaiton_data_path = data_path / "valid"
binary_mask_data_path = data_path / "binary"
valid_zip_url = "https://isic-challenge-data.s3.amazonaws.com/2017/ISIC-2017_Validation_Data.zip"
valid_zip_file = data_path / "ISIC-2017_Validation_Data.zip"
# binary mask
valid_binary_zip_url = "https://isic-challenge-data.s3.amazonaws.com/2017/ISIC-2017_Validation_Part1_GroundTruth.zip"
valid_binary_zip_file = data_path / "ISIC-2017_Training_Part1_GroundTruth.zip"


In [None]:
if validaiton_data_path.is_dir() and binary_mask_data_path.is_dir():
    print(f"{validaiton_data_path} and {binary_mask_data_path} directories already exist.")

else:
    print(f"Preparing data directories at {data_path}")
    validaiton_data_path.mkdir(parents=True, exist_ok=True)

    # Download training data
    print(f"Downloading validation Data from: {valid_zip_url}")
    download_file(valid_zip_url, valid_zip_file)

    # Extract the zip file
    extract_zip(valid_zip_file, validaiton_data_path)
    binary_mask_data_path.mkdir(parents=True,exist_ok=True)

    # Download training data
    print(f"Downloading Binary Mask Data from: {train_binary_zip_url}")
    download_file(valid_binary_zip_url, valid_binary_zip_file)

    # Extract the zip file
    extract_zip(valid_binary_zip_file, binary_mask_data_path)

In [None]:
dataset_folder=validaiton_data_path / "ISIC-2017_Validation_Data"   #"/content/data/valid/ISIC-2017_Validation_Data"
image_output_folder="ISIC-2017_Data/valid/Images"
superpixel_output_folder="ISIC-2017_Data/valid/Superpixel"
organize_files(dataset_folder=dataset_folder,

                   image_output_folder=image_output_folder,

                   superpixel_output_folder=superpixel_output_folder)

In [None]:
import shutil

# moving file to a consolidated location
source_dir = binary_mask_data_path / "ISIC-2017_Validation_Part1_GroundTruth" #Path("/content/data/binary/ISIC-2017_Validation_Part1_GroundTruth")
destination_dir = Path("ISIC-2017_Data/valid/Binary")

# Create the destination directory if it doesn't exist
destination_dir.mkdir(parents=True, exist_ok=True)
for file in source_dir.iterdir():
  if file.is_file():
    shutil.move(str(file),str(destination_dir/file.name))

In [None]:
# Deleting our data_path after getting our desizerd ouput to free storage
if data_path.exists() and data_path.is_dir():
    shutil.rmtree(data_path)
    print(f"Folder '{data_path}' and all its subdirectories have been deleted.")
else:
    print(f"Folder '{data_path}' does not exist.")

## 2 - Become one with the data (data preparation and exploration)

In [None]:
import os

def walk_through(dir_path:str):
  for dirpath, dirnames, filenames in os.walk(dir_path):
    print(f"There are {len(dirnames)} directories and {len(filenames)} images in '{dirpath}'.")

In [None]:
data_path=Path("ISIC-2017_Data")
walk_through(data_path)

### 2.1 Visualize Images

Let's write some code to:



1. Get all of the image paths using `pathlib.Path.glob()` to find all of the files ending in .jpg.

2. Pick a random image path using Python's `random.choice()`.

3. And since we're working with images, we'll open the random image path using `PIL.Image.open()` (PIL stands for Python Image Library).


In [None]:
import random
from PIL import Image

# creating a list of all the images
image_path_list=list(data_path.glob("**/*.jpg"))
len(image_path_list)

# select a random image path
random_image_path = random.choice(image_path_list)

# open Image

img = Image.open(fp=random_image_path)
print(f"Random Image Path: {random_image_path}")
print(f"Image height: {img.height}")
print(f"Image width: {img.width}")

# Display the image
img

In [None]:
random_images_path_idx=random.sample(population=range(len(image_path_list)),k=3)
random_images_path_idx

In [None]:
# list for inut images
image_path_list=sorted(list(data_path.glob("**/*.jpg")))

# list for binary masks
binary_mask_list=sorted(list(data_path.glob("**/Binary/*.png")))

# list for superpixel mask
superpixel_mask_list=sorted(list(data_path.glob("**/Superpixel/*.png")))
len(binary_mask_list),len(superpixel_mask_list)

In [None]:
import random
from PIL import Image
import matplotlib.pyplot as plt
from typing import List

def plot_random_images(
    image_list: List[str],
    binary_mask_list: List[str],
    superpixel_mask_list: List[str],
    num_samples: int = 3):

    """
    Plots a specified number of random images alongside their binary and superpixel masks.

    Args:
        image_list (List[str]): List of file paths to input images.
        binary_mask_list (List[str]): List of file paths to binary mask images.
        superpixel_mask_list (List[str]): List of file paths to superpixel mask images.
        num_samples (int, optional): Number of random samples to display. Defaults to 3.
    Raises:
        ValueError: If the lengths of `image_list`, `binary_mask_list`, and `superpixel_mask_list` are not equal.
        ValueError: If `num_samples` is greater than the number of available images.

    Example:
        ```python
        plot_random_images(
            image_list=["image1.jpg", "image2.jpg"],
            binary_mask_list=["binary1.png", "binary2.png"],
            superpixel_mask_list=["superpixel1.png", "superpixel2.png"],
            num_samples=2)
        ```
    """
    # Check for consistency in list lengths
    if len(image_list) != len(binary_mask_list) or len(image_list) != len(superpixel_mask_list):
        raise ValueError("All input lists must have the same length.")
    # Ensure the number of samples is valid
    if num_samples > len(image_list):
        raise ValueError("num_samples cannot be greater than the number of images available.")
    # Randomly sample indices
    idx = random.sample(range(len(image_list)), num_samples)
    # Create subplots
    fig, axes = plt.subplots(nrows=num_samples, ncols=3, figsize=(15, 5 * num_samples))
    # Ensure axes is a list of lists, even for single sample
    if num_samples == 1:
        axes = [axes]
    for i, sample_idx in enumerate(idx):
        # Open images and masks
        img = Image.open(image_list[sample_idx])
        superpixel_mask = Image.open(superpixel_mask_list[sample_idx])
        binary_mask = Image.open(binary_mask_list[sample_idx])
        # Plot the original image
        axes[i][0].imshow(img)
        axes[i][0].set_title(f"Image: {sample_idx},Size:{img.size}")
        axes[i][0].axis("off")
        # Plot the binary mask
        axes[i][1].imshow(binary_mask)
        axes[i][1].set_title(f"Binary Mask: {sample_idx},Size:{binary_mask.size}")
        axes[i][1].axis("off")
        # Plot the superpixel mask
        axes[i][2].imshow(superpixel_mask)
        axes[i][2].set_title(f"Superpixel Mask: {sample_idx},Size:{superpixel_mask.size}")
        axes[i][2].axis("off")
    # Adjust layout and display
    plt.tight_layout()
    plt.show()


In [None]:
plot_random_images(image_list=image_path_list,
                   binary_mask_list=binary_mask_list,
                   superpixel_mask_list=superpixel_mask_list,
                   num_samples=3)

### 2.2 - Combining Masks

1. Weighted Average or Overlaying

2. Union

3. Intersection

4. Boundary Enhancement

In [None]:
import cv2

def load_masks(binary_mask_path_list, superpixel_mask_path_list,):
  # Randomly sample indices
  # Check for consistency in list lengths
  if len(binary_mask_path_list) != len(superpixel_mask_path_list):
      raise ValueError("The binary and superpixel mask lists must have the same length.")
  # Randomly select a single index
  idx = random.choice(range(len(binary_mask_path_list)))
  # Load masks as grayscale images
  binary_mask = cv2.imread(binary_mask_path_list[idx], cv2.IMREAD_GRAYSCALE)
  superpixel_mask = cv2.imread(superpixel_mask_path_list[idx], cv2.IMREAD_GRAYSCALE)
  return binary_mask, superpixel_mask

bm, sm = load_masks(
    binary_mask_path_list=binary_mask_list,
    superpixel_mask_path_list=superpixel_mask_list
)

In [None]:
import cv2 
import matplotlib.pyplot as plt
plt.imshow(cv2.cvtColor(bm, cv2.COLOR_BGR2RGB)) 
plt.title('Image') 
plt.axis('off')

In [None]:
plt.imshow(cv2.cvtColor(sm, cv2.COLOR_BGR2RGB)) 
plt.title('Image') 
plt.axis('off')

### 2.3 - Weighted average or Overlaying

In [None]:
import cv2
import numpy as np
from pathlib import Path

def weighted_average_combination(binary_mask,superpixel_mask,alpha=0.5):
  # Normalize masks to range [0, 1]
  binary_mask = binary_mask.astype(np.float32) / 255.0
  superpixel_mask = superpixel_mask.astype(np.float32) / 255.0
  # Compute weighted average
  combined_mask = alpha * binary_mask + (1 - alpha) * superpixel_mask
  # Clip values to [0, 1] and scale back to [0, 255]
  combined_mask = np.clip(combined_mask, 0, 1)
  return (combined_mask * 255).astype(np.uint8)

In [None]:
wac=weighted_average_combination(bm,sm)
plt.imshow(cv2.cvtColor(wac, cv2.COLOR_BGR2RGB)) 
plt.title('Image') 
plt.axis('off')

lets try and overlay the orignal image with each mask and try to better understand our data

overlaying binary mask with orignal image

In [None]:
bm,img=load_masks(
    binary_mask_path_list=binary_mask_list,
    superpixel_mask_path_list=image_path_list,#insted of superpixel_mask_path_list we are send the image paths list
)

In [None]:
plt.imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB)) 
plt.title('Image') 
plt.axis('off')

In [None]:
wac=weighted_average_combination(bm,img)

plt.imshow(cv2.cvtColor(wac, cv2.COLOR_BGR2RGB)) 
plt.title('Image') 
plt.axis('off')

overlaying subinaryperpixel mask with orignal image

In [None]:
img,sm=load_masks(
    binary_mask_path_list=image_path_list,#insted of binary_mask_path_list we are send the image paths list
    superpixel_mask_path_list=superpixel_mask_list)

In [None]:
wac=weighted_average_combination(img,sm)

plt.imshow(cv2.cvtColor(wac, cv2.COLOR_BGR2RGB)) 
plt.title('Image') 
plt.axis('off')

## 3 - Creating our (UNET) model class

In [None]:
from collections import OrderedDict
import torch
import torch.nn as nn

class UNET (nn.Module):

  def __init__(self,in_channels=3,out_channels=1,init_features=32):
    super().__init__()
    features=init_features
    self.encoder1=UNET._block(in_channels,features,name="encoder_1")
    self.pool1=nn.MaxPool2d(kernel_size=2,stride=2)
    self.encoder2=UNET._block(features,features*2,name="encoder_2")
    self.pool2=nn.MaxPool2d(kernel_size=2,stride=2)
    self.encoder3=UNET._block(features*2,features*4,name="encoder_3")
    self.pool3=nn.MaxPool2d(kernel_size=2,stride=2)
    self.encoder4=UNET._block(features*4,features*8,name="encoder_4")
    self.pool4=nn.MaxPool2d(kernel_size=2,stride=2)

    # bottle neck

    self.bottleneck=UNET._block(features*8,features*16,name="bottleneck")

    # Decoer
    self.upconv4=nn.ConvTranspose2d(in_channels=features*16,out_channels=features*8,kernel_size=2,stride=2)
    self.decoder4=UNET._block((features*8)*2,features*8,name="decoder_4")
    self.upconv3=nn.ConvTranspose2d(in_channels=features*8,out_channels=features*4,kernel_size=2,stride=2)
    self.decoder3=UNET._block((features*4)*2,features*4,name="decoder_3")
    self.upconv2=nn.ConvTranspose2d(in_channels=features*4,out_channels=features*2,kernel_size=2,stride=2)
    self.decoder2=UNET._block((features*2)*2,features*2,name="decoder_2")
    self.upconv1=nn.ConvTranspose2d(in_channels=features*2,out_channels=features,kernel_size=2,stride=2)
    self.decoder1=UNET._block(features*2,features,name="decoder_1")

    # output

    self.conv=nn.Conv2d(in_channels=features,out_channels=out_channels,kernel_size=1)

  def forward(self,x):

    # encoder
    encoder_1=self.encoder1(x)
    encoder_2=self.encoder2(self.pool1(encoder_1))
    encoder_3=self.encoder3(self.pool2(encoder_2))
    encoder_4=self.encoder4(self.pool3(encoder_3))

    # bottleneck

    bottleneck=self.bottleneck(self.pool4(encoder_4))

    # decoder

    decoder_4=self.upconv4(bottleneck)
    decoder_4=torch.cat((decoder_4,encoder_4),dim=1)
    decoder_4=self.decoder4(decoder_4)

    decoder_3=self.upconv3(decoder_4)
    decoder_3=torch.cat(tensors=(decoder_3,encoder_3),dim=1)
    decoder_3=self.decoder3(decoder_3)

    decoder_2=self.upconv2(decoder_3)
    decoder_2=torch.cat(tensors=(decoder_2,encoder_2),dim=1)
    decoder_2=self.decoder2(decoder_2)

    decoder_1=self.upconv1(decoder_2)
    decoder_1=torch.cat(tensors=(decoder_1,encoder_1),dim=1)
    decoder_1=self.decoder1(decoder_1)

    # final segmentation map
    return torch.sigmoid(self.conv(decoder_1))

  def _block(in_channels,features,name):
    seq=nn.Sequential(
        OrderedDict([
            (name + "conv1",nn.Conv2d(in_channels=in_channels,out_channels=features,kernel_size=3,padding=1,bias=False)),
            (name + "norm1",nn.BatchNorm2d(num_features=features)),
            (name + "relu1",nn.ReLU(inplace=True)),
            (name + "conv2",nn.Conv2d(in_channels=features,out_channels=features,kernel_size=3,padding=1,bias=False)),
            (name + "norm2",nn.BatchNorm2d(num_features=features)),
            (name + "relu2",nn.ReLU(inplace=True)),
        ])
    )
    return seq

💀 **Error Made**



 *What Happened with the Input Size of (244, 244)?*





>`# creating transform
 transform=transforms.Compose([transforms.Resize(size=(244,244)),
                                      transforms.ToTensor()])`"

                                      

                                      

When the input size was (244, 244), you encountered the error because the spatial dimension was not divisible by 16 (after 4 downsampling operations). Let's break it down:



1. Initial Input: 244x244

2. After 1st MaxPool: 244 / 2 = 122

3. After 2nd MaxPool: 122 / 2 = 61

4. After 3rd MaxPool: 61 / 2 = 30.5 (not an integer)



Since the dimensions after the pooling steps should always be integers, the fact that 244 / 2^4 = 15.25 (i.e., a non-integer) causes the model to fail, especially during upsampling, which uses a ConvTranspose2d (up-conv) layer that requires the output size to match the input size during concatenation."






In [None]:
# # wil result in error

# model_0=UNET()
# dummy_in=torch.rand(size=(1,3,244,244))
# output=model_0(dummy_in)
# print(output)

🟢 **Error Corrected:**



*Why (224, 224) Works:*



> `transform=transforms.Compose([transforms.Resize(size=(224,224)),
  transforms.ToTensor()])`



When you change the input size to (224, 224):



1. Initial Input: 224x224

2. After 1st MaxPool: 224 / 2 = 112

3. After 2nd MaxPool: 112 / 2 = 56

4. After 3rd MaxPool: 56 / 2 = 28

5. After 4th MaxPool: 28 / 2 = 14



Here, 224 is divisible by 16, meaning that after 4 downsampling operations, the spatial dimension (14x14) is compatible for the decoder's upsampling. The size of the feature maps at each stage remains consistent, and the skip connections can concatenate without issues.

In [None]:
# will not result in error
model_0=UNET()
dummy_in=torch.rand(size=(1,3,256,256))

output=model_0(dummy_in)

print(output.shape)

### 3.1 Use `torchinfo` to get an idea of the shapes going through our model

In [None]:
try:
  from torchinfo import summary
except:
  !pip install torchinfo
  from torchinfo import summary

In [None]:
summary(model=model_0,input_size=(1,3,256,256))

## 4 - Custom Dataset

In [None]:
import albumentations as A
from albumentations.pytorch import ToTensorV2
from torch.utils.data import Dataset
from PIL import Image
from torchvision.transforms import ToTensor
from torchvision import transforms
from typing import Tuple
import numpy as np
import cv2
from pathlib import Path



class ISICDataset(Dataset):
  def __init__(self,image_dir_path,binarymasks_dir_path,transform=transforms.ToTensor()):
    self.image_paths=sorted(list(Path(image_dir_path).glob("*.jpg")))
    self.binarymask_paths=sorted(list(Path(binarymasks_dir_path).glob("*.png")))
    self.transform=transform
    assert len(self.image_paths) == len(self.binarymask_paths), (
            "Number of images and binary masks do not match."
        )

  def load_image(self,index:int)->Image.Image:
    image_path=self.image_paths[index]
    image=Image.open(image_path)
    return np.array(image)

  def load_binarymask(self,index:int)->Image.Image:
    binarymask_path=self.binarymask_paths[index]
    mask=Image.open(binarymask_path).convert("1") #load mask as binary image
    return np.array(mask, dtype=np.uint8)  # Convert to NumPy array

  def __len__(self)->int:
    return len(self.image_paths)

  def __getitem__(self, index: int) -> Tuple[torch.Tensor, torch.Tensor]:
    # Load image and mask
    image = self.load_image(index)
    mask = self.load_binarymask(index)

    if self.transform:
        # Apply augmentations using albumentations
        augmented = self.transform(image=image, mask=mask)
        image = augmented['image']
        mask = augmented['mask'].float()
    else:
        # Convert to tensor if no transform is provided
        image = ToTensor()(image)
        mask = torch.tensor(mask, dtype=torch.float32)  # Explicitly convert to PyTorch tensor

    # Ensure the mask is in shape (1, H, W) for loss compatibility
    mask = mask.unsqueeze(0)  # Add a channel dimension

    return image, mask

## 5 - Dataset and Dataloaders

In [None]:
# error transform
# transform=transforms.Compose([transforms.Resize(size=(244,244)),
#                                       transforms.ToTensor()])

# creating right transform
import albumentations as A
from albumentations.pytorch import ToTensorV2

# Define transformation pipeline
transform = A.Compose([
    A.Resize(256, 256),                        # Resize both image and mask
    A.Rotate(limit=35, p=1.0),                 # Rotate image and mask consistently
    A.HorizontalFlip(p=0.5),                   # Flip image and mask horizontally
    A.VerticalFlip(p=0.1),                     # Flip image and mask vertically
    A.Normalize(mean=(0.5,), std=(0.5,)),      # Normalize image (optional)
    ToTensorV2()                               # Convert to PyTorch tensors
])

In [None]:
# creating our training datasets
train_dataset=ISICDataset(image_dir_path="ISIC-2017_Data/train/Images",binarymasks_dir_path="ISIC-2017_Data/train/Binary",transform=transform)
img,mask=train_dataset[0]
print(img.shape,mask.shape)

In [None]:
# creating our validation datasets

valid_dataset=ISICDataset(image_dir_path="ISIC-2017_Data/valid/Images",binarymasks_dir_path="ISIC-2017_Data/valid/Binary",transform=transform)
img,mask=valid_dataset[0]
print("Image shape:", img.shape)
print("Mask shape:", mask.shape)
print("Mask unique values:", torch.unique(mask))

### Visualizing train dataset

In [None]:
import matplotlib.pyplot as plt

# Assuming `sample_img` and `sample_mask` are obtained from the dataset
sample_img, sample_mask = train_dataset[0]

# Convert image and mask to numpy arrays for visualization
image_np = sample_img.permute(1, 2, 0).numpy()  # Change from [C, H, W] to [H, W, C]
mask_np = sample_mask.squeeze(0).numpy()        # Remove channel dimension

# Plot using subplots
fig, axes = plt.subplots(1, 2, figsize=(12, 6))

# Plot the image
axes[0].imshow(image_np, cmap='gray')
axes[0].set_title("Image")
axes[0].axis('off')

# Plot the mask
axes[1].imshow(mask_np, cmap='gray')
axes[1].set_title("Mask")
axes[1].axis('off')

# Show the plots
plt.tight_layout()
plt.show()


In [None]:
import matplotlib.pyplot as plt

# Assuming `sample_img` and `sample_mask` are obtained from the dataset
sample_img, sample_mask = valid_dataset[0]

# Convert image and mask to numpy arrays for visualization
image_np = sample_img.permute(1, 2, 0).numpy()  # Change from [C, H, W] to [H, W, C]
mask_np = sample_mask.squeeze(0).numpy()        # Remove channel dimension

# Plot using subplots
fig, axes = plt.subplots(1, 2, figsize=(12, 6))

# Plot the image
axes[0].imshow(image_np, cmap='gray')
axes[0].set_title("Image")
axes[0].axis('off')

# Plot the mask
axes[1].imshow(mask_np, cmap='gray')
axes[1].set_title("Mask")
axes[1].axis('off')

# Show the plots
plt.tight_layout()
plt.show()


In [None]:
len(train_dataset),len(valid_dataset)

In [None]:
# Creating dataloaders

import os
from torch.utils.data import DataLoader
BATCH_SIZE=8
NUM_WORKERS=os.cpu_count()

train_dataloader=DataLoader(dataset=train_dataset,batch_size=BATCH_SIZE,
                            shuffle=True,num_workers=NUM_WORKERS,
                            pin_memory=True)

valid_dataloader=DataLoader(dataset=valid_dataset,batch_size=32,
                            shuffle=False,num_workers=NUM_WORKERS,
                            pin_memory=True)

train_dataloader,valid_dataloader

In [None]:
# checking our dataloaders
img,mask=next(iter(train_dataloader))
img.shape,mask.shape

In [None]:
# lets see if mask contain only bonary values?
mask

## 6 - Create train and test


### 6.1 - Creating trainig and test step

* `train_step()` - takes in a model and dataloader and trains the model on the dataloader.

* `validation_step()` - takes in a model and dataloader and evaluates the model on the dataloader.

In [None]:
try:
  import torchmetrics
except:
  !pip install torchmetrics
  import torchmetrics

In [None]:
from torch.amp import autocast, GradScaler

# Initialize a GradScaler for scaling the gradients during the backward pass
scaler = GradScaler()

def train_step(model: nn.Module, dataloader: torch.utils.data.DataLoader,
               loss_fn: nn.Module, optimizer: torch.optim.Optimizer,
               metric_fn=None, device=None):
    """
    Perform a training step to calculate the training loss and a chosen metric.

    Args:
        model (nn.Module): The model to train.
        dataloader (torch.utils.data.DataLoader): The training dataloader.
        loss_fn (nn.Module): Loss function to use.
        optimizer (torch.optim.Optimizer): Optimizer for model updates.
        metric_fn (callable, optional): Metric function to evaluate performance. Default is None.
        device (torch.device, optional): Device to perform computation on. Default is None.

    Returns:
        tuple: Training loss and metric value.
    """
    model.train()
    device_type = device.type

    # Initialize gradient scaler for mixed precision
    scaler = torch.cuda.amp.GradScaler()

    # Initialize metrics
    train_loss = 0
    train_metric = 0

    # Iterate through batches
    for img, mask in dataloader:
        img, mask = img.to(device), mask.to(device)

        # Mixed Precision Training - wrap forward pass with autocast
        with autocast(device_type=device_type):
            y_pred = model(img)
            loss = loss_fn(y_pred, mask)

        train_loss += loss.item()

        # Calculate metric if provided
        train_metric += metric_fn(y_pred, mask).item()

        # Zero gradients before backward pass
        optimizer.zero_grad()

        # Backward pass with scaled loss
        scaler.scale(loss).backward()

        # Optimizer step
        scaler.step(optimizer)
        scaler.update()

    # Calculate average loss and metric
    train_loss /= len(dataloader)
    train_metric /= len(dataloader)

    return train_loss, train_metric



def validation_step(model: nn.Module, dataloader: torch.utils.data.DataLoader,
                    loss_fn: nn.Module, metric_fn, device=None):
    """
    Perform a validation step to calculate the validation loss and a chosen metric.

    Args:
        model (nn.Module): The model to evaluate.
        dataloader (torch.utils.data.DataLoader): The validation dataloader.
        loss_fn (nn.Module): Loss function to use.
        metric_fn (callable): Metric function to evaluate performance.
        device (torch.device): Device to perform computation on.

    Returns:
        tuple: Validation loss and metric value.
    """
    # Set model to evaluation mode
    model.eval()

    # Initialize metrics
    val_loss = 0
    val_metric = 0

    # Turn off gradient tracking for inference
    with torch.no_grad():
        for img, mask in dataloader:
            img, mask = img.to(device), mask.to(device)

            # Mixed Precision - apply autocast during inference (if using mixed precision)
            with autocast(device.type):
                y_pred = model(img)
                
                # Calculate loss
                loss = loss_fn(y_pred, mask)
                val_loss += loss.item()

                # # Convert logits to binary predictions if needed (applying sigmoid)
                # preds = torch.sigmoid(y_pred)  # Apply sigmoid only if needed
                # preds = (preds > 0.5).float()  # Convert to binary

                # Calculate metric (e.g., accuracy, dice score, etc.)
                val_metric += metric_fn(y_pred, mask).item()

    # Average over the batches
    val_loss /= len(dataloader)
    val_metric /= len(dataloader)

    return val_loss, val_metric


### 6.2 - Creating `train()` to combine trainig and validation step

In [None]:
from tqdm.auto import tqdm
import time


def train(model:nn.Module,train_dataloader:torch.utils.data.DataLoader,
          test_dataloader:torch.utils.data.DataLoader,
          optimizer:torch.optim.Optimizer,loss_fn:nn.Module,
          metric_fn,epochs,device):
  results={"train_loss":[],
           "train_metrics":[],
           "val_loss":[],
           "val_metrics":[]}

  for epoch in tqdm(range(epochs)):
    start_time=time.time()
    train_loss,train_metrics=train_step(model,dataloader=train_dataloader,
                                    loss_fn=loss_fn,optimizer=optimizer,
                                    metric_fn=metric_fn,device=device)

    val_loss,val_metrics=validation_step(model,dataloader=test_dataloader,
                                     loss_fn=loss_fn,metric_fn=metric_fn,
                                     device=device)

    end_time=time.time()
    time_taken=end_time-start_time
    # Print out what's happening
    print(f"Epoch: {epoch} | Epoch time:{time_taken} s | Train loss: {train_loss:.4f} | Train Metrics: {train_acc:.4f} | val loss: {val_loss:.4f} | val Metrics: {val_acc:.4f}")


    # Update results dictionary
    results["train_loss"].append(train_loss)
    results["train_metrics"].append(train_metrics)
    results["val_loss"].append(val_loss)
    results["val__metrics"].append(val_metrics)

  # Return the filled results at the end of the epochs
  return results


## 7 - Creating Loss and Metrics Class

### Dice Loss Class

In [None]:
class DiceLoss(torch.nn.Module):
    def __init__(self):
        super(DiceLoss, self).__init__()
    
    def forward(self, preds, targets, smooth=1e-6):
        # preds = torch.sigmoid(preds)  # Apply sigmoid to logits
        intersection = (preds * targets).sum()
        dice = (2. * intersection + smooth) / (preds.sum() + targets.sum() + smooth)
        return 1 - dice

### Dice Metrics

In [None]:
class CustomDiceMetric(nn.Module):
    def __init__(self, threshold=0.5, eps=1e-6):
        """
        Custom Dice metric for binary segmentation.
        
        Args:
            threshold (float): Threshold to binarize predictions.
            eps (float): Small epsilon to prevent division by zero.
        """
        super(CustomDiceMetric, self).__init__()
        self.threshold = threshold
        self.eps = eps

    def forward(self, preds, targets):
        """
        Compute Dice coefficient.
        
        Args:
            preds (Tensor): Model predictions (logits, before sigmoid).
            targets (Tensor): Ground truth masks (float).
        
        Returns:
            Tensor: Dice coefficient.
        """
        # Apply sigmoid to predictions (logits -> probabilities)
        # preds = torch.sigmoid(preds)
        
        # Binarize predictions
        preds = (preds > self.threshold).float()

        # Compute Dice coefficient
        intersection = (preds * targets).sum(dim=(1, 2, 3))  # Sum over spatial dimensions
        union = preds.sum(dim=(1, 2, 3)) + targets.sum(dim=(1, 2, 3))

        dice = (2.0 * intersection + self.eps) / (union + self.eps)
        return dice.mean()  # Average over the batch

## 8 - Training our model

In [None]:
import time
import mlflow
import mlflow.pytorch
from torchmetrics import Accuracy
import torch
from tqdm import tqdm

def setup_device_train_and_save(
    model: nn.Module,train_dataloader: torch.utils.data.DataLoader,
    test_dataloader: torch.utils.data.DataLoader,optimizer: torch.optim.Optimizer,
    loss_fn: nn.Module,metric_fn,
    epochs: int,experiment_name: str = "Skin Lesion Segmentation",):
    """
    Integrates MLflow to track the experiment details and log training curves.
    Calls the provided train function and logs metrics to MLflow.
    """
    # Device setup
    if torch.cuda.device_count() > 1:
        print(f"Multiple GPUs detected: {torch.cuda.device_count()} GPUs")
        model = nn.DataParallel(model)
        device = torch.device("cuda:0")
    elif torch.cuda.is_available():
        print("Using a single GPU")
        device = torch.device("cuda:0")
    else:
        print("No GPU detected, using CPU")
        device = torch.device("cpu")

    model = model.to(device)
    metric_fn=metric_fn.to(device)

    # Training loop
    results = train(
        model=model,
        train_dataloader=train_dataloader,
        test_dataloader=test_dataloader,
        optimizer=optimizer,
        loss_fn=loss_fn,
        metric_fn=metric_fn,
        epochs=epochs,
        device=device)
    # saving the model
    torch.save(model.state_dict(), f"{experiment_name}.pth")
    return results


In [None]:
# Setting model Hyperparameters
torch.manual_seed(42)
torch.cuda.manual_seed(42)

EPOCHS=1
LEARNING_RATE=0.001

model_1=UNET(in_channels=3,out_channels=1).to(device)
# loss_fn=torch.nn.BCEWithLogitsLoss()
loss_fn=DiceLoss()
optimizer=torch.optim.Adam(model_1.parameters(),lr=LEARNING_RATE)
metric_fn= CustomDiceMetric(threshold=0.5)

In [None]:
results_1 = setup_device_train_and_save(
    model=model_1,
    train_dataloader=train_dataloader,
    test_dataloader=valid_dataloader,
    optimizer=optimizer,
    loss_fn=loss_fn,
    metric_fn=metric_fn,
    epochs=EPOCHS,
    experiment_name="Skin Lesion Segmentation",
)

## 9 - PLotting Loss Curves

In [None]:
from typing import List,Dict
def plot_loss_curves(results:Dict[str,List[float]]):
    """
    Plots the training and validation loss and accuracy curves from the results dictionary.

    Args:
        results (Dict[str, List[float]]): A dictionary containing the following keys:
            - 'train_loss': List of training loss values.
            - 'val_loss': List of validation loss values.
            - 'train_metrics': List of training accuracy values.
            - 'val_metrics': List of validation accuracy values.
    """
    # Get the loss values of the results dictionary (training and validation)
    train_loss = results["train_loss"]
    val_loss = results["val_loss"]

    # Get the accuracy values of the results dictionary (training and validation)
    train_metrics = results["train_metrics"]
    val_metrics = results["val_metrics"]

    # Determine the number of epochs
    epochs = range(len(val_metrics))

    # Create a figure with two subplots
    fig, ax = plt.subplots(nrows=1, ncols=2, figsize=(15, 7))

    # Plot the training and validation loss
    ax[0].plot(epochs, train_loss, label="Train Loss")
    ax[0].plot(epochs, val_loss, label="Validation Loss")
    ax[0].set_title("Loss")
    ax[0].set_xlabel("Epochs")
    ax[0].set_ylabel("Loss")
    ax[0].legend()

    # Plot the training and validation accuracy
    ax[1].plot(epochs, train_metrics, label="Train Accuracy")
    ax[1].plot(epochs, val_metrics, label="Validation Accuracy")
    ax[1].set_title("Accuracy")
    ax[1].set_xlabel("Epochs")
    ax[1].set_ylabel("Accuracy")
    ax[1].legend()

    # Display the plots
    plt.show()

In [None]:
# plotting the results for out model
plot_loss_curves(results=results_1)

## 7 - Model training and Experiment tracking with MLFLOW

In [None]:
try:
    import mlflow
    import mlflow.pytorch
except:
    !pip install mlflow
    import mlflow
    import mlflow.pytorch

In [None]:
import time
import mlflow
import mlflow.pytorch
from torchmetrics import Accuracy
import torch
from tqdm import tqdm

def setup_device_and_train_with_mlflow(
    model: nn.Module,train_dataloader: torch.utils.data.DataLoader,
    test_dataloader: torch.utils.data.DataLoader,optimizer: torch.optim.Optimizer,
    loss_fn: nn.Module,metric_fn,
    epochs: int,experiment_name: str = "Skin Lesion Segmentation",):
    """
    Integrates MLflow to track the experiment details and log training curves.
    Calls the provided train function and logs metrics to MLflow.
    """
    # Initialize MLflow experiment
    mlflow.set_experiment(experiment_name)

    with mlflow.start_run():  # Start an MLflow run
        # Log model hyperparameters
        mlflow.log_param("model_name", model.__class__.__name__)
        mlflow.log_param("epochs", epochs)
        mlflow.log_param("optimizer", optimizer.__class__.__name__)
        mlflow.log_param("loss_fn", loss_fn.__class__.__name__)

        # Device setup
        if torch.cuda.device_count() > 1:
            print(f"Multiple GPUs detected: {torch.cuda.device_count()} GPUs")
            model = nn.DataParallel(model)
            device = torch.device("cuda:0")
        elif torch.cuda.is_available():
            print("Using a single GPU")
            device = torch.device("cuda:0")
        else:
            print("No GPU detected, using CPU")
            device = torch.device("cpu")

        model = model.to(device)
        metric_fn=metric_fn.to(device)

        # Training loop
        results = train(
            model=model,
            train_dataloader=train_dataloader,
            test_dataloader=test_dataloader,
            optimizer=optimizer,
            loss_fn=loss_fn,
            metric_fn=metric_fn,
            epochs=epochs,
            device=device
        )
        try:

            # Log metrics for each epoch
            for epoch in range(epochs):
                # Log train and validation metrics for each epoch
                mlflow.log_metric("train_loss", results["train_loss"][epoch], step=epoch)
                mlflow.log_metric("train_acc", results["train_acc"][epoch], step=epoch)
                mlflow.log_metric("val_loss", results["val_loss"][epoch], step=epoch)
                mlflow.log_metric("val_acc", results["val_acc"][epoch], step=epoch)
    
            # Log final metrics
            mlflow.log_metric("final_train_loss", results["train_loss"][-1])
            mlflow.log_metric("final_train_acc", results["train_acc"][-1])
            mlflow.log_metric("final_val_loss", results["val_loss"][-1])
            mlflow.log_metric("final_val_acc", results["val_acc"][-1])
    
            # Log the model
            example_input = torch.randn(1, 3, 256, 256)  # Example input matching model's input
            example_input_np = example_input.cpu().numpy()  # Convert to numpy array
            
            # Now log the model with the converted input example
            mlflow.pytorch.log_model(model, "model", input_example=example_input_np)
    
            print("MLflow run completed. Results are logged.")
        except Exception as e:
            print(f"An error occurred: {e}") 

    return results


In [None]:
# Setting model parameter
# Setting model Hyperparameters
torch.manual_seed(42)
torch.cuda.manual_seed(42)

EPOCHS=1
LEARNING_RATE=0.001

model_2=UNET(in_channels=3,out_channels=1).to(device)
# loss_fn=torch.nn.BCEWithLogitsLoss()
loss_fn=DiceLoss()
optimizer=torch.optim.Adam(model_2.parameters(),lr=LEARNING_RATE)
metric_fn= CustomDiceMetric(threshold=0.5)

In [None]:
results_2 = setup_device_and_train_with_mlflow(
    model=model_2,
    train_dataloader=train_dataloader,
    test_dataloader=valid_dataloader,
    optimizer=optimizer,
    loss_fn=loss_fn,
    metric_fn=metric_fn,
    epochs=EPOCHS,
    experiment_name="Skin Lesion Segmentation",
)


## Saving Model

In [None]:
# # Save the model to a file
# torch.save(model_0.state_dict(), f"unet_model_{EPOCHS}.pth")


In [None]:
plot_loss_curves(results_1)

## Compressing our MLflow data for downloading

In [None]:
def compress_folder_to_zip(folder_path: str, destination_folder: str=None, zip_filename: str=None):
    """
    Compresses a folder into a zip file and saves it in the destination folder.

    Args:
        folder_path (str): The path to the folder to compress.
        destination_folder (str): The path to the destination folder where the zip file will be saved.
        zip_filename (str): The name of the zip file (without extension).
    """
    if destination_folder == None and zip_filename==None:
        zip_filename=os.path.basename(folder_path)
        destination_folder=folder_path-zip_filename
    
    
    # Ensure the destination folder exists
    if not os.path.exists(destination_folder):
        os.makedirs(destination_folder)

    # Define the full path for the zip file
    zip_file_path = os.path.join(destination_folder, zip_filename + '.zip')

    # Compress the folder into a zip file
    shutil.make_archive(zip_file_path[:-4], 'zip', folder_path)
    
    print(f"Folder '{folder_path}' has been compressed and saved to '{zip_file_path}'.")

In [None]:
# import wandb
# import torch
# import torch.nn as nn
# from torchmetrics import Accuracy

# def setup_device_and_train_with_wandb(
#     model: nn.Module,
#     train_dataloader: torch.utils.data.DataLoader,
#     valid_dataloader: torch.utils.data.DataLoader,
#     optimizer: torch.optim.Optimizer,
#     loss_fn: nn.Module,
#     acc_fn: Accuracy,
#     epochs: int,
#     experiment_name: str = "Skin Lesion Segmentation",
# ):
#     """
#     Integrates Weights & Biases (W&B) to track the experiment details.
#     """
#     # Initialize W&B experiment
#     wandb.init(project=experiment_name)  # Initialize W&B run

#     # Log hyperparameters
#     wandb.config.model_name = model.__class__.__name__
#     wandb.config.epochs = epochs
#     wandb.config.optimizer = optimizer.__class__.__name__
#     wandb.config.loss_fn = loss_fn.__class__.__name__

#     # Device setup
#     if torch.cuda.device_count() > 1:
#         print(f"Multiple GPUs detected: {torch.cuda.device_count()} GPUs")
#         model = nn.DataParallel(model)
#         device = torch.device("cuda:0")
#     elif torch.cuda.is_available():
#         print("Using a single GPU")
#         device = torch.device("cuda:0")
#     else:
#         print("No GPU detected, using CPU")
#         device = torch.device("cpu")

#     model = model.to(device)

#     # Watch the model and log gradients/parameters
#     wandb.watch(model, log="all")

#     # Training loop
#     results = train(
#         model=model,
#         train_dataloader=train_dataloader,
#         test_dataloader=test_dataloader,
#         optimizer=optimizer,
#         loss_fn=loss_fn,
#         acc_fn=acc_fn,
#         epochs=epochs,
#         device=device,
#     )

#     # Log metrics after training
#     for epoch in range(epochs):
#         wandb.log({
#             "train_loss": results["train_loss"][epoch],
#             "train_acc": results["train_acc"][epoch],
#             "val_loss": results["val_loss"][epoch],
#             "val_acc": results["val_acc"][epoch],
#             "epoch": epoch,
#         })

#     # Log final metrics
#     wandb.log({
#         "final_train_loss": results["train_loss"][-1],
#         "final_train_acc": results["train_acc"][-1],
#         "final_val_loss": results["val_loss"][-1],
#         "final_val_acc": results["val_acc"][-1]
#     })

#     # Save model to W&B
#     wandb.save("model.pt")

#     print("W&B run completed. Results are logged.")

#     return results


In [None]:
# !wandb login

In [None]:
# setup_device_and_train_with_wandb(model=model_0,train_dataloader=train_dataloader,
#                                  valid_dataloader=valid_dataloader,optimizer=optimizer,
#                                  loss_fn=loss_fn,acc_fn=acc_fn,epochs=EPOCHS)