In [None]:
!pip install tensorflow-addons

In [None]:
import cv2
import os
import tensorflow as tf
import tensorflow_addons as tfa
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, concatenate, Conv2DTranspose, BatchNormalization, Dropout, Lambda
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import Activation, MaxPool2D, Concatenate
from tensorflow.keras.losses import MeanSquaredError
import imageio.v3 as imageio
import numpy as np
from matplotlib import pyplot as plt
from google.colab import drive

In [None]:
# Accessing My Google Drive
drive.mount('/content/drive')

# Get $I_1$, $I_{1}^w$ and $I_2$ images

In [None]:
path = os.getcwd() + '/drive/My Drive/My Final Year Project Folder 2023/'

In [None]:
# Get I1 images
path_to_I1 = path + 'I1_updated/'
list_dir = [int(file.split(".")[0]) for file in os.listdir(path_to_I1)]
list_dir.sort()
list_I1_images = []
for file_name in list_dir:
    img = cv2.imread(filename=path_to_I1 + str(file_name) + ".png")
    img = cv2.cvtColor(src=img, code=cv2.COLOR_BGR2RGB)
    img = cv2.resize(src=img, dsize=(336, 176), interpolation=cv2.INTER_LINEAR)
    list_I1_images.append(img)
list_I1_images = np.asarray(list_I1_images)

In [None]:
# Get I1w images
path_to_I1w = path + 'I1w/'
list_dir = [int(file.split(".")[0]) for file in os.listdir(path_to_I1w)]
list_dir.sort()
list_I1w_images = []
for file_name in list_dir:
    img = cv2.imread(filename=path_to_I1w + str(file_name) + ".png")
    img = cv2.cvtColor(src=img, code=cv2.COLOR_BGR2RGB)
    img = cv2.resize(src=img, dsize=(336, 176), interpolation=cv2.INTER_LINEAR)
    list_I1w_images.append(img)
list_I1w_images = np.asarray(list_I1w_images)

In [None]:
# Get I2 images
path_to_I2 = path + 'I2_updated/'
list_dir = [int(file.split(".")[0]) for file in os.listdir(path_to_I2)]
list_dir.sort()
list_I2_images = []
for file_name in list_dir:
    img = cv2.imread(filename=path_to_I2 + str(file_name) + ".png")
    img = cv2.cvtColor(src=img, code=cv2.COLOR_BGR2RGB)
    img = cv2.resize(src=img, dsize=(336, 176), interpolation=cv2.INTER_LINEAR)
    list_I2_images.append(img)
list_I2_images = np.asarray(list_I2_images)

# Translation between camera frames

In [None]:
translation_between_frames = [0, 1, 1.25, 1.3, 1.85, 2.25, 3.78, 1.593, 1.3275, 1.239, 1.416, 0.885, 1.593, 3.009, 2.655, 1.239, 1.77, 1.46, 0.708, 1.593,
                              1.77, 1.593, 1.239, 1.593, 1.947, 1.77, 1.416, 1.239, 1.239, 1.062, 2.124, 2.301, 2.655, 2.124, 1.947, 1.062, 1.416, 1.239, 
                              1.593, 1.416, 2.124, 4.779, 1.239, 1.593, 1.593, 1.77, 1.239, 1.416, 1.593, 1.593, 2.124, 1.77, 3.009, 1.947, 1.593, 2.124,
                              1.947, 1.062, 2.301, 2.124]
translation_between_frames = np.asarray(a=translation_between_frames)

# Definition of epipole generator function

In [None]:
def epipole_generator(ndarray_I1w, ndarray_I2):
    epipole_list = []
    grouped_data = tf.data.Dataset.from_tensor_slices((ndarray_I1w, ndarray_I2))

    for element in grouped_data:
        I1w, I2 = element
        I1w = I1w.numpy()
        I2 = I2.numpy()

        sift = cv2.SIFT_create()
        # find the keypoints and descriptors with SIFT
        kp1, des1 = sift.detectAndCompute(I1w,None)
        kp2, des2 = sift.detectAndCompute(I2,None)

        # FLANN parameters
        FLANN_INDEX_KDTREE = 1
        index_params = dict(algorithm = FLANN_INDEX_KDTREE, trees = 5)
        search_params = dict(checks=50)
        flann = cv2.FlannBasedMatcher(index_params,search_params)
        matches = flann.knnMatch(des1,des2,k=2)
        pts1 = []
        pts2 = []

        # ratio test as per Lowe's paper
        for i,(m,n) in enumerate(matches):
            if m.distance < 0.8*n.distance:
                pts2.append(kp2[m.trainIdx].pt)
                pts1.append(kp1[m.queryIdx].pt)

        # Now we have the list of best matches from both the images. Let's find the Fundamental Matrix.
        pts1 = np.int32(pts1)
        pts2 = np.int32(pts2)
        F, mask = cv2.findFundamentalMat(pts1,pts2, cv2.FM_LMEDS)

        # We select only inlier points
        pts1 = pts1[mask.ravel()==1]
        pts2 = pts2[mask.ravel()==1]

        # Find epilines corresponding to points in I2 (second image) and
        # drawing its lines on I1w
        lines1 = cv2.computeCorrespondEpilines(pts2.reshape(-1,1,2), 2,F)
        lines1 = lines1.reshape(-1,3)

        # Find epilines corresponding to points in I1w (first image)
        # drawing its lines on I2
        lines2 = cv2.computeCorrespondEpilines(pts1.reshape(-1,1,2), 1,F)
        lines2 = lines2.reshape(-1,3)
        
        epipole_left = np.cross(lines1[0], lines1[1])
        epipole_right = np.cross(lines2[0], lines2[1])

        if epipole_left[2] != 0:
            epipole_left = epipole_left/epipole_left[2]
        else:
            epipole_left = epipole_left
            
        if epipole_right[2] != 0:
            epipole_right = epipole_right/epipole_right[2]
        else:
            epipole_right = epipole_right
        
        epipole_list.append(epipole_right)

    # The epipoles are returned in homogenous coordinates
    
    return np.asarray(epipole_list)

# Definition of Optical flow generator function

In [None]:
@tf.function
def optical_flow_generator(struct_param_tensor, epipole_tensor, translation_between_frames):
    """
    @param struct_param_tensor is a tensor of shape [1, 176, 336, 1]
    @param epipole_tensor is a tensor of shape [3, ]
    @returns optical flow a tensor of [1, 176, 336, 2]
    """
    epipole = [epipole_tensor[0], epipole_tensor[1]]
    struct_param_tensor_copy = tf.identity(struct_param_tensor)
    struct_param_tensor_copy = tf.cast(struct_param_tensor_copy, tf.float32)  # Convert to float32
    height_of_camera = .152 # in meters
    translation_between_frames = tf.cast(x=translation_between_frames, dtype=tf.float32)
    b = translation_between_frames/height_of_camera
    
    # i_range represents the height dimension in struct_param_tensor_copy
    i_range = tf.range(struct_param_tensor_copy.shape[1], dtype=tf.int32)

    # j_range represents the width dimension in struct_param_tensor_copy
    j_range = tf.range(struct_param_tensor_copy.shape[2], dtype=tf.int32)

    I, J = tf.meshgrid(i_range, j_range, indexing='ij')

    # Assuming you have the tensors struct_param, I, and J defined
    
    # Modify the range of values in I and J to match the valid indices
    I_modified = tf.clip_by_value(I, 0, struct_param_tensor_copy.shape[1] - 1)
    J_modified = tf.clip_by_value(J, 0, struct_param_tensor_copy.shape[2] - 1)
    
    # Expand dimensions of I_modified and J_modified to match struct_param
    I_expanded = tf.expand_dims(I_modified, axis=-1)  # Shape: [176, 336, 1]
    J_expanded = tf.expand_dims(J_modified, axis=-1)  # Shape: [176, 336, 1]
    
    # Use tf.concat to create indices tensor
    indices = tf.concat([I_expanded, J_expanded], axis=-1)  # Shape: [176, 336, 2]
    
    # Use tf.gather_nd to access elements of struct_param based on indices
    result = tf.gather_nd(struct_param_tensor_copy[0], indices)[:,:,0]  # Shape: [176, 336, 1]
    
    new_u_x = ((tf.math.multiply(x=result, y=b)) / (tf.math.multiply(x=result, y=b) - 1)) * (tf.cast(epipole[0], tf.float32) - tf.cast(J, tf.float32)) # Shape: [176, 336]
    new_u_y = ((tf.math.multiply(x=result, y=b)) / (tf.math.multiply(x=result, y=b) - 1)) * (tf.cast(epipole[1], tf.float32) - tf.cast(I, tf.float32)) # Shape: [176, 336]

    new_u_x = tf.reshape(new_u_x, (176, 336, 1))
    new_u_y = tf.reshape(new_u_y, (176, 336, 1))
    
    return tf.expand_dims(tf.concat([new_u_x, new_u_y], axis=2), axis=0)

# Definition of the optical flow warp function

In [None]:
@tf.function
def optical_flow_warp(optical_flow_tensor, I1w_tensor):
    """ 
    @param optical_flow_tensor is a tensor of shape [1, 480, 640, 2]
    @param I1w_tensor is a tensor of shape [1, 480, 640, 3]
    @returns warped image a tensor of [1, 480, 640, 3]
    
    """
    height  = tf.shape(optical_flow_tensor)[1]
    width = tf.shape(optical_flow_tensor)[2]

    flow_x,flow_y = tf.split(optical_flow_tensor,[1,1], axis =3)
    coord_x, coord_y = tf.meshgrid(tf.range(width), tf.range(height))
    pos_x = tf.cast(tf.expand_dims(tf.expand_dims(coord_x, axis = 2), axis=0), dtype=tf.float32) + flow_x
    pos_y = tf.cast(tf.expand_dims(tf.expand_dims(coord_y, axis = 2), axis=0), dtype=tf.float32) + flow_y

    warped_points = tf.concat([pos_x, pos_y], axis=3, name='warped_points')
    I1w_tensor = tf.cast(I1w_tensor, dtype=tf.float32)

    return tfa.image.resampler(I1w_tensor, warped_points)

# Create neural network
## Function definitions of building blocks for an autoencoder

In [None]:
# This code snippet was found here: https://youtu.be/azM57JuQpQI?list=PLZsOBAyNTZwbR08R959iCvYT3qzhxvGOE
#Convolutional block to be used in autoencoder and U-Net
def conv_block(input_representation, num_filters):
    x = Conv2D(num_filters, 3, padding="same")(input_representation)
    x = BatchNormalization()(x)   #Not in the original network. 
    x = Activation("relu")(x)

    x = Conv2D(num_filters, 3, padding="same")(x)
    x = BatchNormalization()(x)  #Not in the original network
    x = Activation("relu")(x)

    return x

#Encoder block: Conv block followed by maxpooling
def encoder_block(input_representation, num_filters):
    x = conv_block(input_representation, num_filters)
    p = MaxPool2D((2, 2))(x)
    return x, p   

#Decoder block for autoencoder (no skip connections)
def decoder_block(input_representation, num_filters):
    x = Conv2DTranspose(num_filters, (2, 2), strides=2, padding="same")(input_representation)
    x = conv_block(x, num_filters)
    return x

## Functional definition for an encoder

In [None]:
# This code snippet was found here: https://youtu.be/azM57JuQpQI?list=PLZsOBAyNTZwbR08R959iCvYT3qzhxvGOE
#Encoder will be the same for Autoencoder and U-net
#We are getting both conv output and maxpool output for convenience.
#we will ignore conv output for Autoencoder. It acts as skip connections for U-Net
def build_encoder(input_representation):
    #inputs = Input(input_shape)

    s1, p1 = encoder_block(input_representation, 64)
    s2, p2 = encoder_block(p1, 128)
    s3, p3 = encoder_block(p2, 256)
    s4, p4 = encoder_block(p3, 512)
    
    encoded = conv_block(p4, 1024) #Bridge
    
    return encoded

## Functional definition for a decoder

In [None]:
# This code snippet was found here: https://youtu.be/azM57JuQpQI?list=PLZsOBAyNTZwbR08R959iCvYT3qzhxvGOE
#Decoder for Autoencoder ONLY. 
def build_decoder(encoded):
    d1 = decoder_block(encoded, 512)
    d2 = decoder_block(d1, 256)
    d3 = decoder_block(d2, 128)
    d4 = decoder_block(d3, 64)
    
    decoded = Conv2D(1, 3, padding="same", activation="relu")(d4)
    return decoded

## Functional definition of an Autoencoder

In [None]:
# This code snippet was found here: https://youtu.be/azM57JuQpQI?list=PLZsOBAyNTZwbR08R959iCvYT3qzhxvGOE
#Use encoder and decoder blocks to build the autoencoder. 
def build_autoencoder(input_shape):
    input_img = Input(shape=input_shape)
    autoencoder = Model(input_img, build_decoder(build_encoder(input_img)))
    return(autoencoder)

# Functional definition of U-Net

In [None]:
# This code snippet was found here: https://youtu.be/azM57JuQpQI?list=PLZsOBAyNTZwbR08R959iCvYT3qzhxvGOE
#Decoder block for unet
#skip features gets input from encoder for concatenation
def decoder_block_for_unet(input_representation, skip_features, num_filters):
    x = Conv2DTranspose(num_filters, (2, 2), strides=2, padding="same")(input_representation)
    x = Concatenate()([x, skip_features])
    x = conv_block(x, num_filters)
    return x

#Build Unet using the blocks
def build_unet(input_shape):
    inputs = Input(input_shape)

    s1, p1 = encoder_block(inputs, 64)
    s2, p2 = encoder_block(p1, 128)
    s3, p3 = encoder_block(p2, 256)
    s4, p4 = encoder_block(p3, 512)

    b1 = conv_block(p4, 1024) #Bridge

    d1 = decoder_block_for_unet(b1, s4, 512)
    d2 = decoder_block_for_unet(d1, s3, 256)
    d3 = decoder_block_for_unet(d2, s2, 128)
    d4 = decoder_block_for_unet(d3, s1, 64)

    outputs = Conv2D(1, 1, padding="same", activation="relu")(d4)

    model = Model(inputs, outputs, name="U-Net")
    return model

# Model

In [None]:
model = build_autoencoder((176, 336, 3))

# Training Loop

In [None]:
# Define the Robust Charbonnier Photometric loss
class PhotometricLoss(tf.keras.losses.Loss):
    # initialize instance attributes
    def __init__(self, epsilon=0.1, reduction=tf.keras.losses.Reduction.AUTO):
        super().__init__(reduction=reduction)
        self.epsilon = epsilon
    # Compute loss
    def call(self, y_true, y_pred):
        y_true = tf.cast(y_true, tf.float32)  # Convert y_true to float32
        y_pred = tf.cast(y_pred, tf.float32)  # Convert y_pred to float32
        delta = y_true - y_pred
        return tf.math.reduce_mean(tf.math.sqrt(tf.math.square(delta) + tf.math.square(self.epsilon)))

In [None]:
# This code snippet was found here: https://github.com/daniilidis-group/EV-FlowNet
@tf.function
def charbonnier_loss(delta, alpha=0.5, epsilon=0.1):
    loss = tf.reduce_mean(tf.pow(tf.math.square(delta) + tf.math.square(epsilon), alpha))
    return loss

In [None]:
# This code snippet was found here: https://github.com/daniilidis-group/EV-FlowNet
@tf.function
def compute_smoothness_loss(struct_param):
    struct_param_ucrop = struct_param[..., 1:, :]
    struct_param_dcrop = struct_param[..., :-1, :]
    struct_param_lcrop = struct_param[:, 1:, ...]
    struct_param_rcrop = struct_param[:, :-1, ...]

    struct_param_ulcrop = struct_param[:, 1:, 1:, :]
    struct_param_drcrop = struct_param[:, :-1, :-1, :]
    struct_param_dlcrop = struct_param[:, :-1, 1:, :]
    struct_param_urcrop = struct_param[:, 1:, :-1, :]

    smoothness_loss = charbonnier_loss(struct_param_lcrop - struct_param_rcrop) + charbonnier_loss(struct_param_ucrop - struct_param_dcrop) + charbonnier_loss(struct_param_ulcrop - struct_param_drcrop) + charbonnier_loss(struct_param_dlcrop - struct_param_urcrop)
    smoothness_loss /= 4.
    return smoothness_loss

In [None]:
# Instantiate an optimizer.
optimizer = Adam(learning_rate=0.00001)
# Instantiate a loss function.
loss_fn = PhotometricLoss()

In [None]:
dataset_voxel = np.load(path + "voxel_grids.npy")

In [None]:
epipoles = epipole_generator(list_I1_images, list_I2_images)

In [None]:
event_volumes = tf.keras.utils.normalize(x=dataset_voxel[:60, :176, :336, :])

In [None]:
translation_between_frames = tf.expand_dims(translation_between_frames, axis=0)

In [None]:
translation_between_frames = tf.broadcast_to(input=translation_between_frames, shape=[60, 60])

In [None]:
train_dataset = tf.data.Dataset.from_tensor_slices((event_volumes, list_I1w_images, list_I2_images, epipoles, translation_between_frames))

In [None]:
epochs = 50
for epoch in range(epochs):
    print("\nStart of epoch %d" % (epoch,))
    hold_loss = []
     # Iterate over the dataset.
    for step, element in enumerate(train_dataset):
        first_element, second_element, third_element, fourth_element, fifth_element = element
        voxel = tf.Variable(initial_value=first_element, trainable=True)
        Iw1_im = tf.Variable(initial_value=second_element, trainable=True)
        I2_im = tf.Variable(initial_value=third_element, trainable=True)
        I2_im_epipole = tf.Variable(initial_value=fourth_element, trainable=True)

        # Open a GradientTape to record the operations run
        # during the forward pass, which enables auto-differentiation.
        with tf.GradientTape() as tape:
            # Run the forward pass of the layer.
            # The operations that the layer applies
            # to its inputs are going to be recorded
            # on the GradientTape.

            tape.watch(model.trainable_weights)
            structure_parameter = model(tf.expand_dims(input=voxel, axis=0), training=True)  # Logits for this minibatch

            #structure has shape [1, 480, 640, 1]

            Iw1_im = tf.expand_dims(input=Iw1_im, axis=0)
            I2_im = tf.expand_dims(input=I2_im, axis=0)

            # Both I1w and I2 images have shape [1, 480, 640, 3]

            op_flow = optical_flow_generator(structure_parameter, I2_im_epipole, fifth_element[step])

            # optical flow a tensor of [1, 480, 640, 2]

            I2_hat = optical_flow_warp(op_flow, Iw1_im)

            # Compute the loss value for this minibatch.
            loss_value = loss_fn(I2_im, I2_hat) + 0.2*compute_smoothness_loss(structure_parameter)

        # Use the gradient tape to automatically retrieve
        # the gradients of the trainable variables with respect to the loss.
        grads = tape.gradient(loss_value, model.trainable_weights, unconnected_gradients=tf.UnconnectedGradients.ZERO)

        # Run one step of gradient descent by updating
        # the value of the variables to minimize the loss.
        optimizer.apply_gradients(zip(grads, model.trainable_weights))
        hold_loss.append(loss_value)
    print('Average loss', ":", tf.math.reduce_mean(tf.convert_to_tensor(hold_loss)).numpy())
    #print(loss_value)