In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

# ***Loading Libaries***

In [None]:
import os
import cv2
import numpy as np
from tqdm import tqdm
import tensorflow as tf
from tensorflow.keras.models import Model, Sequential
from sklearn.model_selection import train_test_split
from tensorflow.keras.layers import Flatten, Dense, Input, Conv2D, concatenate, BatchNormalization, MaxPooling2D, UpSampling2D, Concatenate, Dropout, Cropping2D, ZeroPadding2D
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from sklearn.metrics import jaccard_score
import matplotlib.pyplot as plt
from sklearn.utils.class_weight import compute_class_weight
from tensorflow.keras.metrics import MeanIoU
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset

from torchvision.transforms import ToTensor
from transformers import ViTModel, ViTConfig
import torchvision.models as models

## ***DataLoading***

In [None]:
TRAIN_IMAGE_DIR = "/kaggle/input/diabetic-retinopathy-dataset/Daataset_DR/DB0/Images"
TRAIN_MASK_DIR = "/kaggle/input/diabetic-retinopathy-dataset/Daataset_DR/DB0/GroundTruth"
TEST_IMAGE_DIR = "/kaggle/input/diabetic-retinopathy-dataset/Daataset_DR/DB1/Images"
TEST_MASK_DIR = "/kaggle/input/diabetic-retinopathy-dataset/Daataset_DR/DB1/GroundTruth"

## ***1. Combining Masks***

In [None]:
def create_combined_mask(image_filename):
    sample_mask_path = os.path.join(TRAIN_MASK_DIR, mask_subfolders[0], image_filename)
    mask_shape = cv2.imread(sample_mask_path, cv2.IMREAD_GRAYSCALE).shape
    combined_mask = np.zeros(mask_shape, dtype=np.uint8)

    for subfolder in mask_subfolders:
        mask_path = os.path.join(TRAIN_MASK_DIR, subfolder, image_filename)
        if os.path.exists(mask_path):
            mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
            combined_mask = cv2.bitwise_or(combined_mask, mask)

    return combined_mask

### ***Bright Lesion***

In [None]:
OUTPUT_MASK_DIR = "/kaggle/working/CombinedMasksTask1"
os.makedirs(OUTPUT_MASK_DIR, exist_ok=True)
mask_subfolders = ['CWS', 'HE']
image_filenames = os.listdir(TRAIN_IMAGE_DIR)

for image_filename in tqdm(image_filenames):
    combined_mask = create_combined_mask(image_filename)
    output_path = os.path.join(OUTPUT_MASK_DIR, image_filename)
    cv2.imwrite(output_path, combined_mask)

In [None]:
OUTPUT_MASK_DIR = "/kaggle/working/TestCombinedMasksTask1"
os.makedirs(OUTPUT_MASK_DIR, exist_ok=True)
mask_subfolders = ['CWS', 'HE']
image_filenames = os.listdir(TEST_IMAGE_DIR)

for image_filename in tqdm(image_filenames):
    combined_mask = create_combined_mask(image_filename)
    output_path = os.path.join(OUTPUT_MASK_DIR, image_filename)
    cv2.imwrite(output_path, combined_mask)

### ***Darker Lesion***

In [None]:
OUTPUT_MASK_DIR = "/kaggle/working/CombinedMasksTask2"
os.makedirs(OUTPUT_MASK_DIR, exist_ok=True)
mask_subfolders = ['H', 'MA']
image_filenames = sorted(os.listdir(TRAIN_IMAGE_DIR))

for image_filename in tqdm(image_filenames):
    combined_mask = create_combined_mask(image_filename)
    output_path = os.path.join(OUTPUT_MASK_DIR, image_filename)
    cv2.imwrite(output_path, combined_mask)

In [None]:
OUTPUT_MASK_DIR = "/kaggle/working/TestCombinedMasksTask2"
os.makedirs(OUTPUT_MASK_DIR, exist_ok=True)
mask_subfolders = ['H', 'MA']
image_filenames = os.listdir(TEST_IMAGE_DIR)

for image_filename in tqdm(image_filenames):
    combined_mask = create_combined_mask(image_filename)
    output_path = os.path.join(OUTPUT_MASK_DIR, image_filename)
    cv2.imwrite(output_path, combined_mask)

## ***2. Loading Images and Masks***

In [None]:
IMAGE_DIR = "/kaggle/input/diabetic-retinopathy-dataset/Daataset_DR/DB0/Images"
MASK_DIR = "/kaggle/working/CombinedMasksTask1"

IMG_HEIGHT = 1152
IMG_WIDTH = 1500
BATCH_SIZE = 1
EPOCHS = 50

def load_data(image_dir, mask_dir):
    image_filenames = sorted(os.listdir(image_dir))
    images = []
    masks = []
    
    for filename in tqdm(image_filenames, desc="Loading data"):
        # Load and resize image
        image_path = os.path.join(image_dir, filename)
        image = cv2.imread(image_path)
        #image = cv2.resize(image, (IMG_WIDTH, IMG_HEIGHT))
        images.append(image)  # Normalize image to [0, 1]
        
        # Load and resize mask
        mask_path = os.path.join(mask_dir, filename)
        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
        #mask = cv2.resize(mask, (IMG_WIDTH, IMG_HEIGHT))
        mask = (mask > 0).astype(np.uint8)  # Binarize mask (0 or 1)
        masks.append(mask)
    
    return np.array(images), np.array(masks).reshape(-1, IMG_HEIGHT, IMG_WIDTH, 1)

images, masks = load_data(IMAGE_DIR, MASK_DIR)

#X_train, X_val, y_train, y_val = train_test_split(images, masks, test_size=0.1, random_state=42)

In [None]:
IMAGE_DIR = "/kaggle/input/diabetic-retinopathy-dataset/Daataset_DR/DB0/Images"
MASK_DIR = "/kaggle/working/CombinedMasksTask2"

IMG_HEIGHT = 1152
IMG_WIDTH = 1500
BATCH_SIZE = 1
EPOCHS = 50

def load_data(image_dir, mask_dir):
    image_filenames = sorted(os.listdir(image_dir))
    images = []
    masks = []
    
    for filename in tqdm(image_filenames, desc="Loading data"):
        # Load and resize image
        image_path = os.path.join(image_dir, filename)
        image = cv2.imread(image_path)
        #image = cv2.resize(image, (IMG_WIDTH, IMG_HEIGHT))
        images.append(image)  # Normalize image to [0, 1]
        
        # Load and resize mask
        mask_path = os.path.join(mask_dir, filename)
        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
        #mask = cv2.resize(mask, (IMG_WIDTH, IMG_HEIGHT))
        mask = (mask > 0).astype(np.uint8)  # Binarize mask (0 or 1)
        masks.append(mask)
    
    return np.array(images), np.array(masks).reshape(-1, IMG_HEIGHT, IMG_WIDTH, 1)

images, masks = load_data(IMAGE_DIR, MASK_DIR)

#X_train, X_val, y_train, y_val = train_test_split(images, masks, test_size=0.1, random_state=42)

# ***SeTFormer MOdel***

## ***Data preprocessing***

In [None]:
def preprocess_image(image,mask= None):
    ih, iw = 1024, 1024
    image = cv2.resize(image, (ih, iw))
    if mask == None :
        image = image.reshape(-1, ih, iw, 3)
    #print(image.shape)
        image = torch.tensor(image, dtype=torch.float32).permute(0, 3, 2, 1)
    else:
        image = image.reshape(-1, ih, iw, 1)
        image = torch.tensor(image, dtype=torch.float32).permute(0, 3, 2, 1)
    #image = normalize(image)  # Normalize for ResNet
    return image
images2 = [preprocess_image(image) for image in images]
masks2 = [preprocess_image(image, mask = 1) for image in masks]

In [None]:
images2[0].shape, masks2[0].shape, len(images2), len(masks2)

## ***Model***

In [None]:
target_size = (1024, 1024)

class SetFormer(nn.Module):
    def __init__(self, num_classes=1):
        super(SetFormer, self).__init__()
        resnet = models.resnet101(pretrained=True)
        self.encoder = nn.Sequential(*list(resnet.children())[:-2])

        # Additional decoder layers with skip connections
        self.segmentation_head = nn.Sequential(
            nn.Conv2d(2048, 256, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True),
            nn.Conv2d(256, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True),
            nn.Conv2d(128, num_classes, kernel_size=1),
            nn.Upsample(scale_factor=8, mode='bilinear', align_corners=True)
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.segmentation_head(x)
        return x

In [None]:
!pip install torchsummary
from torchsummary import summary

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = SetFormer(num_classes=1).to(device)

#Print model summary
summary(model, input_size=(3, 1024, 1024))

In [None]:
# Dice Loss Function
def dice_loss(pred, target):
    smooth = 1e-6
    pred = torch.sigmoid(pred)
    intersection = (pred * target).sum()
    return 1 - (2. * intersection + smooth) / (pred.sum() + target.sum() + smooth)

bce_loss = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
epochs = 100
for epoch in range(epochs):
    model.train()
    epoch_loss = 0
    for imgs, masks in zip(images2, masks2):
        imgs, masks = imgs.to(device), masks.to(device)

        optimizer.zero_grad()
        outputs = model(imgs)
        
        loss = bce_loss(outputs, masks)# + dice_loss(outputs, masks)
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    print(f"Epoch {epoch + 1}/{epochs}, Loss: {epoch_loss / len(images)}")

In [None]:
print(type(model))

In [None]:
import torch

# Save the model's state dictionary
torch.save(model.state_dict(), '/kaggle/working/setformer_model.pth')

print("Model saved as 'setformer_model.pth'")

### ***Grad_cam***

In [None]:
!pip install grad-cam

## ***Evaluation***

In [None]:
def predict(model, image, device, threshold=0.5):
    model.eval()  # Set model to evaluation mode
    input_tensor = preprocess_image(image).to(device)
    with torch.no_grad():
        output = model(input_tensor)
    probabilities = torch.sigmoid(output)
    binary_mask = (probabilities > threshold).int()
    binary_mask = binary_mask.squeeze().cpu().numpy()
    return binary_mask

In [None]:
# Directory path containing images
image_dir = "/kaggle/input/diabetic-retinopathy-dataset/Daataset_DR/DB1/Images"

# Get the list of all images and sort them to ensure order
image_paths = sorted([os.path.join(image_dir, fname) for fname in os.listdir(image_dir) if fname.endswith('.png')])

# Select the first 25 images
selected_images = image_paths[:25]

# Create a 5x5 grid plot
fig, axes = plt.subplots(5, 5, figsize=(15, 15))

for i, ax in enumerate(axes.flat):
    test_image_path = selected_images[i]
    test_image = cv2.imread(test_image_path)
    predicted_mask = predict(model, test_image, device)  # Assuming predict function is already defined
    ax.imshow(predicted_mask)
    ax.axis('off')  # Hide the axis for better visualization
    ax.set_title(f"Image {i+1}")  # Optional: Set a title for each subplot

plt.tight_layout()
plt.show()

In [None]:
image_dir = "/kaggle/working/CombinedMasksTask2"

image_paths = sorted([os.path.join(image_dir, fname) for fname in os.listdir(image_dir) if fname.endswith('.png')])

# Select the first 25 images
selected_images = image_paths[:25]

# Create a 5x5 grid plot
fig, axes = plt.subplots(5, 5, figsize=(15, 15))

for i, ax in enumerate(axes.flat):
    test_image_path = selected_images[i]
    test_image = cv2.imread(test_image_path)
    ax.imshow(test_image)
    ax.axis('off')  # Hide the axis for better visualization
    ax.set_title(f"Image {i+1}")  # Optional: Set a title for each subplot

plt.tight_layout()
plt.show()

In [None]:
import matplotlib.pyplot as plt
import cv2
import numpy as np

def visualize_predictions(images, predicted_masks):
    """
    Visualizes a batch of original images and their predicted segmentation masks side by side.
    - images: List of original input images (in RGB).
    - predicted_masks: List of predicted binary segmentation masks.
    """
    n = len(images)
    plt.figure(figsize=(15, 6))
    for i in range(n):
        # Original image
        plt.subplot(2, n, i + 1)
        plt.imshow(cv2.cvtColor(images[i], cv2.COLOR_BGR2RGB))  # Convert BGR to RGB
        plt.title(f"Image {i+1}")
        plt.axis("off")

        # Predicted mask
        plt.subplot(2, n, i + 1 + n)
        plt.imshow(predicted_masks[i], cmap='gray')
        plt.title(f"Predicted Mask {i+1}")
        plt.axis("off")

    plt.tight_layout()
    plt.show()

# Load 10 test images
test_image_paths = [
    "/kaggle/input/diabetic-retinopathy-dataset/Daataset_DR/DB1/Images/image009.png",
    "/kaggle/input/diabetic-retinopathy-dataset/Daataset_DR/DB1/Images/image007.png",
    # Add paths for 8 more images...
]
test_images = [cv2.imread(path) for path in test_image_paths]

# Predict masks for all 10 images
predicted_masks = [predict(model, img, device) for img in test_images]

# Visualize predictions
visualize_predictions(test_images, predicted_masks)


In [None]:
np.unique(mask)

In [None]:
# Ensure the mask is converted to integer type before using np.bincount
true_mask = mask.astype(np.int32)

# Flatten the mask and count the occurrences of each unique value
pixel_counts = np.bincount(true_mask.flatten())

# Print the pixel counts for each class (excluding background if it's class 0)
for class_id, count in enumerate(pixel_counts):
    print(f"Class {class_id}: {count} pixels")
