In [None]:
import os
import numpy as np
import cv2
from glob import glob
from sklearn.utils import shuffle
import tensorflow as tf
from loss import SSIM, CE, MSE, MAE, TV, PSNR, L1, L2, dice_coef_loss, binary_iou
from psf import PSFLayer, ZernPSF
import pix2pix

import os
import numpy as np
import cv2
from glob import glob
from tqdm import tqdm
from sklearn.model_selection import train_test_split

## Load Segmentation Dataset

In [None]:
img_h = 128
img_w = 128


def shuffling(x, y):
    x, y = shuffle(x, y, random_state=42)
    return x, y

def load_data(path):
    x = sorted(glob(os.path.join(path, "image", "*png")))
    y = sorted(glob(os.path.join(path, "mask", "*png")))
    return x, y

def read_image(path):
    path = path.decode()
    x = cv2.imread(path, cv2.IMREAD_COLOR)
    x = x/255.0
    x = x.astype(np.float32)
    return x

def read_mask(path):
    path = path.decode()
    x = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
    x = x.astype(np.float32)
    x = np.expand_dims(x, axis=-1)
    
    return x

def tf_parse(x, y):
    def _parse(x, y):
        x = read_image(x)
        y = read_mask(y)
        return x, y

    x, y = tf.numpy_function(_parse, [x, y], [tf.float32, tf.float32])
    x.set_shape([img_h, img_w, 3])
    y.set_shape([img_h, img_w, 1])
    return x, y

def tf_dataset(X, Y, batch=2):
    dataset = tf.data.Dataset.from_tensor_slices((X, Y))
    dataset = dataset.map(tf_parse)
    dataset = dataset.batch(batch)
    dataset = dataset.prefetch(10)
    return dataset

In [None]:
def save_results(image, mask, y_pred, save_image_path):
    line = np.ones((128, 10, 3)) * 128

    mask = np.expand_dims(mask, axis=-1)  
    mask = np.concatenate([mask, mask, mask], axis=-1) 
    mask = mask * 255

    y_pred = np.expand_dims(y_pred, axis=-1)
    y_pred = np.concatenate([y_pred, y_pred, y_pred], axis=-1)

    masked_image = image * y_pred
    y_pred = y_pred * 255

    cat_images = np.concatenate([image, line, mask, line, y_pred, line, masked_image], axis=1)
    cv2.imwrite(save_image_path, cat_images)

def save_results_psf(image, mask, x_psf, y_pred, save_image_path):
    line = np.ones((128, 10, 3)) * 128

    mask = np.expand_dims(mask, axis=-1)  
    mask = np.concatenate([mask, mask, mask], axis=-1) 
    mask = mask * 255

    y_pred = np.expand_dims(y_pred, axis=-1)
    y_pred = np.concatenate([y_pred, y_pred, y_pred], axis=-1)

    masked_image = image * y_pred
    y_pred = y_pred * 255

    cat_images = np.concatenate([image, line, mask, line, (x_psf*255), line, y_pred, line, masked_image], axis=1)
    cv2.imwrite(save_image_path, cat_images)

## Model Training (without Physical Layer)

In [None]:
base_model = tf.keras.applications.MobileNetV2(input_shape=[128, 128, 3], include_top=False)

layer_names = [
    'block_1_expand_relu',
    'block_3_expand_relu',
    'block_6_expand_relu',
    'block_13_expand_relu',
    'block_16_project',   
]
base_model_outputs = [base_model.get_layer(name).output for name in layer_names]

down_stack = tf.keras.Model(inputs=base_model.input, outputs=base_model_outputs)

down_stack.trainable = False


up_stack = [
    pix2pix.upsample(512, 3),
    pix2pix.upsample(256, 3),
    pix2pix.upsample(128, 3),
    pix2pix.upsample(64, 3), 
]

def unet_model(output_channels:int):
  inputs = tf.keras.layers.Input(shape=[128, 128, 3])

  skips = down_stack(inputs)
  x = skips[-1]
  skips = reversed(skips[:-1])

  for up, skip in zip(up_stack, skips):
    x = up(x)
    concat = tf.keras.layers.Concatenate()
    x = concat([x, skip])

  last = tf.keras.layers.Conv2DTranspose(
      filters=output_channels, kernel_size=3, strides=2,
      padding='same', activation='sigmoid')

  x = last(x)

  return tf.keras.Model(inputs=inputs, outputs=x)

In [None]:
np.random.seed(42)
tf.random.set_seed(42)

batch_size = 2
num_epochs = 2

dataset_path = "datasets/segmentation"
train_path = os.path.join(dataset_path, "train")
valid_path = os.path.join(dataset_path, "test")

train_x, train_y = load_data(train_path)
train_x, train_y = shuffling(train_x, train_y)
valid_x, valid_y = load_data(valid_path)

train_dataset = tf_dataset(train_x, train_y, batch=batch_size)
valid_dataset = tf_dataset(valid_x, valid_y, batch=batch_size)

train_dataset = train_dataset.take(len(train_x))
valid_dataset = valid_dataset.take(len(valid_x))

print(f"Train size: {len(train_dataset)*batch_size}")
print(f"Validation size: {len(valid_dataset)*batch_size}")

unet = unet_model(1)

MeanIou = tf.keras.metrics.MeanIoU(num_classes=2)

unet.compile(optimizer='adam',
                loss=dice_coef_loss,
                metrics=binary_iou)
unet.fit(train_dataset, epochs=num_epochs, validation_data=valid_dataset, batch_size=batch_size)

In [None]:
unet.save("checkpoints/unet_segmentation.h5")

In [None]:
## Save plots ##

np.random.seed(42)
tf.random.set_seed(42)

save_path = "results/segmentation_results/segmentation_without_psf"
if not os.path.exists(save_path):
    os.makedirs(save_path)

dataset_path = "datasets/segmentation"

valid_path = os.path.join(dataset_path, "test")
test_x, test_y = load_data(valid_path)

print("Test size: ", len(test_x))

for i, (x, y) in enumerate(zip(test_x, test_y)):
    img_name = x.split("/")[-1].split(".")[0]

    image = cv2.imread(x, cv2.IMREAD_COLOR)
    x = image/255.0
    x = np.expand_dims(x, axis=0)

    mask = cv2.imread(y, cv2.IMREAD_GRAYSCALE)

    # x_psf = unet.layers[1](x)
    # x_psf = np.squeeze(x_psf, axis=0)
    y_pred = unet.predict(x)[0]
    y_pred = np.squeeze(y_pred, axis=-1)
    y_pred = y_pred > 0.5
    y_pred = y_pred.astype(np.int32)

    save_image_path = "{}/{}.png".format(save_path, img_name)
    # save_results(image, mask, x_psf, y_pred, save_image_path)
    save_results(image, mask, y_pred, save_image_path)
    if i > 70:
        break


### Adding Zernike PSF Layer to the UNet model

In [None]:
base_model = tf.keras.applications.MobileNetV2(input_shape=[128, 128, 3], include_top=False)

layer_names = [
    'block_1_expand_relu',
    'block_3_expand_relu',
    'block_6_expand_relu',
    'block_13_expand_relu',
    'block_16_project',   
]
base_model_outputs = [base_model.get_layer(name).output for name in layer_names]

down_stack = tf.keras.Model(inputs=base_model.input, outputs=base_model_outputs)

down_stack.trainable = False

epochs = 20

nmodes = 6
rad = 5
modestart = 1
n_channels = 3

psf_layer = ZernPSF(nmodes, rad, modestart, n_channels, False)

up_stack = [
    pix2pix.upsample(512, 3),
    pix2pix.upsample(256, 3),
    pix2pix.upsample(128, 3),
    pix2pix.upsample(64, 3), 
]

def unet_psf_model(output_channels:int):
  inputs = tf.keras.layers.Input(shape=[128, 128, 3])
  x_psf = psf_layer(inputs)

  skips = down_stack(x_psf)
  x = skips[-1]
  skips = reversed(skips[:-1])

  for up, skip in zip(up_stack, skips):
    x = up(x)
    concat = tf.keras.layers.Concatenate()
    x = concat([x, skip])

  last = tf.keras.layers.Conv2DTranspose(
      filters=output_channels, kernel_size=3, strides=2,
      padding='same', activation='sigmoid')

  x = last(x)

  return tf.keras.Model(inputs=inputs, outputs=x)

In [None]:
np.random.seed(42)
tf.random.set_seed(42)

batch_size = 2
num_epochs = 2


unet_psf_mse = unet_psf_model(1)

optimizer = tf.keras.optimizers.legacy.Adam()

@tf.function
def train_step(x, y):
    with tf.GradientTape() as tape:
        output = unet_psf_mse(x, training=True)
        x_psf = unet_psf_mse.layers[1](x)
        loss_value = dice_coef_loss(y, output) - MSE(x_psf, x)
        
    grads = tape.gradient(loss_value, unet_psf_mse.trainable_weights)
    optimizer.apply_gradients(zip(grads, unet_psf_mse.trainable_weights))

    train_iou = binary_iou(y, output)

    return loss_value, train_iou

for epoch in range(num_epochs):
    print("Epoch: {}".format(epoch+1))
    train_iou_ls = []
    val_iou_ls = []

    for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
        loss_value, train_iou = train_step(x_batch_train, y_batch_train)

        if step % 2000 == 0:
            print("Training loss at step {} = {}".format(step, np.round(loss_value, 5)))
            print("Training IoU at step {} = {}".format(step, np.round(train_iou, 5)))
        train_iou_ls.append(train_iou)

    print("Train IoU: {}".format(np.round(tf.reduce_mean(train_iou_ls), 5)))


    for x_batch_val, y_batch_val in valid_dataset:
        val_output = unet_psf_mse(x_batch_val, training=False)
        val_iou_ls.append(binary_iou(y_batch_val, val_output))

    print("Validation IoU: {}".format(np.round(tf.reduce_mean(val_iou_ls), 5)))

In [None]:
# Evaluate privacy using SSIM, MS-SSIM, and PSNR
ssim_loss = []
ms_ssim_loss = []
psnr_loss = []
for step, (x_batch_test, y_batch_test) in enumerate(valid_dataset):
    x_psf_pred = unet_psf_mse.layers[1](x_batch_test, training=False)
    #rescale to [0, 1]
    for idx in range(x_psf_pred.shape[0]):
        ssim_loss.append(SSIM(x_batch_test[idx], x_psf_pred[idx]))
        ms_ssim_loss.append(tf.image.ssim_multiscale(x_batch_test[idx], x_psf_pred[idx], max_val=1.0, filter_size=8))
        psnr_loss.append(PSNR(x_batch_test[idx], x_psf_pred[idx]))

print('SSIM Loss: ', np.mean(ssim_loss))
print('MS-SSIM Loss: ', np.mean(ms_ssim_loss))
print('PSNR Loss: ', np.mean(psnr_loss))

In [None]:
## Save plots ##

np.random.seed(42)
tf.random.set_seed(42)

save_path = "results/segmentation_results/segmentation_with_psf"
if not os.path.exists(save_path):
    os.makedirs(save_path)

dataset_path = "datasets/segmentation"

valid_path = os.path.join(dataset_path, "test")
test_x, test_y = load_data(valid_path)

print("Test size: ", len(test_x))

for i, (x, y) in enumerate(zip(test_x, test_y)):
    img_name = x.split("/")[-1].split(".")[0]

    image = cv2.imread(x, cv2.IMREAD_COLOR)
    x = image/255.0
    x = np.expand_dims(x, axis=0)

    mask = cv2.imread(y, cv2.IMREAD_GRAYSCALE)

    x_psf = unet_psf_mse.layers[1](x)
    x_psf = np.squeeze(x_psf, axis=0)
    y_pred = unet_psf_mse.predict(x)[0]
    y_pred = np.squeeze(y_pred, axis=-1)
    y_pred = y_pred > 0.5
    y_pred = y_pred.astype(np.int32)

    save_image_path = "{}/{}.png".format(save_path, img_name)
    save_results_psf(image, mask, x_psf, y_pred, save_image_path)
    # save_results(image, mask, y_pred, save_image_path)
    if i > 70:
        break

In [None]:
base_model = tf.keras.applications.MobileNetV2(input_shape=[128, 128, 3], include_top=False)

layer_names = [
    'block_1_expand_relu',
    'block_3_expand_relu',
    'block_6_expand_relu',
    'block_13_expand_relu',
    'block_16_project',   
]
base_model_outputs = [base_model.get_layer(name).output for name in layer_names]

down_stack = tf.keras.Model(inputs=base_model.input, outputs=base_model_outputs)

down_stack.trainable = False


nmodes = 6
rad = 5
modestart = 1
n_channels = 3

psf_layer = ZernPSF(nmodes, rad, modestart, n_channels, True)

up_stack = [
    pix2pix.upsample(512, 3),
    pix2pix.upsample(256, 3),
    pix2pix.upsample(128, 3),
    pix2pix.upsample(64, 3), 
]

def unet_psf_model(output_channels:int):
  inputs = tf.keras.layers.Input(shape=[128, 128, 3])
  x_psf = psf_layer(inputs)

  skips = down_stack(x_psf)
  x = skips[-1]
  skips = reversed(skips[:-1])

  for up, skip in zip(up_stack, skips):
    x = up(x)
    concat = tf.keras.layers.Concatenate()
    x = concat([x, skip])

  last = tf.keras.layers.Conv2DTranspose(
      filters=output_channels, kernel_size=3, strides=2,
      padding='same', activation='sigmoid')

  x = last(x)

  return tf.keras.Model(inputs=inputs, outputs=x)

In [None]:
np.random.seed(42)
tf.random.set_seed(42)

batch_size = 2
num_epochs = 2


unet_psf = unet_psf_model(1)

optimizer = tf.keras.optimizers.legacy.Adam()

def gradient_loss(x, x_psf):
    x_grad = tf.image.sobel_edges(x)
    x_psf_grad = tf.image.sobel_edges(x_psf)
    return tf.reduce_mean(tf.keras.losses.mean_squared_error(x_grad, x_psf_grad))

@tf.function
def train_step(x, y):
    with tf.GradientTape() as tape:
        output = unet_psf(x, training=True)
        x_psf = unet_psf.layers[1](x)
        loss_value = dice_coef_loss(y, output) - MSE(x_psf, x) + gradient_loss(x, x_psf)
        
    grads = tape.gradient(loss_value, unet_psf.trainable_weights)
    optimizer.apply_gradients(zip(grads, unet_psf.trainable_weights))

    train_iou = binary_iou(y, output)

    return loss_value, train_iou

for epoch in range(num_epochs):
    print("Epoch: {}".format(epoch+1))
    train_iou = []
    val_iou = []

    for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
        loss_value, train_iou = train_step(x_batch_train, y_batch_train)

        if step % 2000 == 0:
            print("Training loss at step {} = {}".format(step, np.round(loss_value, 5)))
            print("Training IoU at step {} = {}".format(step, np.round(train_iou, 5)))
        train_iou.append(train_iou)

    print("Train IoU: {}".format(np.round(tf.reduce_mean(train_iou), 5)))


    for x_batch_val, y_batch_val in valid_dataset:
        val_output = unet_psf(x_batch_val, training=False)
        val_iou.append(binary_iou(y_batch_val, val_output))


    print("Validation IoU: {}".format(np.round(tf.reduce_mean(val_iou), 5)))

In [None]:
val_iou = []
for x_batch_val, y_batch_val in valid_dataset:
    val_output = unet_psf(x_batch_val, training=False)
    val_iou.append(binary_iou(y_batch_val, val_output))
tf.reduce_mean(val_iou)

In [None]:
## Save plots ##

np.random.seed(42)
tf.random.set_seed(42)

save_path = "results/segmentation_results/segmentation_with_psf"
if not os.path.exists(save_path):
    os.makedirs(save_path)

dataset_path = "datasets/segmentation"

valid_path = os.path.join(dataset_path, "test")
test_x, test_y = load_data(valid_path)

print("Test size: ", len(test_x))

for i, (x, y) in enumerate(zip(test_x, test_y)):
    img_name = x.split("/")[-1].split(".")[0]

    image = cv2.imread(x, cv2.IMREAD_COLOR)
    x = image/255.0
    x = np.expand_dims(x, axis=0)

    mask = cv2.imread(y, cv2.IMREAD_GRAYSCALE)

    x_psf = unet_psf.layers[1](x)
    x_psf = np.squeeze(x_psf, axis=0)
    y_pred = unet_psf.predict(x)[0]
    y_pred = np.squeeze(y_pred, axis=-1)
    y_pred = y_pred > 0.5
    y_pred = y_pred.astype(np.int32)

    save_image_path = "{}/{}.png".format(save_path, img_name)
    save_results_psf(image, mask, x_psf, y_pred, save_image_path)
    # save_results(image, mask, y_pred, save_image_path)
    if i > 70:
        break

In [None]:
# Evaluate privacy using SSIM, MS-SSIM, and PSNR
ssim_loss = []
ms_ssim_loss = []
psnr_loss = []
for step, (x_batch_test, y_batch_test) in enumerate(valid_dataset):
    x_psf_pred = unet_psf.layers[1](x_batch_test, training=False)
    for idx in range(x_psf_pred.shape[0]):
        ssim_loss.append(SSIM(x_batch_test[idx], x_psf_pred[idx]))
        ms_ssim_loss.append(tf.image.ssim_multiscale(x_batch_test[idx], x_psf_pred[idx], max_val=1.0, filter_size=8))
        psnr_loss.append(PSNR(x_batch_test[idx], x_psf_pred[idx]))

print('SSIM Loss: ', np.mean(ssim_loss))
print('MS-SSIM Loss: ', np.mean(ms_ssim_loss))
print('PSNR Loss: ', np.mean(psnr_loss))

In [None]:
## test without Sobel
np.random.seed(42)
tf.random.set_seed(42)

batch_size = 2
num_epochs = 2


unet_psf_mse = unet_psf_model(1)

optimizer = tf.keras.optimizers.legacy.Adam()

@tf.function
def train_step(x, y):
    with tf.GradientTape() as tape:
        output = unet_psf_mse(x, training=True)
        x_psf = unet_psf_mse.layers[1](x)
        loss_value = dice_coef_loss(y, output) - MSE(x_psf, x)
        
    grads = tape.gradient(loss_value, unet_psf_mse.trainable_weights)
    optimizer.apply_gradients(zip(grads, unet_psf_mse.trainable_weights))

    train_iou = binary_iou(y, output)

    return loss_value, train_iou

for epoch in range(num_epochs):
    print("Epoch: {}".format(epoch+1))
    train_iou_ls = []
    val_iou_ls = []

    for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
        loss_value, train_iou = train_step(x_batch_train, y_batch_train)

        if step % 2000 == 0:
            print("Training loss at step {} = {}".format(step, np.round(loss_value, 5)))
            print("Training IoU at step {} = {}".format(step, np.round(train_iou, 5)))
        train_iou_ls.append(train_iou)

    print("Train IoU: {}".format(np.round(tf.reduce_mean(train_iou_ls), 5)))


    for x_batch_val, y_batch_val in valid_dataset:
        val_output = unet_psf_mse(x_batch_val, training=False)
        val_iou_ls.append(binary_iou(y_batch_val, val_output))

    print("Validation IoU: {}".format(np.round(tf.reduce_mean(val_iou_ls), 5)))

In [None]:
unet_psf_mse.save("checkpoints/unet_psf_zern_mse")

In [None]:
val_iou_ls = []
for x_batch_val, y_batch_val in valid_dataset:
    val_output = unet_psf_mse(x_batch_val, training=False)
    val_iou_ls.append(binary_iou(y_batch_val, val_output))

print("Validation IoU: {}".format(np.round(tf.reduce_mean(val_iou_ls), 5)))

In [None]:
# Evaluate privacy using SSIM, MS-SSIM, and PSNR
ssim_loss = []
ms_ssim_loss = []
psnr_loss = []
for step, (x_batch_test, y_batch_test) in enumerate(valid_dataset):
    x_psf_pred = unet_psf_mse.layers[1](x_batch_test, training=False)
    #rescale to [0, 1]
    x_psf_pred = (x_psf_pred - tf.reduce_min(x_psf_pred)) / (tf.reduce_max(x_psf_pred) - tf.reduce_min(x_psf_pred))
    for idx in range(x_psf_pred.shape[0]):
        ssim_loss.append(SSIM(x_batch_test[idx], x_psf_pred[idx]))
        ms_ssim_loss.append(tf.image.ssim_multiscale(x_batch_test[idx], x_psf_pred[idx], max_val=1.0, filter_size=8))
        psnr_loss.append(PSNR(x_batch_test[idx], x_psf_pred[idx]))

print('SSIM Loss: ', np.mean(ssim_loss))
print('MS-SSIM Loss: ', np.mean(ms_ssim_loss))
print('PSNR Loss: ', np.mean(psnr_loss))

In [None]:
## Save plots ##

np.random.seed(42)
tf.random.set_seed(42)

save_path = "results/segmentation_results/segmentation_with_psf_without_sobel"
if not os.path.exists(save_path):
    os.makedirs(save_path)

dataset_path = "datasets/segmentation"

valid_path = os.path.join(dataset_path, "test")
test_x, test_y = load_data(valid_path)

print("Test size: ", len(test_x))

for i, (x, y) in enumerate(zip(test_x, test_y)):
    img_name = x.split("/")[-1].split(".")[0]

    image = cv2.imread(x, cv2.IMREAD_COLOR)
    x = image/255.0
    x = np.expand_dims(x, axis=0)

    mask = cv2.imread(y, cv2.IMREAD_GRAYSCALE)

    x_psf = unet_psf_mse.layers[1](x)
    x_psf = np.squeeze(x_psf, axis=0)
    y_pred = unet_psf_mse.predict(x)[0]
    y_pred = np.squeeze(y_pred, axis=-1)
    y_pred = y_pred > 0.5
    y_pred = y_pred.astype(np.int32)

    save_image_path = "{}/{}.png".format(save_path, img_name)
    save_results_psf(image, mask, x_psf, y_pred, save_image_path)
    # save_results(image, mask, y_pred, save_image_path)
    if i > 70:
        break