## Import Libraries

In [None]:
import os
import csv
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, TensorBoard, ReduceLROnPlateau
from tensorflow.keras.initializers import HeNormal, Constant
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.utils import to_categorical
from natsort import natsorted
import datetime

# Add matplotlib inline
import matplotlib.pyplot as plt
%matplotlib inline

## Configure GPU

In [None]:
# Check for GPU activation, if not 1 activate GPU using Edit -> Notebook Settings -> Hardware Accelerator -> GPU
physical_devices = tf.config.list_physical_devices('GPU')
print("Num GPUs:", len(physical_devices))
# If Num GPUs >= 1 then you can continue with GPU enabled
print(tf.config.list_physical_devices('GPU'))

In [None]:
# # COnfigure GPU to allow dynamic memory growth
tf.device('/GPU:0')
config = tf.compat.v1.ConfigProto()
config.gpu_options.allow_growth = True
sess = tf.compat.v1.Session()

## Input Pipeline
- Long term aim should be to replace ImageDataGenerator with much faster tf.data pipeline

In [None]:
# Set base data path for dataset
data_path = 'D:/Complex/Rotation/' 

In [None]:
# One hot encode function masks and depths 
def encode_one_hot(img, levels):
    encoded = to_categorical(img, levels, dtype='uint8')
    return np.asarray(encoded)


# One hot decode pixel level
def decode_one_hot(img):
    decoded = tf.math.argmax(img, axis=2)
    return np.asarray(decoded)

In [None]:
# Define generator parameters
frames_gen = ImageDataGenerator(rescale = 1/255.)
masks_gen = ImageDataGenerator()
zbuffs_gen = ImageDataGenerator()

In [None]:
def datagen(folder, batch_size, shuffle, seed = 5, target_size = (224,224)):

    left_gen = frames_gen.flow_from_directory(data_path+folder+'frames/left/', seed = seed,
                                              batch_size = batch_size, target_size = target_size, shuffle = shuffle)

    right_gen = frames_gen.flow_from_directory(data_path+folder+'frames/right/', seed = seed,
                                              batch_size = batch_size, target_size = target_size, shuffle = shuffle)

    left_mask = masks_gen.flow_from_directory(data_path+folder+'masks/left/', seed = seed,
                                              batch_size = batch_size, target_size = target_size, shuffle = shuffle,
                                              color_mode = 'grayscale')
    
    right_mask = masks_gen.flow_from_directory(data_path+folder+'masks/right/', seed = seed,
                                              batch_size = batch_size, target_size = target_size, shuffle = shuffle,
                                              color_mode = 'grayscale')
    
    depth_gen = zbuffs_gen.flow_from_directory(data_path+folder+'zbuffs/left/', seed = seed,
                                              batch_size = batch_size, target_size = target_size, shuffle = shuffle,
                                              color_mode = 'grayscale')

    while True:
        # Get next image
        left = left_gen.next()
        right = right_gen.next()
        left_target = left_mask.next()
        right_target = right_mask.next()
        depth = depth_gen.next()

        # One hot encode
        l_mask = [encode_one_hot(left_target[0][x,:,:,:], 256) for x in range(left_target[0].shape[0])]
        r_mask = [encode_one_hot(right_target[0][x,:,:,:], 256) for x in range(right_target[0].shape[0])]
        depth = [encode_one_hot(depth[0][x,:,:,:], 256) for x in range(depth[0].shape[0])]

        yield [np.asarray(left[0]),np.asarray(right[0])], [np.asarray(l_mask),np.asarray(r_mask),np.asarray(depth)]


## Model Definition

In [None]:
# Define class for convolution block
class ConvBlock(layers.Layer):
    """ Defines convolution block: (Conv2D -> (Activation) -> BN) x2
        Returns output of convolution block
    """
    def __init__(self, num_filters):
        super(ConvBlock, self).__init__()
        # Conv -> (Activation) -> BN
        self.conv1 = layers.Conv2D(num_filters, kernel_size = (3, 3), activation = 'elu',
                                  kernel_initializer = tf.keras.initializers.HeNormal(),
                                  use_bias = True, bias_initializer = tf.keras.initializers.Constant(0.1),
                                  padding = 'same')
        self.bn1 = layers.BatchNormalization()
        # Conv -> (Activation) -> BN
        self.conv2 = layers.Conv2D(num_filters, kernel_size = (3, 3), activation = 'elu',
                                  kernel_initializer = tf.keras.initializers.HeNormal(),
                                  use_bias = True, bias_initializer = tf.keras.initializers.Constant(0.1),
                                  padding = 'same')
        self.bn2 = layers.BatchNormalization()

    # Create object
    def call(self, input_tensor, training=False):
        x = self.conv1(input_tensor)
        x = self.bn1(x, training=training)
        x = self.conv2(x)
        x = self.bn2(x, training=training)
        return x

    # Define config for custom layer
    def get_config(self):
        config = super(ConvBlock, self).get_config()
        config.update({'cb_conv1': self.conv1, 'cb_bn1': self.bn1, 'cb_conv2': self.conv2, 'cb_bn2': self.bn2})
        return config
    
    
# Define class for encoder layer
class EncoderLayer(layers.Layer):
    """ Defines encoder layer: ConvBlock -> MaxPool2D -> SpatialDropout2D
        Returns layer and pooling output for next downward layer input and skip connection
    """
    def __init__(self, num_filters, dropout=0.):
        super(EncoderLayer, self).__init__()
        self.conv_block = ConvBlock(num_filters)
        self.max_pool = layers.MaxPooling2D((2, 2))
        self.dropout = layers.SpatialDropout2D(dropout)
    
    # Create object
    def call(self, input_tensor, training=False):
        x = self.conv_block(input_tensor, training=training)
        p = self.max_pool(x)
        p = self.dropout(p)
        return x, p

    # Define config for custom layer
    def get_config(self):
        config = super(EncoderLayer, self).get_config()
        config.update({'el_conv_block': self.conv_block, 'el_max_pool': self.max_pool, 'el_dropout_2d': self.dropout})
        return config
    

# Define class for decoder layer
class DecoderLayer(layers.Layer):
    """ Defines Decoder layer: UpSample2D -> Concatenate -> ConvBlock
        Returns layer output for next upward layer
    """
    def __init__(self, num_filters):
        super(DecoderLayer, self).__init__()
        self.upsample = layers.UpSampling2D((2, 2))
        self.concat = layers.Concatenate()
        self.conv_block = ConvBlock(num_filters)
    
    # Create object
    def call(self, input_tensor, skip_conn, training=False):
        x = self.upsample(input_tensor)
        x = self.concat([x, skip_conn])
        x = self.conv_block(x, training=training)
        return x

    # Define config for custom layer
    def get_config(self):
        config = super(DecoderLayer, self).get_config()
        config.update({'dl_upsample': self.upsample, 'dl_concat': self.concat, 'dl_conv_block': self.conv_block})
        return config


# Define class for upsampling with skip connections
class UpsampleSkipConn(layers.Layer):
    """ Defines upsample for merged inputs with skip connections for (224, 224) image
        Returns a merged depth input for second half of double u-net
    """
    def __init__(self):
        super(UpsampleSkipConn, self).__init__()
        self.upsample = layers.UpSampling2D((2, 2))
        self.concat = layers.Concatenate()
    
    # Create object
    def call(self, input_tensor, skip_conn, training=False):
        x = self.upsample(input_tensor)
        x = self.concat([x, skip_conn[0], skip_conn[1]])
        return x
    
    # Define config for custom layer
    def get_config(self):
        config = super(UpSampleSkipConn, self).get_config()
        config.update({'usc_upsample': self.upsample, 'usc_concat': self.concat})
        return config

In [None]:
class StereoDepthNet(Model):
    """ Build double U-Net for segmentation and depth estimation """
    def __init__(self, seg_channels = 256, depth_channels = 256):
        super(StereoDepthNet, self).__init__()
        self.filters = [16, 32, 64, 128, 256]
        self.softmax_kernel = (1, 1)
        self.activate = 'softmax'
        self.padding = 'same'

        # Frame encoder block layers with downscale pooling
        # Input H&W: 224 -> 112 -> 56 -> 28 -> 14 Bridge layer
        self.left_frame1 = EncoderLayer(self.filters[0], dropout=0.1)
        self.left_frame2 = EncoderLayer(self.filters[1], dropout=0.1)
        self.left_frame3 = EncoderLayer(self.filters[2], dropout=0.2)
        self.left_frame4 = EncoderLayer(self.filters[3], dropout=0.3)

        self.right_frame1 = EncoderLayer(self.filters[0], dropout=0.1)
        self.right_frame2 = EncoderLayer(self.filters[1], dropout=0.1)
        self.right_frame3 = EncoderLayer(self.filters[2], dropout=0.2)
        self.right_frame4 = EncoderLayer(self.filters[3], dropout=0.3)

        # Bridge layer
        self.left_bridge_in = ConvBlock(self.filters[4])
        self.right_bridge_in = ConvBlock(self.filters[4])
        self.merged = layers.Concatenate()
        self.left_bridge_out = ConvBlock(self.filters[4])
        self.right_bridge_out = ConvBlock(self.filters[4])

        # Segmentation decoder block layers with upsampling & skip connections
        self.left_seg4 = DecoderLayer(self.filters[3])
        self.left_seg3 = DecoderLayer(self.filters[2])
        self.left_seg2 = DecoderLayer(self.filters[1])
        self.left_seg1 = DecoderLayer(self.filters[0])

        self.right_seg4 = DecoderLayer(self.filters[3])
        self.right_seg3 = DecoderLayer(self.filters[2])
        self.right_seg2 = DecoderLayer(self.filters[1])
        self.right_seg1 = DecoderLayer(self.filters[0])

        # Segmentation mask outputs
        self.left_mask = layers.Conv2D(seg_channels, self.softmax_kernel, activation = self.activate, padding = self.padding)
        self.right_mask = layers.Conv2D(seg_channels, self.softmax_kernel, activation = self.activate, padding = self.padding)

        
        # Upsample self.merged to full size (224) to form a depth input
        self.upsample_merge4 = UpsampleSkipConn()
        self.upsample_merge3 = UpsampleSkipConn()
        self.upsample_merge2 = UpsampleSkipConn()
        self.upsample_merge1 = UpsampleSkipConn()


        # Depth encoder block layers with downscale pooling
        # Input H&W: 224 -> 112 -> 56 -> 28 -> 14 Bridge layer
        self.depth_in1 = EncoderLayer(self.filters[0], dropout=0.1)
        self.depth_in2 = EncoderLayer(self.filters[1], dropout=0.1)
        self.depth_in3 = EncoderLayer(self.filters[2], dropout=0.2)
        self.depth_in4 = EncoderLayer(self.filters[3], dropout=0.3)

        # Depth bridge
        self.depth_bridge_in = ConvBlock(self.filters[4])
        self.depth_bridge1 = ConvBlock(self.filters[4])
        self.depth_bridge2 = ConvBlock(self.filters[4])
        self.depth_bridge3 = ConvBlock(self.filters[4])
        self.depth_bridge_out = ConvBlock(self.filters[4])

        # Depth decoder block layers with upsampling & skip connections
        self.depth_out4 = DecoderLayer(self.filters[3])
        self.depth_out3 = DecoderLayer(self.filters[2])
        self.depth_out2 = DecoderLayer(self.filters[1])
        self.depth_out1 = DecoderLayer(self.filters[0])

        # Depth estimate output
        self.depth = layers.Conv2D(depth_channels, self.softmax_kernel, activation = self.activate, padding = self.padding)

        
    # Create object
    def call(self, input_tensor, training=False):
        # Segmentation encoder block layers with skip connection, pooling and spatial dropout
        skl1, xl1 = self.left_frame1(input_tensor[0], training=training)
        skl2, xl2 = self.left_frame2(xl1, training=training)
        skl3, xl3 = self.left_frame3(xl2, training=training)
        skl4, xl4 = self.left_frame4(xl3, training=training)
        
        skr1, xr1 = self.right_frame1(input_tensor[1], training=training)
        skr2, xr2 = self.right_frame2(xr1, training=training)
        skr3, xr3 = self.right_frame3(xr2, training=training)
        skr4, xr4 = self.right_frame4(xr3, training=training)
        
        # Bridge layer
        xl5 = self.left_bridge_in(xl4, training=training)
        xr5 = self.right_bridge_in(xr4, training=training)
        merged = self.merged([xl5, xr5])
        left_bridge_out = self.left_bridge_out(merged, training=training)
        right_bridge_out = self.right_bridge_out(merged, training=training)

        # Segmentation decoder block layers
        ls4 = self.left_seg4(left_bridge_out, skl4, training=training)
        ls3 = self.left_seg3(ls4, skl3, training=training)
        ls2 = self.left_seg2(ls3, skl2, training=training)
        ls1 = self.left_seg1(ls2, skl1, training=training)
        # Left segmentation output defined at end of function

        rs4 = self.right_seg4(right_bridge_out, skr4, training=training)
        rs3 = self.right_seg3(rs4, skr3, training=training)
        rs2 = self.right_seg2(rs3, skr2, training=training)
        rs1 = self.right_seg1(rs2, skr1, training=training)
        # Right segmentation output defined at end of function
        
        # Upsample merged and add skip connections for merged depth input (224, 224)
        merge4 = self.upsample_merge4(merged, [skl4, skr4], training=training)
        merge3 = self.upsample_merge3(merge4, [skl3, skr3], training=training)
        merge2 = self.upsample_merge2(merge3, [skl2, skr2], training=training)
        depth_input = self.upsample_merge1(merge2, [skl1, skr1], training=training)
        
        # Depth encoder block layers with skip connection, pooling and spatial dropout
        skd1, xd1 = self.depth_in1(depth_input, training=training)
        skd2, xd2 = self.depth_in2(xd1, training=training)
        skd3, xd3 = self.depth_in3(xd2, training=training)
        skd4, xd4 = self.depth_in4(xd3, training=training)

        # Depth bridge
        xd5 = self.depth_bridge_in(xd4, training=training)
        bx1 = self.depth_bridge1(xd5, training=training)
        bx2 = self.depth_bridge2(bx1, training=training)
        bx3 = self.depth_bridge3(bx2, training=training)
        de5 = self.depth_bridge_out(bx3, training=training)

        # Depth decoder block layers with upsampling and skip connection
        de4 = self.depth_out4(de5, skd4, training=training)
        de3 = self.depth_out3(de4, skd3, training=training)
        de2 = self.depth_out2(de3, skd2, training=training)
        de1 = self.depth_out1(de2, skd1, training=training)
        # Depth estimation output defined at end of function

        # Outputs
        left_mask = self.left_mask(ls1)
        right_mask = self.right_mask(rs1)
        depth = self.depth(de1)
        return [left_mask, right_mask, depth]
    
    
    def get_config(self):
        return {"hidden_units": self.hidden_units}

    
    @classmethod
    def from_config(cls, config):
        return cls(**config)

In [None]:
# Instantiate model
model = StereoDepthNet(seg_channels=256, depth_channels=256)

# compile model
model.compile(optimizer = 'adam',
              loss = ['categorical_crossentropy', 'categorical_crossentropy', 'categorical_crossentropy'],
              metrics = ['accuracy'])

## Model Training

In [None]:
# Define parameters
early_stop = 40
experiment = '_Summary_ComplexRotation_Updated_Depth'
best_model = experiment + '_StereoDepthNetv2_' + datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
mc = ModelCheckpoint(mode='min', filepath=('Models/'+best_model+'.h5'), monitor='val_loss',
                     save_best_only='True', save_weights_only='True', verbose=2)
es = EarlyStopping(mode='min', monitor='val_loss', patience=early_stop, verbose=2)
#tb = TensorBoard(log_dir='TensorBoard/SDNv2//{}'.format(best_model), write_graph=True, histogram_freq=1)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=10, verbose=2, mode='auto', min_delta=0.00001,
                              cooldown=0, min_lr=0)

# callbacks = [tb, mc, es, reduce_lr]
callbacks = [mc, es, reduce_lr]
smooth = 1

# calculate step sizes from batch size of length of training and validation data
num_epochs = 200
batch_size = 4
train_steps_path = data_path + 'training/frames/left/left/'
val_steps_path = data_path + 'validation/frames/left/left/'

train_size = len(os.listdir(train_steps_path))
val_size = len(os.listdir(val_steps_path))

train_steps_epoch = int(np.ceil(train_size / batch_size))
val_steps_epoch = int(np.ceil(val_size / batch_size))

In [None]:
# Train model
DepthResult = model.fit(datagen(folder='training/', batch_size=batch_size, shuffle=True),
                        validation_data = datagen(folder='validation/', batch_size=batch_size, shuffle=False),
                        steps_per_epoch = train_steps_epoch, validation_steps = val_steps_epoch,
                        epochs = num_epochs, callbacks = callbacks)

## Best Model
Epoch 189/200
900/900 [==============================] - ETA: 0s - loss: 0.4883 - output_1_loss: 0.0495 - output_2_loss: 0.0501 - output_3_loss: 0.3888 - output_1_accuracy: 0.9786 - output_2_accuracy: 0.9786 - output_3_accuracy: 0.8727
Epoch 189: val_loss improved from 0.57406 to 0.57254, saving model to Models\_Summary_ComplexRotation_Updated_Depth_StereoDepthNetv2_20220324_201552.h5
900/900 [==============================] - 183s 203ms/step - loss: 0.4883 - output_1_loss: 0.0495 - output_2_loss: 0.0501 - output_3_loss: 0.3888 - output_1_accuracy: 0.9786 - output_2_accuracy: 0.9786 - output_3_accuracy: 0.8727 - val_loss: 0.5725 - val_output_1_loss: 0.0803 - val_output_2_loss: 0.0767 - val_output_3_loss: 0.4155 - val_output_1_accuracy: 0.9719 - val_output_2_accuracy: 0.9716 - val_output_3_accuracy: 0.8667 - lr: 2.5000e-04

In [None]:
# Use to initialise model until fix model.save
# DepthResult = model.fit(datagen(folder='training/', batch_size=batch_size, shuffle=True),
#                         validation_data = datagen(folder='validation/', batch_size=batch_size, shuffle=False),
#                         steps_per_epoch = train_steps_epoch, validation_steps = val_steps_epoch,
#                         epochs = num_epochs, callbacks = callbacks)

In [None]:
# Print model summary - layer names, no. of parameters, weights etc.
model.summary()

## Training Accuracy (TensorBoard removed for training speed)

In [None]:
# Loss plot
plt.plot(DepthResult.history['loss'], label='loss')
plt.plot(DepthResult.history['val_loss'], label='val_loss')
plt.ylim([0, 4])
plt.legend()


In [None]:
# Depth accuracy plot
plt.plot(DepthResult.history['output_3_accuracy'], label='depth_accuracy')
plt.plot(DepthResult.history['val_output_3_accuracy'], label='val_depth_accuracy')
plt.legend()

## Save and Load Models

#### Reload best training model and save full model execution graph

In [None]:
# Load best model from training
model_name = 'redacted'
# Use model.save (for custom layers), ignore warnings
model.save(model_name, save_format='tf')

#### Load full model for standalone predictions

In [None]:
# Load Model with compile=False as not training
model = tf.keras.models.load_model("sdnv2", compile=False)

#### Load best weights if continuing on from training

In [None]:
# Load weights to test
model.load_weights('Models/'+best_model+'.h5')

## Predictions
Check data_path at start of Input Pipeline for dataset

In [None]:
# Unpacks predictions upfront due to excessive memory consumption on large datasets
batch_size = 4

# Prediction generator
testing_gen = datagen(folder='testing/', batch_size=batch_size, shuffle=False)

# Calculate test steps
test_steps_path = data_path + 'testing/frames/left/left/'
test_size = len(os.listdir(test_steps_path))
test_steps = int(np.ceil(test_size / batch_size))
count = 0

# Create empty lists for truths and predictions
true_left_frames, true_left_masks, true_depths = [], [], []
pred_left_masks, pred_right_masks, pred_depths = [], [], []

# Start prediction loop
start = datetime.datetime.now()
while count < test_steps:

    # Get next batch of data from data generator
    [left_imgs, right_imgs], [true_left_mask, true_right_mask, true_depth] = next(testing_gen)
    
    # De-batch relevant input data (TRY block for end of (incomplete) batch preds without error)
    for i in range(batch_size):
        try:
            true_left_frames.append(left_imgs[i])
            true_left_masks.append(decode_one_hot(true_left_mask[i]))
            true_depths.append(decode_one_hot(true_depth[i]))
        except:
            print(f"Error de-batching on batch {count+1} at batch index {i}")


    # Make predictions (preds[left, right, depth] = [3, (batch_size, 224, 224, 256)])
    preds = model.predict([left_imgs, right_imgs])

    # De-batch prediction outputs (right mask not used in analysis)
    for i in range(batch_size):
        try:
            pred_left_masks.append(decode_one_hot(preds[0][i]))
            pred_depths.append(decode_one_hot(preds[2][i]))
        except:
            print(f"Error de-batching on batch {count+1} at batch index {i}")
        
    # Increment count
    count += 1

end = datetime.datetime.now()
print("\nPredictions complete, time taken: ", end-start)
print(f"Input data lengths: true left frames {len(true_left_frames)}")
print(f"Input data lengths: true left masks {len(true_left_masks)}; true depths {len(true_depths)}")
print(f"Prediction lengths: pred left masks {len(pred_left_masks)}; pred depths {len(pred_depths)}")

In [None]:
# Create depth errors list (256 depth levels)
depth_errors = []
for i in range(len(true_depths)):
    depth_errors.append(np.asarray(true_depths[i]-pred_depths[i]))

# Calculate histogram of errors list
hist_all = []
for i in range(len(depth_errors)):
    hist, _ = np.histogram(depth_errors[i], bins=511, range=(-255, 255))
    hist_all.append(hist)

# Exact hit accuracy (h*w = 224*224 = 50176)
test_acc = [hist_all[i][255]/50176 for i in range(len(hist_all))]

In [None]:
# Create and save csv of histogram of errors
err_dist = list(range(-255, 256))
with open("D:/RawPixels/ComplexRotation Updated Depth/histogram_of_errors.csv", "w", newline="") as f:
    writer = csv.writer(f)
    writer.writerow(err_dist)
    writer.writerows(hist_all)
    
# create and saves csv of test accuracies (from histogram of errors)
# test_acc = [hist_all[i][255]/50176 for i in range(len(hist_all))]
with open("D:/RawPixels/ComplexRotation Updated Depth/test_accuracy.csv", "w", newline="") as f:
    writer = csv.writer(f)
    writer.writerow(test_acc)

In [None]:
# Show overall test accuracy
print(f"Min accuracy:\t{round(np.min(test_acc)*100, 1)}%")
print(f"Max accuracy:\t{round(np.max(test_acc)*100, 1)}%")
print(f"Mean accuracy:\t{round(np.mean(test_acc)*100, 1)}%\n")

chart_acc = [test_acc[i]*100 for i in range(len(test_acc))]
# Plot accuracy for each image
plt.title("Accuracy across Test Set\n")
plt.xlabel("Test Set Image (idx)")
plt.ylabel("Accuracy (%)")
plt.ylim([0, 100])
plt.plot(chart_acc)
plt.show()
print()
fig, axs = plt.subplots(1, 1)
# don't show outlier points
plt.boxplot(chart_acc)
plt.title("Test accuracy boxplot with outliers")
plt.xticks(color='w')
plt.xlabel("Test Set: 1000 images")
plt.ylabel("Accuracy (%)")
plt.show()

## Visualise Predictions
Visualise results bank for user defined range

In [None]:
# Visualise predictions
start_idx = 1
end_idx = start_idx + 4

for i in range(start_idx, end_idx):
    
    fig = plt.figure(figsize=(35,12))

    ax1 = fig.add_subplot(1,6,1)
    ax1.imshow(true_left_frames[i])
    ax1.title.set_text('Actual Left Frame')
    ax1.grid(False)

    ax2 = fig.add_subplot(1,6,2)
    ax2.set_title('Ground Truth Labels')
    ax2.imshow(true_left_masks[i])
    ax2.grid(False)

    ax3 = fig.add_subplot(1,6,3)
    ax3.set_title('Predicted Labels')
    ax3.imshow(pred_left_masks[i])
    ax3.grid(False)

    ax4 = fig.add_subplot(1,6,4)
    ax4.set_title('Ground Truth Depth')
    ax4.grid(False)
    ax4= plt.imshow(true_depths[i], cmap='gray')

    ax5 = fig.add_subplot(1,6,5)
    ax5.set_title('Predicted Depth')
    ax5.grid(False)
    ax5 = plt.imshow(pred_depths[i], cmap='gray')

    ax6 = fig.add_subplot(1,6,6)
    ax6.set_title('Depth Errors')
    ax6 = plt.imshow(abs(depth_errors[i]), cmap='gray')

#plt.saveplot('StereoSegResultsRight.png')

## Individual Image Analysis

In [None]:
# Best and worst images
print(f"Index of max accuracy: {test_acc.index(np.max(test_acc))}\tAccuracy: {round(np.max(test_acc)*100, 1)}%")
print(f"Index of min accuracy: {test_acc.index(np.min(test_acc))}\tAccuracy: {round(np.min(test_acc)*100, 1)}%")

In [None]:
# Select image index from test set (starts at zero) - Summary documents image number = idx+1 (position in dataset)
image_idx = 86

print(f"\nImage accuracy: {round(test_acc[image_idx]*100, 1)}%\n")

# Configure and draw the histogram figure
histogram, bin_edges = np.histogram(depth_errors[image_idx], bins=511, range=(-255, 255))
plt.figure()
plt.title("Grayscale Histogram (50,176 Pixels Predictions)")
plt.xlabel("Grayscale Error")
plt.ylabel("Pixel Count")
plt.xlim([-255, 255])  # <- named arguments do not work here
plt.plot(bin_edges[0:-1], histogram)  # <- or here
plt.show()
print()
# Zoom in on y-axis to see distribution of errors
plt.figure()
plt.title("ZOOM: Grayscale Histogram (50,176 Pixels Predictions)")
plt.xlabel("Grayscale Error")
plt.ylabel("Pixel Count")
plt.xlim([-255, 255])  # <- named arguments do not work here
plt.ylim([0, 100])  # <- named arguments do not work here
plt.plot(bin_edges[0:-1], histogram)  # <- or here
plt.show()
print()

# Show larger depth images
fig = plt.figure(figsize=(35,12))
ax1 = fig.add_subplot(1,3,1)
ax1 = plt.title("Actual Left Frame\n", fontsize=24)
ax1 = plt.imshow(true_left_frames[image_idx])
ax2 = fig.add_subplot(1,3,2)
ax2 = plt.title("Ground Truth Labels\n", fontsize=24)
ax2 = plt.imshow(true_left_masks[image_idx])
ax3 = fig.add_subplot(1,3,3)
ax3 = plt.title("Predicted Labels\n", fontsize=24)
ax3 = plt.imshow(pred_left_masks[image_idx])

fig = plt.figure(figsize=(35,12))
ax1 = fig.add_subplot(1,3,1)
ax1 = plt.title("Ground Truth Depth\n", fontsize=24)
ax1 = plt.imshow(true_depths[image_idx], cmap='gray')
ax2 = fig.add_subplot(1,3,2)
ax2 = plt.title("Predicted Depth\n", fontsize=24)
ax2 = plt.imshow(pred_depths[image_idx], cmap='gray')
ax3 = fig.add_subplot(1,3,3)
ax3 = plt.title("Depth Errors\n", fontsize=24)
ax3 = plt.imshow(abs(depth_errors[image_idx]), cmap='gray')

## Individual Image Labels

In [None]:
# Check for matching of unique of pixels values
true_unique = set()
pred_unique = set()
for image_idx in range(len(true_left_masks)):
    a = list(np.unique(true_left_masks[image_idx]))
    true_unique.update(a)
    b = list(np.unique(pred_left_masks[image_idx]))
    pred_unique.update(b)

# Set of unique pixels across all images should match
print(natsorted(true_unique))
print(natsorted(pred_unique))

# Convert to list for future use
true_unique = natsorted(tuple(true_unique))

In [None]:
# [0, 63, 76, 88, 97, 99, 127, 132, 150, 155, 156, 166, 176, 215, 255]
# labels = ['Background', 'FireSupport1Prefab', 'Barrel1Prefab', 'BoxPrefab', 'TowerPrefab', 'SandBarricadePrefab', 'CanopyPrefab', 'HangerPrefab', 'FuelTankPrefab', 'Car5Prefab', 'HighWallPrefab', 'Terrain', 'Foliage', 'CabinPrefab']
labels = ['list of redacted labels']
seg_labels = ['list of redacted labels']

# Extract unique colours using same index as previous section
a = list(np.unique(true_left_masks[image_idx]))
# Extract unique colours
b = list(np.unique(pred_left_masks[image_idx]))
print("Unique true greyscale values:\t\t", a)
print("Unique predicted greyscale values:\t", b)
print()

# Get unique values, frequnecy count & first index position
unique_true, occurCount= np.unique(true_left_masks[image_idx], return_counts=True)
true_masks = tuple(zip(unique_true, occurCount))
print("True left mask breakdown")
for elem in true_masks:
    print(f"Value  {elem[0]}:\t{elem[1]} pixels\tProportion of image: {round(elem[1]/50176*100, 1)}%")
print()

# Get unique values, frequnecy count & first index position
unique_pred, occurCount= np.unique(pred_left_masks[image_idx], return_counts=True)
pred_masks = tuple(zip(unique_pred, occurCount))
print("Predicted left mask breakdown")
for elem in pred_masks:
    print(f"Value  {elem[0]}:\tProportion of image: {round(elem[1]/50176*100, 1)}%")
print()

## Confusion Matrices

#### Depth Confusion Matrix

In [None]:
# Create empty confusion matrix
cm_depth = np.zeros((256, 256), dtype='int')

# Verify from test_accuracy calculations
print("Calculating confusion matrix. Please wait...\n")
for img in range(len(true_depths)):
    for x in range(224):
        for y in range(224):
            true = true_depths[img][x][y]
            pred = pred_depths[img][x][y]
            cm_depth[pred, true] += 1
#             if true==pred:
#                 pass
#             else:
#                 cm_depth[pred, true] += 1
            
print("Total pixels:", np.sum(cm_depth))

In [None]:
# Calcs and print summary
tot_pix = len(true_depths*224*224)
print(f"Total number of pixels in non-direct hits:\t{np.sum(cm_depth)}")
print(f"Total number of pixels in dataset:\t\t{tot_pix}")
print(f"Percentage of non-direct hits:\t\t\t{round(np.sum(cm_depth)/tot_pix*100, 1)}%")
print("\nColour bar represents percentage of non-direct hits from within the error population.")
print("Thin red line represents axis of direct hits.")

# Convert numbrs to percentage
cm2 = cm_depth / np.sum(cm_depth) *100

# Plot error matrix
plt.figure(figsize=(35,12))
plt.title("Monocular Complex Rotation - Depth Confusion Matrix Across All Test Images (0-200m)\n", fontsize=14)
plt.xlabel("Predicted", fontsize=12)
plt.ylabel("True", fontsize=12)
plt.xlim([-5, 260])
plt.ylim([260, -5])
plt.plot(range(200), color='red', linestyle = 'dotted', lw=0.4)
plt.imshow(cm2, cmap='binary')
plt.colorbar()
plt.show()

In [None]:
# Show confusion matrix as table ready for export to csv
labels=list(range(0, 256))
df = pd.DataFrame(cm_depth, columns=labels, index=labels)
display(df)

In [None]:
# Export to CSV
df.to_csv("D:/ConfusionMatrix/_Summary - Depth Error ComplexRotation Confusion Matrix.csv")  

#### Segmentation Confusion Matrix

In [None]:
# Plot confusion matrix
from sklearn.metrics import confusion_matrix
import numpy as np
import itertools

def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title=f'\nSegmentation Confusion Matrix ({len(true_depths)} images)\n',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """

    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('\nConfusion matrix, without normalization')

    plt.figure(figsize=(35,12))
    # Display as percentage
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
               horizontalalignment="center",
               color="white" if cm[i, j] > thresh else "black")

    #   plt.tight_layout()
    # configure and draw the histogram figure
    plt.ylabel("\nTrue label")
    plt.xlabel("\nPredicted label")
    plt.show()

In [None]:
# Create empty confusion matrix
cm_seg = np.zeros((len(true_unique), len(true_unique)), dtype='int')

# Verify from test_accuracy calculations
print("Calculating confusion matrix. Please wait...\n")
for img in range(len(true_left_masks)):
    for x in range(224):
        for y in range(224):
            true = true_left_masks[img][x][y]
            pred = pred_left_masks[img][x][y]
            cm_seg[true_unique.index(pred), true_unique.index(true)] += 1
print("Total pixels:", np.sum(cm_seg))

# Plot confusion matrix
title=f'\nSegmentation Confusion Matrix ({len(true_depths)} images)\n'
plot_confusion_matrix(cm_seg, seg_labels, title=title)

In [None]:
# Create results table
mydata = [(true_unique[i], cm[i][i], np.sum(cm[i]), round(cm[i][i]/np.sum(cm[i])*100, 2)) for i in range(len(true_unique))]
head = ["Pixel Value", "Predicted", "Total", "Accuracy"]
df = pd.DataFrame(mydata, columns=head, index=seg_labels)
display(df)

In [None]:
seg_pred_acc = [round(cm[i][i]/np.sum(cm[i])*100, 2) for i in range(len(true_unique))]
# Plot accuracy for aeach image
plt.title("Segmentation class accuracy across test set\n")
plt.xlabel("\nLabel")
plt.xticks(rotation=90)
plt.ylabel("Accuracy (%)")
plt.ylim([0, 100])
plt.bar([labels[idx] for idx, i in enumerate(true_unique)], seg_pred_acc)
plt.show()

In [None]:
# Show confusion matrix as table ready for export to csv
df = pd.DataFrame(cm, columns=seg_labels, index=seg_labels)
display(df)

In [None]:
# Export to CSV
df.to_csv("D:/ConfusionMatrix/_Summary - Segmentation Error ComplexRotation Confusion Matrix.csv")  

## Real-Time Inference with Threaded Solution
When setting "sliding_frames" variable (used in ShowFrames class):
- False: Uses frame pairs (RightCam, LeftCam)
- True: Uses sliding frames (In -> RightCam -> LeftCam -> Out)

In [None]:
from threading import Thread
from datetime import datetime
from queue import Queue
from time import sleep
import cv2


# Decode for video
def decode_one_hot_vid(img):
    decoded = tf.math.argmax(img, axis=2)
    return np.asarray(decoded, dtype='uint8')


# Define class object for recording frame rate
class FPS:
    def __init__(self):
        # store the start time, end time, and total number of frames
        # that were examined between the start and end intervals
        self._start = None
        self._end = None
        self._numFrames = 0

    def start(self):
        # start the timer
        self._start = datetime.now()
        return self

    def stop(self):
        # stop the timer
        self._end = datetime.now()
        
    def now(self):
        # record current time elapsed
        return (datetime.now() - self._start).total_seconds()

    def update(self, step):
        # increment the total number of frames examined during the start and end intervals
        self._numFrames += step

    def elapsed(self):
        # return the total number of seconds between the start and end interval
        return (self._end - self._start).total_seconds()

    def fps(self):
        # compute the (approximate) frames per second
        return self._numFrames / self.elapsed()
    
    
class LoadFrames:
    """
    Class that loads frames from a VideoCapture object.
    Uses a dedicated thread.
    """
    def __init__(self, source=0, queue_size=1024):
        self.thread = Thread(target=self.fill_queue, args=())
        self.thread.daemon = True
        self.stream = cv2.VideoCapture(source)
        self.Q = Queue(maxsize=queue_size)
        self.grabbed = False
        self.frame = None
        self.count = 0
    
    def start(self):
        self.thread.start()
        return self

    def fill_queue(self):
        # Start infinite loop
        while True:

            # Fill Queue as long as room exists, else sleep to create room
            if not self.Q.full():
                # read the next frame from the file
                (self.grabbed, self.frame) = self.stream.read()

                # If no frames to grab assume end of file
                if not self.grabbed:
                    break

                # LoadFrames quicker than ShowFrames, pre-process frames for model here
                self.frame = cv2.cvtColor(self.frame, cv2.COLOR_BGR2RGB)
                self.frame = cv2.resize(self.frame, (224, 224))
                self.frame = self.frame.reshape(1, 224, 224, 3)
                
                # Add the frame to the queue
                self.Q.put(self.frame)
                self.count += 1

            else:
                sleep(0.1)

        # Release stream and thread once finished grabbing frames
        self.stream.release()
        self.thread.join()

    
class ShowFrames:
    """
    Class to retrieve frames from a pre-loaded queue and make DL predictions.
    Uses a dedicated thread.
    """
    def __init__(self, sliding_frames = False):
        self.thread = Thread(target=self.show, args=())
        self.thread.daemon = True
        self.stopped = False
        self.sliding_frames = sliding_frames
        self.l_frame = None
        self.l_frame_slide = None
        self.r_frame = None
        self.l_mask = None
        self.depth = None

        
    def start(self):
        self.thread.start()
        return self

    def show(self):

        # Grab initial frames (right first, then left) - Known bug fix this waste of 2 frames
        self.r_frame = get_frames.Q.get()
        self.l_frame = get_frames.Q.get()
        self.l_frame_slide = self.l_frame   # Creates a clean duplicate for sliding frames
        
        # Start loop to show frames
        while not self.stopped:

            # Grab sliding frames (right first, then left):
            if self.sliding_frames:
                self.r_frame = self.l_frame_slide
                self.l_frame = get_frames.Q.get()
                self.l_frame_slide = self.l_frame   # Creates a clean duplicate for sliding frames
                fps.update(step=1)
            else:
                # Grab frame pairs
                self.r_frame = get_frames.Q.get()
                self.l_frame = get_frames.Q.get()
                fps.update(step=1)

            # Make depth prediction (if error, add depth = depth.reshape(224, 224, 1))
            self.l_mask, _, self.depth = model.predict([self.l_frame/255., self.r_frame/255.])
            # Returns shape (1, 224, 224, 256)
            self.l_mask = self.l_mask.reshape((224, 224, 256))
            self.l_mask = decode_one_hot_vid(self.l_mask)
            self.depth = self.depth.reshape((224, 224, 256))
            self.depth = decode_one_hot_vid(self.depth)
            self.depth = cv2.applyColorMap(self.depth, cv2.COLORMAP_RAINBOW)
            self.l_mask = cv2.applyColorMap(self.l_mask, cv2.COLORMAP_DEEPGREEN)

            # Reshape input frame
            self.l_frame = self.l_frame.reshape((224, 224, 3))
            self.l_frame = cv2.cvtColor(self.l_frame, cv2.COLOR_RGB2BGR)
            
            # Add frame rate and text
            cv2.putText(self.l_frame, "FPS: {:.2f}".format(fps._numFrames / fps.now()),
            (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
            cv2.putText(self.l_frame, "Left Camera", (10, 200), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
            cv2.putText(self.l_mask, "Segmentation", (10, 200), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
            cv2.putText(self.depth, "Depth", (10, 200), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 1)

            # Resize for clarity
#             self.l_frame = cv2.resize(self.l_frame, (450, 450))
#             self.l_mask = cv2.resize(self.l_mask, (450, 450))
#             self.depth = cv2.resize(self.depth, (450, 450))
            
            # Show frames
            concat = np.concatenate((self.l_frame, self.l_mask, self.depth), axis=1)
            cv2.imshow("Left Frame - Left Mask - Depth (with colourmaps)", concat)
            
            # Write frame for creating video
            cv2.imwrite(f'D:/Video/complex/real_life/output_frames/{fps._numFrames}.png', concat)
        
            # Check if queue contains enough frames
            if get_frames.Q.qsize() < 2:
                if not self.sliding_frames:     # Frame pairs need minimum of 2 frames
                    self.stopped = True
                elif get_frames.Q.qsize == 0:   # Sliding frames can use 1 frame
                    self.stopped = True
            
            # Escape video feed
            key = cv2.waitKey(1) & 0xff
            if key == ord('q'):
                break 

        # Clean up
        cv2.destroyAllWindows()
        self.stop()

    def stop(self):
        self.stopped = True
        fps.stop()
        print(f"Frames loaded:\t{get_frames.count}")
        print(f"Average FPS:\t{int(round(fps.fps(), 2))}")

        
# Define function to allow initialisation of threading
def init_get_frames(source, Qsize):
    t1 = LoadFrames(source, Qsize)
    return t1


# Define function to allow initialisation of threading
def init_show_frames(sliding_frames):
    t2 = ShowFrames(sliding_frames)
    return t2

In [None]:
# # Load manually named saved model
# model.load_weights('Models/_Summary_SemiComplexRotation_StereoDepthNetv2_20220215_180725.h5')

In [None]:
# Define source and initialise objects.  Start timer.
# source = 'D:/Video/videotest.mp4'
source = 'D:/Video/complex/real_life/yesnaby_008.mp4'

fps = FPS().start()

# Initialise and start frame loading with max queue size variable
get_frames = init_get_frames(source, 8).start()

# Initialise frame showing thread with slding frames variable
show_frames = init_show_frames(sliding_frames=False).start()

## Additional Code

In [None]:
# Write frames to video
import cv2
import os
from natsort import natsorted

image_folder = 'D:/Video/complex/real_life/output_frames/'
video_name = 'D:/Video/complex/real_life/yesnaby_008_COMPLEX.mp4'

images = [img for img in os.listdir(image_folder) if img.endswith(".png")]
images = natsorted(images)
frame = cv2.imread(os.path.join(image_folder, images[0]))
height, width, layers = frame.shape

video = cv2.VideoWriter(video_name, 0, 15, (width,height))

for idx, image in enumerate(images):
    video.write(cv2.imread(os.path.join(image_folder, images[idx])))

cv2.destroyAllWindows()
video.release()

In [None]:
cv2.destroyAllWindows()