<a href="https://colab.research.google.com/github/atharvadesai1/BE-Project-Codes/blob/main/liver_tumor.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

!unzip "/content/drive/MyDrive/liver_segmentation_dataset.zip" -d "/content/dataset"

Mounted at /content/drive
Archive:  /content/drive/MyDrive/liver_segmentation_dataset.zip
  inflating: /content/dataset/segmentations/segmentation-0.nii  
  inflating: /content/dataset/segmentations/segmentation-1.nii  
  inflating: /content/dataset/segmentations/segmentation-10.nii  
  inflating: /content/dataset/segmentations/segmentation-100.nii  
  inflating: /content/dataset/segmentations/segmentation-101.nii  
  inflating: /content/dataset/segmentations/segmentation-102.nii  
  inflating: /content/dataset/segmentations/segmentation-103.nii  
  inflating: /content/dataset/segmentations/segmentation-104.nii  
  inflating: /content/dataset/segmentations/segmentation-105.nii  
  inflating: /content/dataset/segmentations/segmentation-106.nii  
  inflating: /content/dataset/segmentations/segmentation-107.nii  
  inflating: /content/dataset/segmentations/segmentation-108.nii  
  inflating: /content/dataset/segmentations/segmentation-109.nii  
  inflating: /content/dataset/segmentations/

In [None]:
import os
import numpy as np
import cv2
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, concatenate, BatchNormalization, Activation
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from tensorflow.keras import backend as K
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.model_selection import train_test_split
import glob
import zipfile

In [None]:
def load_data(ct_path, mask_path):
    ct_scans = []
    masks = []

    for ct_file in sorted(glob.glob(ct_path + "/*.nii")):
        mask_file = os.path.join(mask_path, os.path.basename(ct_file).replace("volume", "segmentation"))

        if os.path.exists(mask_file):
            ct_scans.append(ct_file)
            masks.append(mask_file)

    return ct_scans, masks

ct_scans, masks = load_data("/content/dataset/ct_scans", "/content/dataset/masks")

In [None]:
def preprocess_image(image, target_size=(256, 256)):
    image = cv2.normalize(image, None, 0, 255, cv2.NORM_MINMAX)
    image = cv2.resize(image, target_size)
    image = image / 255.0
    return np.expand_dims(image, axis=-1)

def preprocess_mask(mask, target_size=(256, 256)):
    mask = cv2.resize(mask, target_size)
    mask = (mask > 0).astype(np.float32)
    return np.expand_dims(mask, axis=-1)

In [None]:
class DataGenerator(tf.keras.utils.Sequence):
    def __init__(self, ct_paths, mask_paths, batch_size=8, target_size=(256, 256), shuffle=True):
        self.ct_paths = ct_paths
        self.mask_paths = mask_paths
        self.batch_size = batch_size
        self.target_size = target_size
        self.shuffle = shuffle
        self.on_epoch_end()

    def __len__(self):
        return int(np.floor(len(self.ct_paths) / self.batch_size))

    def __getitem__(self, index):
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]
        ct_batch = [self.ct_paths[k] for k in indexes]
        mask_batch = [self.mask_paths[k] for k in indexes]

        X = np.zeros((self.batch_size, *self.target_size, 1), dtype=np.float32)
        y = np.zeros((self.batch_size, *self.target_size, 1), dtype=np.float32)

        for i, (ct_path, mask_path) in enumerate(zip(ct_batch, mask_batch)):
            ct = nib.load(ct_path).get_fdata()
            mask = nib.load(mask_path).get_fdata()

            X[i,] = preprocess_image(ct)
            y[i,] = preprocess_mask(mask)

        return X, y

    def on_epoch_end(self):
        self.indexes = np.arange(len(self.ct_paths))
        if self.shuffle:
            np.random.shuffle(self.indexes)

In [None]:
def unet_model(input_size=(256, 256, 1)):
    inputs = Input(input_size)

    # Encoder
    conv1 = Conv2D(64, 3, activation='relu', padding='same')(inputs)
    conv1 = Conv2D(64, 3, activation='relu', padding='same')(conv1)
    pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)

    conv2 = Conv2D(128, 3, activation='relu', padding='same')(pool1)
    conv2 = Conv2D(128, 3, activation='relu', padding='same')(conv2)
    pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)

    conv3 = Conv2D(256, 3, activation='relu', padding='same')(pool2)
    conv3 = Conv2D(256, 3, activation='relu', padding='same')(conv3)
    pool3 = MaxPooling2D(pool_size=(2, 2))(conv3)

    # Bottleneck
    conv4 = Conv2D(512, 3, activation='relu', padding='same')(pool3)
    conv4 = Conv2D(512, 3, activation='relu', padding='same')(conv4)

    # Decoder
    up5 = UpSampling2D(size=(2, 2))(conv4)
    merge5 = concatenate([conv3, up5], axis=3)
    conv5 = Conv2D(256, 3, activation='relu', padding='same')(merge5)
    conv5 = Conv2D(256, 3, activation='relu', padding='same')(conv5)

    up6 = UpSampling2D(size=(2, 2))(conv5)
    merge6 = concatenate([conv2, up6], axis=3)
    conv6 = Conv2D(128, 3, activation='relu', padding='same')(merge6)
    conv6 = Conv2D(128, 3, activation='relu', padding='same')(conv6)

    up7 = UpSampling2D(size=(2, 2))(conv6)
    merge7 = concatenate([conv1, up7], axis=3)
    conv7 = Conv2D(64, 3, activation='relu', padding='same')(merge7)
    conv7 = Conv2D(64, 3, activation='relu', padding='same')(conv7)

    outputs = Conv2D(1, 1, activation='sigmoid')(conv7)

    model = Model(inputs=inputs, outputs=outputs)
    return model

In [None]:
model = unet_model()
model.compile(optimizer=Adam(learning_rate=1e-4), loss='binary_crossentropy', metrics=['accuracy'])

train_ct, val_ct, train_mask, val_mask = train_test_split(ct_scans, masks, test_size=0.2)

train_gen = DataGenerator(train_ct, train_mask)
val_gen = DataGenerator(val_ct, val_mask)

checkpoint = ModelCheckpoint('liver_tumor_segmentation.h5', monitor='val_accuracy', save_best_only=True)
early_stop = EarlyStopping(patience=10, restore_best_weights=True)

history = model.fit(train_gen, validation_data=val_gen, epochs=50, callbacks=[checkpoint, early_stop])

In [None]:
def grad_cam(model, img, layer_name="conv5_block3_out"):
    grad_model = Model(inputs=model.inputs, outputs=[model.get_layer(layer_name).output, model.output])

    with tf.GradientTape() as tape:
        conv_output, pred = grad_model(np.expand_dims(img, axis=0))
        pred_class = tf.argmax(pred[0])
        loss = pred[:, pred_class]

    grads = tape.gradient(loss, conv_output)
    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))

    conv_output = conv_output[0]
    heatmap = conv_output @ pooled_grads[..., tf.newaxis]
    heatmap = tf.squeeze(heatmap)
    heatmap = tf.maximum(heatmap, 0) / tf.math.reduce_max(heatmap)

    return heatmap.numpy()

# Example Grad-CAM visualization
sample_img = preprocess_image(nib.load(val_ct[0]).get_fdata())
heatmap = grad_cam(model, sample_img)

plt.imshow(sample_img[:, :, 0], cmap='gray')
plt.imshow(cv2.resize(heatmap, (256, 256)), alpha=0.5, cmap='jet')
plt.title("Grad-CAM Explanation")
plt.show()