# μPyD‑Net - Peluoso et al.

## Imports

In [2]:
import numpy as np
import cv2
import os
from pathlib import Path
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras.layers import Input, Conv2D, LeakyReLU, Conv2DTranspose, Concatenate, Lambda, Reshape
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import TimeDistributed, ConvLSTM2D
import visualkeras
import random
import matplotlib.pyplot as plt


## Eigen Split (Eigen et al.)

In [35]:
def load_split_file(split_file_path, data_root, target_size=(64, 64), original_width=1242):
    """
    Loads images and disparity maps based on a provided split file.
    Applies same logic for train, test, and validation splits.
    """
    with open(split_file_path, 'r') as f:
        lines = f.read().splitlines()

    images = []
    disparities = []
    sequences = set()
    scale_factor = target_size[1] / original_width

    for line in lines:
        left_path = line.strip().split()[0]
        parts = Path(left_path).parts[1:]
        rel_path = Path(*parts)

        sequence = rel_path.parts[0]
        filename = rel_path.stem

        image_path = Path(data_root) / sequence / "images" / f"{filename}.png"
        disp_path = Path(data_root) / sequence / "depths" / f"{filename}.npy"
        sequences.add(sequence)

        if not image_path.exists() or not disp_path.exists():
            continue

        img = cv2.imread(str(image_path), cv2.IMREAD_GRAYSCALE)
        img = cv2.resize(img, target_size).astype(np.float32) / 255.0
        img = img[..., np.newaxis]

        disp = np.load(disp_path)
        disp = cv2.resize(disp, target_size, interpolation=cv2.INTER_NEAREST).astype(np.float32)
        disp = disp * scale_factor
        disp = disp[..., np.newaxis]

        images.append(img)
        disparities.append(disp)

    return np.array(images), np.array(disparities), sequences

## Train-test split

In [37]:
# Replace these with your paths to the preprocessed dataset.
data_root = r""
train_file = r""
val_file   = r""
test_file = r""


X_train_2, y_train_2, train_sequences = load_split_file(train_file, data_root, target_size=(32, 32))
X_val_2,   y_val_2,   val_sequences   = load_split_file(val_file, data_root, target_size=(32, 32))
X_test_2,  y_test_2,  test_sequences  = load_split_file(test_file, data_root, target_size=(32, 32))

## Sanity check

In [None]:
# These should show 22600, 888, 697. Otherwise, something went wrong.
print(len(X_train_2))
print(len(X_val_2))
print(len(X_test_2))

## Sample visualization

In [None]:
def visualize_sample(X, y, index=600):
    """
    Visualize a grayscale input image and its corresponding SGM disparity map.
    """
    img = X[index].squeeze()
    disp = y[index].squeeze()

    plt.figure(figsize=(10, 4))

    plt.subplot(1, 2, 1)
    plt.imshow(img, cmap='gray')
    plt.title("Input Grayscale Image")
    plt.axis("off")

    plt.subplot(1, 2, 2)
    plt.imshow(disp, cmap='inferno')
    plt.title("SGM Disparity Map")
    plt.axis("off")

    plt.tight_layout()
    plt.show()

visualize_sample(X_train_2, y_train_2)


## μPyD‑Net Model

In [46]:
def build_uPyDNet(input_shape=(32, 32, 1)):
    def conv_block(x, filters, stride=1):
        x = Conv2D(filters, 3, stride, padding='same')(x)
        x = LeakyReLU(alpha=0.125)(x)
        return x

    inputs = Input(shape=input_shape)
    skips = []

    x = conv_block(inputs, 8)
    skips.append(x)

    x = conv_block(x, 8, stride=2)
    skips.append(x)

    x = conv_block(x, 16)
    skips.append(x)

    x = conv_block(x, 16, stride=2)
    skips.append(x)

    x = conv_block(x, 32)

    x = Conv2DTranspose(32, 2, strides=2, padding='same')(x)
    skip = skips[2]
    skip = Reshape((16, 16, 16))(skip)
    x = Concatenate()([x, skip])
    x = conv_block(x, 32)
    x = conv_block(x, 32)
    x = conv_block(x, 32)

    x = Conv2DTranspose(16, 2, strides=2, padding='same')(x)
    skip = skips[0]
    skip = Reshape((32, 32, 8))(skip)
    x = Concatenate()([x, skip])
    x = conv_block(x, 16)
    x = conv_block(x, 16)
    x = conv_block(x, 16)

    output = Conv2D(1, 3, padding='same')(x)
    return Model(inputs, output)

In [47]:
def berHu_loss(y_true, y_pred, alpha=0.2):
    abs_error = tf.abs(y_true - y_pred)
    max_val = tf.reduce_max(abs_error, axis=[1, 2, 3], keepdims=True)
    c = alpha * max_val + 1e-6
    condition = abs_error <= c
    l1 = abs_error
    l2 = (tf.square(abs_error) + tf.square(c)) / (2 * c)
    return tf.reduce_mean(tf.where(condition, l1, l2))

In [None]:
model = build_uPyDNet(input_shape=(32, 32, 1))

model.compile(optimizer=Adam(learning_rate=1e-4), loss=berHu_loss, metrics=['mae'])

model.summary()

In [None]:
from collections import defaultdict

color_map = defaultdict(dict)
color_map[Conv2D]['fill'] = 'skyblue'
color_map[Conv2DTranspose]['fill'] = 'lightgreen'
color_map[LeakyReLU]['fill'] = 'orange'
color_map[Concatenate]['fill'] = 'pink'

visualkeras.layered_view(model, legend=True, color_map=color_map, to_file='upyDNet_diagram.png')

## Training μPyD‑Net

In [None]:
callbacks = [
    tf.keras.callbacks.EarlyStopping(patience=80, restore_best_weights=True),
    tf.keras.callbacks.ReduceLROnPlateau(patience=50, factor=0.7, verbose=1)
]

history = model.fit(
    X_train_2, y_train_2,
    validation_data=(X_val_2, y_val_2),
    epochs=100,
    batch_size=16,
    callbacks=callbacks
)

In [None]:
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history.history['mae'], label='Training MAE')
plt.plot(history.history['val_mae'], label='Validation MAE')
plt.title('Model MAE')
plt.xlabel('Epoch')
plt.ylabel('MAE')
plt.legend()

plt.tight_layout()
plt.show()

## Test set evaluation (Monodepth Eigen test set - Godard et al.)

In [None]:
loss, mae = model.evaluate(X_test_2, y_test_2)
print(f"\nTest loss: {loss:.4f}, Test MAE: {mae:.4f}")

## Model saving

In [53]:
# Change the model name if you want it updated
model.save("model.h5")

## Prediction Visualization

In [54]:
# Needed to better visualize predictions. Otherwise, things seem blurry.
def normalize_for_display(arr, clip_percentile=2):
    vmin = np.percentile(arr, clip_percentile)
    vmax = np.percentile(arr, 100 - clip_percentile)
    arr_clipped = np.clip(arr, vmin, vmax)
    normed = (arr_clipped - vmin) / (vmax - vmin + 1e-8)
    return (normed * 255).astype(np.uint8)

In [56]:
def visualize_predictions(model, X_test, y_test, num_samples=3, upscale_factor=1):
    predictions = model.predict(X_test[:num_samples])

    plt.figure(figsize=(15, 5 * num_samples))

    for i in range(num_samples):
        input_img = X_test[i, ..., 0]
        gt = y_test[i, ..., 0]
        pred = predictions[i, ..., 0]

        pred_display = normalize_for_display(pred)

        if upscale_factor > 1:
            h, w = input_img.shape
            new_size = (w * upscale_factor, h * upscale_factor)
            input_img = cv2.resize(input_img, new_size, interpolation=cv2.INTER_CUBIC)
            pred_display = cv2.resize(pred_display, new_size, interpolation=cv2.INTER_CUBIC)
            gt_resized = cv2.resize(gt, new_size, interpolation=cv2.INTER_NEAREST)
        else:
            gt_resized = gt

        gt_mask = gt_resized > 0
        gt_display = np.zeros_like(gt_resized)
        gt_display[gt_mask] = normalize_for_display(gt_resized[gt_mask])

        plt.subplot(num_samples, 3, i * 3 + 1)
        plt.imshow(input_img, cmap='gray')
        plt.title('Input Image')
        plt.axis('off')

        plt.subplot(num_samples, 3, i * 3 + 2)
        plt.imshow(gt_display, cmap='magma')
        plt.title('Ground Truth Depth')
        plt.axis('off')

        plt.subplot(num_samples, 3, i * 3 + 3)
        plt.imshow(pred_display, cmap='magma')
        plt.title('Predicted Depth')
        plt.axis('off')

    plt.tight_layout()
    plt.show()


In [None]:
visualize_predictions(model, X_test_2, y_test_2, 5)

## Delta error calculation (non-quantized)

In [None]:
# Load original Keras model - uncomment this line if you want to test your saved model against the test set.
# model = tf.keras.models.load_model("model.h5", custom_objects={'berHu_loss':berHu_loss})

predictions = model.predict(X_test_2, batch_size=16, verbose=1)

# Constants (adjust to match your dataset / camera model if not using KITTI)
focal_length = 721.5377
baseline = 0.5327

def disparity_to_depth(disp):
    return focal_length * baseline / np.clip(disp, 1e-6, None)

def crop_for_eigen(gt):
    h, w = gt.shape
    crop = np.zeros_like(gt, dtype=bool)
    crop[int(0.4081 * h):int(0.9919 * h), int(0.0359 * w):int(0.9640 * w)] = True
    return crop

delta_1_all = []
delta_2_all = []
delta_3_all = []

for i in range(len(y_test_2)):
    gt_disp = y_test_2[i].squeeze()
    pred_disp = predictions[i].squeeze()

    gt_depth = disparity_to_depth(gt_disp)
    pred_depth = disparity_to_depth(pred_disp)

    valid_mask = (gt_depth > 0) & crop_for_eigen(gt_depth)

    gt = gt_depth[valid_mask]
    pred = pred_depth[valid_mask]

    pred = np.clip(pred, 1e-6, None)
    delta = np.maximum(pred / gt, gt / pred)

    delta_1_all.append(np.mean(delta < 1.25))
    delta_2_all.append(np.mean(delta < 1.25 ** 2))
    delta_3_all.append(np.mean(delta < 1.25 ** 3))

delta_1 = np.mean(delta_1_all) * 100
delta_2 = np.mean(delta_2_all) * 100
delta_3 = np.mean(delta_3_all) * 100

print(f"Delta < 1.25: {delta_1:.2f}%")
print(f"Delta < 1.25²: {delta_2:.2f}%")
print(f"Delta < 1.25³: {delta_3:.2f}%")



## TFLite Conversion

In [59]:
# Uncomment next line if you want to use this logic to fully quantize your own model.
# model = tf.keras.models.load_model("model.h5", custom_objects={'berHu_loss':berHu_loss})

converter = tf.lite.TFLiteConverter.from_keras_model(model)

converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_ops = [
    tf.lite.OpsSet.TFLITE_BUILTINS_INT8
]
converter.inference_input_type = tf.uint8
converter.inference_output_type = tf.uint8

def representative_dataset():
    # Take 100 random samples from X_train for calibration - otherwise, TFLite will refuse the quantization to int8
    num_samples = min(100, len(X_train_2))
    indices = np.random.choice(len(X_train_2), num_samples, replace=False)
    
    for idx in indices:
        sample = X_train_2[idx:idx+1]
        yield [sample]

converter.representative_dataset = representative_dataset

In [None]:
tflite_model = converter.convert()

In [None]:
# Change the name of the file for more clarity.
with open('model.tflite', 'wb') as f:
    f.write(tflite_model)

## Inference with .tflite model on Test set

In [None]:
interpreter = tf.lite.Interpreter(model_path="") # Put the path to your TFLite model here
interpreter.allocate_tensors()

input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

input_index = input_details[0]['index']
output_index = output_details[0]['index']

input_scale, input_zero_point = input_details[0]['quantization']
output_scale, output_zero_point = output_details[0]['quantization']

input_dtype = input_details[0]['dtype']
output_dtype = output_details[0]['dtype']

predictions = []

for i, img in enumerate(X_test_2):
    # Quantize input if needed - I used uint8 for input and output, change this if you used int8.
    if input_dtype == np.uint8:
        input_data = (img / input_scale + input_zero_point).astype(np.uint8)
    else:
        input_data = img.astype(input_dtype)

    input_data = np.expand_dims(input_data, axis=0)  # Add batch dimension

    interpreter.set_tensor(input_index, input_data)
    interpreter.invoke()

    output_data = interpreter.get_tensor(output_index)

    # Dequantize if needed - I used uint8 for input and output, change this if you used int8.
    if output_dtype == np.uint8:
        output_data = (output_data.astype(np.float32) - output_zero_point) * output_scale

    predictions.append(output_data[0])

predictions = np.array(predictions)

In [65]:
def normalize_for_display(arr, clip_percentile=2):
    vmin = np.percentile(arr, clip_percentile)
    vmax = np.percentile(arr, 100 - clip_percentile)
    arr_clipped = np.clip(arr, vmin, vmax)
    normed = (arr_clipped - vmin) / (vmax - vmin + 1e-8)
    return (normed * 255).astype(np.uint8)

def visualize_tflite_predictions(X_test, y_test, predictions, num_samples=3, upscale_factor=1):
    plt.figure(figsize=(15, 5 * num_samples))

    for i in range(num_samples):
        input_img = X_test[i, ..., 0]
        gt = y_test[i, ..., 0]
        pred = predictions[i, ..., 0]

        pred_display = normalize_for_display(pred)
        gt_display = normalize_for_display(gt)

        if upscale_factor > 1:
            h, w = input_img.shape
            new_size = (w * upscale_factor, h * upscale_factor)
            input_img = cv2.resize(input_img, new_size, interpolation=cv2.INTER_CUBIC)
            pred_display = cv2.resize(pred_display, new_size, interpolation=cv2.INTER_CUBIC)
            gt_display = cv2.resize(gt_display, new_size, interpolation=cv2.INTER_NEAREST)

        plt.subplot(num_samples, 3, i * 3 + 1)
        plt.imshow(input_img, cmap='gray')
        plt.title('Input Image')
        plt.axis('off')

        plt.subplot(num_samples, 3, i * 3 + 2)
        plt.imshow(gt_display, cmap='magma')
        plt.title('Ground Truth Depth')
        plt.axis('off')

        plt.subplot(num_samples, 3, i * 3 + 3)
        plt.imshow(pred_display, cmap='magma')
        plt.title('TFLite Prediction')
        plt.axis('off')

    plt.tight_layout()
    plt.show()

In [None]:
visualize_tflite_predictions(X_test_2, y_test_2, predictions, num_samples=25)

## Delta Error Calculation

In [None]:
# Constants (adjust to match your dataset or camera model if not using KITTI)
focal_length = 721.5377
baseline = 0.5327

def disparity_to_depth(disp):
    return focal_length * baseline / np.clip(disp, 1e-6, None)

def crop_for_eigen(gt):
    h, w = gt.shape
    crop = np.zeros_like(gt, dtype=bool)
    crop[int(0.4081 * h):int(0.9919 * h), int(0.0359 * w):int(0.9640 * w)] = True
    return crop

delta_1_all = []
delta_2_all = []
delta_3_all = []

delta_low_all = []
delta_mid_all = []
delta_high_all = []

for i in range(len(y_test_2)):
    gt_disp = y_test_2[i].squeeze()
    pred_disp = predictions[i].squeeze()

    gt_depth = disparity_to_depth(gt_disp)
    pred_depth = disparity_to_depth(pred_disp)

    valid_mask = (gt_depth > 0) & crop_for_eigen(gt_depth)

    gt = gt_depth[valid_mask]
    pred = pred_depth[valid_mask]
    disp = gt_disp[valid_mask]

    pred = np.clip(pred, 1e-6, None)
    delta = np.maximum(pred / gt, gt / pred)

    delta_1_all.append(np.mean(delta < 1.25))
    delta_2_all.append(np.mean(delta < 1.25 ** 2))
    delta_3_all.append(np.mean(delta < 1.25 ** 3))

    sorted_indices = np.argsort(disp)
    n = len(disp)
    thirds = n // 3

    low_idx = sorted_indices[:thirds]
    mid_idx = sorted_indices[thirds:2*thirds]
    high_idx = sorted_indices[2*thirds:]

    delta_low_all.append(np.mean(delta[low_idx] < 1.25))
    delta_mid_all.append(np.mean(delta[mid_idx] < 1.25))
    delta_high_all.append(np.mean(delta[high_idx] < 1.25))

delta_1 = np.mean(delta_1_all) * 100
delta_2 = np.mean(delta_2_all) * 100
delta_3 = np.mean(delta_3_all) * 100

delta_low = np.mean(delta_low_all) * 100
delta_mid = np.mean(delta_mid_all) * 100
delta_high = np.mean(delta_high_all) * 100

print(f"Delta < 1.25: {delta_1:.2f}%")
print(f"Delta < 1.25²: {delta_2:.2f}%")
print(f"Delta < 1.25³: {delta_3:.2f}%\n")

print("Delta < 1.25 by Disparity Region:")
print(f"  Low disparity (far):    {delta_low:.2f}%")
print(f"  Mid disparity:          {delta_mid:.2f}%")
print(f"  High disparity (close): {delta_high:.2f}%")

# Temporal μPyD‑Net

## Eigen split

In [40]:
def load_temporal_frames_from_split(split_file_path, data_root, target_size=(64, 64), original_width=1242, time_steps=3):
    """
    For each frame in the split file, finds the previous (time_steps - 1) adjacent frames (by filename).
    Builds input/output pairs suitable for LSTM models.
    Returns:
        - image_seqs: (N, T, H, W, 1)
        - disp_seqs:  (N, T, H, W, 1)
    """
    assert time_steps >= 1, "time_steps must be >= 1"
    with open(split_file_path, 'r') as f:
        lines = f.read().splitlines()

    image_seqs = []
    disp_seqs = []
    scale_factor = target_size[1] / original_width

    for line in lines:
        left_path = line.strip().split()[0]
        parts = Path(left_path).parts[1:]
        rel_path = Path(*parts)

        seq = rel_path.parts[0]
        frame_id_str = rel_path.stem
        frame_id = int(frame_id_str)

        frames = []
        for offset in reversed(range(time_steps)):
            frame_index = frame_id - offset
            if frame_index < 0:
                break  # Have not found 2 frames before this
            frame_name = f"{frame_index:010d}"

            image_path = Path(data_root) / seq / "images" / f"{frame_name}.png"
            disp_path = Path(data_root) / seq / "depths" / f"{frame_name}.npy"

            if not image_path.exists() or not disp_path.exists():
                break

            img = cv2.imread(str(image_path), cv2.IMREAD_GRAYSCALE)
            if img is None:
                break

            img = cv2.resize(img, target_size).astype(np.float32) / 255.0
            img = img[..., np.newaxis]

            disp = np.load(disp_path)
            disp = cv2.resize(disp, target_size, interpolation=cv2.INTER_NEAREST).astype(np.float32)
            disp *= scale_factor
            disp = disp[..., np.newaxis]

            frames.append((img, disp))

        if len(frames) == time_steps:
            imgs, disps = zip(*frames)
            image_seqs.append(np.stack(imgs, axis=0))
            disp_seqs.append(np.stack(disps, axis=0))

    return np.array(image_seqs), np.array(disp_seqs)

In [41]:
data_root = r"" # This should be the path to your preprocessed dataset (KITTI or anything else, as long as it maintains folder structure)

X_train, y_train_seq = load_temporal_frames_from_split("eigen_train_files.txt", data_root, target_size=(32, 32))
y_train = y_train_seq[:, -1]

X_val, y_val_seq = load_temporal_frames_from_split("eigen_val_files.txt", data_root, target_size=(32, 32))
y_val = y_val_seq[:, -1]

X_test, y_test_seq = load_temporal_frames_from_split("eigen_test_files.txt", data_root, target_size=(32, 32))
y_test = y_test_seq[:, -1]

## Pico-compatible Model

In [73]:
import tensorflow.keras.backend as K
import tensorflow as tf
from tensorflow.keras.layers import Layer, Input, Conv2D, Conv2DTranspose, TimeDistributed, LeakyReLU, Concatenate
from tensorflow.keras.models import Model

In [140]:
def build_temporal_uPyDNet_tflm(input_shape=(3, 32, 32, 1)):

    def conv_block_td(x, filters, stride=1):
        x = TimeDistributed(Conv2D(filters, 3, strides=stride, padding='same'))(x)
        x = TimeDistributed(LeakyReLU(alpha=0.125))(x)
        return x

    inputs = Input(shape=input_shape)
    skips = []

    x = conv_block_td(inputs, 8, stride=1)
    skips.append(x)

    x = conv_block_td(x, 8, stride=2)
    skips.append(x)

    x = conv_block_td(x, 16, stride=1)
    skips.append(x)

    x = conv_block_td(x, 16, stride=2)
    skips.append(x)

    x = conv_block_td(x, 32, stride=1)

    x = Lambda(lambda t: K.mean(t, axis=1))(x)

    decoder_channels = [32, 16]

    for i in reversed(range(2)):
        x = Conv2DTranspose(decoder_channels[i], 2, strides=2, padding='same')(x)

        skip = skips[i * 2]
        skip = Lambda(lambda s: s[:, -1, :, :, :])(skip)

        x = Concatenate(axis=-1)([x, skip])
        x = Conv2D(decoder_channels[i], 3, padding='same')(x)
        x = LeakyReLU(alpha=0.125)(x)
        x = Conv2D(decoder_channels[i], 3, padding='same')(x)
        x = LeakyReLU(alpha=0.125)(x)

    output = Conv2D(1, 3, padding='same')(x)
    return Model(inputs, output)


## Training

In [None]:
model = build_temporal_uPyDNet_tflm()

model.compile(optimizer=Adam(learning_rate=1e-4), loss=berHu_loss, metrics=['mae'])

model.summary()

In [None]:
from collections import defaultdict

color_map = defaultdict(dict)
color_map[Conv2D]['fill'] = 'skyblue'
color_map[Conv2DTranspose]['fill'] = 'lightgreen'
color_map[LeakyReLU]['fill'] = 'orange'
color_map[Concatenate]['fill'] = 'pink'

visualkeras.layered_view(model, legend=True, color_map=color_map, to_file='lstm_upyDNet_diagram.png')

In [None]:
callbacks = [
    tf.keras.callbacks.EarlyStopping(patience=15, restore_best_weights=True),
    tf.keras.callbacks.ModelCheckpoint('model_checkpoint.h5', save_best_only=True)
]

history = model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=100,
    batch_size=16,
    callbacks=callbacks
)

In [None]:
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history.history['mae'], label='Training MAE')
plt.plot(history.history['val_mae'], label='Validation MAE')
plt.title('Model MAE')
plt.xlabel('Epoch')
plt.ylabel('MAE')
plt.legend()

plt.tight_layout()
plt.show()

## Test set evaluation

In [None]:
loss, mae = model.evaluate(X_test, y_test)
print(f"\nTest loss: {loss:.4f}, Test MAE: {mae:.4f}")

## Model saving

In [154]:
model.save("model.h5")

## Test set predictions

In [128]:
def normalize_for_display(arr, clip_percentile=2):
    vmin = np.percentile(arr, clip_percentile)
    vmax = np.percentile(arr, 100 - clip_percentile)
    arr_clipped = np.clip(arr, vmin, vmax)
    normed = (arr_clipped - vmin) / (vmax - vmin + 1e-8)
    return (normed * 255).astype(np.uint8)

In [129]:
def visualize_temporal_predictions(model, X_seq_test, y_seq_test, num_samples=3, upscale_factor=1):
    """
    Visualize predictions from an LSTM-based depth model.

    Args:
        model: Trained temporal Keras model
        X_seq_test: Input sequences, shape (N, T, H, W, 1)
        y_seq_test: Ground truth disparity maps, shape (N, H, W, 1)
        num_samples: Number of samples to visualize
        upscale_factor: Factor to enlarge images for display
    """
    predictions = model.predict(X_seq_test[:num_samples])

    plt.figure(figsize=(15, 5 * num_samples))

    for i in range(num_samples):
        input_img = X_seq_test[i, -1].squeeze()
        gt = y_seq_test[i].squeeze()
        pred = predictions[i].squeeze()

        pred_display = normalize_for_display(pred)

        if upscale_factor > 1:
            h, w = input_img.shape
            new_size = (w * upscale_factor, h * upscale_factor)
            input_img = cv2.resize(input_img, new_size, interpolation=cv2.INTER_CUBIC)
            pred_display = cv2.resize(pred_display, new_size, interpolation=cv2.INTER_CUBIC)
            gt_resized = cv2.resize(gt, new_size, interpolation=cv2.INTER_NEAREST)
        else:
            gt_resized = gt

        gt_mask = gt_resized > 0
        gt_valid = np.where(gt_mask, gt_resized, np.nan)
        gt_display = normalize_for_display(np.nan_to_num(gt_valid, nan=0.0))

        plt.subplot(num_samples, 3, i * 3 + 1)
        plt.imshow(input_img, cmap='gray')
        plt.title('Input Image (last frame)')
        plt.axis('off')

        plt.subplot(num_samples, 3, i * 3 + 2)
        plt.imshow(gt_display, cmap='magma')
        plt.title('Ground Truth Depth')
        plt.axis('off')

        plt.subplot(num_samples, 3, i * 3 + 3)
        plt.imshow(pred_display, cmap='magma')
        plt.title('Predicted Depth')
        plt.axis('off')

    plt.tight_layout()
    plt.show()


In [None]:
visualize_temporal_predictions(model, X_test, y_test)

In [None]:
predictions = model.predict(X_test, batch_size=16, verbose=1)

# Constants (adjust to match your dataset or camera model if not using KITTI)
focal_length = 721.5377
baseline = 0.5327

def disparity_to_depth(disp):
    return focal_length * baseline / np.clip(disp, 1e-6, None)

def crop_for_eigen(gt):
    h, w = gt.shape
    crop = np.zeros_like(gt, dtype=bool)
    crop[int(0.4081 * h):int(0.9919 * h), int(0.0359 * w):int(0.9640 * w)] = True
    return crop

delta_1_all = []
delta_2_all = []
delta_3_all = []

for i in range(len(y_test)):
    gt_disp = y_test[i].squeeze()
    pred_disp = predictions[i].squeeze()

    gt_depth = disparity_to_depth(gt_disp)
    pred_depth = disparity_to_depth(pred_disp)

    valid_mask = (gt_depth > 0) & crop_for_eigen(gt_depth)

    gt = gt_depth[valid_mask]
    pred = pred_depth[valid_mask]

    pred = np.clip(pred, 1e-6, None)
    delta = np.maximum(pred / gt, gt / pred)

    delta_1_all.append(np.mean(delta < 1.25))
    delta_2_all.append(np.mean(delta < 1.25 ** 2))
    delta_3_all.append(np.mean(delta < 1.25 ** 3))

delta_1 = np.mean(delta_1_all) * 100
delta_2 = np.mean(delta_2_all) * 100
delta_3 = np.mean(delta_3_all) * 100

print(f"Delta < 1.25: {delta_1:.2f}%")
print(f"Delta < 1.25²: {delta_2:.2f}%")
print(f"Delta < 1.25³: {delta_3:.2f}%")



## TFLite Conversion

In [176]:
# Uncomment the line below if you want to convert your own model using this code.
# model = tf.keras.models.load_model("model.h5", custom_objects={'berHu_loss':berHu_loss})

converter = tf.lite.TFLiteConverter.from_keras_model(model)

converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_ops = [
    tf.lite.OpsSet.TFLITE_BUILTINS_INT8
]
converter.inference_input_type = tf.uint8
converter.inference_output_type = tf.uint8

def representative_dataset():
    num_samples = min(100, len(X_train))
    indices = np.random.choice(len(X_train), num_samples, replace=False)
    
    for idx in indices:
        sample = X_train[idx:idx+1]
        yield [sample]

converter.representative_dataset = representative_dataset

In [None]:
tflite_model = converter.convert()

In [None]:
with open('model.tflite', 'wb') as f:
    f.write(tflite_model)

## TFLite inference

In [None]:
interpreter = tf.lite.Interpreter(model_path="") # Replace this with the path to you TFLite model
interpreter.allocate_tensors()

input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

input_index = input_details[0]['index']
output_index = output_details[0]['index']

input_scale, input_zero_point = input_details[0]['quantization']
output_scale, output_zero_point = output_details[0]['quantization']

input_dtype = input_details[0]['dtype']
output_dtype = output_details[0]['dtype']

predictions = []

for i, img in enumerate(X_test):
    # Quantize input if needed - I used uint8 for my input and output layers, you need to change this to int8 if you used that instead.
    if input_dtype == np.uint8:
        input_data = (img / input_scale + input_zero_point).astype(np.uint8)
    else:
        input_data = img.astype(input_dtype)

    input_data = np.expand_dims(input_data, axis=0)

    interpreter.set_tensor(input_index, input_data)
    interpreter.invoke()

    output_data = interpreter.get_tensor(output_index)

    # Dequantize if needed - I used uint8 for my input and output layers, you need to change this to int8 if you used that instead.
    if output_dtype == np.uint8:
        output_data = (output_data.astype(np.float32) - output_zero_point) * output_scale

    predictions.append(output_data[0])

predictions = np.array(predictions)

In [112]:
def normalize_for_display(arr, clip_percentile=2):
    vmin = np.percentile(arr, clip_percentile)
    vmax = np.percentile(arr, 100 - clip_percentile)
    arr_clipped = np.clip(arr, vmin, vmax)
    normed = (arr_clipped - vmin) / (vmax - vmin + 1e-8)
    return (normed * 255).astype(np.uint8)

In [113]:
def visualize_temporal_predictions(predictions, y_seq_test, num_samples=3, upscale_factor=1):
    """
    Visualize predictions from an LSTM-based depth model.

    Args:
        model: Trained temporal Keras model
        X_seq_test: Input sequences, shape (N, T, H, W, 1)
        y_seq_test: Ground truth disparity maps, shape (N, H, W, 1)
        num_samples: Number of samples to visualize
        upscale_factor: Factor to enlarge images for display
    """
    plt.figure(figsize=(15, 5 * num_samples))

    for i in range(num_samples):
        input_img = X_test[i, -1].squeeze()
        gt = y_seq_test[i].squeeze()
        pred = predictions[i].squeeze()

        pred_display = normalize_for_display(pred)

        if upscale_factor > 1:
            h, w = input_img.shape
            new_size = (w * upscale_factor, h * upscale_factor)
            input_img = cv2.resize(input_img, new_size, interpolation=cv2.INTER_CUBIC)
            pred_display = cv2.resize(pred_display, new_size, interpolation=cv2.INTER_CUBIC)
            gt_resized = cv2.resize(gt, new_size, interpolation=cv2.INTER_NEAREST)
        else:
            gt_resized = gt

        gt_mask = gt_resized > 0
        gt_valid = np.where(gt_mask, gt_resized, np.nan)
        gt_display = normalize_for_display(np.nan_to_num(gt_valid, nan=0.0))

        plt.subplot(num_samples, 3, i * 3 + 1)
        plt.imshow(input_img, cmap='gray')
        plt.title('Input Image (last frame)')
        plt.axis('off')

        plt.subplot(num_samples, 3, i * 3 + 2)
        plt.imshow(gt_display, cmap='magma')
        plt.title('Ground Truth Depth')
        plt.axis('off')

        plt.subplot(num_samples, 3, i * 3 + 3)
        plt.imshow(pred_display, cmap='magma')
        plt.title('Predicted Depth')
        plt.axis('off')

    plt.tight_layout()
    plt.show()


In [None]:
visualize_temporal_predictions(predictions, y_test, num_samples=5)

## Delta error calculation

In [None]:
# Constants (adjust to match your dataset or camera model if not using KITTI)
focal_length = 721.5377
baseline = 0.5327

def disparity_to_depth(disp):
    return focal_length * baseline / np.clip(disp, 1e-6, None)

def crop_for_eigen(gt):
    h, w = gt.shape
    crop = np.zeros_like(gt, dtype=bool)
    crop[int(0.4081 * h):int(0.9919 * h), int(0.0359 * w):int(0.9640 * w)] = True
    return crop

delta_1_all = []
delta_2_all = []
delta_3_all = []

delta_low_all = []
delta_mid_all = []
delta_high_all = []

for i in range(len(y_test)):
    gt_disp = y_test[i].squeeze()
    pred_disp = predictions[i].squeeze()

    gt_depth = disparity_to_depth(gt_disp)
    pred_depth = disparity_to_depth(pred_disp)

    valid_mask = (gt_depth > 0) & crop_for_eigen(gt_depth)

    gt = gt_depth[valid_mask]
    pred = pred_depth[valid_mask]
    disp = gt_disp[valid_mask]

    pred = np.clip(pred, 1e-6, None)
    delta = np.maximum(pred / gt, gt / pred)

    delta_1_all.append(np.mean(delta < 1.25))
    delta_2_all.append(np.mean(delta < 1.25 ** 2))
    delta_3_all.append(np.mean(delta < 1.25 ** 3))

    sorted_indices = np.argsort(disp)
    n = len(disp)
    thirds = n // 3

    low_idx = sorted_indices[:thirds]
    mid_idx = sorted_indices[thirds:2*thirds]
    high_idx = sorted_indices[2*thirds:]

    delta_low_all.append(np.mean(delta[low_idx] < 1.25))
    delta_mid_all.append(np.mean(delta[mid_idx] < 1.25))
    delta_high_all.append(np.mean(delta[high_idx] < 1.25))

delta_1 = np.mean(delta_1_all) * 100
delta_2 = np.mean(delta_2_all) * 100
delta_3 = np.mean(delta_3_all) * 100

delta_low = np.mean(delta_low_all) * 100
delta_mid = np.mean(delta_mid_all) * 100
delta_high = np.mean(delta_high_all) * 100

print(f"Delta < 1.25: {delta_1:.2f}%")
print(f"Delta < 1.25²: {delta_2:.2f}%")
print(f"Delta < 1.25³: {delta_3:.2f}%\n")

print("Delta < 1.25 by Disparity Region:")
print(f"  Low disparity (far):    {delta_low:.2f}%")
print(f"  Mid disparity:          {delta_mid:.2f}%")
print(f"  High disparity (close): {delta_high:.2f}%")


# L-EfficientUNet

## Imports

In [184]:
from tensorflow.keras.layers import DepthwiseConv2D, BatchNormalization, ReLU, MaxPooling2D, UpSampling2D

## Model

In [185]:
def depthwise_separable_conv(x, filters, kernel_size=3, strides=1):
    x = DepthwiseConv2D(kernel_size, strides=strides, padding='same', use_bias=False)(x)
    x = BatchNormalization()(x)
    x = ReLU()(x)
    x = Conv2D(filters, kernel_size=1, padding='same', use_bias=False)(x)
    x = BatchNormalization()(x)
    x = ReLU()(x)
    return x

def encoder_block(x, filters):
    x = depthwise_separable_conv(x, filters)
    skip = x
    x = MaxPooling2D(pool_size=(2, 2))(x)
    return x, skip

def decoder_block(x, skip, filters):
    x = UpSampling2D(size=(2, 2), interpolation='bilinear')(x)
    x = Concatenate()([x, skip])
    x = depthwise_separable_conv(x, filters)
    return x

def build_l_efficientunet(input_shape=(64, 64, 3), num_classes=1):
    inputs = Input(shape=input_shape)

    x, skip1 = encoder_block(inputs, 16)
    x, skip2 = encoder_block(x, 32)
    x, skip3 = encoder_block(x, 64)
    x, skip4 = encoder_block(x, 128)

    x = depthwise_separable_conv(x, 256)

    x = decoder_block(x, skip4, 128)
    x = decoder_block(x, skip3, 64)
    x = decoder_block(x, skip2, 32)
    x = decoder_block(x, skip1, 16)

    outputs = Conv2D(num_classes, kernel_size=1)(x)

    model = Model(inputs, outputs, name='L_EfficientUNet')
    print(f"\nTotal params: {model.count_params():,}")
    return model

In [None]:
model = build_l_efficientunet(input_shape=(32, 32, 1), num_classes=1)
model.compile(optimizer=Adam(1e-4), loss=berHu_loss, metrics=['mae'])

In [None]:
from collections import defaultdict

color_map = defaultdict(dict)
color_map[Conv2D]['fill'] = 'skyblue'
color_map[Conv2DTranspose]['fill'] = 'lightgreen'
color_map[LeakyReLU]['fill'] = 'orange'
color_map[Concatenate]['fill'] = 'pink'

visualkeras.layered_view(model, legend=True, color_map=color_map, to_file='l-efficientUNet_diagram.png')

## Model training

In [None]:
callbacks = [
    tf.keras.callbacks.EarlyStopping(patience=15, restore_best_weights=True),
    tf.keras.callbacks.ModelCheckpoint("model_checkpoint.h5", save_best_only=True)
]

history = model.fit(
    X_train_2, y_train_2,
    validation_data=(X_val_2, y_val_2),
    epochs=100,
    batch_size=16,
    callbacks=callbacks
)

In [None]:
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history.history['mae'], label='Training MAE')
plt.plot(history.history['val_mae'], label='Validation MAE')
plt.title('Model MAE')
plt.xlabel('Epoch')
plt.ylabel('MAE')
plt.legend()

plt.tight_layout()
plt.show()

## Test set evaluation

In [None]:
loss, mae = model.evaluate(X_test_2, y_test_2)
print(f"\nTest loss: {loss:.4f}, Test MAE: {mae:.4f}")

## Model saving

In [66]:
model.save("model.h5")

## Prediction visualization

In [64]:
def normalize_for_display(arr, clip_percentile=2):
    vmin = np.percentile(arr, clip_percentile)
    vmax = np.percentile(arr, 100 - clip_percentile)
    arr_clipped = np.clip(arr, vmin, vmax)
    normed = (arr_clipped - vmin) / (vmax - vmin + 1e-8)
    return (normed * 255).astype(np.uint8)

In [65]:
def visualize_predictions(model, X_test, y_test, num_samples=3, upscale_factor=1):
    predictions = model.predict(X_test[:num_samples])

    plt.figure(figsize=(15, 5 * num_samples))

    for i in range(num_samples):
        input_img = X_test[i, ..., 0]
        gt = y_test[i, ..., 0]
        pred = predictions[i, ..., 0]

        pred_display = normalize_for_display(pred)

        if upscale_factor > 1:
            h, w = input_img.shape
            new_size = (w * upscale_factor, h * upscale_factor)
            input_img = cv2.resize(input_img, new_size, interpolation=cv2.INTER_CUBIC)
            pred_display = cv2.resize(pred_display, new_size, interpolation=cv2.INTER_CUBIC)
            gt_resized = cv2.resize(gt, new_size, interpolation=cv2.INTER_NEAREST)
        else:
            gt_resized = gt

        gt_mask = gt_resized > 0
        gt_display = np.zeros_like(gt_resized)
        gt_display[gt_mask] = normalize_for_display(gt_resized[gt_mask])

        plt.subplot(num_samples, 3, i * 3 + 1)
        plt.imshow(input_img, cmap='gray')
        plt.title('Input Image')
        plt.axis('off')

        plt.subplot(num_samples, 3, i * 3 + 2)
        plt.imshow(gt_display, cmap='magma')
        plt.title('Ground Truth Depth')
        plt.axis('off')

        plt.subplot(num_samples, 3, i * 3 + 3)
        plt.imshow(pred_display, cmap='magma')
        plt.title('Predicted Depth')
        plt.axis('off')

    plt.tight_layout()
    plt.show()


In [None]:
visualize_predictions(model, X_test_2, y_test_2, 5)

## TFLite conversion

In [69]:
# Uncomment the below line if you want to use this code to quantize your own model.
# model = tf.keras.models.load_model("model.h5", custom_objects={'berHu_loss':berHu_loss})

converter = tf.lite.TFLiteConverter.from_keras_model(model)

converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_ops = [
    tf.lite.OpsSet.TFLITE_BUILTINS_INT8
]
converter.inference_input_type = tf.uint8
converter.inference_output_type = tf.uint8

def representative_dataset():
    num_samples = min(100, len(X_train_2))
    indices = np.random.choice(len(X_train_2), num_samples, replace=False)
    
    for idx in indices:
        sample = X_train_2[idx:idx+1]
        yield [sample]

converter.representative_dataset = representative_dataset

In [None]:
tflite_model = converter.convert()

In [None]:
with open('model.tflite', 'wb') as f:
    f.write(tflite_model)

In [None]:
interpreter = tf.lite.Interpreter(model_path="") # Replace this with the path to your TFLite model
interpreter.allocate_tensors()

input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

input_index = input_details[0]['index']
output_index = output_details[0]['index']

input_scale, input_zero_point = input_details[0]['quantization']
output_scale, output_zero_point = output_details[0]['quantization']

input_dtype = input_details[0]['dtype']
output_dtype = output_details[0]['dtype']

predictions = []

for i, img in enumerate(X_test_2):
    # Quantize input if needed - I used uint8 for my input and output, if you used int8, change the below code accordingly.
    if input_dtype == np.uint8:
        input_data = (img / input_scale + input_zero_point).astype(np.uint8)
    else:
        input_data = img.astype(input_dtype)

    input_data = np.expand_dims(input_data, axis=0)

    interpreter.set_tensor(input_index, input_data)
    interpreter.invoke()

    output_data = interpreter.get_tensor(output_index)

    # Dequantize if needed - I used uint8 for my input and output, if you used int8, change the below code accordingly.
    if output_dtype == np.uint8:
        output_data = (output_data.astype(np.float32) - output_zero_point) * output_scale

    predictions.append(output_data[0])


predictions = np.array(predictions)

In [97]:
def normalize_for_display(arr, clip_percentile=2):
    vmin = np.percentile(arr, clip_percentile)
    vmax = np.percentile(arr, 100 - clip_percentile)
    arr_clipped = np.clip(arr, vmin, vmax)
    normed = (arr_clipped - vmin) / (vmax - vmin + 1e-8)
    return (normed * 255).astype(np.uint8)

def visualize_tflite_predictions(X_test, y_test, predictions, num_samples=3, upscale_factor=1):
    plt.figure(figsize=(15, 5 * num_samples))

    for i in range(num_samples):
        input_img = X_test[i, ..., 0]
        gt = y_test[i, ..., 0]
        pred = predictions[i, ..., 0]

        pred_display = normalize_for_display(pred)
        gt_display = normalize_for_display(gt)

        if upscale_factor > 1:
            h, w = input_img.shape
            new_size = (w * upscale_factor, h * upscale_factor)
            input_img = cv2.resize(input_img, new_size, interpolation=cv2.INTER_CUBIC)
            pred_display = cv2.resize(pred_display, new_size, interpolation=cv2.INTER_CUBIC)
            gt_display = cv2.resize(gt_display, new_size, interpolation=cv2.INTER_NEAREST)

        plt.subplot(num_samples, 3, i * 3 + 1)
        plt.imshow(input_img, cmap='gray')
        plt.title('Input Image')
        plt.axis('off')

        plt.subplot(num_samples, 3, i * 3 + 2)
        plt.imshow(gt_display, cmap='magma')
        plt.title('Ground Truth Depth')
        plt.axis('off')

        plt.subplot(num_samples, 3, i * 3 + 3)
        plt.imshow(pred_display, cmap='magma')
        plt.title('TFLite Prediction')
        plt.axis('off')

    plt.tight_layout()
    plt.show()

In [None]:
visualize_tflite_predictions(X_test_2, y_test_2, predictions, num_samples=25)

## Delta accuracy evaluation

In [None]:
delta_1_all = []
delta_2_all = []
delta_3_all = []

for i in range(len(y_test)):
    # Extract predicted and ground truth disparities
    gt = y_test_2[i].squeeze()
    pred = predictions[i].squeeze()

    # Mask out invalid ground truth pixels
    valid_mask = gt > 0

    gt = gt[valid_mask]
    pred = pred[valid_mask]

    # Avoid divide-by-zero
    pred = np.clip(pred, 1e-6, None)

    delta = np.maximum(pred / gt, gt / pred)

    delta_1_all.append(np.mean(delta < 1.25))
    delta_2_all.append(np.mean(delta < 1.25 ** 2))
    delta_3_all.append(np.mean(delta < 1.25 ** 3))

# Convert to percentages
delta_1 = np.mean(delta_1_all) * 100
delta_2 = np.mean(delta_2_all) * 100
delta_3 = np.mean(delta_3_all) * 100

print(f"Delta < 1.25: {delta_1:.2f}%")
print(f"Delta < 1.25²: {delta_2:.2f}%")
print(f"Delta < 1.25³: {delta_3:.2f}%")


# Pico friendly L-ENet

## Imports

In [75]:
from tensorflow.keras.layers import Add

## Model

In [76]:
def depthwise_separable_conv(x, out_channels, stride=1):
    x = DepthwiseConv2D(3, strides=stride, padding='same', use_bias=False)(x)
    x = BatchNormalization()(x)
    x = ReLU(max_value=6)(x)
    x = Conv2D(out_channels, 1, padding='same', use_bias=False)(x)
    x = BatchNormalization()(x)
    x = ReLU(max_value=6)(x)
    return x


def initial_block(x, out_channels):
    conv = Conv2D(out_channels - 3, 3, strides=2, padding='same')(x)
    pool = MaxPooling2D(pool_size=2, strides=2, padding='same')(x)
    return Concatenate()([conv, pool])


def bottleneck(x, out_channels, downsample=False):
    residual = x
    stride = 2 if downsample else 1

    x = depthwise_separable_conv(x, out_channels, stride=stride)

    if downsample:
        residual = MaxPooling2D(pool_size=2, strides=2, padding='same')(residual)

    if residual.shape[-1] != out_channels:
        residual = Conv2D(out_channels, 1, padding='same')(residual)

    x = Add()([x, residual])
    x = ReLU(max_value=6)(x)
    return x


def upsample_block(x, skip, out_channels):
    x = UpSampling2D()(x)
    x = Conv2D(out_channels, 3, padding='same', activation='relu')(x)

    if x.shape[1:3] == skip.shape[1:3]:
        x = Concatenate()([x, skip])
    return x


def create_lenet_fixed(input_shape=(64, 64, 3), num_classes=1):
    inputs = Input(shape=input_shape)

    x = initial_block(inputs, out_channels=12)
    skip1 = x

    x = bottleneck(x, 16, downsample=True)
    skip2 = x

    x = bottleneck(x, 24, downsample=True)
    skip3 = x

    x = bottleneck(x, 32, downsample=True)

    x = bottleneck(x, 32)

    x = upsample_block(x, skip3, 24)
    x = upsample_block(x, skip2, 16)
    x = upsample_block(x, skip1, 12)

    x = UpSampling2D()(x)
    x = Conv2D(12, 3, padding='same', activation='relu')(x)

    outputs = Conv2D(num_classes, 1, padding='same')(x)

    model = Model(inputs, outputs)
    print(f"Total parameters: {model.count_params():,}")
    return model

In [None]:
model = create_lenet_fixed(input_shape=(64, 64, 1), num_classes=1)

model.compile(optimizer='adam', loss=berHu_loss, metrics=['mae'])

model.summary()

In [None]:
from collections import defaultdict

color_map = defaultdict(dict)
color_map[Conv2D]['fill'] = 'skyblue'
color_map[Conv2DTranspose]['fill'] = 'lightgreen'
color_map[LeakyReLU]['fill'] = 'orange'
color_map[Concatenate]['fill'] = 'pink'

# Create the visual representation
visualkeras.layered_view(model, legend=True, color_map=color_map, to_file='l-ENet_diagram.png')

In [None]:
callbacks = [
    tf.keras.callbacks.EarlyStopping(patience=80, restore_best_weights=True),
    tf.keras.callbacks.ModelCheckpoint("model.h5", save_best_only=True, verbose=1)
]

history = model.fit(
    X_train_2, y_train_2,
    validation_data=(X_val_2, y_val_2),
    epochs=100,
    batch_size=16,
    callbacks=callbacks
)

In [None]:
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history.history['mae'], label='Training MAE')
plt.plot(history.history['val_mae'], label='Validation MAE')
plt.title('Model MAE')
plt.xlabel('Epoch')
plt.ylabel('MAE')
plt.legend()

plt.tight_layout()
plt.show()

## Test Set Evaluation

In [None]:
loss, mae = model.evaluate(X_test_2, y_test_2)
print(f"\nTest loss: {loss:.4f}, Test MAE: {mae:.4f}")

## Model saving

In [79]:
model.save("model.h5")

## Prediction visualization

In [82]:
def normalize_for_display(arr, clip_percentile=2):
    vmin = np.percentile(arr, clip_percentile)
    vmax = np.percentile(arr, 100 - clip_percentile)
    arr_clipped = np.clip(arr, vmin, vmax)
    normed = (arr_clipped - vmin) / (vmax - vmin + 1e-8)
    return (normed * 255).astype(np.uint8)

In [83]:
def visualize_predictions(model, X_test, y_test, num_samples=3, upscale_factor=1):
    predictions = model.predict(X_test[:num_samples])

    plt.figure(figsize=(15, 5 * num_samples))

    for i in range(num_samples):
        input_img = X_test[i, ..., 0]
        gt = y_test[i, ..., 0]
        pred = predictions[i, ..., 0]

        pred_display = normalize_for_display(pred)

        if upscale_factor > 1:
            h, w = input_img.shape
            new_size = (w * upscale_factor, h * upscale_factor)
            input_img = cv2.resize(input_img, new_size, interpolation=cv2.INTER_CUBIC)
            pred_display = cv2.resize(pred_display, new_size, interpolation=cv2.INTER_CUBIC)
            gt_resized = cv2.resize(gt, new_size, interpolation=cv2.INTER_NEAREST)
        else:
            gt_resized = gt

        gt_mask = gt_resized > 0
        gt_display = np.zeros_like(gt_resized)
        gt_display[gt_mask] = normalize_for_display(gt_resized[gt_mask])

        plt.subplot(num_samples, 3, i * 3 + 1)
        plt.imshow(input_img, cmap='gray')
        plt.title('Input Image')
        plt.axis('off')

        plt.subplot(num_samples, 3, i * 3 + 2)
        plt.imshow(gt_display, cmap='magma')
        plt.title('Ground Truth Depth')
        plt.axis('off')

        plt.subplot(num_samples, 3, i * 3 + 3)
        plt.imshow(pred_display, cmap='magma')
        plt.title('Predicted Depth')
        plt.axis('off')

    plt.tight_layout()
    plt.show()


In [None]:
visualize_predictions(model, X_test_2, y_test_2, 25)

## TFLite Conversion

In [89]:
# Uncomment the line below if you want to use this code to quantize your own model
# model = tf.keras.models.load_model("model.h5", custom_objects={'berHu_loss':berHu_loss})

converter = tf.lite.TFLiteConverter.from_keras_model(model)

converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_ops = [
    tf.lite.OpsSet.TFLITE_BUILTINS_INT8
]
converter.inference_input_type = tf.uint8
converter.inference_output_type = tf.uint8

def representative_dataset():
    num_samples = min(100, len(X_train_2))
    indices = np.random.choice(len(X_train_2), num_samples, replace=False)
    
    for idx in indices:
        sample = X_train_2[idx:idx+1]
        yield [sample]

converter.representative_dataset = representative_dataset

In [None]:
tflite_model = converter.convert()

In [None]:
with open('model.tflite', 'wb') as f:
    f.write(tflite_model)

In [None]:
interpreter = tf.lite.Interpreter(model_path="") # Replace this with a path to your TFLite model
interpreter.allocate_tensors()

input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

input_index = input_details[0]['index']
output_index = output_details[0]['index']

input_scale, input_zero_point = input_details[0]['quantization']
output_scale, output_zero_point = output_details[0]['quantization']

input_dtype = input_details[0]['dtype']
output_dtype = output_details[0]['dtype']

predictions = []

for i, img in enumerate(X_test_2):
    # Quantize input if needed - I used uint8 for my input and output layers, if you used int8, modify the code below accordingly.
    if input_dtype == np.uint8:
        input_data = (img / input_scale + input_zero_point).astype(np.uint8)
    else:
        input_data = img.astype(input_dtype)

    input_data = np.expand_dims(input_data, axis=0)

    interpreter.set_tensor(input_index, input_data)
    interpreter.invoke()

    output_data = interpreter.get_tensor(output_index)

    # Dequantize if needed - I used uint8 for my input and output layers, if you used int8, modify the code below accordingly.
    if output_dtype == np.uint8:
        output_data = (output_data.astype(np.float32) - output_zero_point) * output_scale

    predictions.append(output_data[0])

predictions = np.array(predictions)

In [93]:
def normalize_for_display(arr, clip_percentile=2):
    vmin = np.percentile(arr, clip_percentile)
    vmax = np.percentile(arr, 100 - clip_percentile)
    arr_clipped = np.clip(arr, vmin, vmax)
    normed = (arr_clipped - vmin) / (vmax - vmin + 1e-8)
    return (normed * 255).astype(np.uint8)

def visualize_tflite_predictions(X_test, y_test, predictions, num_samples=3, upscale_factor=1):
    plt.figure(figsize=(15, 5 * num_samples))

    for i in range(num_samples):
        input_img = X_test[i, ..., 0]
        gt = y_test[i, ..., 0]
        pred = predictions[i, ..., 0]

        pred_display = normalize_for_display(pred)
        gt_display = normalize_for_display(gt)

        if upscale_factor > 1:
            h, w = input_img.shape
            new_size = (w * upscale_factor, h * upscale_factor)
            input_img = cv2.resize(input_img, new_size, interpolation=cv2.INTER_CUBIC)
            pred_display = cv2.resize(pred_display, new_size, interpolation=cv2.INTER_CUBIC)
            gt_display = cv2.resize(gt_display, new_size, interpolation=cv2.INTER_NEAREST)

        # Input image
        plt.subplot(num_samples, 3, i * 3 + 1)
        plt.imshow(input_img, cmap='gray')
        plt.title('Input Image')
        plt.axis('off')

        # Ground truth
        plt.subplot(num_samples, 3, i * 3 + 2)
        plt.imshow(gt_display, cmap='magma')
        plt.title('Ground Truth Depth')
        plt.axis('off')

        # Prediction
        plt.subplot(num_samples, 3, i * 3 + 3)
        plt.imshow(pred_display, cmap='magma')
        plt.title('TFLite Prediction')
        plt.axis('off')

    plt.tight_layout()
    plt.show()

In [None]:
visualize_tflite_predictions(X_test_2, y_test_2, predictions, num_samples=5)

## Delta error evaluation

In [None]:
delta_1_all = []
delta_2_all = []
delta_3_all = []

for i in range(len(y_test)):
    gt = y_test_2[i].squeeze()
    pred = predictions[i].squeeze()

    valid_mask = gt > 0

    gt = gt[valid_mask]
    pred = pred[valid_mask]

    pred = np.clip(pred, 1e-6, None)

    delta = np.maximum(pred / gt, gt / pred)

    delta_1_all.append(np.mean(delta < 1.25))
    delta_2_all.append(np.mean(delta < 1.25 ** 2))
    delta_3_all.append(np.mean(delta < 1.25 ** 3))

delta_1 = np.mean(delta_1_all) * 100
delta_2 = np.mean(delta_2_all) * 100
delta_3 = np.mean(delta_3_all) * 100

print(f"Delta < 1.25: {delta_1:.2f}%")
print(f"Delta < 1.25²: {delta_2:.2f}%")
print(f"Delta < 1.25³: {delta_3:.2f}%")


# Conversion logic for model to .cpp array

In [4]:
def convert_tflite_to_c_array(tflite_path, output_cpp_path, array_name="model"):
    with open(tflite_path, "rb") as f:
        data = f.read()

    with open(output_cpp_path, "w") as f:
        f.write(f"const unsigned char {array_name}[] = {{\n")
        for i, byte in enumerate(data):
            if i % 12 == 0:
                f.write("  ")
            f.write(f"0x{byte:02x}, ")
            if (i + 1) % 12 == 0:
                f.write("\n")
        f.write(f"\n}};\n")
        f.write(f"const unsigned int {array_name}_len = {len(data)};\n")

    print(f"Saved C array to {output_cpp_path}")


In [None]:
tflite_path = r"" # Change this to your TFLite model path

convert_tflite_to_c_array(tflite_path, "model.cpp", "tflite_model")

# Image to array conversion

In [None]:
data_root = Path(r"") # This should be a path to your preprocessed data folder.
split_file = Path(r"") # This should be a path to your test set paths.
OUTPUT_CPP = "test_image_data.cpp"
ARRAY_NAME = "g_test_image_data"
WIDTH, HEIGHT = 32, 32
INDEX = 0 # Change this index to reflect the image you want to convert.

with open(split_file, 'r') as f:
    lines = f.read().splitlines()

line = lines[INDEX]
left_path = line.strip().split()[0]
parts = Path(left_path).parts[1:]
rel_path = Path(*parts)

sequence = rel_path.parts[0]
filename = rel_path.stem

image_path = data_root / sequence / "images" / f"{filename}.png"

if not image_path.exists():
    raise FileNotFoundError(f"Image not found: {image_path}")

img = cv2.imread(str(image_path), cv2.IMREAD_GRAYSCALE)
if img is None:
    raise RuntimeError(f"Failed to load image: {image_path}")

img_resized = cv2.resize(img, (WIDTH, HEIGHT)).astype(np.float32) / 255.0

img_quantized = np.clip(np.round(img_resized * 255.0), 0, 255).astype(np.uint8).flatten()

with open(OUTPUT_CPP, "w") as f:
    f.write('#include <cstdint>\n\n')
    f.write(f'const unsigned int {ARRAY_NAME}_size = {len(img_quantized)};\n')
    f.write(f'alignas(16) const unsigned char {ARRAY_NAME}[] = {{\n')

    for i, val in enumerate(img_quantized):
        f.write(f'0x{val:02x}, ')
        if (i + 1) % 12 == 0:
            f.write('\n')

    f.write('\n};\n')

# Image sequence to array conversion

In [None]:
data_root = Path(r"") # This should be a path to your preprocessed data folder.
split_file = Path(r"") # This should be a path to your test set paths.
OUTPUT_CPP = "test_sequence_data.cpp"
ARRAY_NAME = "g_test_sequence_data"
WIDTH, HEIGHT = 32, 32
TIME_STEPS = 3 # This should be the length of the sequence used in your model
TARGET_INDEX = -1 # Change this to an available index, >= TIME_STEPS - 1. Otherwise, the code will fail.

with open(split_file, 'r') as f:
    lines = f.read().splitlines()

line = lines[TARGET_INDEX]
left_path = line.strip().split()[0]
parts = Path(left_path).parts[1:]
rel_path = Path(*parts)

seq = rel_path.parts[0]
frame_id = int(rel_path.stem)

frames = []
for offset in reversed(range(TIME_STEPS)):
    frame_index = frame_id - offset
    if frame_index < 0:
        raise ValueError("Not enough previous frames for temporal sequence.")

    frame_name = f"{frame_index:010d}"
    image_path = data_root / seq / "images" / f"{frame_name}.png"

    if not image_path.exists():
        raise FileNotFoundError(f"Missing image: {image_path}")

    img = cv2.imread(str(image_path), cv2.IMREAD_GRAYSCALE)
    if img is None:
        raise RuntimeError(f"Failed to load: {image_path}")

    img_resized = cv2.resize(img, (WIDTH, HEIGHT)).astype(np.float32) / 255.0
    img_quantized = np.clip(np.round(img_resized * 255.0), 0, 255).astype(np.uint8)
    frames.append(img_quantized)

sequence_array = np.stack(frames, axis=0).flatten()

with open(OUTPUT_CPP, "w") as f:
    f.write('#include <cstdint>\n\n')
    f.write(f'const unsigned int {ARRAY_NAME}_size = {len(sequence_array)};\n')
    f.write(f'alignas(16) const unsigned char {ARRAY_NAME}[] = {{\n')

    for i, val in enumerate(sequence_array):
        f.write(f'0x{val:02x}, ')
        if (i + 1) % 12 == 0:
            f.write('\n')

    f.write('\n};\n')