In [1]:
import os
import numpy as np
import cv2
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Conv2D, BatchNormalization, Activation, MaxPool2D, Conv2DTranspose, Concatenate, Input

In [2]:
# Define
def conv_block(inputs, num_filters):
  x = Conv2D(num_filters, 3, padding ="same")(inputs)
  x = BatchNormalization()(x)
  x = Activation("relu")(x)

  x = Conv2D(num_filters, 3, padding ="same")(x)
  x = BatchNormalization()(x)
  x = Activation("relu")(x)

  return x

In [3]:
def encoder_block(inputs, num_filters):
  x = conv_block(inputs, num_filters)
  p = MaxPool2D((2,2))(x)
  return x, p

In [4]:
def decoder_block(inputs, skip_features, num_filters):
  # print(inputs.shape)
  x = Conv2DTranspose(num_filters, 2, strides = 2, padding = "same")(inputs)
  x = Concatenate()([x, skip_features]) # changing (32,32,512) to (32,32,1024)
  x = conv_block(x, num_filters)
  # print(x.shape)
  return x

In [5]:
def build_unet(input_shape):
  inputs = Input(input_shape)
  s1, p1 = encoder_block(inputs, 64) #number of filter begins with 64 in U-Net
  s2, p2 = encoder_block(p1, 128)
  s3, p3 = encoder_block(p2, 256)
  s4, p4 = encoder_block(p3, 512)
  print(s1.shape, s2.shape, s3.shape, s4.shape)
  # print(p1.shape, p2.shape, p3.shape, p4.shape)

  b1 =conv_block(p4, 1024)
  # print(b1.shape)
  d1 = decoder_block(b1,s4, 512)
  d2 = decoder_block(d1,s3, 256)
  d3 = decoder_block(d2,s2, 128)
  d4 = decoder_block(d3,s1, 64)
  # print(d4.shape)
  outputs =Conv2D(1, 1, padding ="same", activation ="sigmoid")(d4)
  # print(outputs.shape)
  model = Model(inputs, outputs, name = "UNET")
  return model

In [6]:
input_shape = (256,256,3)
model= build_unet(input_shape)
model.summary()

(None, 256, 256, 64) (None, 128, 128, 128) (None, 64, 64, 256) (None, 32, 32, 512)
Model: "UNET"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 256, 256, 3)]        0         []                            
                                                                                                  
 conv2d (Conv2D)             (None, 256, 256, 64)         1792      ['input_1[0][0]']             
                                                                                                  
 batch_normalization (Batch  (None, 256, 256, 64)         256       ['conv2d[0][0]']              
 Normalization)                                                                                   
                                                                                                  
 activation 

**Training Data**


In [7]:
import os
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"

import numpy as np
import cv2
from glob import glob
from sklearn.utils import shuffle
import tensorflow as tf
from tensorflow.keras.callbacks import ModelCheckpoint, CSVLogger, ReduceLROnPlateau, EarlyStopping, TensorBoard
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split

In [8]:
# Define metrics
import numpy as np
import tensorflow as tf
from tensorflow.keras import backend as K

smooth = 1e-15
def dice_coef(y_true, y_pred):
    y_true = tf.keras.layers.Flatten()(y_true)
    y_pred = tf.keras.layers.Flatten()(y_pred)
    intersection = tf.reduce_sum(y_true * y_pred)
    return (2. * intersection + smooth) / (tf.reduce_sum(y_true) + tf.reduce_sum(y_pred) + smooth)

def dice_loss(y_true, y_pred):
    return 1.0 - dice_coef(y_true, y_pred)

In [9]:
""" Global parameters """
H = 256
W = 256

In [None]:
import os
from google.colab import drive
drive.mount('/content/drive')
os.chdir('/content/drive/My Drive/Spring 2024/CS584/U-net_data/')
print("Current working directory:", os.getcwd())

In [None]:
from glob import glob
path = '/content/drive/My Drive/Spring 2024/CS584/U-net_data/'
def create_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)
    return path
def load_dataset(path, split = 0.2):
  images = sorted(glob(os.path.join(path, "images", "*.png")))
  print(images)
  print(len(images))
  masks = sorted(glob(os.path.join(path, "masks", "*.png")))
  print(len(masks))
  # Ensure there are samples to split
  split_size = int(len(images) * split)

  train_x, valid_x = train_test_split(images, test_size=split_size, random_state=42)
  train_y, valid_y = train_test_split(masks, test_size=split_size, random_state=42)

  train_x, test_x = train_test_split(train_x, test_size=split_size, random_state=42)
  train_y, test_y = train_test_split(train_y, test_size=split_size, random_state=42)

  return (train_x, train_y), (valid_x, valid_y), (test_x, test_y)

(train_x, train_y), (valid_x, valid_y), (test_x, test_y) = load_dataset(path)

In [None]:
print(f"Train: {len(train_x)} - {len(train_y)}")
print(f"Valid: {len(valid_x)} - {len(valid_y)}")
print(f"Test: {len(test_x)} -  {len(test_y)}")

In [None]:
create_dir(path)

In [None]:
def read_image(path):
    path = path.decode()
    x = cv2.imread(path, cv2.IMREAD_COLOR)
    x = cv2.resize(x, (W, H))
    x = x / 255.0
    x = x.astype(np.float32)
    return x

In [None]:
def read_mask(path):
    path = path.decode()
    x = cv2.imread(path, cv2.IMREAD_GRAYSCALE)  ## (h, w)
    x = cv2.resize(x, (W, H))   ## (h, w)
    x = x / 255.0               ## (h, w)
    x = x.astype(np.float32)    ## (h, w)
    x = np.expand_dims(x, axis=-1)## (h, w, 1)
    return x

In [None]:
def tf_parse(x, y):
    def _parse(x, y):
        x = read_image(x)
        y = read_mask(y)
        return x, y

    x, y = tf.numpy_function(_parse, [x, y], [tf.float32, tf.float32])
    x.set_shape([H, W, 3])
    y.set_shape([H, W, 1])
    return x, y

In [None]:
def tf_dataset(X, Y, batch=2):
    dataset = tf.data.Dataset.from_tensor_slices((X, Y))
    dataset = dataset.map(tf_parse)
    dataset = dataset.batch(batch)
    dataset = dataset.prefetch(10)
    return dataset

In [None]:
batch_size = 16
lr = 1e-4
num_epochs = 500
model_path = os.path.join("files", "model.h5")
csv_path = os.path.join("files", "log.csv")

In [None]:
train_dataset = tf_dataset(train_x, train_y, batch=batch_size)
valid_dataset = tf_dataset(valid_x, valid_y, batch=batch_size)

In [None]:
# for x,y in train_dataset:
#   print(x.shape, y.shape)

In [None]:
""" Model """
model = build_unet((H, W, 3))
model.compile(loss=dice_loss, optimizer=Adam(lr), metrics=[dice_coef])

In [None]:
callbacks = [
        ModelCheckpoint(model_path, verbose=1, save_best_only=True),
        ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5, min_lr=1e-7, verbose=1),
        CSVLogger(csv_path),
        EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=False),
    ]
model.fit(
        train_dataset,
        epochs=num_epochs,
        validation_data=valid_dataset,
        callbacks=callbacks
    )

**Testing Data**

In [None]:
import pandas as pd
from glob import glob
from tqdm import tqdm
import tensorflow as tf
from tensorflow.keras.utils import CustomObjectScope
from sklearn.metrics import f1_score, jaccard_score, precision_score, recall_score
from sklearn.model_selection import train_test_split


In [None]:
""" Global parameters """
H = 256
W = 256

""" Creating a directory """
def create_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)

In [None]:
def save_results(image, mask, y_pred, save_image_path):
    mask = np.expand_dims(mask, axis=-1)
    mask = np.concatenate([mask, mask, mask], axis=-1)

    y_pred = np.expand_dims(y_pred, axis=-1)
    y_pred = np.concatenate([y_pred, y_pred, y_pred], axis=-1)
    y_pred = y_pred * 255

    line = np.ones((H, 10, 3)) * 255

    cat_images = np.concatenate([image, line, mask, line, y_pred], axis=1)
    cv2.imwrite(save_image_path, cat_images)


In [None]:
(train_x, train_y), (valid_x, valid_y), (test_x, test_y) = load_dataset(path)

In [None]:
SCORE = []
for x, y in tqdm(zip(test_x, test_y), total=len(test_y)):
  """ Extracting the name """
  name = x.split("/")[-1]

  """ Reading the image """
  image = cv2.imread(x, cv2.IMREAD_COLOR) ## [H, w, 3]
  image = cv2.resize(image, (W, H))       ## [H, w, 3]
  x = image/255.0                         ## [H, w, 3]
  x = np.expand_dims(x, axis=0)           ## [1, H, w, 3]

  """ Reading the mask """
  mask = cv2.imread(y, cv2.IMREAD_GRAYSCALE)
  mask = cv2.resize(mask, (W, H))

  """ Prediction """
  y_pred = model.predict(x, verbose=0)[0]
  y_pred = np.squeeze(y_pred, axis=-1)
  y_pred = y_pred >= 0.5
  y_pred = y_pred.astype(np.int32)

  """ Saving the prediction """
  save_image_path = os.path.join("results", name)
  save_results(image, mask, y_pred, save_image_path)

  """ Flatten the array """
  mask = mask/255.0
  mask = (mask > 0.5).astype(np.int32).flatten()
  y_pred = y_pred.flatten()

  """ Calculating the metrics values """
  f1_value = f1_score(mask, y_pred, labels=[0, 1], average="binary")
  jac_value = jaccard_score(mask, y_pred, labels=[0, 1], average="binary")
  recall_value = recall_score(mask, y_pred, labels=[0, 1], average="binary", zero_division=0)
  precision_value = precision_score(mask, y_pred, labels=[0, 1], average="binary", zero_division=0)
  SCORE.append([name, f1_value, jac_value, recall_value, precision_value])

""" Metrics values """
score = [s[1:]for s in SCORE]
score = np.mean(score, axis=0)
print(f"F1: {score[0]:0.5f}")
print(f"Jaccard: {score[1]:0.5f}")
print(f"Recall: {score[2]:0.5f}")
print(f"Precision: {score[3]:0.5f}")

df = pd.DataFrame(SCORE, columns=["Image", "F1", "Jaccard", "Recall", "Precision"])
df.to_csv("files/score.csv")