<a href="https://colab.research.google.com/github/SruthiSuresh12/diabetic-retinopathy/blob/main/sample.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import cv2
import numpy as np
import pandas as pd
import os
import zipfile

In [18]:
data_directory = "./diabetic-retinopathy-detection"
sample_zip = os.path.join(data_directory, "sample.zip")
labels_csv = os.path.join(data_directory, "trainLabels.csv")
# Extract sample.zip if not already extracted
if os.path.exists(sample_zip):
    print("Extracting sample.zip...")
    with zipfile.ZipFile(sample_zip, "r") as zip_ref:
        zip_ref.extractall(data_directory)
    print("✅ Sample data extracted")

sample_dir = os.path.join(data_directory, "sample")

# 2. Load labels and keep only sample images
train_labels_df = pd.read_csv(labels_csv)

# Add full image paths
train_labels_df["image_path"] = train_labels_df["image"].apply(
    lambda x: os.path.join(sample_dir, f"{x}.jpeg")
)

# Keep only rows where the image exists in sample/
train_labels_df = train_labels_df[train_labels_df["image_path"].apply(os.path.exists)].reset_index(drop=True)

print(f"✅ Found {len(train_labels_df)} labeled sample images")

Extracting sample.zip...
✅ Sample data extracted
✅ Found 10 labeled sample images


In [6]:
def preprocess_image(image_path, label):
    img = tf.io.read_file(image_path)
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.cast(img, tf.float32)

    # Extract green channel
    img_green = img[:, :, 1]

    # CLAHE using numpy_function
    img_clahe = tf.numpy_function(
        func=lambda x: cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)).apply(x.astype(np.uint8)),
        inp=[img_green],
        Tout=tf.uint8
    )
    img_clahe.set_shape(img_green.shape)
    img_processed = tf.image.grayscale_to_rgb(tf.expand_dims(img_clahe, axis=-1))

    # Circle crop (wrap numpy in numpy_function)
    def circle_crop(x):
        h, w, _ = x.shape
        center_x, center_y = w // 2, h // 2
        radius = min(center_x, center_y)
        Y, X = np.ogrid[:h, :w]
        mask = (X - center_x) ** 2 + (Y - center_y) ** 2 <= radius ** 2
        cropped = np.where(mask[..., None], x, 0)
        return cropped.astype(np.uint8)

    img_cropped = tf.numpy_function(circle_crop, [img_processed], tf.uint8)
    img_cropped.set_shape(img_processed.shape)

    # Resize & normalize
    img_resized = tf.image.resize(img_cropped, (224, 224))
    img_normalized = img_resized / 255.0

    return img_normalized, label


In [7]:
class FocalLoss(keras.losses.Loss):
    def __init__(self, gamma=2.0, alpha=0.25, **kwargs):
        super().__init__(**kwargs)
        self.gamma = gamma
        self.alpha = alpha

    def call(self, y_true, y_pred):
        y_true = tf.cast(y_true, tf.float32)
        y_true_one_hot = tf.one_hot(tf.cast(y_true, tf.int32), depth=5)
        epsilon = keras.backend.epsilon()
        y_pred = tf.clip_by_value(y_pred, epsilon, 1. - epsilon)

        cross_entropy = -y_true_one_hot * tf.math.log(y_pred)
        alpha_t = y_true_one_hot * self.alpha + (1 - y_true_one_hot) * (1 - self.alpha)
        p_t = y_true_one_hot * y_pred + (1 - y_true_one_hot) * (1 - y_pred)
        focal_loss = alpha_t * tf.pow((1. - p_t), self.gamma) * cross_entropy
        return tf.reduce_sum(focal_loss, axis=-1)

In [8]:
def build_model(num_classes=5):
    base_model = tf.keras.applications.ResNet50V2(
        input_shape=(224, 224, 3),
        include_top=False,
        weights='imagenet'
    )
    base_model.trainable = False

    inputs = keras.Input(shape=(224, 224, 3))
    x = base_model(inputs, training=False)
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dense(128, activation='relu')(x)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(num_classes, activation='softmax')(x)

    return keras.Model(inputs, outputs)

In [19]:
image_paths = train_labels_df["image_path"].tolist()
labels = train_labels_df["level"].tolist()

dataset = tf.data.Dataset.from_tensor_slices((image_paths, labels))
dataset = dataset.shuffle(buffer_size=len(image_paths))

train_size = int(0.8 * len(image_paths))
train_ds_raw = dataset.take(train_size)
val_ds_raw = dataset.skip(train_size)

train_dataset = train_ds_raw.map(preprocess_image, num_parallel_calls=tf.data.AUTOTUNE)
val_dataset = val_ds_raw.map(preprocess_image, num_parallel_calls=tf.data.AUTOTUNE)

BATCH_SIZE = 8
train_dataset = train_dataset.batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)
val_dataset = val_dataset.batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

model = build_model()
model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=1e-4),
    loss=FocalLoss(),
    metrics=["accuracy"]
)

history = model.fit(
    train_dataset,
    epochs=3,  # just a few epochs since sample is tiny
    validation_data=val_dataset
)

print("✅ Training complete (sample dataset)")

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50v2_weights_tf_dim_ordering_tf_kernels_notop.h5
[1m94668760/94668760[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 0us/step
Epoch 1/3
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m19s[0m 19s/step - accuracy: 0.5000 - loss: 0.6475 - val_accuracy: 1.0000 - val_loss: 9.9348e-04
Epoch 2/3
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 6s/step - accuracy: 0.5000 - loss: 0.5479 - val_accuracy: 0.0000e+00 - val_loss: 0.8233
Epoch 3/3
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 6s/step - accuracy: 0.3750 - loss: 0.3975 - val_accuracy: 1.0000 - val_loss: 0.0028
✅ Training complete (sample dataset)
