<a href="https://colab.research.google.com/github/Eric-Manzi/UNETVariantsModels/blob/main/MSA_Unet_skincancer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import tensorflow as tf
tf.keras.backend.clear_session()

In [None]:
!apt-get install graphviz
!pip install pydotplus
!pip install pydot

In [None]:
from tensorflow.keras import backend as K
from tensorflow.keras.callbacks import TensorBoard
import datetime
from tqdm import tqdm
from sklearn.utils import shuffle
import zipfile
import tensorflow as tf
from tensorflow.keras.utils import Sequence
from tensorflow.keras.layers import Input, Conv2D, Conv2DTranspose, UpSampling2D, MaxPooling2D, concatenate, Activation, BatchNormalization
from tensorflow.keras.layers import ConvLSTM2D, Add, Bidirectional, Lambda
from tensorflow.keras.layers import Reshape
from tensorflow.keras.models import Model
from tensorflow.keras.utils import plot_model
from sklearn.model_selection import train_test_split
import numpy as np
import cv2
import os
from tensorflow.keras.preprocessing.image import ImageDataGenerator


In [None]:
# Unzip the dataset
zip_path = '/content/drive/MyDrive/Datasets/ham1000-segmentation-and-classification.zip'
extract_path = '/content/ham1000_dataset'

with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_path)

print("Dataset extracted to:", extract_path)


Dataset extracted to: /content/ham1000_dataset


In [None]:
# Assuming the images and masks are stored in separate folders named 'images' and 'masks' within the extracted directory
extract_path = '/content/ham1000_dataset'
image_folder = os.path.join(extract_path, 'images')
mask_folder = os.path.join(extract_path, 'masks')

# Define allowed image and mask extensions
image_extensions = ('.jpg', '.jpeg')  # For image files
mask_extensions = ('.png',)  # For mask files

# Get the list of files in the image and mask folders, filtering by extension
image_files = sorted([f for f in os.listdir(image_folder) if f.lower().endswith(image_extensions)])
mask_files = sorted([f for f in os.listdir(mask_folder) if f.lower().endswith(mask_extensions)])

# Print the number of images and masks for debugging
print("Number of image files found:", len(image_files))
print("Number of mask files found:", len(mask_files))

# Print the first few filenames for debugging
print("First 5 image filenames:", image_files[:5])
print("First 5 mask filenames:", mask_files[:5])

# Ensure the number of images and masks match
# Check if the number of images and masks are equal. If not, print detailed information
# and raise the assertion error to halt execution.
if len(image_files) != len(mask_files):
    print(f"Number of image files: {len(image_files)}")
    print(f"Number of mask files: {len(mask_files)}")
    print("Image files:", image_files)  # Print all image filenames
    print("Mask files:", mask_files)  # Print all mask filenames
    raise AssertionError("The number of images and masks must be equal.")
#Original Assertion
#assert len(image_files) == len(mask_files), "The number of images and masks must be equal."


# Shuffle the dataset
image_files, mask_files = shuffle(image_files, mask_files, random_state=42)

# Split the dataset: 2500 for training, 500 for validation, 300 for testing
train_image_files = image_files[:2500]
train_mask_files = mask_files[:2500]

val_image_files = image_files[2500:3000]
val_mask_files = mask_files[2500:3000]

test_image_files = image_files[3000:3300]
test_mask_files = mask_files[3000:3300]

# Now you have:
# 2500 training images/masks
# 500 validation images/masks
# 300 testing images/masks


# Verify if the folders exist
print("Images folder:", image_folder)
print("Masks folder:", mask_folder)
print("Number of images:", len(os.listdir(image_folder)))
print("Number of masks:", len(os.listdir(mask_folder)))


Number of image files found: 10015
Number of mask files found: 10015
First 5 image filenames: ['ISIC_0024306.jpg', 'ISIC_0024307.jpg', 'ISIC_0024308.jpg', 'ISIC_0024309.jpg', 'ISIC_0024310.jpg']
First 5 mask filenames: ['ISIC_0024306_segmentation.png', 'ISIC_0024307_segmentation.png', 'ISIC_0024308_segmentation.png', 'ISIC_0024309_segmentation.png', 'ISIC_0024310_segmentation.png']
Images folder: /content/ham1000_dataset/images
Masks folder: /content/ham1000_dataset/masks
Number of images: 10017
Number of masks: 10015


In [None]:
class DataGenerator(Sequence):
    def __init__(self, image_folder, mask_folder, image_files, mask_files, batch_size, img_size, shuffle=True, augment=True):
        self.image_folder = image_folder
        self.mask_folder = mask_folder
        self.image_files = image_files
        self.mask_files = mask_files
        self.batch_size = batch_size
        self.img_size = img_size
        self.shuffle = shuffle
        self.augment = augment  # Flag to control augmentation
        self.indices = np.arange(len(self.image_files))
        self.on_epoch_end()

        # Define data augmentation parameters
        self.datagen = ImageDataGenerator(
            rotation_range=20,
            width_shift_range=0.2,
            height_shift_range=0.2,
            shear_range=0.2,
            zoom_range=0.2,
            horizontal_flip=True,
            fill_mode='nearest'
        )

    def __len__(self):
        return len(self.image_files) // self.batch_size

    def __getitem__(self, index):
        start_index = index * self.batch_size
        end_index = min((index + 1) * self.batch_size, len(self.image_files))
        batch_indices = self.indices[start_index:end_index]

        images = []
        masks = []

        for i in batch_indices:
            image_path = os.path.join(self.image_folder, self.image_files[i])
            mask_path = os.path.join(self.mask_folder, self.mask_files[i])

            image = cv2.imread(image_path)
            mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)

            image = cv2.resize(image, self.img_size)
            mask = cv2.resize(mask, self.img_size)

            # Apply augmentation if enabled
            if self.augment:
                # Combine image and mask for simultaneous transformation
                seed = np.random.randint(0, 2**32 - 1)  # Ensure consistent transformations
                image = self.datagen.random_transform(image, seed=seed)
                mask = self.datagen.random_transform(mask[..., np.newaxis], seed=seed)[..., 0]  # Augment mask separately

            # Cast data types and normalize
            image = tf.cast(image, tf.float32) / 255.0
            mask = tf.cast(mask, tf.float32) / 255.0

            images.append(image)
            masks.append(mask)

        return np.array(images), np.array(masks)

    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indices)

In [None]:
# Define the batch size and image size
batch_size = 8
img_size = (256, 256)

# Create DataGenerators for each split
# Include image_files and mask_files in the DataGenerator instantiation
train_generator = DataGenerator(
    image_folder=image_folder,
    mask_folder=mask_folder,
    image_files=train_image_files,  # Pass train_image_files here
    mask_files=train_mask_files,  # Pass train_mask_files here
    batch_size=batch_size,
    img_size=img_size,
    shuffle=True
)

val_generator = DataGenerator(
    image_folder=image_folder,
    mask_folder=mask_folder,
    image_files=val_image_files,  # Pass val_image_files here
    mask_files=val_mask_files,  # Pass val_mask_files here
    batch_size=batch_size,
    img_size=img_size,
    shuffle=False
)

test_generator = DataGenerator(
    image_folder=image_folder,
    mask_folder=mask_folder,
    image_files=test_image_files,  # Pass test_image_files here
    mask_files=test_mask_files,  # Pass test_mask_files here
    batch_size=batch_size,
    img_size=img_size,
    shuffle=False
)

In [None]:
def dice_score(y_true, y_pred):
    #Calculates the Dice score.
    y_true = tf.cast(y_true, tf.float32)  # Cast y_true to float32
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum(y_true_f * y_pred_f)
    return (2. * intersection + 1) / (K.sum(y_true_f) + K.sum(y_pred_f) + 1)

def iou_score(y_true, y_pred):
    #Calculates the Intersection over Union (IoU) score.
    y_true = tf.cast(y_true, tf.float32)  # Cast y_true to float32
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum(y_true_f * y_pred_f)
    union = K.sum(y_true_f) + K.sum(y_pred_f) - intersection
    return (intersection + 1) / (union + 1)

In [None]:
def pixel_accuracy(y_true, y_pred):
    # Calculates Pixel Accuracy.
    y_true = tf.cast(y_true, tf.float32)
    y_pred = tf.cast(y_pred, tf.float32)
    correct_pixels = K.sum(K.equal(y_true, y_pred))
    total_pixels = K.sum(K.cast(K.not_equal(y_true, -1), tf.float32))  # Exclude background pixels (if -1 is used)
    return correct_pixels / (total_pixels + K.epsilon())

def precision(y_true, y_pred):
    # Calculates Precision.
    y_true = tf.cast(y_true, tf.float32)
    y_pred = tf.cast(y_pred, tf.float32)
    true_positives = K.sum(y_true * y_pred)
    false_positives = K.sum((1 - y_true) * y_pred)
    return true_positives / (true_positives + false_positives + K.epsilon())

def recall(y_true, y_pred):
    # Calculates Recall.
    y_true = tf.cast(y_true, tf.float32)
    y_pred = tf.cast(y_pred, tf.float32)
    true_positives = K.sum(y_true * y_pred)
    false_negatives = K.sum(y_true * (1 - y_pred))
    return true_positives / (true_positives + false_negatives + K.epsilon())

In [None]:
def attention_block(x, filters):
    # Feature pyramid and recalibration
    x1 = Conv2D(filters, (1, 1), padding='same')(x)
    x1 = BatchNormalization()(x1)
    x1 = Activation('relu')(x1)

    x2 = Conv2D(filters, (3, 3), padding='same')(x)
    x2 = BatchNormalization()(x2)
    x2 = Activation('relu')(x2)

    x3 = Conv2D(filters, (5, 5), padding='same')(x)
    x3 = BatchNormalization()(x3)
    x3 = Activation('relu')(x3)

    # Concatenate feature pyramid
    combined = concatenate([x1, x2, x3], axis=-1)

    # Reshape to add a timestep dimension for ConvLSTM2D
    combined = Reshape((1, combined.shape[1], combined.shape[2], combined.shape[3]))(combined) # Add a timestep dimension of size 1

    # Bidirectional ConvLSTM to fetch common discriminative features
    bd_clstm = Bidirectional(ConvLSTM2D(filters=filters, kernel_size=(3, 3), padding='same', return_sequences=False))(combined)
    bd_clstm = BatchNormalization()(bd_clstm)

    return bd_clstm

In [None]:
def encoder_block(x, filters):
    x = Conv2D(filters, (3, 3), padding='same', activation='relu')(x)
    x = BatchNormalization()(x)
    x = Conv2D(filters, (3, 3), padding='same', activation='relu')(x)
    x = BatchNormalization()(x)
    pooled = MaxPooling2D((2, 2))(x)
    return x, pooled


In [None]:
def decoder_block(x, skip, filters):
    x = Conv2DTranspose(filters, (2, 2), strides=(2, 2), padding='same')(x)
    x = concatenate([x, skip])
    x = Conv2D(filters, (3, 3), padding='same', activation='relu')(x)
    x = BatchNormalization()(x)
    x = Conv2D(filters, (3, 3), padding='same', activation='relu')(x)
    x = BatchNormalization()(x)
    return x


In [None]:
def msau_net(input_shape=(256, 256, 3), num_classes=1):
    inputs = Input(input_shape)

    # Encoder
    skip1, pool1 = encoder_block(inputs, 64)
    skip2, pool2 = encoder_block(pool1, 128)
    skip3, pool3 = encoder_block(pool2, 256)
    skip4, pool4 = encoder_block(pool3, 512)

    # Bottleneck with Attention Mechanism
    bottleneck = attention_block(pool4, 1024)

    # Decoder
    up4 = decoder_block(bottleneck, skip4, 512)
    up3 = decoder_block(up4, skip3, 256)
    up2 = decoder_block(up3, skip2, 128)
    up1 = decoder_block(up2, skip1, 64)

    # Output layer
    outputs = Conv2D(num_classes, (1, 1), activation='sigmoid')(up1)

    model = Model(inputs, outputs)
    return model


In [None]:
model = msau_net()  # Assuming msau_net() is your model definition
model.compile(optimizer='adam',
              loss='binary_crossentropy',
              metrics=['accuracy', dice_score, iou_score, pixel_accuracy, precision, recall])


In [None]:
model.summary()

In [None]:
#plot_model(model, to_file='model_plot.png', show_shapes=True, show_layer_names=True)
plot_model(model, to_file='model_plot.png', show_shapes=False, show_layer_names=False)

In [None]:
#model checkpoints
with tf.device('/GPU:0'):
      checkpointer = tf.keras.callbacks.ModelCheckpoint('model_for_mau.keras', verbose=1, save_best_only=True)

      callbacks=[ tf.keras.callbacks.EarlyStopping(patience=5, monitor='val_loss', restore_best_weights=True),
                tf.keras.callbacks.TensorBoard(log_dir='logs')]

In [None]:
# Train the model
history = model.fit(train_generator, epochs=10, validation_data=val_generator, callbacks=callbacks)




Epoch 1/10


  self._warn_if_super_not_called()


[1m  1/312[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m16:55:53[0m 196s/step - accuracy: 0.5163 - dice_score: 0.2746 - iou_score: 0.1591 - loss: 0.8518

In [None]:
# Evaluate the model on the test set
test_loss, test_accuracy, test_dice, test_iou = model.evaluate(test_generator)
print(f"Test Loss: {test_loss}")
print(f"Test Accuracy: {test_accuracy}")
print(f"Test Dice Score: {test_dice}")
print(f"Test IoU Score: {test_iou}")


In [None]:
# Load the TensorBoard extension
%load_ext tensorboard
# Start TensorBoard and specify the log directory
%tensorboard --logdir logs/fit

!pip install -q pyngrok
from pyngrok import ngrok
# Start TensorBoard as a background process
import datetime

log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
%tensorboard --logdir logs/fit --port=6006 &

# Start ngrok to tunnel the port
public_url = ngrok.connect(port="6006")
print(f"TensorBoard URL: {public_url}")
#The code above will output a URL. Open this URL in your local browser to view the TensorBoard metrics running in Colab.

In [None]:
# Select a single image from your dataset for demonstration
sample_image = train_image_files[0]  # Replace with any image in your dataset

# Reshape the image to add a batch dimension (required for ImageDataGenerator)
sample_image = np.expand_dims(sample_image, 0)

# Create a generator to produce augmented versions of this image
augmented_images = datagen.flow(sample_image, batch_size=1)

# Plot a few augmented images
plt.figure(figsize=(10, 10))
for i in range(9):  # Displaying 9 augmented samples
    aug_img = next(augmented_images)[0].astype('uint8')  # Get the next augmented image
    plt.subplot(3, 3, i + 1)
    plt.imshow(aug_img)
    plt.axis('off')
plt.suptitle('Augmented Images')
plt.show()


ValueError: could not convert string to float: 'ISIC_0025923.jpg'