<a href="https://colab.research.google.com/github/AchyuthNamburi/webDev/blob/main/colorectal_cancer_detection_basic_UnetModel.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Step 1: Install required packages
!pip install opencv-python albumentations scikit-image kaggle --quiet

# Step 2: Upload kaggle.json API token file using the Colab file upload UI or from your local machine
from google.colab import files
files.upload()  # upload your kaggle.json here

# Step 3: Set up Kaggle API credentials
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

# Step 4: Download and unzip Kvasir-SEG dataset
!kaggle datasets download debeshjha1/kvasirseg -p ./kvasir --unzip

# Step 5: Import necessary libraries
import cv2
import numpy as np
import os
from glob import glob
import albumentations as A
from skimage import color
from tqdm import tqdm

# Step 6: Define CLAHE on L channel in LAB color space
def clahe_lab(img):
    lab = color.rgb2lab(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
    l, a, b = lab[:,:,0], lab[:,:,1], lab[:,:,2]
    l = (l * 255.0 / l.max()).astype(np.uint8)
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
    l_clahe = clahe.apply(l)
    lab[:,:,0] = l_clahe * (l.max() / 255.0)
    rgb = color.lab2rgb(lab)
    output = (rgb * 255).astype(np.uint8)
    return cv2.cvtColor(output, cv2.COLOR_RGB2BGR)

# Step 7: Set directory paths (adjust if needed)
IMAGE_DIR = "./kvasir/Kvasir-SEG/images/"
MASK_DIR = "./kvasir/Kvasir-SEG/masks/"
SAVE_IMAGE_DIR = "./kvasir/processed_images/"
SAVE_MASK_DIR = "./kvasir/preprocessed_masks/"

os.makedirs(SAVE_IMAGE_DIR, exist_ok=True)
os.makedirs(SAVE_MASK_DIR, exist_ok=True)

# Step 8: Define augmentations
augment = A.Compose([
    A.HorizontalFlip(p=0.5),
    A.VerticalFlip(p=0.5),
    A.Rotate(limit=30, p=0.5),
    A.RandomCrop(width=224, height=224, p=0.5),
    A.CoarseDropout(max_holes=8, max_height=32, max_width=32, p=0.5),
    A.Resize(256, 256),
])

# Step 9: Preprocessing loop
image_files = glob(os.path.join(IMAGE_DIR, "*.jpg")) + glob(os.path.join(IMAGE_DIR, "*.png"))

for img_path in tqdm(image_files):
    img = cv2.imread(img_path)
    mask_path = os.path.join(MASK_DIR, os.path.basename(img_path))
    mask = cv2.imread(mask_path, 0)  # grayscale mask

    img_enhanced = clahe_lab(img)
    augmented = augment(image=img_enhanced, mask=mask)

    img_aug = augmented['image']
    mask_aug = augmented['mask']

    mask_bin = np.where(mask_aug > 127, 1, 0).astype(np.uint8)

    cv2.imwrite(os.path.join(SAVE_IMAGE_DIR, os.path.basename(img_path)), img_aug)
    cv2.imwrite(os.path.join(SAVE_MASK_DIR, os.path.basename(img_path)), mask_bin * 255)

print("Preprocessing Done. Images and masks saved in processed folders.")


Saving kaggle.json to kaggle.json
Dataset URL: https://www.kaggle.com/datasets/debeshjha1/kvasirseg
License(s): copyright-authors
Downloading kvasirseg.zip to ./kvasir
 93% 134M/144M [00:00<00:00, 1.39GB/s]
100% 144M/144M [00:00<00:00, 1.31GB/s]


  A.CoarseDropout(max_holes=8, max_height=32, max_width=32, p=0.5),
0it [00:00, ?it/s]

Preprocessing Done. Images and masks saved in processed folders.





In [None]:
import cv2
import matplotlib.pyplot as plt
import random
import os

# Set directories to your processed folders
SAVE_IMAGE_DIR = "./kvasir/processed_images/"
SAVE_MASK_DIR = "./kvasir/preprocessed_masks/"

# List image and mask files
image_files = os.listdir(SAVE_IMAGE_DIR)
mask_files = os.listdir(SAVE_MASK_DIR)

# Pick a random file to display
idx = random.randint(0, len(image_files) - 1)
img_path = os.path.join(SAVE_IMAGE_DIR, image_files[idx])
mask_path = os.path.join(SAVE_MASK_DIR, mask_files[idx])

# Read images (OpenCV loads in BGR, convert to RGB for display)
image = cv2.cvtColor(cv2.imread(img_path), cv2.COLOR_BGR2RGB)
mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)

# Plot side by side
plt.figure(figsize=(10, 5))
plt.subplot(1, 2, 1)
plt.title("Processed Image")
plt.axis('off')
plt.imshow(image)

plt.subplot(1, 2, 2)
plt.title("Processed Mask")
plt.axis('off')
plt.imshow(mask, cmap='gray')

plt.show()


ValueError: empty range in randrange(0, 0)

In [None]:
import os
print("Files in processed_images:", os.listdir("./kvasir/processed_images/"))
print("Files in preprocessed_masks:", os.listdir("./kvasir/preprocessed_masks/"))


In [None]:
cv2.imwrite(os.path.join(SAVE_IMAGE_DIR, os.path.basename(img_path)), img_aug)
cv2.imwrite(os.path.join(SAVE_MASK_DIR, os.path.basename(img_path)), mask_bin * 255)


In [None]:
import cv2
import os
from glob import glob
import albumentations as A
from skimage import color
from tqdm import tqdm

IMAGE_DIR = "./kvasir/Kvasir-SEG//Kvasir-SEG/images/"
MASK_DIR = "./kvasir/Kvasir-SEG//Kvasir-SEG/masks/"
SAVE_IMAGE_DIR = "./kvasir/processed_images/"
SAVE_MASK_DIR = "./kvasir/preprocessed_masks/"
os.makedirs(SAVE_IMAGE_DIR, exist_ok=True)
os.makedirs(SAVE_MASK_DIR, exist_ok=True)

def clahe_lab(img):
    lab = color.rgb2lab(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
    l = (lab[:,:,0] * 255.0 / lab[:,:,0].max()).astype(np.uint8)
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
    lab[:,:,0] = clahe.apply(l) * (lab[:,:,0].max() / 255.0)
    rgb = color.lab2rgb(lab)
    output = (rgb * 255).astype(np.uint8)
    return cv2.cvtColor(output, cv2.COLOR_RGB2BGR)

augment = A.Compose([
    A.HorizontalFlip(p=0.5),
    A.VerticalFlip(p=0.5),
    A.Rotate(limit=30, p=0.5),
    A.RandomCrop(width=224, height=224, p=0.5),
    A.CoarseDropout(max_holes=8, max_height=32, max_width=32, p=0.5),
    A.Resize(256, 256),
])

image_files = glob(IMAGE_DIR + "*.jpg") + glob(IMAGE_DIR + "*.png")

for img_path in tqdm(image_files):
    img = cv2.imread(img_path)
    mask_path = os.path.join(MASK_DIR, os.path.basename(img_path))
    mask = cv2.imread(mask_path, 0) if os.path.exists(mask_path) else None
    if img is not None and mask is not None:
        img_enhanced = clahe_lab(img)
        augmented = augment(image=img_enhanced, mask=mask)
        img_aug = augmented['image']
        mask_aug = augmented['mask']
        mask_bin = (mask_aug > 127).astype('uint8')
        cv2.imwrite(os.path.join(SAVE_IMAGE_DIR, os.path.basename(img_path)), img_aug)
        cv2.imwrite(os.path.join(SAVE_MASK_DIR, os.path.basename(img_path)), mask_bin * 255)

print("Processing complete.")


In [None]:
import matplotlib.pyplot as plt
import random

img_list = os.listdir(SAVE_IMAGE_DIR)
mask_list = os.listdir(SAVE_MASK_DIR)
if img_list:
    i = random.randint(0, len(img_list) - 1)
    img_path = os.path.join(SAVE_IMAGE_DIR, img_list[i])
    mask_path = os.path.join(SAVE_MASK_DIR, mask_list[i])
    img = cv2.cvtColor(cv2.imread(img_path), cv2.COLOR_BGR2RGB)
    mask = cv2.imread(mask_path, 0)
    plt.figure(figsize=(10,5))
    plt.subplot(1,2,1); plt.imshow(img); plt.axis('off'); plt.title('Processed Image')
    plt.subplot(1,2,2); plt.imshow(mask, cmap='gray'); plt.axis('off'); plt.title('Processed Mask')
    plt.show()
else:
    print("No images found in processed_images folder.")


In [None]:
import matplotlib.pyplot as plt
import os

img_list = os.listdir(SAVE_IMAGE_DIR)
mask_list = os.listdir(SAVE_MASK_DIR)

n = len(img_list)  # number of images to display

for i in range(min(10, n)):  # Show 10 sample

    img_path = os.path.join(SAVE_IMAGE_DIR, img_list[i])
    mask_path = os.path.join(SAVE_MASK_DIR, mask_list[i])

    img = cv2.cvtColor(cv2.imread(img_path), cv2.COLOR_BGR2RGB)
    mask = cv2.imread(mask_path, 0)

    plt.figure(figsize=(10,5))
    plt.subplot(1,2,1); plt.imshow(img); plt.axis('off'); plt.title('Processed Image')
    plt.subplot(1,2,2); plt.imshow(mask, cmap='gray'); plt.axis('off'); plt.title('Processed Mask')
    plt.show()


In [None]:
import os
import cv2
import matplotlib.pyplot as plt

SAVE_IMAGE_DIR = './kvasir/processed_images/'
SAVE_MASK_DIR = './kvasir/preprocessed_masks/'

img_files = sorted([f for f in os.listdir(SAVE_IMAGE_DIR) if f.endswith('.png') or f.endswith('.jpg')])

for fname in img_files:
    img_path = os.path.join(SAVE_IMAGE_DIR, fname)
    mask_path = os.path.join(SAVE_MASK_DIR, fname)  # exactly match by filename

    if not os.path.exists(mask_path):
        print(f'Mask not found for {fname}')
        continue

    img = cv2.cvtColor(cv2.imread(img_path), cv2.COLOR_BGR2RGB)
    mask = cv2.imread(mask_path, 0)

    plt.figure(figsize=(10,5))
    plt.subplot(1,2,1); plt.imshow(img); plt.axis('off'); plt.title('Processed Image')
    plt.subplot(1,2,2); plt.imshow(mask, cmap='gray'); plt.axis('off'); plt.title('Processed Mask')
    plt.show()


In [None]:
import os
import random
import shutil

# Directories of your processed images and masks:
KVASIR_IMG = "./kvasir/processed_images/"
KVASIR_MASK = "./kvasir/preprocessed_masks/"

# Output split folders:
FINAL = "./kvasir_split/"
splits = ["train", "val", "test", "holdout"]
for split in splits:
    os.makedirs(os.path.join(FINAL, split, "images"), exist_ok=True)
    os.makedirs(os.path.join(FINAL, split, "masks"), exist_ok=True)

# Gather and shuffle file names:
def file_list(folder):
    return sorted([f for f in os.listdir(folder) if f.endswith(".png") or f.endswith(".jpg")])
files = file_list(KVASIR_IMG)
random.shuffle(files)

# Hold out 20 for domain testing
holdout = files[:20]
main_files = files[20:]

# Split main set into train/val/test (75/15/10 split)
n_total = len(main_files)
n_train = int(0.75 * n_total)
n_val = int(0.15 * n_total)
n_test = n_total - n_train - n_val

train_files = main_files[:n_train]
val_files = main_files[n_train:n_train + n_val]
test_files = main_files[n_train + n_val:]

def copyset(file_list, split):
    for fname in file_list:
        shutil.copy(os.path.join(KVASIR_IMG, fname), os.path.join(FINAL, split, "images", fname))
        shutil.copy(os.path.join(KVASIR_MASK, fname), os.path.join(FINAL, split, "masks", fname))

copyset(train_files, "train")
copyset(val_files, "val")
copyset(test_files, "test")
copyset(holdout, "holdout")

print("Kvasir split complete! Train:", len(train_files), "Val:", len(val_files), "Test:", len(test_files), "Holdout:", len(holdout))


In [None]:
import torch

# Use Apple MPS if available, otherwise CPU
device = torch.device('mps' if torch.backends.mps.is_available() else 'cpu')
print('Using device:', device)


In [None]:
# Install required packages
!pip install torch torchvision albumentations tqdm --quiet

# --- Data Loader ---
import os
import cv2
import torch
from torch.utils.data import Dataset, DataLoader
import albumentations as A
from albumentations.pytorch import ToTensorV2
import numpy as np

class KvasirSegDataset(Dataset):
    def __init__(self, img_dir, mask_dir, augment=None):
        self.img_files = sorted([f for f in os.listdir(img_dir) if f.endswith('.png') or f.endswith('.jpg')])
        self.img_dir = img_dir
        self.mask_dir = mask_dir
        self.augment = augment

    def __len__(self):
        return len(self.img_files)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.img_files[idx])
        mask_path = os.path.join(self.mask_dir, self.img_files[idx])
        image = cv2.cvtColor(cv2.imread(img_path), cv2.COLOR_BGR2RGB)
        mask = cv2.imread(mask_path, 0)

        if self.augment:
            augmented = self.augment(image=image, mask=mask)
            image = augmented['image']
            mask = augmented['mask']

        mask = (mask > 127).astype(np.float32)
        mask = torch.from_numpy(mask).unsqueeze(0)  # shape [1, H, W]
        image = ToTensorV2()(image=image)['image'].float() # Convert image to float32
        return image, mask

# --- Albumentations Augmentation ---
train_aug = A.Compose([
    A.HorizontalFlip(p=0.5),
    A.RandomRotate90(p=0.5),
    A.ShiftScaleRotate(p=0.5),
    A.Resize(256, 256)
])
valid_aug = A.Compose([A.Resize(256, 256)])

# --- Dataloaders ---
train_ds = KvasirSegDataset('./kvasir_split/train/images', './kvasir_split/train/masks', augment=train_aug)
val_ds = KvasirSegDataset('./kvasir_split/val/images', './kvasir_split/val/masks', augment=valid_aug)
train_loader = DataLoader(train_ds, batch_size=8, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=8)

# --- Simple U-Net Model ---
import torch.nn as nn

class UNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 16, 3, padding=1), nn.ReLU(),
            nn.Conv2d(16, 32, 3, padding=1), nn.ReLU(),
            nn.MaxPool2d(2)
        )
        self.middle = nn.Sequential(
            nn.Conv2d(32, 64, 3, padding=1), nn.ReLU(),
            nn.Conv2d(64, 64, 3, padding=1), nn.ReLU()
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(64, 32, 2, stride=2), nn.ReLU(),
            nn.Conv2d(32, 16, 3, padding=1), nn.ReLU(),
            nn.Conv2d(16, 1, 1)
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.middle(x)
        x = self.decoder(x)
        return torch.sigmoid(x)

model = UNet().to(device)

# --- Loss and Optimizer ---
def dice_loss(pred, target, smooth=1e-6):
    pred = pred.view(-1)
    target = target.view(-1)
    intersection = (pred * target).sum()
    return 1 - ((2. * intersection + smooth) / (pred.sum() + target.sum() + smooth))

optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# --- Training Loop (simple, one epoch for demo) ---
for epoch in range(30):
    model.train()
    for images, masks in train_loader:
        images, masks = images.to(device), masks.to(device)
        preds = model(images)
        loss = dice_loss(preds, masks)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    print(f'Train loss: {loss.item():.4f}')

    # Validation
    model.eval()
    with torch.no_grad():
        val_loss, val_dice = 0, 0
        for images, masks in val_loader:
            images, masks = images.to(device), masks.to(device)
            preds = model(images)
            loss = dice_loss(preds, masks)
            val_loss += loss.item()
            dice = 1 - loss.item()
            val_dice += dice
        print(f'Validation Dice: {val_dice / len(val_loader):.4f}')

print("Baseline U-Net training complete!")

# --- Visualize Predictions ---
import matplotlib.pyplot as plt

model.eval()
for images, masks in val_loader:
    images, masks = images.to(device), masks.to(device)
    preds = model(images)
    img = images[0].cpu().permute(1,2,0).numpy()
    pred_mask = (preds[0].cpu().detach().numpy().squeeze() > 0.5).astype(np.uint8)
    true_mask = masks[0].cpu().numpy().squeeze()

    plt.figure(figsize=(15,5))
    plt.subplot(1,3,1); plt.imshow(img); plt.title("Image"); plt.axis('off')
    plt.subplot(1,3,2); plt.imshow(true_mask, cmap='gray'); plt.title("True Mask"); plt.axis('off')
    plt.subplot(1,3,3); plt.imshow(pred_mask, cmap='gray'); plt.title("Pred Mask"); plt.axis('off')
    plt.show()
    break  # Show one example