# Single-image food volume estimation
Using a  monocular depth estimation network and a segmentation network, we will estimate the volume of the food displayed in the input image.

In [None]:
import sys
print(sys.executable)

In [None]:
from food_volume_estimation.depth_estimation.custom_modules import *
from food_volume_estimation.volume_estimator import VolumeEstimator
from FoodSAM.FoodSAM_tools.enhance_semantic_masks import enhance_masks
from FoodSAM.FoodSAM_tools.predict_semantic_mask import semantic_predict
from segment_anything import SamAutomaticMaskGenerator, sam_model_registry
import os
import sys
import json
import time
import shutil
import argparse
import threading
from datetime import datetime
from typing import Any, Dict, List, Optional

import cv2
import numpy as np
import torch
import tensorflow as tf
import uvicorn
import nest_asyncio
import httpx

from fastapi import FastAPI, HTTPException, UploadFile, File, Depends
from pydantic import BaseModel
from keras import backend as K
from keras.models import Model, model_from_json
from pyngrok import ngrok
from fastapi.middleware.cors import CORSMiddleware

# Th√™m ƒë∆∞·ªùng d·∫´n c√°c th∆∞ vi·ªán t√πy ch·ªânh
sys.path.append('.')
sys.path.append('./SAM')
sys.path.append('./mmseg')

# Import c√°c module c·∫ßn thi·∫øt

# √Åp d·ª•ng nest_asyncio ƒë·ªÉ ch·∫°y FastAPI
nest_asyncio.apply()

# Ki·ªÉm tra GPU
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")
if device == "cuda":
    print(f"GPU Name: {torch.cuda.get_device_name(0)}")

# Bi·∫øn to√†n c·ª•c cho estimator, graph, session
estimator = None
global_graph = None
global_session = None


def init_estimator():
    """Kh·ªüi t·∫°o estimator v·ªõi graph v√† session m·ªõi"""
    global estimator, global_graph, global_session
    # X√≥a graph v√† session hi·ªán t·∫°i
    K.clear_session()
    tf.reset_default_graph()

    # T·∫°o graph v√† session m·ªõi
    global_graph = tf.Graph()
    global_session = tf.Session(graph=global_graph)

    with global_graph.as_default():
        with global_session.as_default():
            K.set_session(global_session)
            depth_model_architecture = './models/fine_tune_food_videos/monovideo_fine_tune_food_videos.json'
            depth_model_weights = './models/fine_tune_food_videos/monovideo_fine_tune_food_videos.h5'
            estimator = VolumeEstimator(arg_init=False)
            try:
                with open(depth_model_architecture, 'r') as read_file:
                    custom_losses = Losses()
                    objs = {
                        'ProjectionLayer': ProjectionLayer,
                        'ReflectionPadding2D': ReflectionPadding2D,
                        'InverseDepthNormalization': InverseDepthNormalization,
                        'AugmentationLayer': AugmentationLayer,
                        'compute_source_loss': custom_losses.compute_source_loss
                    }
                    model_architecture_json = json.load(read_file)
                    estimator.monovideo = model_from_json(
                        model_architecture_json, custom_objects=objs)
                estimator._VolumeEstimator__set_weights_trainable(
                    estimator.monovideo, False)
                global_session.run(tf.global_variables_initializer())
                estimator.monovideo.load_weights(depth_model_weights)
                estimator.model_input_shape = estimator.monovideo.inputs[0].shape.as_list()[
                    1:]
                depth_net = estimator.monovideo.get_layer('depth_net')
                estimator.depth_model = Model(
                    inputs=depth_net.inputs, outputs=depth_net.outputs, name='depth_model')
                MIN_DEPTH = 0.01
                MAX_DEPTH = 10
                estimator.min_disp = 1 / MAX_DEPTH
                estimator.max_disp = 1 / MIN_DEPTH
                estimator.gt_depth_scale = 0.35
                estimator.relax_param = 0.01
                print('[*] Loaded depth estimation model.')
                # B·ªè graph.finalize() ƒë·ªÉ tr√°nh l·ªói
            except Exception as e:
                print(f"Error loading model: {str(e)}")
                raise

# H√†m ghi log


def write_log(message):
    log_file_path = "debug/log.txt"

    # T·∫°o th∆∞ m·ª•c n·∫øu ch∆∞a c√≥
    os.makedirs(os.path.dirname(log_file_path), exist_ok=True)

    # Th√™m timestamp cho log
    timestamp = datetime.now().strftime("[%Y-%m-%d %H:%M:%S]")

    # Format n·ªôi dung log
    if isinstance(message, (dict, list)):
        message_str = json.dumps(message, ensure_ascii=False, indent=2)
    else:
        message_str = str(message)

    # Ghi v√†o file
    with open(log_file_path, "a", encoding="utf-8") as log_file:
        log_file.write(f"{timestamp} {message_str}\n")

    # In ra console
    print(f"{timestamp} {message_str}")

# H√†m x√≥a th∆∞ m·ª•c


def clear_folder(folder_path: str):
    if os.path.exists(folder_path) and os.path.isdir(folder_path):
        for item in os.listdir(folder_path):
            item_path = os.path.join(folder_path, item)
            try:
                if os.path.isfile(item_path) or os.path.islink(item_path):
                    os.remove(item_path)
                elif os.path.isdir(item_path):
                    shutil.rmtree(item_path)
            except Exception as e:
                print(f"L·ªói khi x√≥a {item_path}: {e}")
        print(f"‚úÖ ƒê√£ x√≥a to√†n b·ªô n·ªôi dung trong th∆∞ m·ª•c '{folder_path}'.")
    else:
        print(f"‚ö†Ô∏è Th∆∞ m·ª•c '{folder_path}' kh√¥ng t·ªìn t·∫°i.")

# H√†m l∆∞u mask t·ª´ SAM


def write_masks_to_folder(masks: List[Dict[str, Any]], path: str) -> None:
    header = "id,area,bbox_x0,bbox_y0,bbox_w,bbox_h,point_input_x,point_input_y,predicted_iou,stability_score,crop_box_x0,crop_box_y0,crop_box_w,crop_box_h"
    metadata = [header]
    os.makedirs(os.path.join(path, "sam_mask"), exist_ok=True)
    masks_array = []
    for i, mask_data in enumerate(masks):
        mask = mask_data["segmentation"]
        masks_array.append(mask.copy())
        filename = f"{i}.png"
        cv2.imwrite(os.path.join(path, "sam_mask", filename), mask * 255)
        mask_metadata = [
            str(i),
            str(mask_data["area"]),
            *[str(x) for x in mask_data["bbox"]],
            *[str(x) for x in mask_data["point_coords"][0]],
            str(mask_data["predicted_iou"]),
            str(mask_data["stability_score"]),
            *[str(x) for x in mask_data["crop_box"]],
        ]
        row = ",".join(mask_metadata)
        metadata.append(row)
    masks_array = np.stack(masks_array, axis=0)
    np.save(os.path.join(path, "sam_mask", "masks.npy"), masks_array)
    metadata_path = os.path.join(path, "sam_metadata.csv")
    with open(metadata_path, "w") as f:
        f.write("\n".join(metadata))

# H√†m ch·∫°y FoodSAM (PyTorch)


def run_foodsam(image_path):
    args = argparse.Namespace(
        img_path=image_path,
        output="Output/Semantic_Results",
        device=device,
        SAM_checkpoint="ckpts/sam_vit_h_4b8939.pth",
        semantic_config="configs/SETR_MLA_768x768_80k_base.py",
        semantic_checkpoint="ckpts/SETR_MLA/iter_80000.pth",
        model_type="vit_h",
        color_list_path="FoodSAM/FoodSAM_tools/color_list.npy",
        category_txt="FoodSAM/FoodSAM_tools/category_id_files/foodseg103_category_id.txt",
        num_class=104
    )

    os.makedirs(args.output, exist_ok=True)
    write_log("Running SAM...")

    # Load m√¥ h√¨nh SAM
    sam = sam_model_registry[args.model_type](checkpoint=args.SAM_checkpoint)
    sam.to(device=args.device)
    write_log(f"SAM model loaded on {args.device}")

    # Thi·∫øt l·∫≠p b·ªô t·∫°o mask
    output_mode = "binary_mask"
    generator = SamAutomaticMaskGenerator(sam, output_mode=output_mode)

    # ƒê·ªçc ·∫£nh v√† th·ª±c hi·ªán segmentation
    targets = [args.img_path]
    for t in targets:
        write_log(f"Processing {t}...")
        image = cv2.imread(t)
        if image is None:
            write_log(f"Kh√¥ng th·ªÉ t·∫£i {t}, b·ªè qua...")
            continue
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        masks = generator.generate(image)

        # L∆∞u k·∫øt qu·∫£
        base = os.path.basename(t).split('.')[0]
        save_base = os.path.join(args.output, base)
        os.makedirs(save_base, exist_ok=True)
        write_masks_to_folder(masks, save_base)
        shutil.copyfile(t, os.path.join(save_base, "input.jpg"))

    write_log("SAM processing done!")
    write_log("Running semantic segmentation model...")
    semantic_predict(args.semantic_config, args.semantic_checkpoint,
                     args.output, args.color_list_path, args.img_path, device=args.device)
    write_log("Semantic segmentation done!")
    write_log("Enhancing semantic masks...")
    enhance_masks(args.output, args.category_txt,
                  args.color_list_path, num_class=args.num_class)
    write_log("Enhancement done!")
    write_log(f"Results saved in {args.output}!")
    return base

# H√†m ch·∫°y depth estimation v√† t√≠nh kh·ªëi l∆∞·ª£ng (TensorFlow)


def run_depth_estimation(image_path, base):
    global estimator, global_graph, global_session
    # Kh·ªüi t·∫°o l·∫°i estimator v·ªõi graph/session m·ªõi
    init_estimator()
    with global_graph.as_default():
        with global_session.as_default():
            K.set_session(global_session)
            # B·ªè log summary c·ªßa depth model theo y√™u c·∫ßu

            # Ch·∫°y ∆∞·ªõc l∆∞·ª£ng th·ªÉ t√≠ch
            plate_diameter = 0  # B·ªè qua ph√°t hi·ªán ƒëƒ©a
            outputs_list, food_volumes = estimator.estimate_volume(
                image_path, fov=70, plate_diameter_prior=plate_diameter,
                plot_results=True, para_folder_path=f"Output/Semantic_Results/{base}/masks/"
            )

            # Chuy·ªÉn ƒë·ªïi th·ªÉ t√≠ch sang kh·ªëi l∆∞·ª£ng
            food_masses = estimator.convert_volume_to_mass(
                'Density_sub_90.xlsx', food_volumes)
            write_log(f"Food masses: {food_masses}")

            return food_masses

# H√†m x·ª≠ l√Ω ·∫£nh ƒë·∫ßu v√†o


def process_image_input(image: UploadFile, image_path: str):
    if image:
        input_folder = "Input"
        os.makedirs(input_folder, exist_ok=True)
        clear_folder("Input")
        clear_folder("debug")
        clear_folder("Output")
        temp_file_path = os.path.join(input_folder, image.filename)
        with open(temp_file_path, "wb") as temp_file:
            shutil.copyfileobj(image.file, temp_file)
        return temp_file_path, True
    elif image_path:
        clear_folder("Input")
        clear_folder("debug")
        clear_folder("Output")
        if not os.path.exists(image_path):
            raise ValueError(
                f"The image at path '{image_path}' does not exist.")
        write_log(f"Using image path: {image_path}")
        return image_path, False
    else:
        raise ValueError(
            "Please provide either an uploaded image or an image path.")


# Kh·ªüi t·∫°o FastAPI
app = FastAPI()

# Th√™m c·∫•u h√¨nh CORS ngay sau d√≤ng `app = FastAPI()`
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:8081"],  # Cho ph√©p URL c·ª• th·ªÉ
    allow_credentials=True,
    allow_methods=["*"],  # Cho ph√©p t·∫•t c·∫£ c√°c ph∆∞∆°ng th·ª©c (GET, POST, v.v.)
    allow_headers=["*"],  # Cho ph√©p t·∫•t c·∫£ c√°c header
)

# ƒê·∫£m b·∫£o th∆∞ m·ª•c debug t·ªìn t·∫°i
os.makedirs("debug", exist_ok=True)

# Endpoint FastAPI (chuy·ªÉn sang ƒë·ªìng b·ªô)


@app.post("/food-mass/", dependencies=[Depends(lambda: None)])
def process_image(image: UploadFile = File(None), image_path: str = None):
    try:
        # X·ª≠ l√Ω ·∫£nh ƒë·∫ßu v√†o
        image_path_to_process, is_uploaded = process_image_input(
            image, image_path)

        # B∆∞·ªõc 1: Ch·∫°y FoodSAM (PyTorch)
        base = run_foodsam(image_path_to_process)

        # B∆∞·ªõc 2: Ch·∫°y depth estimation (TensorFlow)
        food_masses = run_depth_estimation(image_path_to_process, base)

        # X√≥a file t·∫°m n·∫øu c√≥
        if is_uploaded:
            os.remove(image_path_to_process)

        return {"food_masses_gram": food_masses}
    except Exception as e:
        write_log(f"Error: {str(e)}")
        return {"error": str(e)}

# Start Get Nutrtion #####################################################################################################################

# ƒê·ªãnh nghƒ©a m√¥ h√¨nh d·ªØ li·ªáu ƒë·∫ßu v√†o


class FoodItem(BaseModel):
    food: str  # T√™n th·ª±c ph·∫©m, v√≠ d·ª•: "rice"
    mass: float  # Kh·ªëi l∆∞·ª£ng th·ª±c ph·∫©m (gram), v√≠ d·ª•: 258


class FoodInput(BaseModel):
    food_masses_gram: List[FoodItem]  # Danh s√°ch c√°c th·ª±c ph·∫©m v√† kh·ªëi l∆∞·ª£ng
    token: str  # Token ƒë·ªÉ x√°c th·ª±c khi g·ªçi API input_manual


# Kh√≥a API c·ªßa USDA ƒë·ªÉ truy c·∫≠p d·ªØ li·ªáu dinh d∆∞·ª°ng
API_KEY = "gP07Q5U7ULsiXCXLbCGVqk4c2Y6Yx39nMWJiuJxx"


async def get_nutrition(food_name: str, weight: float) -> dict:
    """
    L·∫•y d·ªØ li·ªáu dinh d∆∞·ª°ng t·ª´ USDA API cho m·ªôt th·ª±c ph·∫©m.
    - ƒê·∫ßu v√†o: food_name (t√™n th·ª±c ph·∫©m), weight (kh·ªëi l∆∞·ª£ng t√≠nh b·∫±ng gram)
    - ƒê·∫ßu ra: Dictionary ch·ª©a th√¥ng tin dinh d∆∞·ª°ng theo ƒë·ªãnh d·∫°ng foodLogData
    """
    try:
        # G·ª≠i y√™u c·∫ßu HTTP b·∫•t ƒë·ªìng b·ªô t·ªõi USDA API
        async with httpx.AsyncClient() as client:
            print(f"ƒêang g·ª≠i y√™u c·∫ßu t·ªõi USDA API cho th·ª±c ph·∫©m: {food_name}")
            url = f"https://api.nal.usda.gov/fdc/v1/foods/search?query={food_name}&api_key={API_KEY}"
            response = await client.get(url)
            response.raise_for_status()  # N√©m l·ªói n·∫øu y√™u c·∫ßu th·∫•t b·∫°i
            data = response.json()

            # Ki·ªÉm tra xem c√≥ d·ªØ li·ªáu th·ª±c ph·∫©m hay kh√¥ng
            if 'foods' in data and len(data['foods']) > 0:
                food = data['foods'][0]  # L·∫•y th·ª±c ph·∫©m ƒë·∫ßu ti√™n
                nutrients = {nutrient['nutrientName']: nutrient['value']
                             for nutrient in food['foodNutrients']}

                # Ch·ªçn c√°c ch·∫•t dinh d∆∞·ª°ng c·∫ßn thi·∫øt v√† ƒë·ªïi t√™n ƒë·ªÉ kh·ªõp v·ªõi foodLogData
                selected_nutrients = {
                    # NƒÉng l∆∞·ª£ng (kcal)
                    "calories": nutrients.get("Energy", 0),
                    "protein": nutrients.get("Protein", 0),  # Ch·∫•t ƒë·∫°m (g)
                    # Tinh b·ªôt (g)
                    "carbs": nutrients.get("Carbohydrate, by difference", 0),
                    # Ch·∫•t b√©o (g)
                    "fat": nutrients.get("Total lipid (fat)", 0),
                }

                # T√≠nh to√°n dinh d∆∞·ª°ng theo kh·ªëi l∆∞·ª£ng (USDA cung c·∫•p d·ªØ li·ªáu cho 100g)
                factor = weight / 100
                nutrients_scaled = {k: round(v * factor, 2)
                                    for k, v in selected_nutrients.items()}

                # Tr·∫£ v·ªÅ ƒë·ªãnh d·∫°ng kh·ªõp v·ªõi foodLogData
                result = {
                    "name": food_name,
                    "grams": weight,
                    **nutrients_scaled
                }
                write_log(f"D·ªØ li·ªáu dinh d∆∞·ª°ng cho {food_name}: {result}")
                return result
            else:
                write_log(
                    f"Kh√¥ng t√¨m th·∫•y d·ªØ li·ªáu cho th·ª±c ph·∫©m '{food_name}'")
                raise ValueError(
                    f"Kh√¥ng t√¨m th·∫•y d·ªØ li·ªáu cho th·ª±c ph·∫©m '{food_name}'")
    except httpx.HTTPStatusError as e:
        write_log(f"Y√™u c·∫ßu USDA API th·∫•t b·∫°i cho '{food_name}': {str(e)}")
        raise ValueError(f"Kh√¥ng th·ªÉ l·∫•y d·ªØ li·ªáu cho '{food_name}'")
    except Exception as e:
        write_log(f"L·ªói kh√¥ng mong mu·ªën khi x·ª≠ l√Ω '{food_name}': {str(e)}")
        raise ValueError(f"L·ªói khi x·ª≠ l√Ω '{food_name}'")


async def post_to_input_manual(food_log_data: list, token: str) -> dict:
    """
    G·ª≠i d·ªØ li·ªáu dinh d∆∞·ª°ng t·ªõi API input_manual.
    - ƒê·∫ßu v√†o: food_log_data (danh s√°ch d·ªØ li·ªáu dinh d∆∞·ª°ng), token (chu·ªói x√°c th·ª±c)
    - ƒê·∫ßu ra: Dictionary b√°o tr·∫°ng th√°i th√†nh c√¥ng ho·∫∑c th·∫•t b·∫°i
    """
    api_url = "https://chat.aaateammm.online/api/food-items"  # URL API th·ª±c t·∫ø
    headers = {
        "Accept": "*/*",
        "Content-Type": "application/json",
        "Authorization": f"Bearer {token}"  # Th√™m token v√†o header
    }

    try:
        write_log(f"ƒêang g·ª≠i d·ªØ li·ªáu t·ªõi API input_manual: {food_log_data}")
        async with httpx.AsyncClient() as client:
            response = await client.post(api_url, json=food_log_data, headers=headers)
            response.raise_for_status()  # N√©m l·ªói n·∫øu y√™u c·∫ßu th·∫•t b·∫°i
            print("G·ª≠i d·ªØ li·ªáu t·ªõi API input_manual th√†nh c√¥ng")
            return {"status": "success", "message": "G·ª≠i d·ªØ li·ªáu th√†nh c√¥ng"}
    except httpx.HTTPStatusError as e:
        write_log(f"Y√™u c·∫ßu API input_manual th·∫•t b·∫°i: {str(e)}")
        raise HTTPException(status_code=e.response.status_code, detail=str(e))
    except Exception as e:
        prwrite_logint(
            f"L·ªói kh√¥ng mong mu·ªën khi g·ª≠i t·ªõi API input_manual: {str(e)}")
        raise HTTPException(status_code=500, detail="L·ªói m√°y ch·ªß n·ªôi b·ªô")


@app.post("/food-nutrition/")
async def get_nutrition_endpoint(food_input: FoodInput):
    """
    X·ª≠ l√Ω danh s√°ch th·ª±c ph·∫©m, l·∫•y d·ªØ li·ªáu dinh d∆∞·ª°ng t·ª´ USDA API, v√† g·ª≠i t·ªõi API input_manual.
    - ƒê·∫ßu v√†o: JSON ch·ª©a 'food_masses_gram' (danh s√°ch {food, mass}) v√† 'token'
    - ƒê·∫ßu ra: Tr·∫°ng th√°i x·ª≠ l√Ω v√† d·ªØ li·ªáu dinh d∆∞·ª°ng
    """
    try:
        write_log(food_input)
        # L·∫•y d·ªØ li·ªáu dinh d∆∞·ª°ng cho t·ª´ng th·ª±c ph·∫©m
        food_log_data = []

        for item in food_input.food_masses_gram:
            nutrition = await get_nutrition(item.food, item.mass)
            food_log_data.append(nutrition)

        if food_input.token.startswith("Bearer "):
            food_input.token = food_input.token[len("Bearer "):]

        # G·ª≠i d·ªØ li·ªáu t·ªõi API input_manual
        result = await post_to_input_manual(food_log_data, food_input.token)

        write_log("X·ª≠ l√Ω y√™u c·∫ßu th√†nh c√¥ng")
        return {
            "status": "success",
            "nutrition_data": food_log_data,
            "api_result": result
        }

    except ValueError as e:
        write_log(f"L·ªói: {str(e)}")
        raise HTTPException(status_code=404, detail=str(e))
    except Exception as e:
        write_log(f"L·ªói m√°y ch·ªß: {str(e)}")
        raise HTTPException(status_code=500, detail=str(e))

# End Get Nutrtion #####################################################################################################################

# H√†m ch·∫°y FastAPI server


def start_uvicorn():
    uvicorn.run(app, host="127.0.0.1", port=8000)


# Ch·∫°y FastAPI trong thread ri√™ng
threading.Thread(target=start_uvicorn, daemon=True).start()

# Thi·∫øt l·∫≠p Ngrok
ngrok.set_auth_token("2wftyZRfXxzSxGhlz427E47c3aW_7RdqiCxQ6CswwCVfvBcDM")
public_url = ngrok.connect(8000)

# Log v√† hi·ªÉn th·ªã URL c√¥ng c·ªông
write_log(f"Public URL: {public_url}")
print("üöÄ Public FastAPI server is running at:", public_url)

# Gi·ªØ thread ch√≠nh ch·∫°y
while True:
    time.sleep(1)

In [None]:
#777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777

In [None]:
#777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777#

In [None]:
########################################################################################################################################################

In [None]:
########################################################################################################################################################

In [None]:
image_path = r"rice_1.jpg"

In [None]:
import torch
print(torch.__version__)
print(torch.version.cuda)

SEGMENTATION

In [None]:
import sys
import os
import json
import shutil
import logging
import cv2
import torch
import numpy as np
from typing import Any, Dict, List
import argparse

# Th√™m ƒë∆∞·ªùng d·∫´n c√°c th∆∞ vi·ªán t√πy ch·ªânh
sys.path.append('.')
sys.path.append('./SAM')
sys.path.append('./mmseg')

# Import c√°c module c·∫ßn thi·∫øt
from segment_anything import SamAutomaticMaskGenerator, sam_model_registry
from FoodSAM.FoodSAM_tools.predict_semantic_mask import semantic_predict
from FoodSAM.FoodSAM_tools.enhance_semantic_masks import enhance_masks
from FoodSAM.FoodSAM_tools.evaluate_foodseg103 import evaluate


# T·∫°o ƒë·ªëi t∆∞·ª£ng args thay th·∫ø argparse
args = argparse.Namespace(
    img_path = image_path,  # C·∫≠p nh·∫≠t ƒë∆∞·ªùng d·∫´n ·∫£nh
    output="Output/Semantic_Results",
    device="cuda" if torch.cuda.is_available() else "cpu",
    SAM_checkpoint="ckpts/sam_vit_h_4b8939.pth",
    semantic_config="configs/SETR_MLA_768x768_80k_base.py",
    semantic_checkpoint="ckpts/SETR_MLA/iter_80000.pth",
    model_type="vit_h",
    color_list_path="FoodSAM/FoodSAM_tools/color_list.npy",
    category_txt="FoodSAM/FoodSAM_tools/category_id_files/foodseg103_category_id.txt",
    num_class=104
)


def write_masks_to_folder(masks: List[Dict[str, Any]], path: str) -> None:
    """L∆∞u c√°c mask c·ªßa SAM v√†o th∆∞ m·ª•c"""
    header = "id,area,bbox_x0,bbox_y0,bbox_w,bbox_h,point_input_x,point_input_y,predicted_iou,stability_score,crop_box_x0,crop_box_y0,crop_box_w,crop_box_h"  # noqa
    metadata = [header]
    os.makedirs(os.path.join(path, "sam_mask"), exist_ok=True)
    masks_array = []
    for i, mask_data in enumerate(masks):
        mask = mask_data["segmentation"]
        masks_array.append(mask.copy())
        filename = f"{i}.png"
        cv2.imwrite(os.path.join(path, "sam_mask", filename), mask * 255)
        mask_metadata = [
            str(i),
            str(mask_data["area"]),
            *[str(x) for x in mask_data["bbox"]],
            *[str(x) for x in mask_data["point_coords"][0]],
            str(mask_data["predicted_iou"]),
            str(mask_data["stability_score"]),
            *[str(x) for x in mask_data["crop_box"]],
        ]
        row = ",".join(mask_metadata)
        metadata.append(row)

    masks_array = np.stack(masks_array, axis=0)
    np.save(os.path.join(path, "sam_mask", "masks.npy"), masks_array)
    metadata_path = os.path.join(path, "sam_metadata.csv")
    with open(metadata_path, "w") as f:
        f.write("\n".join(metadata))



def create_logger(save_folder):
    """T·∫°o logger ƒë·ªÉ ghi log trong qu√° tr√¨nh ch·∫°y"""
    log_file = "sam_process.log"
    final_log_file = os.path.join(save_folder, log_file)

    logging.basicConfig(
        format='[%(asctime)s] [%(filename)s:%(lineno)d] [%(levelname)s] %(message)s',
        level=logging.INFO,
        handlers=[
            logging.FileHandler(final_log_file, mode='w'),
            logging.StreamHandler()
        ])
    logger = logging.getLogger()
    print(f"Logger created: {final_log_file}")
    return logger


def main(args):
    """Ch·∫°y qu√° tr√¨nh segmentation"""
    os.makedirs(args.output, exist_ok=True)
    # logger = create_logger(args.output)
    # logger.info("Running SAM...")

    # Ki·ªÉm tra thi·∫øt b·ªã c√≥ h·ªó tr·ª£ CUDA kh√¥ng
    if args.device == "cuda" and not torch.cuda.is_available():
        # logger.warning("CUDA is not available. Switching to CPU.")
        args.device = "cpu"

    # Load m√¥ h√¨nh SAM
    sam = sam_model_registry[args.model_type](checkpoint=args.SAM_checkpoint)
    _ = sam.to(device=args.device)

    # Thi·∫øt l·∫≠p b·ªô t·∫°o mask
    output_mode = "binary_mask"
    generator = SamAutomaticMaskGenerator(sam, output_mode=output_mode)

    assert args.img_path, "B·∫°n ph·∫£i cung c·∫•p ƒë∆∞·ªùng d·∫´n ·∫£nh."
    
    # ƒê·ªçc ·∫£nh v√† th·ª±c hi·ªán segmentation
    targets = [args.img_path]
    for t in targets:
        # logger.info(f"Processing {t}...")
        image = cv2.imread(t)
        if image is None:
            # logger.error(f"Kh√¥ng th·ªÉ t·∫£i {t}, b·ªè qua...")
            continue
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        masks = generator.generate(image)

        # L∆∞u k·∫øt qu·∫£
        base = os.path.basename(t).split('.')[0]
        save_base = os.path.join(args.output, base)
        os.makedirs(save_base, exist_ok=True)
        write_masks_to_folder(masks, save_base)
        shutil.copyfile(t, os.path.join(save_base, "input.jpg"))

    # logger.info("SAM processing done!\n")

    # Ch·∫°y m√¥ h√¨nh ph√¢n ƒëo·∫°n ng·ªØ nghƒ©a
    # logger.info("Running semantic segmentation model...")
    semantic_predict(args.semantic_config, args.semantic_checkpoint,
                     args.output, args.color_list_path, args.img_path, device=args.device)
    # logger.info("Semantic segmentation done!\n")

    # TƒÉng c∆∞·ªùng segmentation mask
    # logger.info("Enhancing semantic masks...")
    enhance_masks(args.output, args.category_txt, args.color_list_path, num_class=args.num_class)
    # logger.info("Enhancement done!\n")

    # logger.info(f"Results saved in {args.output}!\n")



def clear_folder(folder_path: str):
    """
    X√≥a to√†n b·ªô n·ªôi dung b√™n trong th∆∞ m·ª•c (t·∫•t c·∫£ file, th∆∞ m·ª•c con, symbolic link),
    nh∆∞ng v·∫´n gi·ªØ l·∫°i th∆∞ m·ª•c g·ªëc.

    Args:
        folder_path (str): ƒê∆∞·ªùng d·∫´n th∆∞ m·ª•c c·∫ßn x√≥a n·ªôi dung.
    
    Returns:
        None
    """
    if os.path.exists(folder_path) and os.path.isdir(folder_path):
        # Duy·ªát qua t·∫•t c·∫£ file v√† th∆∞ m·ª•c con trong folder
        for item in os.listdir(folder_path):
            item_path = os.path.join(folder_path, item)
            try:
                if os.path.isfile(item_path) or os.path.islink(item_path):
                    os.remove(item_path)  # X√≥a file ho·∫∑c symbolic link
                elif os.path.isdir(item_path):
                    shutil.rmtree(item_path)  # X√≥a th∆∞ m·ª•c con v√† n·ªôi dung b√™n trong
            except Exception as e:
                print(f"L·ªói khi x√≥a {item_path}: {e}")
        print(f"‚úÖ ƒê√£ x√≥a to√†n b·ªô n·ªôi dung trong th∆∞ m·ª•c '{folder_path}'.")
    else:
        print(f"‚ö†Ô∏è Th∆∞ m·ª•c '{folder_path}' kh√¥ng t·ªìn t·∫°i.")


# clear_folder("masks")
# clear_folder("Output")
# # Ch·∫°y ch∆∞∆°ng tr√¨nh
# main(args)
print("doneeeeee")


FOOD VOLUME

In [None]:
import sys
import json
from keras.models import Model, model_from_json
from food_volume_estimation.volume_estimator import VolumeEstimator
from food_volume_estimation.depth_estimation.custom_modules import *
from food_volume_estimation.food_segmentation.food_segmentator import FoodSegmentator
import matplotlib.pyplot as plt
from pyntcloud import PyntCloud

# Paths to model archiecture/weights
depth_model_architecture = './models/fine_tune_food_videos/monovideo_fine_tune_food_videos.json'
depth_model_weights = './models/fine_tune_food_videos/monovideo_fine_tune_food_videos.h5'
print("loaded model estimate volume")

In [None]:
# Create estimator object and intialize
estimator = VolumeEstimator(arg_init=False)
with open(depth_model_architecture, 'r') as read_file:
    custom_losses = Losses()
    objs = {'ProjectionLayer': ProjectionLayer,
            'ReflectionPadding2D': ReflectionPadding2D,
            'InverseDepthNormalization': InverseDepthNormalization,
            'AugmentationLayer': AugmentationLayer,
            'compute_source_loss': custom_losses.compute_source_loss}
    model_architecture_json = json.load(read_file)
    estimator.monovideo = model_from_json(model_architecture_json, custom_objects=objs)
estimator._VolumeEstimator__set_weights_trainable(estimator.monovideo, False)
estimator.monovideo.load_weights(depth_model_weights)
estimator.model_input_shape = estimator.monovideo.inputs[0].shape.as_list()[1:]
depth_net = estimator.monovideo.get_layer('depth_net')
estimator.depth_model = Model(inputs=depth_net.inputs, outputs=depth_net.outputs, name='depth_model')
print('[*] Loaded depth estimation model.')

# Depth model configuration
MIN_DEPTH = 0.01
MAX_DEPTH = 10
estimator.min_disp = 1 / MAX_DEPTH
estimator.max_disp = 1 / MIN_DEPTH
estimator.gt_depth_scale = 0.35 # Ground truth expected median depth

# Create segmentator object
# estimator.segmentator = FoodSegmentator(segmentation_model_weights)

# Set plate adjustment relaxation parameter
estimator.relax_param = 0.01
print("done set up model")

NUTRITION INFORMATION RETRIEAVAL

In [None]:
# H√†m ghi log v√†o file
def write_log(message):
    log_dir = "debug"
    log_file_path = os.path.join(log_dir, "log.txt")

    # T·∫°o th∆∞ m·ª•c n·∫øu ch∆∞a t·ªìn t·∫°i
    os.makedirs(log_dir, exist_ok=True)

    # Ghi log
    with open(log_file_path, "a") as log_file:
        log_file.write(message + "\n")
        
def run(image_path_run):
    write_log(f"Starting processing for image: {image_path_run}")
    # T·∫°o ƒë·ªëi t∆∞·ª£ng args thay th·∫ø argparse
    args = argparse.Namespace(
        img_path=image_path_run,  # C·∫≠p nh·∫≠t ƒë∆∞·ªùng d·∫´n ·∫£nh
        output="Output/Semantic_Results",
        device="cuda" if torch.cuda.is_available() else "cpu",
        SAM_checkpoint="ckpts/sam_vit_h_4b8939.pth",
        semantic_config="configs/SETR_MLA_768x768_80k_base.py",
        semantic_checkpoint="ckpts/SETR_MLA/iter_80000.pth",
        model_type="vit_h",
        color_list_path="FoodSAM/FoodSAM_tools/color_list.npy",
        category_txt="FoodSAM/FoodSAM_tools/category_id_files/foodseg103_category_id.txt",
        num_class=104
    )
    

    clear_folder("masks")
    clear_folder("Output")
    # Ch·∫°y ch∆∞∆°ng tr√¨nh
    main(args)

    name_without_ext = os.path.splitext(image_path_run)[0]
    write_log(f"Processed image name without extension: {name_without_ext}")
    # name_without_ext_without_input = name_without_ext.split("Input/")[1]
    # write_log(f"name_without_ext_without_input: {name_without_ext_without_input}")

    plate_diameter = 0  # Set as 0 to ignore plate detection and scaling
    outputs_list, food_volumes = estimator.estimate_volume(image_path_run, fov=70, plate_diameter_prior=plate_diameter,
                                                        plot_results=True, para_folder_path=rf"Output/Semantic_Results/{name_without_ext}/masks/")

    food_masses = estimator.convert_volume_to_mass(
        r'Density_sub_90.xlsx', food_volumes)
    
    return food_masses

In [None]:
print(run(r"rice_1.jpg"))