In [None]:
!pip install numpy pillow scikit-learn tensorflow

In [None]:
pip install tensorflow-gpu

In [None]:

print("TensorFlow version:", tf.__version__)
print("GPU available:", tf.config.list_physical_devices('GPU'))

TensorFlow version: 2.18.0
GPU available: [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


In [None]:
import os, glob
import numpy as np
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers, callbacks
from PIL import Image
from sklearn.metrics import precision_score, recall_score, f1_score

print("TensorFlow version:", tf.__version__)
print("GPU available:", tf.config.list_physical_devices('GPU'))

# Define paths
image_dir = "/content/drive/MyDrive/cnn-dhp/images"
mask_dir = "/content/drive/MyDrive/cnn-dhp/masks"
output_dir = "predictions"
os.makedirs(output_dir, exist_ok=True)

# Collect image and mask paths
image_paths = sorted(glob.glob(os.path.join(image_dir, "*.png")))
mask_paths = []

for img_path in image_paths:
    filename = os.path.basename(img_path)
    mask_filename = os.path.splitext(filename)[0] + ".png"
    mask_path = os.path.join(mask_dir, mask_filename)
    if os.path.exists(mask_path):
        mask_paths.append(mask_path)
    else:
        raise FileNotFoundError(f"Mask for {filename} not found at {mask_path}")

print(f"Found {len(image_paths)} images and {len(mask_paths)} masks.")

train_imgs, val_imgs, train_masks, val_masks = train_test_split(
    image_paths, mask_paths, test_size=0.2, random_state=42
)
print(f"Training images: {len(train_imgs)}, Validation images: {len(val_imgs)}")

# Full-size image dimensions
CROP_SIZE = 4032  # Updated to be divisible by 16
BATCH_SIZE = 1

# Preprocessing and augmentation
@tf.function
def preprocess_full_image(image_path, mask_path):
    image = tf.io.decode_png(tf.io.read_file(image_path), channels=3)
    mask = tf.io.decode_png(tf.io.read_file(mask_path), channels=1)

    image = tf.image.resize(image, [CROP_SIZE, CROP_SIZE], method='bilinear')
    mask = tf.image.resize(mask, [CROP_SIZE, CROP_SIZE], method='nearest')

    image = tf.cast(image, tf.float32) / 255.0
    mask = tf.cast(mask, tf.float32) / 255.0
    mask = tf.where(mask >= 0.5, 1.0, 0.0)

    return image, mask

@tf.function
def augment_image(image, mask):
    if tf.random.uniform(()) > 0.5:
        image = tf.image.flip_left_right(image)
        mask = tf.image.flip_left_right(mask)
    if tf.random.uniform(()) > 0.5:
        image = tf.image.flip_up_down(image)
        mask = tf.image.flip_up_down(mask)
    k = tf.random.uniform(shape=[], minval=0, maxval=4, dtype=tf.int32)
    image = tf.image.rot90(image, k)
    mask = tf.image.rot90(mask, k)
    image = tf.image.random_brightness(image, 0.1)
    image = tf.image.random_contrast(image, 0.9, 1.1)
    return image, mask

train_ds = tf.data.Dataset.from_tensor_slices((train_imgs, train_masks))
train_ds = train_ds.map(preprocess_full_image, num_parallel_calls=tf.data.AUTOTUNE)
train_ds = train_ds.map(augment_image, num_parallel_calls=tf.data.AUTOTUNE)
train_ds = train_ds.shuffle(32).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

val_ds = tf.data.Dataset.from_tensor_slices((val_imgs, val_masks))
val_ds = val_ds.map(preprocess_full_image, num_parallel_calls=tf.data.AUTOTUNE)
val_ds = val_ds.batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

# U-Net architecture
def conv_block(x, num_filters):
    x = layers.Conv2D(num_filters, 3, padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    x = layers.Conv2D(num_filters, 3, padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    return x

def build_unet(input_shape):
    inputs = layers.Input(shape=input_shape)

    c1 = conv_block(inputs, 16)
    p1 = layers.MaxPooling2D()(c1)

    c2 = conv_block(p1, 32)
    p2 = layers.MaxPooling2D()(c2)

    c3 = conv_block(p2, 64)
    p3 = layers.MaxPooling2D()(c3)

    c4 = conv_block(p3, 128)
    p4 = layers.MaxPooling2D()(c4)

    c5 = conv_block(p4, 256)

    u6 = layers.UpSampling2D()(c5)
    u6 = layers.Concatenate()([u6, c4])
    c6 = conv_block(u6, 128)

    u7 = layers.UpSampling2D()(c6)
    u7 = layers.Concatenate()([u7, c3])
    c7 = conv_block(u7, 64)

    u8 = layers.UpSampling2D()(c7)
    u8 = layers.Concatenate()([u8, c2])
    c8 = conv_block(u8, 32)

    u9 = layers.UpSampling2D()(c8)
    u9 = layers.Concatenate()([u9, c1])
    c9 = conv_block(u9, 16)

    outputs = layers.Conv2D(1, 1, activation='sigmoid')(c9)

    return models.Model(inputs, outputs)

model = build_unet((CROP_SIZE, CROP_SIZE, 3))

# Custom Dice + BCE Loss
from tensorflow.keras import backend as K
def dice_loss(y_true, y_pred):
    smooth = 1e-6
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum(y_true_f * y_pred_f)
    return 1 - ((2. * intersection + smooth) / (K.sum(y_true_f) + K.sum(y_pred_f) + smooth))

def combined_loss(y_true, y_pred):
    bce = tf.keras.losses.binary_crossentropy(y_true, y_pred)
    dsc = dice_loss(y_true, y_pred)
    return bce + dsc

model.compile(optimizer=optimizers.Adam(1e-3), loss=combined_loss, metrics=['accuracy'])
model.summary()

early_stop = callbacks.EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
checkpoint = callbacks.ModelCheckpoint("best_model_unet.keras", monitor='val_loss', save_best_only=True)

EPOCHS = 30
history = model.fit(train_ds, validation_data=val_ds, epochs=EPOCHS, callbacks=[early_stop, checkpoint])

# Evaluate and Save predictions
iou_scores, precisions, recalls, f1s = [], [], [], []

for img_path, mask_path in zip(val_imgs, val_masks):
    img = np.array(Image.open(img_path).convert("RGB"))
    msk = np.array(Image.open(mask_path).convert("L"))

    img = np.array(Image.fromarray(img).resize((CROP_SIZE, CROP_SIZE)))
    msk = np.array(Image.fromarray(msk).resize((CROP_SIZE, CROP_SIZE)))

    img_tensor = tf.expand_dims(tf.cast(img, tf.float32) / 255.0, axis=0)
    pred = model.predict(img_tensor, verbose=0)[0]
    pred_mask = (pred.squeeze() >= 0.5).astype(np.uint8) * 255

    pred_bin = (pred_mask >= 128).flatten()
    mask_bin = (msk >= 128).flatten()

    intersection = np.logical_and(pred_bin, mask_bin).sum()
    union = np.logical_or(pred_bin, mask_bin).sum()
    iou = intersection / union if union != 0 else 1.0
    iou_scores.append(iou)

    precisions.append(precision_score(mask_bin, pred_bin, zero_division=1))
    recalls.append(recall_score(mask_bin, pred_bin, zero_division=1))
    f1s.append(f1_score(mask_bin, pred_bin, zero_division=1))

    name = os.path.splitext(os.path.basename(img_path))[0] + "_pred.png"
    Image.fromarray(pred_mask).save(os.path.join(output_dir, name))

print(f"\nEvaluation Metrics on Validation Set:")
print(f"Mean IoU:       {np.mean(iou_scores):.4f}")
print(f"Mean Precision: {np.mean(precisions):.4f}")
print(f"Mean Recall:    {np.mean(recalls):.4f}")
print(f"Mean F1 Score:  {np.mean(f1s):.4f}")
