
## **1. Problem Definition**

* **Identify the Task**: Define the vision task—e.g., image classification, object detection, or segmentation.
* **Determine the Goal**: Establish what success looks like—e.g., target accuracy, latency, or generalization.
* **Specify Constraints**: Consider hardware limitations, deployment environment, and dataset availability.
* **Use Case Examples**: Medical imaging, autonomous vehicles, retail analytics, etc.

---

### **2. Data Preparation**

* Dataset loading, preprocessing, augmentation, and train/validation/test splitting.

---

### **3. Choose or Define Model**

* Select pretrained architectures or design a custom model for your task.

---

### **4. Define Loss Function and Optimizer**

* Match loss functions and optimizers to the problem type (classification, detection, etc.).

---

### **5. Train the Model**

* Setup of training loops, optimization steps, and progress monitoring.

---

### **6. Evaluate the Model**

* Test set performance metrics and visualizations.

---

### **7. Save**

* Model serialization, export to deployment-friendly formats, and integration into applications.






## **1. Problem Definition: Retina Blood Vessel Segmentation**

The goal is to develop a deep learning model that segments blood vessels from retinal fundus images using the [Retina Blood Vessel dataset](https://www.kaggle.com/datasets/abdallahwagih/retina-blood-vessel/data). Accurate segmentation of retinal vessels is a critical step in diagnosing and monitoring eye diseases such as diabetic retinopathy, glaucoma, and hypertensive retinopathy.

The dataset provides color fundus images along with corresponding ground truth masks highlighting the vascular structure. This is a pixel-wise binary classification task, where the model must distinguish vessel pixels from the background.

### Key Challenges:

* **Class Imbalance**: Blood vessels cover a small fraction of each image, making it easy for the model to be biased toward predicting background.
* **Fine Structural Detail**: Vessels are thin, branching, and vary in intensity, requiring high-resolution feature extraction and spatial precision.
* **Image Variability**: Differences in illumination, contrast, and noise between samples increase the complexity of generalization.

### Success Criteria:

* High segmentation quality measured by **Dice coefficient**, **IoU**, **Precision**, and **Recall**.
* Robust generalization to unseen data, especially across varying image qualities.
* Efficient inference for potential integration in screening tools or clinical workflows.



#### Tools


In [None]:
!pip install segmentation-models-pytorch --quiet


In [None]:
# Standard Library
import os
import time
import random
from glob import glob
from operator import add
from pathlib import Path


# Third-Party Libraries
import cv2
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt
from sklearn.metrics import (
    accuracy_score,
    f1_score,
    jaccard_score,
    precision_score,
    recall_score
)

# PyTorch and Related Modules
import torch
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import segmentation_models_pytorch as smp




### **2. Data Preparation**


In [None]:
class ImageMaskDataset(Dataset):
    def __init__(self, image_paths, mask_paths):
        self.image_paths = [Path(p) for p in image_paths]
        self.mask_paths  = [Path(p) for p in mask_paths]

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img = self._load_image(self.image_paths[idx])
        msk = self._load_mask(self.mask_paths[idx])
        return img, msk

    def _load_image(self, path: Path):
        arr = cv2.imread(str(path), cv2.IMREAD_COLOR)
        arr = arr.astype(np.float32) / 255.0
        arr = np.transpose(arr, (2, 0, 1))
        return torch.from_numpy(arr)

    def _load_mask(self, path: Path):
        arr = cv2.imread(str(path), cv2.IMREAD_GRAYSCALE)
        arr = arr.astype(np.float32) / 255.0
        arr = np.expand_dims(arr, 0)
        return torch.from_numpy(arr)


In [None]:
# ——— Configuration ———
config = {
    "seed": 42,
    "data_root": Path("/kaggle/input/retina-blood-vessel/Data"),
    "img_size": (512, 512),
    "batch_size": 2,
    "lr": 1e-4,
    "checkpoint_dir": Path("files") / "checkpoint.pth",
    "epochs": 50,
}

In [None]:
def create_dir(path):
    """
    Ensure that a directory exists (creates it if necessary).
    Accepts either a string or Path.
    """
    Path(path).mkdir(parents=True, exist_ok=True)

In [None]:
# ——— Setup ———
create_dir(config["checkpoint_dir"].parent)
checkpoint_path = "files/checkpoint.pth"

In [None]:
# ——— Helpers ———
def get_paths(root: Path, split: str, kind: str):
    return sorted((root / split / kind).glob("*"))

In [None]:
# ——— Data Paths ———
train_x = get_paths(config["data_root"], "train", "image")
train_y = get_paths(config["data_root"], "train", "mask")
valid_x = get_paths(config["data_root"], "test",  "image")
valid_y = get_paths(config["data_root"], "test",  "mask")

In [None]:
print(
    f"Dataset Size:\n"
    f"  Train: {len(train_x)} samples\n"
    f"  Valid: {len(valid_x)} samples\n"
)

In [None]:
# ——— Datasets & Loaders ———
train_dataset = ImageMaskDataset(train_x, train_y)
valid_dataset = ImageMaskDataset(valid_x, valid_y)

train_loader = DataLoader(
    train_dataset,
    batch_size=config["batch_size"],
    shuffle=True,
    num_workers=2,
)
valid_loader = DataLoader(
    valid_dataset,
    batch_size=config["batch_size"],
    shuffle=False,
    num_workers=2,
)


In [None]:
train_dataset[0][0].shape, valid_dataset[0][0].shape

### **3. Choose or Define Model**


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
model = smp.Unet(
    encoder_name="resnet34",
    encoder_weights="imagenet",
    in_channels=3,
    classes=1,           
    activation=None
).to(device)

### **4. Define Loss Function and Optimizer**


In [None]:
bce_loss  = nn.BCEWithLogitsLoss()
dice_loss = smp.losses.DiceLoss(mode="binary")


def loss_fn(preds, targets):
    return bce_loss(preds, targets) + dice_loss(preds, targets)

In [None]:
optimizer = optim.Adam(model.parameters(), lr=config['lr'])
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode="min", patience=3, factor=0.5
)

### **5. Train the Model**


In [None]:
def train_one_epoch(loader):
    model.train()
    running_loss = 0.0
    for images, masks in tqdm(loader, desc="Train"):
        images, masks = images.to(device), masks.to(device)
        preds = model(images)
        loss  = loss_fn(preds, masks)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    return running_loss / len(loader)



def validate(loader):
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for images, masks in tqdm(loader, desc="Validate"):
            images, masks = images.to(device), masks.to(device)
            preds = model(images)
            val_loss += loss_fn(preds, masks).item()
    return val_loss / len(loader)

In [None]:
best_val = float("inf")
for epoch in range(1, config['epochs']):
    train_loss = train_one_epoch(train_loader)
    val_loss   = validate(valid_loader)
    scheduler.step(val_loss)

    print(f"Epoch {epoch:02d} — train: {train_loss:.4f}, val: {val_loss:.4f}")
    if val_loss < best_val:
        best_val = val_loss
        torch.save(model.state_dict(), checkpoint_path)
        print("  → checkpoint saved")

# Inference Example
model.load_state_dict(torch.load(checkpoint_path))
model.eval()
with torch.no_grad():
    img, _ = valid_dataset[0]
    pred = model(img.unsqueeze(0).to(device))
    mask = torch.sigmoid(pred).cpu().squeeze().numpy() > 0.5


In [None]:
valid_dataset[0]

### **6. Evaluate the Model**


In [None]:
# Helpers -------------------------------------------------------------------

def ensure_dir_exists(path: str):
    os.makedirs(path, exist_ok=True)

def tensor_to_numpy_image(tensor: torch.Tensor) -> np.ndarray:
    arr = tensor.cpu().numpy().transpose(1, 2, 0)
    return (arr * 255).astype(np.uint8)

def tensor_to_binary_mask(tensor: torch.Tensor, threshold: float = 0.5) -> np.ndarray:
    arr = tensor.cpu().numpy().squeeze()
    return (arr > threshold).astype(np.uint8)

def expand_mask_to_rgb(mask: np.ndarray) -> np.ndarray:
    return np.stack([mask]*3, axis=-1)


In [None]:
# Metrics -------------------------------------------------------------------

from sklearn.metrics import (
    jaccard_score, f1_score,
    recall_score, precision_score,
    accuracy_score
)

def compute_metrics_for_sample(y_true: torch.Tensor, y_pred: torch.Tensor):
    """
    Returns [jaccard, f1, recall, precision, accuracy] for a single sample.
    """
    y_t = (y_true.cpu().numpy().ravel() > 0.5).astype(np.uint8)
    y_p = (y_pred.cpu().numpy().ravel() > 0.5).astype(np.uint8)

    return [
        jaccard_score(y_t, y_p),
        f1_score(y_t, y_p),
        recall_score(y_t, y_p),
        precision_score(y_t, y_p),
        accuracy_score(y_t, y_p),
    ]

In [None]:
# I/O -----------------------------------------------------------------------

def save_comparison_image(
    orig_img: np.ndarray,
    gt_mask: np.ndarray,
    pred_mask: np.ndarray,
    save_dir: str,
    filename: str,
    img_size: tuple
):
    height, width = img_size
    separator = np.ones((height, 10, 3), dtype=np.uint8) * 128

    gt_rgb   = expand_mask_to_rgb(gt_mask)
    pred_rgb = expand_mask_to_rgb(pred_mask)

    composite = np.concatenate(
        [orig_img, separator, gt_rgb, separator, pred_rgb],
        axis=1
    )
    cv2.imwrite(os.path.join(save_dir, filename), composite)

In [None]:
# Main Evaluation -----------------------------------------------------------

def evaluate_model(
    model: torch.nn.Module,
    data_loader: torch.utils.data.DataLoader,
    device: torch.device,
    results_dir: str,
    img_size: tuple
):
    ensure_dir_exists(results_dir)
    model.to(device).eval()

    total_metrics = np.zeros(5, dtype=float)
    n_samples = len(data_loader.dataset)
    sample_idx = 0

    with torch.no_grad():
        for imgs, masks in tqdm(data_loader, desc="Evaluating", total=len(data_loader)):
            imgs  = imgs.to(device)
            masks = masks.to(device)

            preds = torch.sigmoid(model(imgs))

            for img_t, mask_t, pred_t in zip(imgs, masks, preds):
                metrics = compute_metrics_for_sample(mask_t, pred_t)
                total_metrics += np.array(metrics)

                orig     = tensor_to_numpy_image(img_t)
                gt       = tensor_to_binary_mask(mask_t) * 255
                pr       = tensor_to_binary_mask(pred_t) * 255
                filename = f"sample_{sample_idx:04d}.png"
                save_comparison_image(orig, gt, pr, results_dir, filename, img_size)
                sample_idx += 1

    avg_metrics = total_metrics / n_samples
    jaccard, f1, recall, precision, accuracy = avg_metrics
    
    print(f"Accuracy: {accuracy:.4f}")
    print(f"F1 Score: {f1:.4f}")
    print(f"Recall:   {recall:.4f}")
    print(f"Precision:{precision:.4f}")
    print(f"Jaccard:  {jaccard:.4f}")

# Usage ---------------------------------------------------------------------
model.load_state_dict(torch.load(checkpoint_path))
evaluate_model(model, valid_loader, device, "results", config["img_size"])


In [None]:
model.eval()

# Number of examples to display
num_examples = 10

for idx in range(num_examples):
    #  Get image & mask from your dataset
    img_t, mask_t = valid_dataset[idx]                # img_t: Tensor [3,H,W], mask_t: Tensor [1,H,W]
    
    #  Run the model
    with torch.no_grad():
        pred_t = torch.sigmoid(model(img_t.unsqueeze(0).to(device)))
    pred_t = pred_t.cpu().squeeze(0)                   # [1,H,W]
    
    # Convert to numpy uint8 for plotting
    img_np  = img_t.cpu().numpy().transpose(1,2,0)     # [H,W,3], floats in [0,1]
    img_np  = (img_np * 255).astype(np.uint8)
    img_np = img_np[..., ::-1]
    
    gt_mask = (mask_t.cpu().numpy().squeeze() > 0.5).astype(np.uint8) * 255
    pr_mask = (pred_t.cpu().numpy().squeeze() > 0.5).astype(np.uint8) * 255
    
    # Make 3-channel versions of the masks
    gt_rgb = np.stack([gt_mask]*3, axis=-1)
    pr_rgb = np.stack([pr_mask]*3, axis=-1)
    
    # Build a separator and composite image
    h, w, _ = img_np.shape
    sep = np.ones((h, 10, 3), dtype=np.uint8) * 128
    composite = np.concatenate([img_np, sep, gt_rgb, sep, pr_rgb], axis=1)
    
    # Plot
    plt.figure(figsize=(12, 6))
    plt.axis('off')
    plt.imshow(composite)
    plt.title(f"Sample {idx}: Original | Ground Truth | Prediction")
    plt.show()


### **7. Save the model**

In [None]:
torch.save(model, "/kaggle/working/model_weights.pth")
