In [None]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

In [None]:
import os
import shutil
import numpy as np
import pickle
import glob
import tensorflow as tf
import matplotlib.image as mpimg
from PIL import Image
import matplotlib.pyplot as plt
import math
import cv2
import torch
from tensorflow import keras
from keras.optimizers import Adam, SGD
from keras.callbacks import ModelCheckpoint
from keras.models import Sequential, Model, load_model
from keras.layers import Dense, Input, Dropout, Activation, Flatten, BatchNormalization, ReLU, LeakyReLU, concatenate
from keras.layers import Conv2D, MaxPooling2D, UpSampling2D, AveragePooling2D, GlobalAveragePooling2D, Add
from keras.metrics import MeanIoU

from os.path import join, isdir
from os import listdir, rmdir
from shutil import move, rmtree, make_archive

In [None]:
use_cuda = torch.cuda.is_available()
use_cuda

In [None]:
MAIN_DIR = "path to labelled images"
GT_DIR = MAIN_DIR + "masks/"
IMG_DIR =  MAIN_DIR + "images/"

In [None]:
gt_train_paths = [GT_DIR+'training masks/' + path for path in listdir(GT_DIR+'training masks/')]
gt_test_paths = [GT_DIR+'test masks/' + path for path in listdir(GT_DIR+'test masks/')]

im_train_paths = [IMG_DIR+'training images/' + path for path in listdir(IMG_DIR+'training images/')]
im_test_paths = [IMG_DIR+'test images/' + path for path in listdir(IMG_DIR+'test images')]

In [None]:
IMG_SIZE1= 256
IMG_SIZE2= 256
def load_and_preprocess_image(path):
    img = Image.open(path)
    imarray = np.array(img, dtype = 'float')
    if imarray.shape[0] != IMG_SIZE1 or imarray.shape[1] != IMG_SIZE2:
        nimg = img.resize((IMG_SIZE1,IMG_SIZE2))
        nimarray = np.array(nimg, dtype = 'float')
        nimarray/=255.0
        return tf.convert_to_tensor(nimarray)
    imarray/=255.0
    return tf.convert_to_tensor(imarray)

def load_and_preprocess_mask(path):
    img = tf.io.read_file(path)
    img = tf.image.decode_jpeg(img, channels=3)
#     img = tf.image.resize(img, [IMG_SIZE1, IMG_SIZE2],method='nearest')
    return img

def load_and_preprocess_segment(path):
    seg = np.array(load_and_preprocess_mask(path))
    mask = np.zeros((IMG_SIZE1, IMG_SIZE2, len(colors)), dtype=np.uint8)
    for i, color in enumerate(colors):
        cmap = np.all(np.equal(seg, color), axis=-1)
        mask[:, :, i] = cmap * 1
    return tf.convert_to_tensor(mask)

def get_image_paths(dir):
    return sorted([dir + path for path in listdir(dir)])

In [None]:
#if you want to define colours for the labels
colors = [[0, 0, 128],
 [0, 0, 255],
 [0, 128, 0],
 [0, 128, 128],
 [0, 192, 255],
 [0, 255, 0],
 [0, 255, 255],
 [128, 0, 0],
 [128, 0, 128],
 [128, 128, 0],
 [164, 160, 160],
 [192, 0, 255],
 [233, 233, 255],
 [240, 202, 166],
 [255, 0, 0],
 [255, 87, 90],
 [255, 255, 0]]

In [None]:
gt_train_paths = get_image_paths(GT_DIR+'training masks')
gt_train_ds = list(map(load_and_preprocess_segment,gt_train_paths))
gt_train_ds = tf.data.Dataset.from_tensor_slices(gt_train_ds)

In [None]:
gt_test_paths = get_image_paths(GT_DIR+'test masks')
gt_test_ds = list(map(load_and_preprocess_segment,gt_test_paths))
gt_test_ds = tf.data.Dataset.from_tensor_slices(gt_test_ds)

In [None]:
im_train_ds = tf.data.Dataset.from_tensor_slices([load_and_preprocess_image(i) for i in get_image_paths(IMG_DIR+'training images')])

In [None]:
im_test_ds = tf.data.Dataset.from_tensor_slices([load_and_preprocess_image(i) for i in get_image_paths(IMG_DIR+'test images')])

In [None]:
BATCH_SIZE = 8
AUTOTUNE = tf.data.experimental.AUTOTUNE

In [None]:
train_ds = tf.data.Dataset.zip((im_train_ds, gt_train_ds))
train_ds = train_ds.cache().batch(BATCH_SIZE).prefetch(buffer_size=AUTOTUNE)
print('Training Data:\n# of batches, Input batch shape, Ouput batch shape')
print(len(train_ds), next(iter(train_ds))[0].shape, next(iter(train_ds))[1].shape)

In [None]:
test_ds = tf.data.Dataset.zip((im_test_ds, gt_test_ds))
test_ds = test_ds.cache().batch(BATCH_SIZE).prefetch(buffer_size=AUTOTUNE)

print('Validation Data:\n# of batches, Input batch shape, Ouput batch shape')
print(len(test_ds), next(iter(test_ds))[0].shape, next(iter(test_ds))[1].shape)

In [None]:
from tensorflow.keras.losses import categorical_crossentropy
import tensorflow.keras.backend as K
import tensorflow as tf

def dice_coefficient(y_true, y_pred):
    y_true = tf.cast(y_true, tf.float32)  # Convert y_true to float32
    intersection = K.sum(y_true * y_pred, axis=[1, 2, 3])
    union = K.sum(y_true, axis=[1, 2, 3]) + K.sum(y_pred, axis=[1, 2, 3])
    return K.mean((2. * intersection + 1.) / (union + 1.))

def dice_loss(y_true, y_pred):
    return 1 - dice_coefficient(y_true, y_pred)

def combined_loss(y_true, y_pred):
    # Categorical Crossentropy Loss
    cat_cross_loss = categorical_crossentropy(y_true, y_pred)

    # Dice Loss
    dice_loss_value = dice_loss(y_true, y_pred)

    # Combine the losses (you can adjust the weights as needed)
    alpha = 0.5  # Weight for Categorical Crossentropy Loss
    beta = 0.5   # Weight for Dice Loss
    combined = alpha * cat_cross_loss + beta * dice_loss_value

    return combined

In [None]:
from tensorflow.keras import Input, Model
from tensorflow.keras.layers import Conv2D, MaxPooling2D, UpSampling2D, concatenate
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.optimizers import Adam

def new_model_with_resnet50_encoder():
    # Load pre-trained ResNet50 model (without top layers)
    resnet50_encoder = ResNet50(weights='imagenet', include_top=False, input_shape=(IMG_SIZE1, IMG_SIZE2, 3))

    # Freeze the layers in the ResNet50 encoder
    for layer in resnet50_encoder.layers:
        layer.trainable = False

    # Get the output from the ResNet50 encoder
    resnet50_output = resnet50_encoder.output

    x6 = UpSampling2D(size=(2, 2))(resnet50_output)
    x6 = concatenate([resnet50_encoder.get_layer("conv4_block6_out").output, x6])
    x6 = Conv2D(1024, 3, activation="relu", padding="same")(x6)
    x6 = Conv2D(512, 3, activation="relu", padding="same")(x6)

    x7 = UpSampling2D(size=(2, 2))(x6)
    x7 = concatenate([resnet50_encoder.get_layer("conv3_block4_out").output, x7])
    x7 = Conv2D(512, 3, activation="relu", padding="same")(x7)
    x7 = Conv2D(128, 3, activation="relu", padding="same")(x7)

    x8 = UpSampling2D(size=(2, 2))(x7)
    x8 = concatenate([resnet50_encoder.get_layer("conv2_block3_out").output, x8])
    x8 = Conv2D(256, 3, activation="relu", padding="same")(x8)
    x8 = Conv2D(128, 3, activation="relu", padding="same")(x8)

    x9 = UpSampling2D(size=(2, 2))(x8)
    x9 = concatenate([resnet50_encoder.get_layer("conv1_relu").output, x9])
    x9 = Conv2D(128, 3, activation="relu", padding="same")(x9)
    x9 = Conv2D(64, 3, activation="relu", padding="same")(x9)
    x9 = Conv2D(32, 3, activation="relu", padding="same")(x9)

    x9 = UpSampling2D(size=(2, 2))(x9)
    x9 = Conv2D(128, 3, activation="relu", padding="same")(x9)
    x9 = Conv2D(64, 3, activation="relu", padding="same")(x9)
    x9 = Conv2D(17, 3, activation="softmax", padding="same")(x9)

    # Create the new model
    model = Model(inputs=resnet50_encoder.input, outputs=x9)

    # Compile the model
    opt = Adam(learning_rate=0.0001)
    model.compile(optimizer=opt, loss=combined_loss, metrics=[MeanIoU(num_classes=17)])

    return model

# Assuming IMG_SIZE1 and IMG_SIZE2 are defined
IMG_SIZE1, IMG_SIZE2 = 256, 256

my_new_model_with_resnet50 = new_model_with_resnet50_encoder()
print("U-Net Model with ResNet50 Encoder Summary:\n")
my_new_model_with_resnet50.summary()


In [None]:
my_new_model_with_resnet50.fit(train_ds, epochs=100, validation_data=test_ds, verbose=1)

In [None]:
my_new_model.save(MAIN_DIR+'model_name')

loss, acc = my_new_model.evaluate(test_ds, verbose=2)
print("Retrained model, accuracy: {:5.2f}%".format(100 * acc))

In [None]:
gt_unl_paths = get_image_paths(GT_DIR+'predicted')
gt_unl_ds = list(map(load_and_preprocess_segment,gt_unl_paths))
gt_unl_ds = tf.data.Dataset.from_tensor_slices(gt_unl_ds)

In [None]:
im_unl_ds = tf.data.Dataset.from_tensor_slices([load_and_preprocess_image(i) for i in get_image_paths(IMG_DIR+'unlabelled images')])

In [None]:
unl_ds = tf.data.Dataset.zip((im_unl_ds, gt_unl_ds))
unl_ds = unl_ds.cache().batch(BATCH_SIZE).prefetch(buffer_size=AUTOTUNE)

print('Training Data:\n# of batches, Input batch shape, Output batch shape')
print(len(unl_ds), next(iter(unl_ds))[0].shape, next(iter(unl_ds))[1].shape)

In [None]:
model = tf.keras.models.load_model(MAIN_DIR+"model_name")

In [None]:
directory = "path for predicted masks"

In [None]:
os.chdir(directory)

In [None]:
a = [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]
def pre_process(index_img):
    mask = np.zeros((IMG_SIZE1, IMG_SIZE2, len(colors)), dtype=np.uint8)
    for i, color in enumerate(a):
        cmap = index_img == color
        mask[:, :, i] = cmap * 1
    return mask

In [None]:
def lrgb(img):
    nimg = np.zeros((img.shape[0], img.shape[1], 3))
    for i in range(img.shape[2]):
        c = img[:,:,i]
        col = colors[i]
        for j in range(3):
            nimg[:,:,j]+=col[j]*c
    nimg = nimg/255.0
    return nimg

In [None]:
def saveMasks(test_ds):
    k = 0
    paths = get_image_paths(GT_DIR+'sep/')
    for imgs, segs in test_ds:
        p = model.predict(imgs)
        for i in range(p.shape[0]):
            arg = np.argmax(p[i],axis = -1)
            _s = pre_process(arg)
            _p = lrgb(_s)
#             _p = lrgb(p[i])
            filename = paths[k].split("/")[-1]
            k+=1

#             _p = cv2.convertScaleAbs(_p, alpha=(255.0))
# #             cv.imwrite(path, img)
#             cv2.imwrite(filename,_p)
            plt.imsave(filename,_p)

In [None]:
p = model.predict(unl_ds)

In [None]:
arg = np.argmax(p[5],axis = -1)
_s = pre_process(arg)
_p = lrgb(_s)
plt.imshow(_p)

In [None]:
saveMasks(unl_ds)

In [None]:
model = keras.models.load_model('model_name')

In [None]:
pred = model.predict(test_ds)
pred = np.argmax(pred,axis=-1)

In [None]:
def one_hot_encoded_mask(path):
    seg = np.array(load_and_preprocess_mask(path))
    mask = np.zeros((IMG_SIZE1, IMG_SIZE2, 17), dtype=np.uint8)
    for i, color in enumerate(colors):
        cmap = np.all(np.equal(seg, color), axis=-1)
        mask[:, :, i] = cmap * 1
    return mask

truth_val_masks = np.array([one_hot_encoded_mask(i) for i in gt_test_paths])
truth_val_masks = np.argmax(truth_val_masks,axis=-1)

In [None]:
def get_fn(target_img,pred_img,i):
    tar = target_img == i
    pre = pred_img==i
    ans = (tar|pre) & ~pre
    return sum(sum(ans))
def get_fp(target_img,pred_img,i):
    tar = target_img == i
    pre = pred_img==i
    ans = (tar|pre) & ~tar
    return sum(sum(ans))
def get_tp(target_img,pred_img,i):
    tar = target_img == i
    pre = pred_img==i
    ans = tar&pre
    return sum(sum(ans))

def get_detailed_iou(target_img,pred_img,NUM_CLASS = 17):
    iou_classes = np.zeros(17)
    for i in range(17):
        if(i in np.unique(target_img) and i in np.unique(pred_img)):
            den = get_tp(target_img,pred_img,i)+get_fp(target_img,pred_img,i)+get_fn(target_img,pred_img,i)
            if(den != 0):
                iou_classes[i] = (get_tp(target_img,pred_img,i)/(get_tp(target_img,pred_img,i)+get_fp(target_img,pred_img,i)+get_fn(target_img,pred_img,i)))
    return iou_classes

def present_iou(iou_classes):
    for i in range(len(iou_classes)):
       print(colors[i], " : ",iou_classes[i])


def get_Mean_IOU_of_each_class():
    classes_iou = np.zeros(17)
    num_images_to_each_class = np.zeros(17)
    for i in range(105):
        ious = get_detailed_iou(truth_val_masks[i], pred[i])
        for j in range(17):
            classes_iou[j] += ious[j]
        for j in np.unique(pred[i]):
            num_images_to_each_class[j] += 1
    for j in range(len(classes_iou)):
        if(num_images_to_each_class[j] != 0):
            classes_iou[j] /= num_images_to_each_class[j]

    return classes_iou

def show_top_IoUs(n:int):
    classes_ious = get_Mean_IOU_of_each_class()
    print(sorted(classes_ious))

In [None]:
present_iou(get_Mean_IOU_of_each_class())

In [None]:
show_top_IoUs(8)

#### Visualization

In [None]:
model = tf.keras.models.load_model(MAIN_DIR+"model_name")

In [None]:
def LayersToRGBImage(img):
    nimg = np.zeros((img.shape[0], img.shape[1], 3))
    for i in range(img.shape[2]):
        c = img[:,:,i]
        col = colors[i]

        for j in range(3):
            nimg[:,:,j]+=col[j]*c
    nimg = nimg/255.0
    return nimg

In [None]:
max_show=6
for imgs, segs in unl_ds:
    p = model.predict(imgs)
    for i in range(p.shape[0]):
        if i > max_show:
            break
        _p = LayersToRGBImage(p[i])
        _s = LayersToRGBImage(segs[i])
        predimg = _p
        trueimg = _s

        plt.figure(figsize=(15,5))
        plt.subplot(131)
        plt.title("Actual Image")
        plt.imshow(imgs[i])

        plt.subplot(132)
        plt.title("Masked Image")
        plt.imshow(_s)
        # plt.axis("off")

        plt.subplot(133)
        plt.title("Predicted Image")
        plt.imshow(_p)
        # plt.tight_layout()

        plt.show()
    break

Semi-supervision

In [None]:
gt_train_paths = get_image_paths(GT_DIR+'training original labelled +most confident masks/')
gt_train_ds = list(map(load_and_preprocess_segment,gt_train_paths))
gt_train_ds = tf.data.Dataset.from_tensor_slices(gt_train_ds)

In [None]:
gt_test_paths = get_image_paths(GT_DIR+'testing original labelled +most confident masks/')
gt_test_ds = list(map(load_and_preprocess_segment,gt_test_paths))
gt_test_ds = tf.data.Dataset.from_tensor_slices(gt_test_ds)

In [None]:
im_train_ds = tf.data.Dataset.from_tensor_slices([load_and_preprocess_image(i) for i in get_image_paths(IMG_DIR+'training original labelled +most confident images/')])
im_test_ds = tf.data.Dataset.from_tensor_slices([load_and_preprocess_image(i) for i in get_image_paths(IMG_DIR+'testing original labelled +most confident images/')])

In [None]:
BATCH_SIZE = 8
AUTOTUNE = tf.data.experimental.AUTOTUNE

In [None]:
train_ds = tf.data.Dataset.zip((im_train_ds, gt_train_ds))
train_ds = train_ds.cache().batch(BATCH_SIZE).prefetch(buffer_size=AUTOTUNE)

print('Training Data:\n# of batches, Input batch shape, Ouput batch shape')
print(len(train_ds), next(iter(train_ds))[0].shape, next(iter(train_ds))[1].shape)

In [None]:
test_ds = tf.data.Dataset.zip((im_test_ds, gt_test_ds))
test_ds = test_ds.cache().batch(BATCH_SIZE).prefetch(buffer_size=AUTOTUNE)

print('Validation Data:\n# of batches, Input batch shape, Ouput batch shape')
print(len(test_ds), next(iter(test_ds))[0].shape, next(iter(test_ds))[1].shape)

In [None]:
my_new_model_with_resnet50.fit(train_ds, epochs=100, validation_data=test_ds, verbose=1)

In [None]:
my_new_model.save(MAIN_DIR+'model_name')

loss, acc = my_new_model.evaluate(test_ds, verbose=2)
print("Retrained model, accuracy: {:5.2f}%".format(100 * acc))