## Setup
Import modules, define useful functions, and connect Google Drive.

In [1]:
import os
import sys
import numpy as np
import cv2
import pandas as pd

import torch
import torch.nn as nn
import torch.nn.functional as F

import matplotlib.pyplot as plt
from matplotlib.ticker import MaxNLocator
%matplotlib inline

import tensorflow as tf
import keras
from keras import layers
from keras import callbacks
from keras.layers import Dense
import tensorflow_datasets as tfds
import matplotlib.pyplot as plt
import numpy as np

# from keras import datasets
from keras.layers import (Dense, Flatten, Dropout, Activation, BatchNormalization,
                          Input, Conv2D, MaxPool2D, Lambda, Conv2DTranspose,
                          concatenate, UpSampling2D, PReLU, LeakyReLU, Add, Cropping2D)

from keras.models import Model

from IPython.display import clear_output

import sklearn as skl
from sklearn import datasets, linear_model, model_selection
from sklearn.model_selection import train_test_split


In [3]:
# Define some useful functions
class PlotLossAccuracy(keras.callbacks.Callback):
    def on_train_begin(self, logs={}):
        self.i = 0
        self.x = []
        self.acc = []
        self.losses = []
        self.val_losses = []
        self.val_acc = []
        self.logs = []

    def on_epoch_end(self, epoch, logs={}):

        self.logs.append(logs)
        self.x.append(int(self.i))
        self.losses.append(logs.get('loss'))
        self.val_losses.append(logs.get('val_loss'))
        self.acc.append(logs.get('accuracy'))
        self.val_acc.append(logs.get('val_accuracy'))

        self.i += 1

        clear_output(wait=True)
        plt.figure(figsize=(16, 6))
        plt.plot([1, 2])
        plt.subplot(121)
        plt.plot(self.x, self.losses, label="train loss")
        plt.plot(self.x, self.val_losses, label="validation loss")
        plt.gca().xaxis.set_major_locator(MaxNLocator(integer=True))
        plt.ylabel('loss')
        plt.xlabel('epoch')
        plt.title('Model Loss')
        plt.legend()
        plt.subplot(122)
        plt.plot(self.x, self.acc, label="training accuracy")
        plt.plot(self.x, self.val_acc, label="validation accuracy")
        plt.legend()
        plt.ylabel('accuracy')
        plt.xlabel('epoch')
        plt.title('Model Accuracy')
        plt.gca().xaxis.set_major_locator(MaxNLocator(integer=True))
        plt.show();

def display_learning_curves(history):
    # Extract training and validation accuracy values (percentage of correctly classified examples in a validation dataset).
    acc = history.history["accuracy"]
    val_acc = history.history["val_accuracy"]

    # Extract training and validation loss values (quantifies the difference between the model's predictions and the true labels in the validation dataset).
    loss = history.history["loss"]
    val_loss = history.history["val_loss"]

    # Define the range of epochs based on the specified 'NUM_EPOCHS'.
    epochs_range = range(num_epochs)

    # Create a figure for plotting.
    fig = plt.figure(figsize=(12, 6))

    # Plot the training and validation accuracy.
    plt.subplot(1, 2, 1)
    plt.plot(epochs_range, acc, label="train accuracy")
    plt.plot(epochs_range, val_acc, label="validation accuracy")
    plt.title("Accuracy")
    plt.xlabel("Epoch")
    plt.ylabel("Accuracy")
    plt.legend(loc="lower right")

    # Plot the training and validation loss.
    plt.subplot(1, 2, 2)
    plt.plot(epochs_range, loss, label="train loss")
    plt.plot(epochs_range, val_loss, label="validation loss")
    plt.title("Loss")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend(loc="upper right")

    # Adjust the layout and display the figure.
    fig.tight_layout()
    plt.show()


In [3]:
# from google.colab import drive
# drive.mount('/content/drive')

## Data
Import video frames from Google Drive and create Training set, Validation set, and Test set.

In [6]:
# from google.colab.patches import cv2_imshow

# Set the working directory
# os.chdir('/content/drive/MyDrive/Colab Notebooks/Carrier')

# Initialize and store the frames
frames = []
gt_frames = []

for i in range(1, 101):
    # Reading frame
    frame_path = f'/Users/meteore929/Documents/MAI Research/Videos/Carrier/carrier_numbered/carrier{i:04d}.tif'
    frame = cv2.imread(frame_path)
    frames.append(frame)

    # Reading ground truth frame
    gt_frame_path = f'/Users/meteore929/Documents/MAI Research/Videos/Carrier/GroundTruth/gt_carrier_binary{i:04d}.tiff'
    gt_frame = cv2.imread(gt_frame_path, cv2.IMREAD_GRAYSCALE)
    gt_frames.append(gt_frame)

    # Print progress
    print(f'Reading frame {i} and GT frame {i}', end='\r')

print('\n')  # Move to the next line after all frames are read

print(frames[0].shape)
print(gt_frames[0].shape)


Reading frame 100 and GT frame 100

(1080, 1920, 3)
(1080, 1920)


In [7]:
X_combined = np.array(frames)
y_combined = np.array(gt_frames)
X = X_combined.astype('float32') / 255
y = y_combined.astype('float32')

# Split the data into training and temporary sets (80:20)
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.2)

# Split the temporary set into validation and test sets (50:50)
X_validation, X_test, y_validation, y_test = train_test_split(X_temp, y_temp, test_size=0.5)

# Create an instance of our callback functions class, to plot our loss function and accuracy with each epoch.
pltCallBack = PlotLossAccuracy()

print('X_train', X_train.shape)
print('Y_train', y_train.shape)
print('X_validation', X_validation.shape)
print('Y_validation', y_validation.shape)
print('X_test', X_test.shape)
print('Y_test', y_test.shape)

X_train (80, 1080, 1920, 3)
Y_train (80, 1080, 1920)
X_validation (10, 1080, 1920, 3)
Y_validation (10, 1080, 1920)
X_test (10, 1080, 1920, 3)
Y_test (10, 1080, 1920)


## U-Net Model Definition
Build the U-Net, compile, and perform the training.

In [26]:
def unet_model(input_size=(1080, 1920, 3)):
    inputs = Input(input_size)

    # inputs = keras.layers.RandomZoom(.2, .2)(inputs)

    # Encoder
    conv1 = Conv2D(16, 5, activation='relu', padding='same')(inputs)
    conv1 = Conv2D(16, 5, activation='relu', padding='same')(conv1)
    conv1 = BatchNormalization(synchronized=True)(conv1)
    pool1 = MaxPool2D(2)(conv1)
    pool1 = Dropout(.2)(pool1)

    conv2 = Conv2D(32, 5, activation='relu', padding='same')(pool1)
    conv2 = Conv2D(32, 5, activation='relu', padding='same')(conv2)
    conv2 = BatchNormalization(synchronized=True)(conv2)
    pool2 = MaxPool2D(2)(conv2)
    pool2 = Dropout(.2)(pool2)

    conv3 = Conv2D(64, 5, activation='relu', padding='same')(pool2)
    conv3 = Conv2D(64, 5, activation='relu', padding='same')(conv3)
    conv3 = BatchNormalization(synchronized=True)(conv3)
    pool3 = MaxPool2D(2)(conv3)
    pool3 = Dropout(.2)(pool3)

    # Bottom
    conv4 = Conv2D(128, 5, activation='relu', padding='same')(pool3)
    conv4 = Conv2D(128, 5, activation='relu', padding='same')(conv4)
    conv4 = BatchNormalization(synchronized=True)(conv4)
    conv4 = Dropout(.2)(conv4)

    # Decoder
    up5 = concatenate([UpSampling2D(2)(conv4), conv3], axis=-1)
    conv5 = Conv2D(64, 5, activation='relu', padding='same')(up5)
    conv5 = Conv2D(64, 5, activation='relu', padding='same')(conv5)
    conv5 = BatchNormalization(synchronized=True)(conv5)
    conv5 = Dropout(.2)(conv5)

    up6 = concatenate([UpSampling2D(2)(conv5), conv2], axis=-1)
    conv6 = Conv2D(32, 5, activation='relu', padding='same')(up6)
    conv6 = Conv2D(32, 5, activation='relu', padding='same')(conv6)
    conv6 = BatchNormalization(synchronized=True)(conv6)
    conv7 = Dropout(.2)(conv6)

    up7 = concatenate([UpSampling2D(2)(conv6), conv1])
    conv7 = Conv2D(16, 5, activation='relu', padding='same')(up7)
    conv7 = Conv2D(16, 5, activation='relu', padding='same')(conv7)
    conv7 = BatchNormalization(synchronized=True)(conv7)
    conv7 = Dropout(.2)(conv7)

    # Output layer
    outputs = Conv2D(1, (1, 1), activation='sigmoid')(conv7)

    model = Model(inputs=inputs, outputs=outputs)

    return model

# Build the U-Net model
model = unet_model()

# Select an optimizer
opt = keras.optimizers.Adam()

# Compile the model
model.compile(optimizer=opt,
              loss='binary_crossentropy',
              metrics=['accuracy'])


# Display the model summary
model.summary()

if (model.count_params() > 3000000):
    raise Exception("Your model is unecessarily complex, scale down!")

In [None]:
# # Visualization of the U-Net architecture
keras.utils.plot_model(model, show_shapes=True)

In [27]:
num_epochs = 2
pltCallBack = PlotLossAccuracy()

# Run the trianing and store the training history.
model.fit(X_train, y_train,
          batch_size=1,
          epochs=num_epochs,
          validation_data=(X_validation, y_validation),
          callbacks=[pltCallBack])

Epoch 1/2


## Predictions
Use few samples from test dataset to predict using the trained U-Net model.

In [None]:
# Perform inference on test data giving the output probabilities
predictions = model.predict(X_test) 

# Apply thresholding to obtain binary mask
# threshold = 0.5
# predictions_binary = predictions > threshold
# predictions_binary = predictions_binary.astype(np.uint8)

# predictions_binary = predictions_binary * 255
# predictions_binary = predictions_binary.astype(np.uint8)

plt.figure(figsize=(20, 15))

# Original Image
plt.subplot(1, 3, 1)
plt.imshow(X_test[0])
plt.title('Original Image')
plt.axis('off')

# Ground Truth Blotch Mask
plt.subplot(1, 3, 2)
plt.imshow(y_test[0], cmap='gray')
plt.title('Ground Truth Blotch Mask')
plt.axis('off')

# Predicted Blotch Mask
plt.subplot(1, 3, 3)
plt.imshow(predictions[0], cmap='gray')
plt.title('Predicted Blotch Mask')
plt.axis('off')

plt.show()
