In [None]:
import tensorflow as tf
from tensorflow.keras.callbacks import CSVLogger
import os
import numpy as np
import cv2
import tensorflow as tf
from tensorflow.keras.layers import *
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, concatenate, UpSampling2D
from sklearn.model_selection import train_test_split
from PIL import Image
from tensorflow.keras.utils import load_img
from tensorflow.keras.utils import img_to_array
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.preprocessing.image import img_to_array, load_img
from tensorflow.keras import backend as K
from tensorflow.keras.losses import BinaryCrossentropy
import matplotlib.pyplot as plt
from keras.models import load_model
from keras.preprocessing import image

def parse_image(img_path, image_size):
    image_rgb = cv2.imread(img_path, 1)
    h, w, _ = image_rgb.shape
    if (h == image_size) and (w == image_size):
        pass
    else:
        image_rgb = cv2.resize(image_rgb, (image_size, image_size))
    image_rgb = image_rgb/255.0
    return image_rgb

def parse_mask(mask_path, image_size):
    mask = cv2.imread(mask_path, -1)
    h, w = mask.shape
    if (h == image_size) and (w == image_size):
        pass
    else:
        mask = cv2.resize(mask, (image_size, image_size))
    mask = np.expand_dims(mask, -1)
    mask = mask/255.0

    return mask

def data_gen(image_size, images_path, masks_path, batch_size=8):
    def on_epoch_end():
        pass

    def __len__():
        return int(np.ceil(len(images_path)/float(batch_size)))

    def __getitem__(index):
        if (index + 1) * batch_size > len(images_path):
            batch_size_dynamic = len(images_path) - index * batch_size

        images_batch = []
        masks_batch = []

        images_path_batch = images_path[index * batch_size: (index + 1) * batch_size]
        masks_path_batch = masks_path[index * batch_size: (index + 1) * batch_size]

        for i in range(len(images_path_batch)):
            image = parse_image(images_path_batch[i], image_size)
            mask = parse_mask(masks_path_batch[i], image_size)

            images_batch.append(image)
            masks_batch.append(mask)

        return np.array(images_batch), np.array(masks_batch)

    return {
        'on_epoch_end': on_epoch_end,
        '__len__': __len__,
        '__getitem__': __getitem__
    }


"""
ResUNet++ architecture in Keras TensorFlow
"""

def global_context_block(input_tensor):
    # Global average pooling to obtain global context information
    global_context = GlobalAveragePooling2D()(input_tensor)
    global_context = Reshape((1, 1, input_tensor.shape[-1]))(global_context)  # Reshape to match input tensor shape

    # Add a 1x1 convolution to capture channel-wise dependencies
    g_conv = Conv2D(filters=input_tensor.shape[-1], kernel_size=1, activation='relu')(global_context)  # Keep the same number of channels

    # Spatial upsampling to match input tensor shape
    g_conv = UpSampling2D(size=(input_tensor.shape[1], input_tensor.shape[2]))(g_conv)

    # Multiply global context with input feature map to compute attention
    scale = Multiply()([input_tensor, g_conv])

    # Add the scaled feature map to the input tensor
    scaled_features = Add()([input_tensor, scale])

    return scaled_features

def squeeze_excite_block(inputs, ratio=8):
    init = inputs
    channel_axis = -1
    filters = init.shape[channel_axis]
    se_shape = (1, 1, filters)

    se = GlobalAveragePooling2D()(init)
    se = Reshape(se_shape)(se)
    se = Dense(filters // ratio, activation='relu', use_bias=False)(se)
    se = Dense(filters, activation='sigmoid', use_bias=False)(se)

    x = Multiply()([init, se])
    return x

def stem_block(x, n_filter, strides):
    x_init = x

    ## Conv 1
    x = Conv2D(n_filter, (3, 3), padding="same", strides=strides)(x)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)
    x = Conv2D(n_filter, (3, 3), padding="same")(x)

    ## Shortcut
    s  = Conv2D(n_filter, (1, 1), padding="same", strides=strides)(x_init)
    s = BatchNormalization()(s)

    ## Add
    x = Add()([x, s])
    x = squeeze_excite_block(x)
    return x


def resnet_block(x, n_filter, strides=1):
    x_init = x

    ## Conv 1
    x = BatchNormalization()(x)
    x = Activation("relu")(x)
    x = Conv2D(n_filter, (3, 3), padding="same", strides=strides)(x)
    ## Conv 2
    x = BatchNormalization()(x)
    x = Activation("relu")(x)
    x = Conv2D(n_filter, (3, 3), padding="same", strides=1)(x)

    ## Shortcut
    s  = Conv2D(n_filter, (1, 1), padding="same", strides=strides)(x_init)
    s = BatchNormalization()(s)

    ## Add
    s = global_context_block(s)
    x = Add()([x, s])
    x = squeeze_excite_block(x)
    return x

def aspp_block(x, num_filters, rate_scale=1):
    x1 = Conv2D(num_filters, (3, 3), dilation_rate=(4 * rate_scale, 4 * rate_scale), padding="same")(x)
    x1 = BatchNormalization()(x1)

    x2 = Conv2D(num_filters, (3, 3), dilation_rate=(8 * rate_scale, 8 * rate_scale), padding="same")(x)
    x2 = BatchNormalization()(x2)

    x3 = Conv2D(num_filters, (3, 3), dilation_rate=(16 * rate_scale, 16 * rate_scale), padding="same")(x)
    x3 = BatchNormalization()(x3)

    x4 = Conv2D(num_filters, (3, 3), padding="same")(x)
    x4 = BatchNormalization()(x4)

    y = Add()([x1, x2, x3, x4])
    y = Conv2D(num_filters, (1, 1), padding="same")(y)
    return y

def attetion_block(g, x):
    """
        g: Output of Parallel Encoder block
        x: Output of Previous Decoder block
    """

    filters = x.shape[-1]

    g_conv = BatchNormalization()(g)
    g_conv = Activation("relu")(g_conv)
    g_conv = Conv2D(filters, (3, 3), padding="same")(g_conv)

    g_pool = MaxPooling2D(pool_size=(2, 2), strides=(2, 2))(g_conv)

    x_conv = BatchNormalization()(x)
    x_conv = Activation("relu")(x_conv)
    x_conv = Conv2D(filters, (3, 3), padding="same")(x_conv)

    gc_sum = Add()([g_pool, x_conv])

    gc_conv = BatchNormalization()(gc_sum)
    gc_conv = Activation("relu")(gc_conv)
    gc_conv = Conv2D(filters, (3, 3), padding="same")(gc_conv)

    gc_mul = Multiply()([gc_conv, x])
    return gc_mul

def decoder_block(inputs, concat_tensor, filters):
    decoder = layers.Conv2DTranspose(filters, (2, 2), strides=(2, 2), padding='same')(inputs)
    decoder = layers.concatenate([concat_tensor, decoder], axis=-1)
    decoder = layers.BatchNormalization()(decoder)
    decoder = layers.Activation('relu')(decoder)

    conv11 = layers.Conv2D(filters, 3, padding = 'same')(decoder)
    bn11 = layers.BatchNormalization(axis=3)(conv11)
    a11 = layers.Activation("relu")(bn11)

    #filters-line-conv=2
    conv21 = layers.Conv2D(filters, 3, padding = 'same')(decoder)
    bn21 = layers.BatchNormalization(axis=3)(conv21)
    a21 = layers.Activation("relu")(bn21)
    conv22 = layers.Conv2D(filters, 3, padding = 'same')(a21)
    bn22 = layers.BatchNormalization(axis=3)(conv22)
    a22 = layers.Activation("relu")(bn22)

    #filters-line-conv=3
    conv31 = layers.Conv2D(filters, 3, padding = 'same')(decoder)
    bn31 = layers.BatchNormalization(axis=3)(conv31)
    a31 = layers.Activation("relu")(bn31)
    conv32 = layers.Conv2D(filters, 3, padding = 'same')(a31)
    bn32 = layers.BatchNormalization(axis=3)(conv32)
    a32 = layers.Activation("relu")(bn32)
    conv33 = layers.Conv2D(filters, 3, padding = 'same')(a32)
    bn33 = layers.BatchNormalization(axis=3)(conv33)
    a33 = layers.Activation("relu")(bn33)


    resout = layers.concatenate([a11,a22,a33,decoder])
    resout = layers.Activation("relu")(resout)

    return resout


def resunet_u_net(input_size=192):
    n_filters = [16, 32, 64, 128, 256]
    inputs = Input((input_size, input_size, 3))

    # ResUNet++ Encoder
    c0 = inputs
    c1 = stem_block(c0, n_filters[0], strides=1)
    c2 = resnet_block(c1, n_filters[1], strides=2)
    c3 = resnet_block(c2, n_filters[2], strides=2)
    c4 = resnet_block(c3, n_filters[3], strides=2)

     ## Bridge
    b1 = aspp_block(c4, n_filters[4])

    # U-Net Decoder
    d1 = attetion_block(c3, b1)
    d1 = decoder_block(d1, c3, n_filters[3])

    d2 = attetion_block(c2, d1)
    d2 = decoder_block(d2, c2, n_filters[2])

    d3 = attetion_block(c1, d2)
    d3 = decoder_block(d3, c1, n_filters[1])

    # Output
    outputs = aspp_block(d3, n_filters[0])
    outputs = Conv2D(1, (1, 1), padding="same")(outputs)
    outputs = Activation("sigmoid")(outputs)

    # Model
    model = Model(inputs, outputs)
    return model

smooth = 1.
def dice_coef(y_true, y_pred):
    y_true_f = tf.keras.layers.Flatten()(y_true)
    y_pred_f = tf.keras.layers.Flatten()(y_pred)
    intersection = tf.reduce_sum(y_true_f * y_pred_f)
    return (2. * intersection + smooth) / (tf.reduce_sum(y_true_f) + tf.reduce_sum(y_pred_f) + smooth)


def dice_loss(y_true, y_pred):
    return 1.0 - dice_coef(y_true, y_pred)


In [None]:
data_dir = '/content/drive/MyDrive/codeJournal/dwi1_aug_jpg/'
mask_dir = '/content/drive/MyDrive/codeJournal/mask1_aug_j/'
all_images = os.listdir(data_dir)

to_train = 1 # ratio of number of tr*ain set images to use
total_train_images = all_images[:int(len(all_images)*to_train)]
num_augmented_images_dwi1_aug = len(os.listdir(data_dir))
num_augmented_images_mask2_aug = len(os.listdir(mask_dir))
print(f"Number of augmented images in dwi1_aug: {num_augmented_images_dwi1_aug}")
print(f"Number of augmented images in mask2_aug: {num_augmented_images_mask2_aug}")

WIDTH = 192
HEIGHT = 192
BATCH_SIZE = 8


Number of augmented images in dwi1_aug: 1510
Number of augmented images in mask2_aug: 1510


In [1]:
train_images, validation_images = train_test_split(total_train_images, train_size=0.8, test_size=0.2,random_state = 0)


NameError: ignored

In [2]:
# generator that we will use to read the data from the directory
def data_gen_small(data_dir, mask_dir, images, batch_size, dims):
        """
        data_dir: where the actual images are kept
        mask_dir: where the actual masks are kept
        images: the filenames of the images we want to generate batches from
        batch_size: self explanatory
        dims: the dimensions in which we want to rescale our images, tuple
        """
        while True:
            ix = np.random.choice(np.arange(len(images)), batch_size)
            imgs = []
            labels = []
            for i in ix:
                # images
                original_img = load_img(data_dir + images[i])
                resized_img = original_img.resize(dims)
                array_img = img_to_array(resized_img)/255
                imgs.append(array_img)

                # masks
                original_mask = load_img(mask_dir + images[i].split(".")[0] + '.jpg')
                resized_mask = original_mask.resize(dims)
                array_mask = img_to_array(resized_mask)/255
                labels.append(array_mask[:, :, 0])

            imgs = np.array(imgs)
            labels = np.array(labels)
            yield imgs, labels.reshape(-1, dims[0], dims[1], 1)

# generator that we will use to read the data from the directory with random augmentation

# generator that we will use to read the data from the directory with random augmentation
def data_gen_aug(data_dir, mask_dir, images, batch_size, dims):
        """
        data_dir: where the actual images are kept
        mask_dir: where the actual masks are kept
        images: the filenames of the images we want to generate batches from
        batch_size: self explanatory
        dims: the dimensions in which we want to rescale our images, tuple
        """
        while True:
            ix = np.random.choice(np.arange(len(images)), batch_size)
            imgs = []
            labels = []
            for i in ix:
                # read images and masks
                original_img = load_img(data_dir + images[i])
                original_mask = load_img(mask_dir + images[i].split(".")[0] + '.jpg')

                # transform into ideal sizes
                resized_img = original_img.resize(dims)
                resized_mask = original_mask.resize(dims)


                array_img = img_to_array(resized_img)/255
                array_mask = img_to_array(resized_mask)/255

                imgs.append(array_img)
                labels.append(array_mask[:, :, 0])

            imgs = np.array(imgs)

            labels = np.array(labels)
            yield imgs, labels.reshape(-1, dims[0], dims[1], 1)
'''
                # add random augmentation > here we only flip horizontally
                if np.random.random() < 1:
                  resized_img = resized_img.transpose(Image.FLIP_LEFT_RIGHT)
                  resized_mask = resized_mask.transpose(Image.FLIP_LEFT_RIGHT)

'''

'\n                # add random augmentation > here we only flip horizontally\n                if np.random.random() < 1:\n                  resized_img = resized_img.transpose(Image.FLIP_LEFT_RIGHT)\n                  resized_mask = resized_mask.transpose(Image.FLIP_LEFT_RIGHT)\n\n'

In [None]:
#generator for train and validation data set   data_gen_aug(data_dir, mask_dir, images, batch_size, dims, seed=42)

train_gen = data_gen_aug(data_dir, mask_dir, train_images, BATCH_SIZE, (WIDTH, HEIGHT))
val_gen = data_gen_aug(data_dir, mask_dir, validation_images, BATCH_SIZE, (WIDTH, HEIGHT))


In [None]:
early_stop = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=8,
                                              restore_best_weights=False
                                              )

reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss',
                                   factor=0.2,
                                   patience=3,
                                   verbose=1,
                                   min_delta=1e-3,min_lr = 1e-6
                                   )

adam = tf.keras.optimizers.Adam(learning_rate=0.0001)

In [None]:
arch = resunet_u_net()
arch.summary()
tf.keras.utils.plot_model(arch,to_file='model.png')
model = arch
image_size = 192
lr = 1e-4


Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_2 (InputLayer)        [(None, 192, 192, 3)]        0         []                            
                                                                                                  
 conv2d_53 (Conv2D)          (None, 192, 192, 16)         448       ['input_2[0][0]']             
                                                                                                  
 batch_normalization_49 (Ba  (None, 192, 192, 16)         64        ['conv2d_53[0][0]']           
 tchNormalization)                                                                                
                                                                                                  
 activation_41 (Activation)  (None, 192, 192, 16)         0         ['batch_normalization_49

In [3]:
model.compile(optimizer = adam, loss = BinaryCrossentropy(), metrics=['accuracy',dice_coef])

NameError: ignored

In [None]:
hist=model.fit(train_gen,
                    steps_per_epoch=np.ceil(float(len(train_images)) / float(BATCH_SIZE)),
                    epochs=100,
                    validation_steps=np.ceil(float(len(validation_images)) / float(BATCH_SIZE)),
                    validation_data = val_gen)
model.save('mon_modele.h5')

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100
Epoch 59/100
Epoch 60/100
Epoch 61/100
Epoch 62/100
Epoch 63/100
Epoch 64/100
Epoch 65/100
Epoch 66/100
Epoch 67/100
Epoch 68/100
Epoch 69/100
Epoch 70/100
Epoch 71/100
Epoch 72/100
Epoch 73/100
Epoch 74/100
Epoch 75/100
Epoch 76/100
Epoch 77/100
Epoch 78

  saving_api.save_model(


In [None]:
!ls '/content/drive/MyDrive/codeJournal/dwi1_aug_jpg/sub-strokecase0218__aug_1.jpg'

/content/drive/MyDrive/codeJournal/dwi1_aug_jpg/sub-strokecase0218__aug_1.jpg


In [None]:
train_dice = hist.history['dice_coef']
val_dice = hist.history['val_dice_coef']
#epochs = np.arange(1, 200 + 1)


In [None]:
import matplotlib.pyplot as plt
import numpy as np
plt.plot(epochs, train_dice, label='Training Dice')


NameError: ignored

In [4]:
# Assuming you have already trained your model and saved the history in histt

# Extract the metrics from the history
dice = hist.history['dice_coef']
val_dice = hist.history['val_dice_coef']
acc = hist.history['accuracy']
val_acc = hist.history['val_accuracy']
epochs_range = range(len(dice))  # Use the actual number of epochs

# Create subplots
plt.figure(figsize=(15, 5))  # Adjust the figsize to your preference

# Plot Dice Coefficients
plt.subplot(1, 2, 1)  # Changed from 1, 3, 1 to 1, 2, 1 for two plots side by side
plt.plot(epochs_range, dice, label='Training Dice Coeff.', color='b')
plt.plot(epochs_range, val_dice, label='Validation Dice Coeff.', color='r')
plt.legend(loc='lower right')
plt.title('Training and Validation Dice Coefficient')
plt.xlabel('Epochs')
plt.ylabel('Dice Coefficient')
plt.grid(True)

# Plot Accuracy
plt.subplot(1, 2, 2)  # Changed from 1, 3, 2 to 1, 2, 2 for two plots side by side
plt.plot(epochs_range, acc, label='Training Accuracy', color='g')
plt.plot(epochs_range, val_acc, label='Validation Accuracy', color='m')
plt.legend(loc='lower right')
plt.title('Training and Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.grid(True)

plt.tight_layout()  # Ensures the plots don't overlap
plt.show()


NameError: ignored

In [None]:
# Load the trained model
model = load_model('path/to/your/model.h5')  # Replace with the path to your model file

# Function to preprocess an input image
def preprocess_image(img_path, target_size):
    img = image.load_img(img_path, target_size=target_size)
    img_array = image.img_to_array(img)
    img_array = np.expand_dims(img_array, axis=0)
    img_array /= 255.0  # Normalize pixel values to be between 0 and 1
    return img_array

# Specify the path to the test image you want to use
test_image_path = 'path/to/your/test/image.jpg'  # Replace with the path to your test image

# Preprocess the test image
input_image = preprocess_image(test_image_path, target_size=(224, 224))  # Adjust target_size based on your model's input size

# Make predictions
predictions = model.predict(input_image)

# Decode and print the prediction
# Example assumes a binary classification task; adjust accordingly for your problem
if predictions[0][0] > 0.5:
    print("Class: Positive")
else:
    print("Class: Negative")
