<a href="https://colab.research.google.com/github/SVashishta1/DL_Final_Project/blob/main/final_project_sarabu_vashishta_sharma_dl.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
# Paths to the dataset
original_folder = '/content/drive/MyDrive/dl_final_project_files/CVC-ClinicDB/Original'
ground_truth_folder = '/content/drive/MyDrive/dl_final_project_files/CVC-ClinicDB/Ground Truth'

In [None]:

import os
import numpy as np
import tensorflow as tf
from glob import glob
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import matplotlib.pyplot as plt
from tensorflow.keras.layers import (
    Conv2D, BatchNormalization, Activation, MaxPool2D,
    UpSampling2D, Concatenate, Input
)
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping
from tifffile import imread  # Replacing PIL with tifffile for TIFF images

# Load data function
def load_data(original_folder, ground_truth_folder, split=0.1):
    images = sorted(glob(os.path.join(original_folder, "*.tif")))
    masks = sorted(glob(os.path.join(ground_truth_folder, "*.tif")))

    # Split dataset into train, validation, and test sets
    train_x, test_x, train_y, test_y = train_test_split(images, masks, test_size=split, random_state=42)
    train_x, valid_x, train_y, valid_y = train_test_split(train_x, train_y, test_size=split, random_state=42)

    return (train_x, train_y), (valid_x, valid_y), (test_x, test_y)

# Image reading function (RGB for images)
def read_image(path):
    if isinstance(path, tf.Tensor):  # Decode TensorFlow tensor
        path = path.numpy().decode("utf-8")
    elif isinstance(path, bytes):  # Decode bytes
        path = path.decode("utf-8")

    try:
        img = imread(path)  # Read image using tifffile
        img = np.expand_dims(img, axis=-1)  # Add channel dimension if missing
        img = np.resize(img, (256, 256, 1))  # Resize to target size
        img = np.repeat(img, 3, axis=-1)  # Convert grayscale to RGB by repeating the channels
    except Exception as e:
        print(f"Error opening image {path}: {e}")
        return np.zeros((256, 256, 3), dtype=np.float32)  # Return a blank image in case of error

    x = img.astype(np.float32) / 255.0  # Normalize to [0, 1]
    return x

# Mask reading function (Grayscale for masks)
def read_mask(path):
    if isinstance(path, tf.Tensor):  # Decode TensorFlow tensor
        path = path.numpy().decode("utf-8")
    elif isinstance(path, bytes):  # Decode bytes
        path = path.decode("utf-8")

    try:
        img = imread(path)  # Read mask using tifffile
        img = np.resize(img, (256, 256, 1))  # Resize to target size
    except Exception as e:
        print(f"Error opening mask {path}: {e}")
        return np.zeros((256, 256, 1), dtype=np.float32)  # Return a blank image in case of error

    y = img.astype(np.float32) / 255.0  # Normalize to [0, 1]
    return y

# TensorFlow parsing function
def tf_parse(x, y):
    def _parse(x, y):
        x = read_image(x)
        y = read_mask(y)
        return x, y

    x, y = tf.numpy_function(_parse, [x, y], [tf.float32, tf.float32])
    x.set_shape([256, 256, 3])  # RGB input
    y.set_shape([256, 256, 1])  # Grayscale mask
    return x, y

# Dataset preparation function
def tf_dataset(x, y, batch=8):
    dataset = tf.data.Dataset.from_tensor_slices((x, y))
    dataset = dataset.map(tf_parse)
    dataset = dataset.batch(batch)
    dataset = dataset.repeat()
    return dataset

# UNet model definition
def conv_block(x, num_filters):
    x = Conv2D(num_filters, (3, 3), padding="same")(x)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    x = Conv2D(num_filters, (3, 3), padding="same")(x)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    return x

def build_model():
    size = 256
    num_filters = [16, 32, 48, 64]
    inputs = Input((size, size, 3))  # RGB input

    skip_x = []
    x = inputs
    ## Encoder
    for f in num_filters:
        x = conv_block(x, f)
        skip_x.append(x)
        x = MaxPool2D((2, 2))(x)

    ## Bridge
    x = conv_block(x, num_filters[-1])

    num_filters.reverse()
    skip_x.reverse()
    ## Decoder
    for i, f in enumerate(num_filters):
        x = UpSampling2D((2, 2))(x)
        xs = skip_x[i]
        x = Concatenate()([x, xs])
        x = conv_block(x, f)

    ## Output
    x = Conv2D(1, (1, 1), padding="same")(x)
    x = Activation("sigmoid")(x)

    return Model(inputs, x)

# Evaluation and visualization
def mask_parse(mask):
    mask = np.squeeze(mask)  # Remove channel dimension
    return mask

if __name__ == "__main__":
    # Paths to the dataset
    original_folder = '/content/drive/MyDrive/dl_final_project_files/CVC-ClinicDB/Original'
    ground_truth_folder = '/content/drive/MyDrive/dl_final_project_files/CVC-ClinicDB/Ground Truth'

    # Load dataset
    (train_x, train_y), (valid_x, valid_y), (test_x, test_y) = load_data(original_folder, ground_truth_folder)

    # Hyperparameters
    batch_size = 8
    epochs = 50
    lr = 1e-4

    # Prepare datasets
    train_dataset = tf_dataset(train_x, train_y, batch=batch_size)
    valid_dataset = tf_dataset(valid_x, valid_y, batch=batch_size)
    test_dataset = tf_dataset(test_x, test_y, batch=batch_size)

    # Model setup
    model = build_model()
    model.compile(
        loss="binary_crossentropy",
        optimizer=tf.keras.optimizers.Adam(learning_rate=lr),
        metrics=["accuracy", tf.keras.metrics.Recall(), tf.keras.metrics.Precision()]
    )

    # Callbacks
    callbacks = [
        ModelCheckpoint("model.keras", save_best_only=True),
        ReduceLROnPlateau(monitor="val_loss", factor=0.1, patience=5),
        EarlyStopping(monitor="val_loss", patience=10, restore_best_weights=True)
    ]

    # Train model
    train_steps = len(train_x) // batch_size
    valid_steps = len(valid_x) // batch_size
    if len(train_x) % batch_size != 0:
        train_steps += 1
    if len(valid_x) % batch_size != 0:
        valid_steps += 1

    model.fit(
        train_dataset,
        validation_data=valid_dataset,
        epochs=epochs,
        steps_per_epoch=train_steps,
        validation_steps=valid_steps,
        callbacks=callbacks
    )

    # Evaluate and visualize results
    results_dir = "results/"
    os.makedirs(results_dir, exist_ok=True)

    for i, (x_path, y_path) in tqdm(enumerate(zip(test_x, test_y)), total=len(test_x)):
        original_image = read_image(x_path)
        ground_truth = read_mask(y_path)
        predicted_mask = model.predict(np.expand_dims(original_image, axis=0))[0]
        predicted_mask = (predicted_mask > 0.5).astype(np.float32)

        original_image = np.squeeze(original_image)
        ground_truth = mask_parse(ground_truth)
        predicted_mask = mask_parse(predicted_mask)

        plt.figure(figsize=(12, 4))
        plt.subplot(1, 3, 1)
        plt.title("Original Image")
        plt.imshow(original_image, cmap="gray")
        plt.axis("off")

        plt.subplot(1, 3, 2)
        plt.title("Ground Truth Mask")
        plt.imshow(ground_truth, cmap="gray")
        plt.axis("off")

        plt.subplot(1, 3, 3)
        plt.title("Predicted Mask")
        plt.imshow(predicted_mask, cmap="gray")
        plt.axis("off")

        plt.tight_layout()
        plt.savefig(f"{results_dir}/{i}.png")
        plt.close()

Epoch 1/50
[1m62/62[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4s/step - accuracy: 0.5318 - loss: 0.7646 - precision: 0.0971 - recall: 0.5699

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Assuming you have your test data in 'test_x' and your model predictions in 'predictions'
# Load the test images
test_data = [read_image(x_path) for x_path in test_x]  # Load test images using the read_image function

# Get predictions for the test data
predictions = []
for image in test_data:
    predicted_mask = model.predict(np.expand_dims(image, axis=0))[0]  # Predict the mask
    predicted_mask = (predicted_mask > 0.5).astype(np.float32)  # Apply threshold
    predictions.append(predicted_mask)

# Display images and predictions side by side
num_images = 5  # Number of images to display

plt.figure(figsize=(12, 12))
for i in range(num_images):
    plt.subplot(1, num_images, i+1)
    image = test_data[i]  # Test image
    prediction = predictions[i]  # Predicted mask
    plt.imshow(image.squeeze(), cmap="gray")  # Display image (squeeze to remove channel dimension)
    plt.title(f"Pred: {prediction.squeeze()}")  # Title with predicted label (optional)
    plt.axis('off')
plt.show()

In [None]:
# Paths to the dataset
original_folder = '/content/drive/MyDrive/dl_final_project_files/CVC-ClinicDB/Original_jpg'
ground_truth_folder = '/content/drive/MyDrive/dl_final_project_files/CVC-ClinicDB/GroundTruth_jpg'

In [None]:
import os
import numpy as np
import tensorflow as tf
from glob import glob
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import matplotlib.pyplot as plt
from tensorflow.keras.layers import (
    Conv2D, BatchNormalization, Activation, MaxPool2D,
    UpSampling2D, Concatenate, Input
)
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping
from PIL import Image  # Using PIL to open JPG images

# Load data function
def load_data(original_folder, ground_truth_folder, split=0.1):
    images = sorted(glob(os.path.join(original_folder, "*.jpg")))  # Now loading .jpg files
    masks = sorted(glob(os.path.join(ground_truth_folder, "*.jpg")))  # Now loading .jpg files

    # Split dataset into train, validation, and test sets
    train_x, test_x, train_y, test_y = train_test_split(images, masks, test_size=split, random_state=42)
    train_x, valid_x, train_y, valid_y = train_test_split(train_x, train_y, test_size=split, random_state=42)

    return (train_x, train_y), (valid_x, valid_y), (test_x, test_y)

# Image reading function (RGB for images)
def read_image(path):
    if isinstance(path, tf.Tensor):  # Decode TensorFlow tensor
        path = path.numpy().decode("utf-8")
    elif isinstance(path, bytes):  # Decode bytes
        path = path.decode("utf-8")

    try:
        img = Image.open(path)  # Open image using PIL
        img = img.convert("RGB")  # Ensure image is in RGB mode
        img = np.array(img)  # Convert to numpy array
        img = np.resize(img, (256, 256, 3))  # Resize to target size
    except Exception as e:
        print(f"Error opening image {path}: {e}")
        return np.zeros((256, 256, 3), dtype=np.float32)  # Return a blank image in case of error

    x = img.astype(np.float32) / 255.0  # Normalize to [0, 1]
    return x

# Mask reading function (Grayscale for masks)
def read_mask(path):
    if isinstance(path, tf.Tensor):  # Decode TensorFlow tensor
        path = path.numpy().decode("utf-8")
    elif isinstance(path, bytes):  # Decode bytes
        path = path.decode("utf-8")

    try:
        img = Image.open(path)  # Open mask using PIL
        img = img.convert("L")  # Convert mask to grayscale
        img = np.array(img)  # Convert to numpy array
        img = np.resize(img, (256, 256, 1))  # Resize to target size
    except Exception as e:
        print(f"Error opening mask {path}: {e}")
        return np.zeros((256, 256, 1), dtype=np.float32)  # Return a blank mask in case of error

    y = img.astype(np.float32) / 255.0  # Normalize to [0, 1]
    return y

# TensorFlow parsing function
def tf_parse(x, y):
    def _parse(x, y):
        x = read_image(x)
        y = read_mask(y)
        return x, y

    x, y = tf.numpy_function(_parse, [x, y], [tf.float32, tf.float32])
    x.set_shape([256, 256, 3])  # RGB input
    y.set_shape([256, 256, 1])  # Grayscale mask
    return x, y

# Dataset preparation function
def tf_dataset(x, y, batch=8):
    dataset = tf.data.Dataset.from_tensor_slices((x, y))
    dataset = dataset.map(tf_parse)
    dataset = dataset.batch(batch)
    dataset = dataset.repeat()
    return dataset

# UNet model definition
def conv_block(x, num_filters):
    x = Conv2D(num_filters, (3, 3), padding="same")(x)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    x = Conv2D(num_filters, (3, 3), padding="same")(x)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    return x

def build_model():
    size = 256
    num_filters = [16, 32, 48, 64]
    inputs = Input((size, size, 3))  # RGB input

    skip_x = []
    x = inputs
    ## Encoder
    for f in num_filters:
        x = conv_block(x, f)
        skip_x.append(x)
        x = MaxPool2D((2, 2))(x)

    ## Bridge
    x = conv_block(x, num_filters[-1])

    num_filters.reverse()
    skip_x.reverse()
    ## Decoder
    for i, f in enumerate(num_filters):
        x = UpSampling2D((2, 2))(x)
        xs = skip_x[i]
        x = Concatenate()([x, xs])
        x = conv_block(x, f)

    ## Output
    x = Conv2D(1, (1, 1), padding="same")(x)
    x = Activation("sigmoid")(x)

    return Model(inputs, x)

# Evaluation and visualization
def mask_parse(mask):
    mask = np.squeeze(mask)  # Remove channel dimension
    return mask

if __name__ == "__main__":
    # Paths to the dataset
    original_folder = '/content/drive/MyDrive/dl_final_project_files/CVC-ClinicDB/Original_jpg'
    ground_truth_folder = '/content/drive/MyDrive/dl_final_project_files/CVC-ClinicDB/GroundTruth_jpg'

    # Load dataset
    (train_x, train_y), (valid_x, valid_y), (test_x, test_y) = load_data(original_folder, ground_truth_folder)

    # Hyperparameters
    batch_size = 8
    epochs = 50
    lr = 1e-4

    # Prepare datasets
    train_dataset = tf_dataset(train_x, train_y, batch=batch_size)
    valid_dataset = tf_dataset(valid_x, valid_y, batch=batch_size)
    test_dataset = tf_dataset(test_x, test_y, batch=batch_size)

    # Model setup
    model = build_model()
    model.compile(
        loss="binary_crossentropy",
        optimizer=tf.keras.optimizers.Adam(learning_rate=lr),
        metrics=["accuracy", tf.keras.metrics.Recall(), tf.keras.metrics.Precision()]
    )

    # Callbacks
    callbacks = [
        ModelCheckpoint("model.keras", save_best_only=True),
        ReduceLROnPlateau(monitor="val_loss", factor=0.1, patience=5),
        EarlyStopping(monitor="val_loss", patience=10, restore_best_weights=True)
    ]

    # Train model
    train_steps = len(train_x) // batch_size
    valid_steps = len(valid_x) // batch_size
    if len(train_x) % batch_size != 0:
        train_steps += 1
    if len(valid_x) % batch_size != 0:
        valid_steps += 1

    model.fit(
        train_dataset,
        validation_data=valid_dataset,
        epochs=epochs,
        steps_per_epoch=train_steps,
        validation_steps=valid_steps,
        callbacks=callbacks
    )

    # Evaluate and visualize results
    results_dir = "results/"
    os.makedirs(results_dir, exist_ok=True)

    for i, (x_path, y_path) in tqdm(enumerate(zip(test_x, test_y)), total=len(test_x)):
        original_image = read_image(x_path)
        ground_truth = read_mask(y_path)
        predicted_mask = model.predict(np.expand_dims(original_image, axis=0))[0]
        predicted_mask = (predicted_mask > 0.5).astype(np.float32)

        original_image = np.squeeze(original_image)
        ground_truth = mask_parse(ground_truth)
        predicted_mask = mask_parse(predicted_mask)

        plt.figure(figsize=(12, 4))
        plt.subplot(1, 3, 1)
        plt.title("Original Image")
        plt.imshow(original_image, cmap="gray")
        plt.axis("off")

        plt.subplot(1, 3, 2)
        plt.title("Ground Truth Mask")
        plt.imshow(ground_truth, cmap="gray")
        plt.axis("off")

        plt.subplot(1, 3, 3)
        plt.title("Predicted Mask")
        plt.imshow(predicted_mask, cmap="gray")
        plt.axis("off")

        plt.tight_layout()
        plt.savefig(f"{results_dir}/{i}.png")
        plt.close()