# Enviornment Setup


In [None]:
import os
import numpy as np
import pandas as pd

for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))
        

# Check Cuda

In [None]:
import torch

if torch.cuda.is_available():
    print("CUDA is available.")
else:
    print("No CUDA found.")

# Database Building and Analysis

In [None]:
import os
import random
import glob
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from PIL import Image
import torch
from transformers import (
    CLIPModel,
    CLIPProcessor,
    BlipProcessor,
    BlipForConditionalGeneration
)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

class SimpleImageDatabase:
    def __init__(self, clip_model, clip_processor, device):
        self.clip_model = clip_model
        self.clip_processor = clip_processor
        self.device = device
        self.image_paths = []
        self.embeddings = []
        self.metadata = []

    def add_image(self, image_path, metadata=None):
        try:
            img = Image.open(image_path).convert("RGB")
            inputs = self.clip_processor(images=img, return_tensors="pt").to(self.device)
            with torch.no_grad():
                features = self.clip_model.get_image_features(**inputs)
            features = features / features.norm(dim=-1, keepdim=True)
            self.image_paths.append(image_path)
            self.embeddings.append(features.squeeze().cpu().numpy())
            self.metadata.append(metadata or {})
        except Exception as e:
            print(f"Error {image_path}: {e}")

    def search_similar(self, query_image_path, top_k=5):
        try:
            query_img = Image.open(query_image_path).convert("RGB")
            inputs = self.clip_processor(images=query_img, return_tensors="pt").to(self.device)
            with torch.no_grad():
                query_features = self.clip_model.get_image_features(**inputs)
            query_features = query_features / query_features.norm(dim=-1, keepdim=True)
            q_emb = query_features.squeeze().cpu().numpy()

            similarities = []
            for idx, emb in enumerate(self.embeddings):
                sim_v = float(np.dot(q_emb, emb))
                similarities.append((idx, sim_v))
            similarities.sort(key=lambda x: x[1], reverse=True)

            results = []
            for i, sim_val in similarities[:top_k]:
                results.append({
                    "path": self.image_paths[i],
                    "similarity": sim_val,
                    "metadata": self.metadata[i]
                })
            return results
        except Exception as e:
            print(f"Search error: {e}")
            return []

def build_image_database(dataset_path, subset_dir, clip_model, clip_processor, nutrition_df, subset_size=5000, device=device):
    all_imgs = glob.glob(os.path.join(dataset_path, "images", "*", "*.jpg"))
    if subset_size < len(all_imgs):
        chosen_files = random.sample(all_imgs, subset_size)
    else:
        chosen_files = all_imgs

    os.makedirs(subset_dir, exist_ok=True)
    for path in chosen_files:
        category_folder = os.path.basename(os.path.dirname(path))
        local_dir = os.path.join(subset_dir, category_folder)
        os.makedirs(local_dir, exist_ok=True)
        dest = os.path.join(local_dir, os.path.basename(path))
        if not os.path.exists(dest):
            try:
                os.system(f'cp "{path}" "{dest}"')
            except Exception as err:
                print(f"Copy error: {err}")

    db = SimpleImageDatabase(clip_model, clip_processor, device)
    final_imgs = glob.glob(os.path.join(subset_dir, "*", "*.jpg"))
    for img_path in final_imgs:
        cat_name = os.path.basename(os.path.dirname(img_path))
        meta = {"food_category": cat_name}
        row = nutrition_df[nutrition_df["food_name"] == cat_name.lower()]
        if not row.empty:
            row_data = row.iloc[0].to_dict()
            meta.update(row_data)
        db.add_image(img_path, meta)
    return db

def analyze_food_image(image_path, blip_model, blip_processor, similar_imgs, device=device):
    try:
        img = Image.open(image_path).convert("RGB")
        inputs = blip_processor(img, return_tensors="pt").to(device)
        with torch.no_grad():
            cap_ids = blip_model.generate(**inputs, max_new_tokens=50)
        caption = blip_processor.decode(cap_ids[0], skip_special_tokens=True)
    except Exception as e:
        caption = f"Error: {e}"

    if similar_imgs:
        top_candidate = similar_imgs[0]
        cat = top_candidate["metadata"].get("food_category", "Unknown")
        desc = top_candidate["metadata"].get("description", "No description")
    else:
        cat = "Unknown"
        desc = "No match"
    return f"\nImage: {os.path.basename(image_path)}\nCaption: {caption}\nCategory: {cat}\nInfo: {desc}"

def main():
    parent_dataset_path = "/kaggle/input/food101/food-101"
    local_subset_path = "/kaggle/working/food101_subset"
    clip_m = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to(device)
    clip_p = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
    blip_proc = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
    blip_mod = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base").to(device)

    data_map = {
        "food_name": ["apple_pie", "pizza", "ramen", "sushi", "hamburger"],
        "calories": [300, 285, 450, 200, 350],
        "description": [
            "dessert with apples",
            "tomato sauce & cheese",
            "noodle soup",
            "vinegared rice",
            "bun with patty"
        ]
    }
    nutrition_df = pd.DataFrame(data_map)
    db = build_image_database(parent_dataset_path, local_subset_path, clip_m, clip_p, nutrition_df, 5000, device)

    all_subset_imgs = glob.glob(os.path.join(local_subset_path, "*", "*.jpg"))
    if not all_subset_imgs:
        print("No images found.")
        return
    candidate_img = random.choice(all_subset_imgs)
    top_similar = db.search_similar(candidate_img, top_k=3)
    outcome = analyze_food_image(candidate_img, blip_mod, blip_proc, top_similar, device)
    print("RESULT")
    print(outcome)
    for idx, item in enumerate(top_similar):
        print(f"{idx+1}. {item['path']} {item['similarity']:.3f}")

if __name__ == "__main__":
    main()

# Extra Visualization

In [None]:
import os
import random
import glob
import matplotlib.pyplot as plt
from PIL import Image
import torch
from transformers import (
    BlipProcessor,
    BlipForConditionalGeneration
)

def display_image(image_path):
    if not os.path.exists(image_path):
        print(f"Not found {image_path}.")
        return
    with Image.open(image_path) as img:
        plt.imshow(img)
        plt.axis("off")
        plt.show()

def get_random_image(image_list):
    if not image_list:
        raise ValueError("Empty list")
    return random.choice(image_list)

def quick_caption_check(image_path, blip_model, blip_processor, device):
    try:
        with Image.open(image_path) as img:
            inputs = blip_processor(img, return_tensors="pt").to(device)
            with torch.no_grad():
                caption_ids = blip_model.generate(**inputs, max_new_tokens=50)
            caption = blip_processor.decode(caption_ids[0], skip_special_tokens=True)
        print(f"\nImage: {image_path}\nCaption: {caption}\n")
    except Exception as e:
        print(f"Error: {e}")

def main():
    subset_dir = "/kaggle/working/food101_subset"
    files = glob.glob(os.path.join(subset_dir, "*", "*.jpg"))
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    blip_proc = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
    blip_mod = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base").to(device)

    try:
        picked = get_random_image(files)
        display_image(picked)
        quick_caption_check(picked, blip_mod, blip_proc, device)
    except ValueError as e:
        print(f"Error: {e}")

if __name__ == "__main__":
    main()

# 