In [1]:
!curl -L -o data.csv.gz "https://static.openfoodfacts.org/data/en.openfoodfacts.org.products.csv.gz"
!gunzip 'data.csv.gz'

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   145  100   145    0     0    267      0 --:--:-- --:--:-- --:--:--   267
100 1042M  100 1042M    0     0  26.4M      0  0:00:39  0:00:39 --:--:-- 30.9M


In [2]:
!gdown 1cjR3vKZkwt_pzIy3oBWTCHczBnywF06l

Downloading...
From: https://drive.google.com/uc?id=1cjR3vKZkwt_pzIy3oBWTCHczBnywF06l
To: /content/unhealthy_ingredients.json
  0% 0.00/9.14k [00:00<?, ?B/s]100% 9.14k/9.14k [00:00<00:00, 27.3MB/s]


In [5]:
!pip install torch transformers accelerate sentencepiece bitsandbytes gradio accelerate

Collecting gradio
  Downloading gradio-5.29.0-py3-none-any.whl.metadata (16 kB)
Collecting aiofiles<25.0,>=22.0 (from gradio)
  Downloading aiofiles-24.1.0-py3-none-any.whl.metadata (10 kB)
Collecting fastapi<1.0,>=0.115.2 (from gradio)
  Downloading fastapi-0.115.12-py3-none-any.whl.metadata (27 kB)
Collecting ffmpy (from gradio)
  Downloading ffmpy-0.5.0-py3-none-any.whl.metadata (3.0 kB)
Collecting gradio-client==1.10.0 (from gradio)
  Downloading gradio_client-1.10.0-py3-none-any.whl.metadata (7.1 kB)
Collecting groovy~=0.1 (from gradio)
  Downloading groovy-0.1.2-py3-none-any.whl.metadata (6.1 kB)
Collecting pydub (from gradio)
  Downloading pydub-0.25.1-py2.py3-none-any.whl.metadata (1.4 kB)
Collecting python-multipart>=0.0.18 (from gradio)
  Downloading python_multipart-0.0.20-py3-none-any.whl.metadata (1.8 kB)
Collecting ruff>=0.9.3 (from gradio)
  Downloading ruff-0.11.8-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (25 kB)
Collecting safehttpx<0.2.0,>=0.1.6

In [8]:
import pandas as pd
import csv
import sys
import json
import gradio as gr
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline

# ----------- CONFIGURATION ----------- #
DATA_PATH = "data.csv"
UNHEALTHY_INGREDIENT_PATH = "/content/unhealthy_ingredients.json"
MODEL_NAME = "HuggingFaceH4/zephyr-7b-beta"
MAX_NEW_TOKENS = 256

# ----------- MODEL LOADING ----------- #
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModelForCausalLM.from_pretrained(MODEL_NAME, device_map="auto", torch_dtype="auto")
chatbot = pipeline("text-generation", model=model, tokenizer=tokenizer, max_new_tokens=MAX_NEW_TOKENS,
                   do_sample=False, pad_token_id=tokenizer.eos_token_id)

# ----------- LOAD METADATA ----------- #
def load_metadata(path, chunksize=100000, max_chunks=3):
    sample = pd.read_csv(path, sep="\t", nrows=5, dtype=str, encoding='utf-8', on_bad_lines='skip')
    sample.columns = sample.columns.str.strip().str.lower()
    product_col = next((col for col in sample.columns if "product" in col and "name" in col), None)
    brand_col = next((col for col in sample.columns if "brand" in col), None)

    metadata = []
    for i, chunk in enumerate(pd.read_csv(path, sep="\t", chunksize=chunksize, dtype=str, encoding='utf-8', on_bad_lines='skip')):
        for idx, row in chunk.iterrows():
            metadata.append({
                "line": i * chunksize + idx + 1,
                "product_name": row.get(product_col, ""),
                "brands": row.get(brand_col, "")
            })
        if i + 1 >= max_chunks:
            break
    return pd.DataFrame(metadata)

PRODUCT_METADATA = load_metadata(DATA_PATH)

def read_row_by_index(csv_path, index):
    csv.field_size_limit(sys.maxsize)
    with open(csv_path, encoding="utf-8") as f:
        reader = csv.reader(f, delimiter="\t")
        header = next(reader)
        for i, row in enumerate(reader, start=1):
            if i == index:
                return {k.strip().lower(): v for k, v in zip(header, row)}
    return {}


Loading checkpoint shards:   0%|          | 0/8 [00:00<?, ?it/s]

Device set to use cuda:0


In [13]:
# ----------- HEALTH ANALYSIS ----------- #
with open(UNHEALTHY_INGREDIENT_PATH, "r", encoding="utf-8") as f:
    UNHEALTHY_INGREDIENT_DATA = json.load(f)

UNHEALTHY_LOOKUP = {
    alias.lower(): (entry["ingredient"], entry["reason"])
    for entry in UNHEALTHY_INGREDIENT_DATA
    for alias in [entry["ingredient"]] + entry.get("aliases", [])
}

def assess_ingredient_healthiness(ingredients_text):
    ingredients = ingredients_text.lower()
    found = []
    for alias, (canonical, reason) in UNHEALTHY_LOOKUP.items():
        if alias in ingredients:
            found.append(f"{canonical} → {reason}")
    ingredients_list = [i.strip() for i in ingredients.split(",")][:5]
    if any("sugar" in item for item in ingredients_list):
        found.append("Sugar is one of the first ingredients")
    return "No obvious unhealthy ingredients found." if not found else "This product may be unhealthy due to:\n- " + "\n- ".join(found)

# ----------- ALLERGEN CHECK ----------- #
KNOWN_ALLERGENS = [
    "milk", "soy", "nuts", "peanuts", "almond", "cashew", "hazelnut", "walnut", "egg",
    "wheat", "gluten", "shellfish", "fish", "sesame", "mustard", "lupin", "celery", "sulfite"
]

def check_allergen_presence(ingredients_text):
    ingredients = ingredients_text.lower()
    found = [a for a in KNOWN_ALLERGENS if a in ingredients]
    return "No common allergens detected." if not found else "May contain: " + ", ".join(sorted(set(found)))

# ----------- NUTRITION CONTEXT ----------- #
def get_full_context(product):
    ingredients = product.get("ingredients_text", "")
    product["allergen_analysis"] = check_allergen_presence(ingredients)
    product["health"] = assess_ingredient_healthiness(ingredients)
    return product

# ----------- PROMPT BUILDING ----------- #
def build_prompt(product, health, user_input):
    return f"""### Instruction:
Answer concisely using the facts provided.

### Product:
Name: {product.get("product_name")}
Brand: {product.get("brands")}
Energy: {product.get("energy-kcal_100g")} kcal
Fat: {product.get("fat_100g")}g | Sat Fat: {product.get("saturated-fat_100g")}g
Sugars: {product.get("sugars_100g")}g | Fiber: {product.get("fiber_100g")}g
Protein: {product.get("proteins_100g")}g | Salt: {product.get("salt_100g")}g
Ingredients: {product.get("ingredients_text")[:300]}...
Allergens (label): {product.get("allergens")}
Allergen Check: {product.get("allergen_analysis")}
Labels: {product.get("labels_tags")}
Health Insight: {health}

### Question:
{user_input}

### Answer:"""

# ----------- GRADIO LOGIC ----------- #
def get_matching_products(product_name):
    matches = PRODUCT_METADATA[
        PRODUCT_METADATA["product_name"].str.lower().str.contains(product_name.lower(), na=False)
    ]
    if matches.empty:
        return [], "❌ No matching products found."
    options = [f"{row['product_name']} (line {row['line']})" for _, row in matches.iterrows()]
    return options, "✅ Select a product from the dropdown."

def answer_question(selected_product_label, question):
    if not selected_product_label or "line" not in selected_product_label:
        return "❌ Please select a valid product."
    try:
        line = int(selected_product_label.split("line")[-1].strip(" )"))
        product_row = read_row_by_index(DATA_PATH, line)
        if not product_row:
            return "❌ Could not load product details."
        context = get_full_context(product_row)
        prompt = build_prompt(context, context["health"], question)
        input_ids = tokenizer(prompt)["input_ids"]
        if len(input_ids) > 3000:
            prompt = tokenizer.decode(input_ids[-3000:], skip_special_tokens=True)
        result = chatbot(prompt)
        output = result[0]["generated_text"]
        answer = output[len(prompt):].split("User:")[0].strip()
        return answer or "⚠️ No answer generated."
    except Exception as e:
        return f"❌ Error: {e}"

# ----------- GRADIO UI ----------- #
with gr.Blocks(title="Nutrition Chatbot") as demo:
    gr.Markdown("## 🍫 Nutrition Chatbot\nSearch and analyze food products from Open Food Facts.")

    with gr.Row():
        product_input = gr.Textbox(label="Enter product name", placeholder="e.g. chocolate bar")
        search_button = gr.Button("🔍 Search")

    product_dropdown = gr.Dropdown(choices=[], label="Matching Products", interactive=True)

    with gr.Row():
        question_box = gr.Textbox(label="Ask a Question", placeholder="e.g. Is this product high in sugar?")
        ask_button = gr.Button("💬 Ask")

    answer_box = gr.Textbox(label="Response", lines=5)

    def handle_search(product_name):
        options, status = get_matching_products(product_name)
        return gr.update(choices=options, value=None), status

    search_button.click(fn=handle_search, inputs=product_input, outputs=[product_dropdown, answer_box])
    ask_button.click(fn=answer_question, inputs=[product_dropdown, question_box], outputs=answer_box)

demo.launch()


It looks like you are running Gradio on a hosted a Jupyter notebook. For the Gradio app to work, sharing must be enabled. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://3ce4b05fbdbd35e8d3.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)




In [12]:
import time
import torch

def answer_question(selected_product_label, question):
    if not selected_product_label or "line" not in selected_product_label:
        return "❌ Please select a valid product."

    try:
        line = int(selected_product_label.split("line")[-1].strip(" )"))
        product_row = read_row_by_index(DATA_PATH, line)
        if not product_row:
            return "❌ Could not load product details."

        context = get_full_context(product_row)
        prompt = build_prompt(context, context["health"], question)

        input_ids = tokenizer(prompt)["input_ids"]
        if len(input_ids) > 3000:
            prompt = tokenizer.decode(input_ids[-3000:], skip_special_tokens=True)

        print(f"\n🧠 Prompt Tokens: {len(input_ids)}")

        # Start timer
        start_time = time.time()
        result = chatbot(prompt)
        end_time = time.time()

        output = result[0]["generated_text"]
        response_tokens = tokenizer(output[len(prompt):])["input_ids"]
        duration = end_time - start_time

        # Print metrics
        print(f"✅ Response Tokens: {len(response_tokens)}")
        print(f"⏱️ Inference Time: {duration:.2f} seconds")
        print(f"🖥️ CUDA Memory Allocated: {torch.cuda.memory_allocated() / 1e6:.1f} MB")
        print(f"🖥️ CUDA Max Memory Used: {torch.cuda.max_memory_allocated() / 1e6:.1f} MB")

        return output[len(prompt):].split("User:")[0].strip()

    except Exception as e:
        return f"❌ Error: {e}"

question = "Is this product high in sugar?"
selected_product_label = "Milk Chocolate (line 1234)"  # 🔁 Replace with a real product line from dropdown

print(answer_question(selected_product_label, question))




🧠 Prompt Tokens: 128
✅ Response Tokens: 102
⏱️ Inference Time: 4.21 seconds
🖥️ CUDA Memory Allocated: 14517.5 MB
🖥️ CUDA Max Memory Used: 28966.9 MB
Based on the provided information, it is unclear whether this product is high in sugar as the amount of sugars is not specified as high or low. However, the amount of sugars is listed as grams (g), and the value provided is....g. It is up to personal preference and dietary needs to determine if this amount of sugar is considered high or not. Without further context or information, it is best to assume that the product is not excessively high in sugar.
