In [None]:
import tensorflow as tf
from tensorflow.keras import layers, Model, Input, models
import numpy as np
import os
import time
import h5py
import matplotlib.pyplot as plt
from tqdm import tqdm
import datetime


In [None]:
class LiteEncoderDecoder:
    def __init__(self, input_shape=(240, 240, 3)):
        self.input_shape = input_shape
        self.model = self.build_model()
        
    def depthwise_separable_conv(self, x, out_channels=None, strides=1, name=None):
        x = layers.DepthwiseConv2D(kernel_size=3, strides=strides, padding='same')(x)
        x = layers.BatchNormalization()(x)
        x = layers.ReLU()(x)
        if out_channels is not None:
            x = layers.Conv2D(out_channels, kernel_size=1, padding='same')(x)
        x = layers.DepthwiseConv2D(kernel_size=3, padding='same')(x)
        x = layers.BatchNormalization()(x)
        x = layers.ReLU(name=name)(x)
        return x

    def lightweight_block(self, x):
        x = layers.DepthwiseConv2D(kernel_size=3, padding='same')(x)
        x = layers.Conv2D(8, kernel_size=1, padding='same')(x)
        x = layers.Conv2D(8, kernel_size=3, padding='same')(x)
        x = layers.BatchNormalization()(x)
        x = layers.ReLU(name="A")(x)
        return x

    def downsample_block(self, x, double_channels=False, name=None):
        channels = x.shape[-1]
        x = layers.DepthwiseConv2D(kernel_size=3, strides=2, padding='same')(x)
        x = layers.BatchNormalization()(x)
        x = layers.ReLU()(x)
        out_channels = channels * 2 if double_channels else channels
        x = layers.Conv2D(out_channels, kernel_size=1, padding='same')(x)
        x = layers.DepthwiseConv2D(kernel_size=3, padding='same')(x)
        x = layers.BatchNormalization()(x)
        x = layers.ReLU(name=name)(x)
        return x

    def fuse_block(self, low_res, high_res, out_channels, name=None, use_conv=True):
        x = layers.UpSampling2D(size=(2, 2), interpolation='bilinear')(low_res)
        x = layers.Concatenate()([x, high_res])
        if use_conv:
            x = layers.Conv2D(out_channels, kernel_size=3, padding='same')(x)
        x = layers.DepthwiseConv2D(kernel_size=3, padding='same')(x)
        x = layers.BatchNormalization()(x)
        x = layers.ReLU()(x)
        x = layers.Conv2D(out_channels, kernel_size=1, padding='same')(x)
        x = layers.DepthwiseConv2D(kernel_size=3, padding='same')(x)
        x = layers.BatchNormalization()(x)
        x = layers.ReLU(name=name)(x)
        return x

    def build_model(self):
        inputs = Input(shape=self.input_shape)

        # Encoder
        x1 = self.lightweight_block(inputs)                              # 240x240x8
        x2 = self.downsample_block(x1, double_channels=True, name="B")  # 120x120x16
        x3 = self.downsample_block(x2, double_channels=True, name="C")  # 60x60x32
        x4 = self.downsample_block(x3, double_channels=True, name="D")  # 30x30x64
        x5 = self.downsample_block(x4, double_channels=True, name="E")  # 15x15x128

        # Decoder
        d4 = self.fuse_block(x5, x4, 96, name="J")
        d3 = self.fuse_block(d4, x3, 40, name="K", use_conv=False)
        d2 = self.fuse_block(d3, x2, 58, name="L")
        d1 = self.fuse_block(d2, x1, 64, name="out", use_conv=False)

        return Model(inputs, d1, name="LiteEncoderDecoder")

    def summary(self):
        self.model.summary()

class TransformerEncoderBlock(tf.keras.layers.Layer):
    def __init__(self, embedding_dim, num_heads, feed_forward_dim, dropout_rate=0.1):
        super(TransformerEncoderBlock, self).__init__()

        #multi head attention block form keras
        self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embedding_dim // num_heads)
        #feedforward layer
        self.ffn = tf.keras.Sequential([
            layers.Dense(feed_forward_dim, activation='relu'),
            layers.Dense(embedding_dim)
        ])
        #normalization and dropout
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(dropout_rate)
        self.dropout2 = layers.Dropout(dropout_rate)

    def call(self, x, training=False):
        #forward pass for TrnsformerEncodeblock
        attn_output = self.att(x, x)
        out1 = self.layernorm1(x + self.dropout1(attn_output, training=training))
        ffn_output = self.ffn(out1)
        return self.layernorm2(out1 + self.dropout2(ffn_output, training=training))
    


class PatchTransformerEncoder(tf.keras.Model):
    def __init__(self, in_channels, patch_size, embedding_dim, num_heads, num_layers):
        super(PatchTransformerEncoder, self).__init__()

        print(f"in_channels : {in_channels} \npatch_size : {patch_size} \nembedding_dim : {embedding_dim} \nnum_heads : {num_heads} \nnum_layers : {num_layers}")

        #creating the transformerEncoder from TransformerEncoderBlock
        self.transformer_layers = [
            TransformerEncoderBlock(embedding_dim=embedding_dim, num_heads=num_heads, feed_forward_dim=512)
            for _ in range(num_layers)
        ]

        self.embedding_convPxP = layers.Conv2D(
            filters=embedding_dim,
            kernel_size=patch_size,
            strides=patch_size,
            padding='valid',
            use_bias=False
        )
        
        
        #generating the traininable positional encodings
        self.positional_encodings = self.add_weight(
            shape=(225, embedding_dim),
            initializer='random_normal',
            trainable=True
        )
        

    def call(self, x):
        # x: [batch,H, W, C] expected in TF
        x = self.embedding_convPxP(x)  # [batch, H', W', embedding_dim] -> tf.Tensor([ 1 12 16 64], shape=(4,), dtype=int32)
        batch_size, h, w, c = tf.shape(x)[0], tf.shape(x)[1], tf.shape(x)[2], tf.shape(x)[3]

        x = tf.reshape(x, [batch_size, h * w, c])  # [batch, tokens, embedding_dim]
        positional = self.positional_encodings # generating the positional encoding
        
        

        x += positional #concating path embeding  and positional embedding
        
        x = tf.transpose(x, perm=[1,0,2]) #transpose to form -> S,N,E i.e sequence lenght , batch size and embedding dim
        
        
        
        for layer in self.transformer_layers:
            x = layer(x)
        
        
        return x  # [batch, tokens, embedding_dim]
        
class PixelWiseDotProduct(tf.keras.Model):
    def call(self, X):
        x, K = X[0], X[1]
        batch_size, h, w, c = tf.shape(x)[0], tf.shape(x)[1], tf.shape(x)[2], tf.shape(x)[3]
        _, cout, ck = tf.shape(K)[0], tf.shape(K)[1], tf.shape(K)[2]

       
        # Do this:
        x_flat = tf.reshape(x, [batch_size, h * w, c])  # [batch, HW, C]
        y = tf.matmul(x_flat, K, transpose_b=True)      # [batch, HW, out_channels]

        y = tf.transpose(y, [0, 2, 1])        # [batch, out_channels, HW]
        y = tf.reshape(y, [batch_size, tf.shape(K)[1], h, w])  # [batch, out_channels, H, W]

        return y

# from layers import PatchTransformerEncoder, PixelWiseDotProduct

# Define mViT and build_mViT_model as you already have
def export_full_tflite_model():
    # Build model
    model = build_mViT_model(
        input_shape=(240, 240, 3),
        in_channels=3,
        n_query_channels=128,
        patch_size=20,
        dim_out=256,
        embedding_dim=64,
        num_heads=2,
        norm='linear'
    )

    # Run a forward pass to build the model
    dummy_input = tf.random.normal([1, 240, 240, 3])
    _ = model(dummy_input)

    # Export as a SavedModel for TFLite
    saved_model_dir = "full_mvit_savedmodel"
    model.export(saved_model_dir)
    print(f"✅ SavedModel exported to: {saved_model_dir}")

    # Convert to TFLite
    converter = tf.lite.TFLiteConverter.from_saved_model(saved_model_dir)
    converter.optimizations = [tf.lite.Optimize.DEFAULT]
    converter.target_spec.supported_ops = [
        tf.lite.OpsSet.TFLITE_BUILTINS,
        tf.lite.OpsSet.SELECT_TF_OPS
    ]
    converter.experimental_enable_resource_variables = True

    tflite_model = converter.convert()

    # Save TFLite model
    tflite_model_path = "full_mvit_model.tflite"
    with open(tflite_model_path, "wb") as f:
        f.write(tflite_model)

    print(f"✅ Full mViT model converted to TFLite and saved as: {tflite_model_path}")


    

class mViT(tf.keras.Model):
    def __init__(self, in_channels, n_query_channels=64, patch_size=20, dim_out=128, embedding_dim=64, num_heads=2, norm='linear'):
        super(mViT, self).__init__()
        self.norm = norm
        self.n_query_channels = n_query_channels

        self.patch_transformer = PatchTransformerEncoder(
            in_channels=in_channels,
            patch_size=patch_size,
            embedding_dim=embedding_dim,
            num_heads=num_heads,
            num_layers=2
        )

        self.dot_product_layer = PixelWiseDotProduct()

        self.conv3x3 = layers.Conv2D(
            filters=embedding_dim,
            kernel_size=3,
            strides=1,
            padding='same'
        )

        self.regressor = tf.keras.Sequential([
            layers.Dense(256),
            layers.LeakyReLU(),
            layers.Dense(256),
            layers.LeakyReLU(),
            layers.Dense(dim_out)
        ])

    def call(self, x):
        
        # x: [batch, height, width, channels]
        tgt = self.patch_transformer(tf.identity(x))  # [token, batch, embedding_dim]
        
        x = self.conv3x3(x)  # [batch, height, width, embedding_dim]
        
        regression_head = tgt[0, :, :]  # [batch, embedding_dim]
        
        queries = tgt[ 1:self.n_query_channels + 1, :]  # [batch, n_query_channels, embedding_dim]
        
        queries = tf.transpose(queries, perm=[1,0,2])
        
        # Pixel-wise dot product: x is [batch, h, w, embedding_dim], queries is [batch, n_query_channels, embedding_dim]
        range_attention_maps = self.dot_product_layer((x, queries))  # [batch, n_query_channels, h, w]
        

        y = self.regressor(regression_head)  # [batch, dim_out]
        
        
        if self.norm == 'linear':
            y = tf.nn.relu(y)
            eps = 0.1
            y = y + eps
        elif self.norm == 'softmax':
            return tf.nn.softmax(y, axis=1), range_attention_maps
        else:
            y = tf.nn.sigmoid(y)

        y = y / tf.reduce_sum(y, axis=1, keepdims=True)
        return y, range_attention_maps



# from miniVit import mViT
# from encoder_decoder import LiteEncoderDecoder


class UnetAdaptiveBins(tf.keras.Model):
    def __init__(self, backend, n_bins = 256, min_val = 0.1, max_val = 80, norm = "linear"):
        super(UnetAdaptiveBins, self).__init__()

        self.num_classes = n_bins
        self.min_val = min_val
        self.max_val = max_val

        self.encoder_decoder = LiteEncoderDecoder()
        self.adaptive_bins_layers= mViT(
                                    in_channels=64,
                                    n_query_channels=64,
                                    patch_size=16,
                                    dim_out=n_bins,
                                    embedding_dim=64,
                                    num_heads=4,
                                    norm='linear'
                                        )
        self.conv_out= models.Sequential([
                                    layers.Conv2D(filters=n_bins, kernel_size=1, strides=1, padding='valid'),
                                    layers.Softmax(axis=3)  # Softmax applied on the channel dimension 
                                ])
        
    def call(self, x):
        
        encoder_decoder_out = self.encoder_decoder.model(x)
       
        bin_widths_normed, range_attention_maps = self.adaptive_bins_layers(encoder_decoder_out)
        
        range_attention_maps = tf.transpose(range_attention_maps, [0, 2, 3, 1])  # (1, 240, 240, 64)
        out = self.conv_out(range_attention_maps)
        bin_widths = (self.max_val - self.min_val) * bin_widths_normed


        min_vals = tf.fill([tf.shape(bin_widths)[0], 1], self.min_val)
        bin_widths = tf.concat([min_vals, bin_widths], axis=1)
        bin_edges = tf.math.cumsum(bin_widths, axis=1)
        
        centers = 0.5 * (bin_edges[:, :-1] + bin_edges[:, 1:])  # shape: [batch, n_bins]
        centers = tf.reshape(centers, [tf.shape(centers)[0], tf.shape(centers)[1], 1, 1])
    
        out_permuted = tf.transpose(out, [0, 3, 1, 2])  # [B, n_bins, H, W]

        pred = tf.reduce_sum(out_permuted * centers, axis=1, keepdims=True)  # [B, 1, H, W]
        
        return bin_edges, pred


In [None]:
import cv2
import tensorflow as tf
import numpy as np
import os


def load_unet_from_checkpoint(checkpoint_dir="./checkpoints", num_bins=100, learning_rate=3e-4):
    print(f"[INFO] Loading checkpoint from {checkpoint_dir}")
    model = UnetAdaptiveBins(num_bins)
    optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
    checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
    manager = tf.train.CheckpointManager(checkpoint, checkpoint_dir, max_to_keep=5)

    if manager.latest_checkpoint:
        checkpoint.restore(manager.latest_checkpoint).expect_partial()
        print(f"[INFO] Restored from checkpoint: {manager.latest_checkpoint}")
    else:
        print("[WARNING] No checkpoint found. Starting fresh.")

    return model, optimizer, manager

def predict_depth(model, img):
    img_resized = tf.image.resize(img, [240, 240], method='bilinear')
    img_input = tf.expand_dims(img_resized, axis=0)
    _, pred_depth = model(img_input, training=False)

    depth = tf.squeeze(pred_depth, axis=0)  # [240, 240]
    depth = tf.squeeze(depth, axis=0) if len(depth.shape) == 3 else depth
    depth_np = depth.numpy()

    depth_norm = cv2.normalize(depth_np, None, 0, 255, cv2.NORM_MINMAX)
    depth_colored = cv2.applyColorMap(depth_norm.astype(np.uint8), cv2.COLORMAP_PLASMA)

    return depth_colored

def process_directory_to_video(model, image_dir, output_video_path, fps=10):
    image_files = sorted([
        os.path.join(image_dir, f) for f in os.listdir(image_dir)
        if f.lower().endswith(('.png', '.jpg', '.jpeg'))
    ])

    if not image_files:
        print(f"[ERROR] No image files found in {image_dir}")
        return

    frame_width = 240 * 2
    frame_height = 240
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height))

    print(f"[INFO] Writing video to: {output_video_path}")

    for img_path in image_files:
        img_bgr = cv2.imread(img_path)
        img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
        img_rgb = tf.convert_to_tensor(img_rgb, dtype=tf.float32) / 255.0

        depth_map = predict_depth(model, img_rgb)
        rgb_resized = cv2.resize(img_bgr, (240, 240))
        combined = np.hstack((rgb_resized, depth_map))

        out.write(combined)

    out.release()
    print("[INFO] Video generation complete.")

def run_camera(model):
    cap = cv2.VideoCapture(0)

    if not cap.isOpened():
        print("[ERROR] Cannot open webcam.")
        return

    print("[INFO] Press 'q' to exit.")

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        img_tf = tf.convert_to_tensor(frame, dtype=tf.float32) / 255.0
        depth_map = predict_depth(model, img_tf)
        frame_resized = cv2.resize(frame, (240, 240))
        combined = np.hstack((frame_resized, depth_map))

        cv2.imshow("Camera Feed (Left) | Depth Map (Right)", combined)

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()

# Polymorphic main function
def main(image_dir=None, chekcpointdir ="./checkpoints" ,output_video_path="output_depth_video.mp4"):
    model, _, _ = load_unet_from_checkpoint(checkpoint_dir= chekcpointdir)

    if image_dir:
        process_directory_to_video(model, image_dir, output_video_path)
    else:
        run_camera(model)


In [None]:

img_dir = "test_data"
checkpoint = "versin 12/checkpoints"
main(chekcpointdir=checkpoint)  # Process directory


[INFO] Loading checkpoint from versin 12/checkpoints


2025-06-07 19:42:51.813290: I metal_plugin/src/device/metal_device.cc:1154] Metal device set to: Apple M2
2025-06-07 19:42:51.813312: I metal_plugin/src/device/metal_device.cc:296] systemMemory: 16.00 GB
2025-06-07 19:42:51.813315: I metal_plugin/src/device/metal_device.cc:313] maxCacheSize: 5.33 GB
2025-06-07 19:42:51.813332: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:305] Could not identify NUMA node of platform GPU ID 0, defaulting to 0. Your kernel may not have been built with NUMA support.
2025-06-07 19:42:51.813347: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:271] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 0 MB memory) -> physical PluggableDevice (device: 0, name: METAL, pci bus id: <undefined>)


in_channels : 64 
patch_size : 16 
embedding_dim : 64 
num_heads : 4 
num_layers : 2
[INFO] Restored from checkpoint: versin 12/checkpoints/ckpt-75
[INFO] Writing video to: output_depth_video.mp4




[INFO] Video generation complete.
