In [1]:
import sys
import os

# Add parent directory (root) to sys.path
sys.path.append(os.path.abspath('C:/Users/Acer nitro/Desktop/retinal-vessel-segmentation'))

In [2]:
import torch
import pandas
from torch.utils.data import Dataset,DataLoader
from torchvision import transforms
import os
from PIL import Image



A module that was compiled using NumPy 1.x cannot be run in
NumPy 2.2.4 as it may crash. To support both 1.x and 2.x
versions of NumPy, modules must be compiled with NumPy 2.0.
Some module may need to rebuild instead e.g. with 'pybind11>=2.12'.

If you are a user of the module, the easiest solution will be to
downgrade to 'numpy<2' or try to upgrade the affected module.
We expect that some modules will need time to support NumPy 2.

Traceback (most recent call last):  File "c:\Users\Acer nitro\anaconda3\envs\my_new_env\lib\runpy.py", line 196, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "c:\Users\Acer nitro\anaconda3\envs\my_new_env\lib\runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "c:\Users\Acer nitro\anaconda3\envs\my_new_env\lib\site-packages\ipykernel_launcher.py", line 18, in <module>
    app.launch_new_instance()
  File "c:\Users\Acer nitro\anaconda3\envs\my_new_env\lib\site-packages\traitlets\config\application.py", line 10

### Custom Dataset

In [3]:
import numpy as np

In [4]:
class DRIVE_dataset(Dataset):
    def __init__(self, img_dir, mask_dir, transforms=None):
        self.img_dir = img_dir
        self.mask_dir = mask_dir
        self.transforms = transforms
        self.images = [img for img in os.listdir(img_dir) if img.endswith('.tif')] # we dont want other filed besides .tif
    
    def __len__(self):
        return len(self.images)

    def __getitem__(self, index):
        image_name = self.images[index]
        image_path = os.path.join(self.img_dir, image_name)

        image_base = image_name.split('_')[0]  # "21" from "21_training.tif"
        mask_name = f"{image_base}_manual1.gif"
        mask_path = os.path.join(self.mask_dir, mask_name)


        try:
            image = Image.open(image_path).convert("RGB")        
            mask = Image.open(mask_path).convert("L")  # Grayscale
        except Exception as e:
            print(f"Error loading {image_name}: {e}")
            raise e

        # Binarize the mask
        mask = np.array(mask)
        mask = (mask > 0).astype(np.uint8) # must be 0 or 1
        mask = Image.fromarray(mask)

        if self.transforms:
            augmented = self.transforms(image=np.array(image), mask=np.array(mask))
            image = augmented['image']
            mask = augmented['mask']

        return image, mask


In [5]:
from torchvision.transforms import ToTensor
train_data = DRIVE_dataset(
    img_dir = "C:/Users/Acer nitro/Desktop/retinal-vessel-segmentation/data/DRIVE/training/images",
    mask_dir = "C:/Users/Acer nitro/Desktop/retinal-vessel-segmentation/data/DRIVE/training/1st_manual",
    transforms = ToTensor(),
    )

test_data = DRIVE_dataset(
    img_dir = "C:/Users/Acer nitro/Desktop/retinal-vessel-segmentation/data/DRIVE/test/images",
    mask_dir= "C:/Users/Acer nitro/Desktop/retinal-vessel-segmentation/data/DRIVE/test/1st_manual",
    transforms = ToTensor(),
    )

In [6]:
import os
BATCH_SIZE = 4
NO_OF_WORKERS = 0

In [7]:
train_dataloader = DataLoader(train_data,
                              batch_size=BATCH_SIZE,
                              num_workers=NO_OF_WORKERS,
                              #pin_memory= True
                              shuffle=True)
test_dataloader = DataLoader(test_data,
                             batch_size=BATCH_SIZE,
                             num_workers=NO_OF_WORKERS,
                             #pin_memory= True, 
                             shuffle=False)

In [8]:
device = "cuda" if torch.cuda.is_available() else "cpu"

In [9]:
import torch
from torch import nn

class DoubleConv(nn.Module):
    def __init__(self,in_channels,out_channels):
        super(DoubleConv,self).__init__()
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels,out_channels,kernel_size=3, padding = 1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace = True),
            nn.Conv2d(out_channels,out_channels,kernel_size=3, padding = 1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace = True)
            )
    def forward(self,x):
        output = self.double_conv(x)
        return output


class Unet(nn.Module):
    def __init__(self,in_channels = 3,out_channels = 1):
        super(Unet,self).__init__()
        self.enc1 = DoubleConv(in_channels = in_channels ,out_channels =64)
        self.enc2 = DoubleConv(in_channels = 64,out_channels = 128)
        self.enc3 = DoubleConv(in_channels =128,out_channels =256)
        self.enc4 = DoubleConv(in_channels = 256,out_channels = 512)

        self.bottle_neck = DoubleConv(in_channels = 512, out_channels= 1024)

        self.pool = nn.MaxPool2d(kernel_size=2)


      
        self.up1 = nn.ConvTranspose2d(1024, 512, kernel_size=2, stride=2)
        self.dec1 = DoubleConv(1024, 512)  # concat with skip connection

        self.up2 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2)
        self.dec2 = DoubleConv(512, 256)  # concat with skip connection

        self.up3 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)
        self.dec3 = DoubleConv(256, 128)  # concat with skip connection

        self.up4 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
        self.dec4 = DoubleConv(128, 64)  # concat with skip connection

        # Final output layer
        self.out_conv = nn.Conv2d(64, out_channels, kernel_size=1)  # For binary segmentation (out_channels = 1)

    def forward(self, x):
        # Encoder
        x1 = self.enc1(x)
        x2 = self.enc2(self.pool(x1))
        x3 = self.enc3(self.pool(x2))
        x4 = self.enc4(self.pool(x3))

        # Bottleneck
        x = self.bottleneck(self.pool(x4))

        # Decoder
        x = self.up1(x)
        x = torch.cat([x, x4], dim=1)
        x = self.dec1(x)

        x = self.up2(x)
        x = torch.cat([x, x3], dim=1)
        x = self.dec2(x)

        x = self.up3(x)
        x = torch.cat([x, x2], dim=1)
        x = self.dec3(x)

        x = self.up4(x)
        x = torch.cat([x, x1], dim=1)
        x = self.dec4(x)

        # Output
        return self.out_conv(x)

In [10]:
model = Unet(in_channels = 3, out_channels = 1).to(device = "cuda" if torch.cuda.is_available() else "cpu")
criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(params=model.parameters(),lr = 0.001, weight_decay = 1e-5)

In [11]:
from utils import dice_score

In [12]:
def train_step(model: torch.nn.Module,
               dataloader : torch.utils.data,
               criterion : torch.nn,
               optimizer : torch.optim
        ):
    model.train()
    
    train_loss ,train_dice = 0,0

    for batch ,(X,y) in enumerate(dataloader):
        X,y = X.to(device) , y.to(device)
        
        #1 forward pass
        y_pred = model(X)
        #2 calculate loss
        loss = criterion(y_pred,y)
        train_loss += loss.item()
        #3 gradient zero
        optimizer.zero_grad()
        #4 backprop
        loss.backward()
        #5 step
        optimizer.step()

        train_dice += dice_score(y_pred,y)
    train_loss /= len(dataloader)
    train_dice /= len(dataloader)

In [13]:
def test_step(model : torch.nn.Module,
              dataloader :  torch.utils.data,
              criterion : torch.nn):
    model.eval()

    test_loss,test_dice = 0, 0

    for batch,(X,y) in enumerate(dataloader):
        X,y = X.to(device),y.to(device)

        y_pred = model(X)

        loss = criterion(y_pred,y)
        test_loss += loss.item()
        test_dice += dice_score(y_pred,y)

    test_loss /= len(dataloader)
    test_dice /= len(dataloader)

    return test_loss,test_dice




In [1]:
from tqdm import tqdm

In [None]:
def train(model: torch.nn.Module, 
          train_dataloader: torch.utils.data.DataLoader, 
          test_dataloader: torch.utils.data.DataLoader, 
          optimizer: torch.optim.Optimizer,
          criterion: torch.nn.Module = nn.BCEWithLogitsLoss(),
          epochs: int = 5):
    
    # Create empty results dictionary to store metrics
    results = {
        "train_loss": [],
        "train_dice": [],
        "test_loss": [],
        "test_dice": []
    }
    
    # Loop through training and testing steps for a number of epochs
    for epoch in tqdm(range(epochs)):
        # Training step
        train_loss, train_dice = train_step(model=model,
                                            dataloader=train_dataloader,
                                            criterion=criterion,
                                            optimizer=optimizer)
        
        # Testing step
        test_loss, test_dice = test_step(model=model,
                                          dataloader=test_dataloader,
                                          criterion=criterion)
        
        # Print the results for the current epoch
        print(
            f"Epoch: {epoch+1} | "
            f"train_loss: {train_loss:.4f} | "
            f"train_dice: {train_dice:.4f} | "
            f"test_loss: {test_loss:.4f} | "
            f"test_dice: {test_dice:.4f}"
        )

        # Update results dictionary
        results["train_loss"].append(train_loss)
        results["train_dice"].append(train_dice)
        results["test_loss"].append(test_loss)
        results["test_dice"].append(test_dice)

    # Return the results dictionary after training
    return results


: 

In [None]:
import torch
import numpy as np
import random
from timeit import default_timer as timer


# Set random seeds for reproducibility
torch.manual_seed(42)
torch.cuda.manual_seed(42)
random.seed(42)
np.random.seed(42)

# Device setup (GPU if available, else CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Set number of epochs
NUM_EPOCHS = 5

# Recreate an instance of Unet model
model_0 = Unet().to(device)

# Setup loss function and optimizer
criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model_0.parameters(), lr=0.001)

# Start the timer
start_time = timer()

# Train model_0
model_0_results = train(model=model_0, 
                        train_dataloader=train_dataloader,
                        test_dataloader=test_dataloader,
                        optimizer=optimizer,
                        criterion=criterion, 
                        epochs=NUM_EPOCHS)

# End the timer and print out how long it took
end_time = timer()
print(f"Total training time: {end_time - start_time:.3f} seconds")
