In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models
import pandas as pd
import numpy as np
import cv2
import os
import math
from sklearn.model_selection import train_test_split

train_img_dir   = "Nutrition5K/train/color"
train_depth_dir = "Nutrition5K/train/depth_raw"
test_img_dir    = "Nutrition5K/test/color"
test_depth_dir  = "Nutrition5K/test/depth_raw"
label_file      = "Nutrition5K/nutrition5k_train.csv"

epochs = 70
image_size = (224, 224)
batch_size = 64
AUTOTUNE = tf.data.AUTOTUNE

def check_file(path):
    return os.path.exists(path) and os.path.getsize(path) > 0

label_df = pd.read_csv(label_file)
label_df["img_path"]   = label_df["ID"].apply(lambda x: os.path.join(train_img_dir, str(x), "rgb.png"))
label_df["depth_path"] = label_df["ID"].apply(lambda x: os.path.join(train_depth_dir, str(x), "depth_raw.png"))
## label_df = label_df[(label_df["Value"] < 2500) & (label_df["Value"] > 0)]
label_df = label_df[label_df["img_path"].apply(check_file) & label_df["depth_path"].apply(check_file)].reset_index(drop=True)
#print(f Valid samples after filtering: {len(label_df)}")

CAMERA_DISTANCE_CM = 35.9
PIXEL_AREA_CM2 = 5.957e-3

def estimate_food_volume(depth_path):
    depth = cv2.imread(depth_path, cv2.IMREAD_UNCHANGED).astype(np.float32)
    if depth is None or depth.size == 0:
        return 0.0
    hist, bins = np.histogram(depth.flatten(), bins=100)
    plate_depth = bins[np.argmax(hist)]
    mask = (plate_depth - depth) > 10
    food_depth = np.clip((plate_depth - depth), 0, None)
    volume_per_pixel = food_depth * PIXEL_AREA_CM2
    total_volume = np.sum(volume_per_pixel * mask)
    return total_volume / 1000.0

label_df["Volume"] = label_df["depth_path"].apply(estimate_food_volume).astype("float32")
train_df, val_df = train_test_split(label_df, test_size=0.15, random_state=42)


def preprocess_rgb_depth(img_path, depth_path, label, volume, image_size=(224,224)):
    rgb = tf.io.read_file(img_path)
    rgb = tf.image.decode_png(rgb, channels=3)
    rgb = tf.image.resize(rgb, image_size)
    rgb = tf.cast(rgb, tf.float32) / 255.0

    depth = tf.io.read_file(depth_path)
    depth = tf.image.decode_png(depth, channels=1)
    depth = tf.image.resize(depth, image_size)
    depth = tf.cast(depth, tf.float32) / 255.0

    fused = tf.concat([rgb, depth], axis=-1)
    return {"fused_input": fused, "volume_input": tf.expand_dims(volume, -1)}, label

def build_dataset(df, batch_size=32, augment=False):
    paths_img = df["img_path"].values
    paths_depth = df["depth_path"].values
    labels = df["Value"].astype("float32").values
    volumes = df["Volume"].astype("float32").values
    ds = tf.data.Dataset.from_tensor_slices((paths_img, paths_depth, labels, volumes))
    ds = ds.map(lambda p1, p2, y, v: preprocess_rgb_depth(p1, p2, y, v, image_size),
                num_parallel_calls=AUTOTUNE)
    if augment:
        def aug_fn(x, y):
            if tf.random.uniform(()) > 0.5:
                x["fused_input"] = tf.image.flip_left_right(x["fused_input"])
            return x, y
        ds = ds.map(aug_fn, num_parallel_calls=AUTOTUNE)
        ds = ds.shuffle(1000)
    return ds.batch(batch_size).prefetch(AUTOTUNE)

train_ds = build_dataset(train_df, batch_size=batch_size, augment=True)
val_ds   = build_dataset(val_df, batch_size=batch_size, augment=False)



def inception_block(x, f1=32, f3r=32, f3=48, f5r=16, f5=32, fpool=32):
    b1 = layers.Conv2D(f1, 1, activation='relu', padding='same')(x)
    b2 = layers.Conv2D(f3r, 1, activation='relu', padding='same')(x)
    b2 = layers.Conv2D(f3, 3, activation='relu', padding='same')(b2)
    b3 = layers.Conv2D(f5r, 1, activation='relu', padding='same')(x)
    b3 = layers.Conv2D(f5, 5, activation='relu', padding='same')(b3)
    b4 = layers.MaxPooling2D(3, strides=1, padding='same')(x)
    b4 = layers.Conv2D(fpool, 1, activation='relu', padding='same')(b4)
    out = layers.Concatenate()([b1,b2,b3,b4])
    out = layers.BatchNormalization()(out)
    return layers.ReLU()(out)

def build_unified_inception(input_shape=(224,224,4), dropout=0.3):
    fused_input = layers.Input(shape=input_shape, name = "fused_input")

    x = layers.Conv2D(32, 3, strides=2, padding='same', activation='relu')(fused_input)
    x = layers.Conv2D(32, 3, activation='relu', padding='same')(x)
    x = layers.MaxPooling2D(3, strides=2, padding='same')(x)
    x = inception_block(x, 32, 32, 48, 16, 32, 32)
    x = layers.MaxPooling2D(2)(x)
    x = inception_block(x, 64, 48, 64, 24, 48, 48)
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dense(128, activation='relu')(x)
    x = layers.Dropout(dropout)(x)

    # Late Fusion for Volume
    vol_input = layers.Input(shape=(1,), name="volume_input")
    v = layers.Dense(16, activation='relu')(vol_input)

    # Fusion
    fused = layers.Concatenate()([x, v])
    fused = layers.Dense(64, activation='relu')(fused)
    fused = layers.Dropout(dropout)(fused)
    output = layers.Dense(1, activation='relu', name="total_value")(fused)
    model = models.Model(inputs=[fused_input, vol_input], outputs=output)
    return model

model = build_unified_inception()
model.summary()

optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4, weight_decay=1e-4)
model.compile(optimizer=optimizer, loss="mse", metrics=["mae"])

callbacks = [
    tf.keras.callbacks.ModelCheckpoint("best_inception.keras", save_best_only=True, monitor="val_loss"),
]

history = model.fit(train_ds, validation_data=val_ds, epochs=epochs, callbacks=callbacks)


Epoch 1/70
[1m37/44[0m [32m━━━━━━━━━━━━━━━━[0m[37m━━━━[0m [1m4s[0m 619ms/step - loss: 90645.6094 - mae: 217.2798 