# Data Acquisition and Preprocessing

In [None]:
# Data manipulation
import pandas as pd

# Libraries for API querying
import json
import requests
from pprint import pprint

# Imports for image scaling and file access
import os
from PIL import Image
from tqdm import tqdm

# For splitting into train and test
import random
import shutil

# For PyTorch
import torch
from torch.utils.data import DataLoader
from torchvision import datasets, transforms


To be loading API keys and related secret value we will be using .env files.

In [None]:
from dotenv import load_dotenv

load_dotenv()

A helper function which will be useful later on in nutrition data retrieval will be to get a list of all the food names present in our dataset.

In [None]:

def get_food_names():
    data_path = 'food-128'
    food_names = []

    # Extract food names from folders
    for food_name in os.listdir(data_path):
        full_path = os.path.join(data_path, food_name)
        if os.path.isdir(full_path):
            food_names.append(food_name.replace('_', ' '))
    return food_names
print(get_food_names())

Downscaling the images will help massively in getting a working model without needing to use the full image data. The method to do so is shown below.

In [None]:
# Downscale images (ChatGPT)
# Take our raw images

# Source and target directories
SRC_DIR = "images"        # original images folder with class subfolders
DST_DIR = "food-128"      # output folder

size = (128, 128)  # target resolution

# Create output root directory
os.makedirs(DST_DIR, exist_ok=True)

# Loop through every class folder
for class_name in os.listdir(SRC_DIR):
    src_class_path = os.path.join(SRC_DIR, class_name)
    dst_class_path = os.path.join(DST_DIR, class_name)

    # Skip non-directories (just in case)
    if not os.path.isdir(src_class_path):
        continue

    os.makedirs(dst_class_path, exist_ok=True)

    # Resize each image
    for img_name in tqdm(os.listdir(src_class_path), desc=f"Resizing {class_name[:20]}"):
        src_img_path = os.path.join(src_class_path, img_name)
        dst_img_path = os.path.join(dst_class_path, img_name)

        try:
            img = Image.open(src_img_path).convert("RGB")
            img = img.resize(size, Image.Resampling.BILINEAR)
            img.save(dst_img_path, quality=90)
        except Exception as e:
            print(f"Skipped {src_img_path}: {e}")

print("All images resized to 128x128 and saved in food-128")

## FoodData Central API Loading

For loading the API data, we'll first need to define some constants which will be used through data retrieval.

In [None]:
# API Setup
FDC_BASE_URL = 'https://api.nal.usda.gov/fdc/v1'
FDC_API_KEY = os.getenv('FDC_API_KEY')

To search the FoodData Central database, we'll use the following helper functions:

In [None]:
def search_food(query, data_type='Survey (FNDDS)', limit=10):
    # data_type can be any of 'Branded', 'Foundation', 'Survey (FNDDS)', 'SR Legacy'
    parameters = {
        'api_key': FDC_API_KEY,
        'query': query,
        'dataType': data_type,
        'pageSize': limit
    }
    response = requests.get(url=f'{FDC_BASE_URL}/foods/search', params=parameters)
    response.raise_for_status()
    return response.json()


def get_calories(food):
    for nutrient in food.get('foodNutrients', []):
        # TODO: test if {foodNutrientId': 34350077, nutrientName: 'Energy'} is the correct dict
        if nutrient.get('nutrientName') == 'Energy' and nutrient.get('unitName') == 'KCAL':
            return nutrient['value']
    return None


def simplify_results(data):
    foods = data.get('foods', [])
    result_foods = []
    for food in foods:
        calories = get_calories(food)
        if calories is not None:
            result_foods.append({
                'name': food['description'],
                'calories': calories
            })
    return result_foods


def estimate_calories_from_results(results, top_n=5):
    if not results:
        return None
    
    # Get the first `top_n` calorie values that aren't None
    calories = [r['calories'] for r in results if r.get('calories') is not None][:top_n]
    if not calories:
        return None
    
    return round(sum(calories) / len(calories), 1)

To test the helper functions' ability to query the API, we can search with a given dish name.

In [None]:
# Since the API calls could fail, use a try except block
try:
    food = input('Type a food:')
    response = search_food(food)
    simplified = simplify_results(response)
    print(len(simplified))
    pprint(simplified)
except Exception as e:
    print('Error:', e)

Now, we'll need to refine this data to contain only the relevant caloric information for the foods in our dataset. An important aspect of this to note is that the FoodData Central API is limited to 1000 requests per hour by IP address, and will block an IP address for one hour if this limit is exceded.

In [None]:
try:
    food_names = get_food_names()
    all_calorie_data = []
    for food in food_names:
        response = search_food(food)
        simplified = simplify_results(response)
        estimate = estimate_calories_from_results(simplified, top_n=5)
        print(f"\n{food}: ~{estimate} kcal/100g")
        all_calorie_data.append({
            'food': food,
            'estimate_calories_per_100g': estimate,
            'results': simplified
        })
except Exception as e:
    print("Error:", e)


With the previous code block, we now have a list of the results of the keyword search for all 101 dishes along with an estimate for kcal/100g. However, some of these contained no entries at all.

In [None]:
# Count and record missing dishes
missing_dishes = []
for i in range(len(food_names)):
    if all_calorie_data[i].get('estimate_calories_per_100g') == None:
        missing_dishes.append(food_names[i])

print('Missing dish count:', len(missing_dishes))
pprint(missing_dishes)

To supplement the data which doesn't have an entry in the FoodData Central API, we'll be using the Nutritionix API.

In [None]:
NUTRITIONIX_URL = 'https://trackapi.nutritionix.com/v2/natural/nutrients'
NUTRITIONIX_APP_ID = os.getenv('NUTRITIONIX_APP_ID')
NUTRITIONIX_APP_KEY = os.getenv('NUTRITIONIX_APP_KEY')

In [None]:
def get_calories_nutritionix(query):
    headers = {
        'x-app-id': NUTRITIONIX_APP_ID,
        'x-app-key': NUTRITIONIX_APP_KEY,
        'Content-Type': 'application/json'
    }
    body = {
        "query": query
    }

    try:
        response = requests.post(NUTRITIONIX_URL, headers=headers, json=body)
        response.raise_for_status()
    except Exception as e:
        print('Error:', e)
        return None
    
    data = response.json()
    
    foods = data.get("foods", [])
    if not foods:
        print("No nutrition data found for", query)
        return None

    # Extract calorie info from first result
    f = foods[0]
    cal = f.get("nf_calories", 0)
    weight = f.get("serving_weight_grams", 100)
    cal_per_100g = cal * (100 / weight)

    return round(cal_per_100g, 1)

When storing this with the rest of our data which had FoodData Central simplified results included, we'll use `None` as the value for the `'results'` kay in the food dictionary.

In [None]:
for food in missing_dishes:
    estimate = get_calories_nutritionix(food)
    all_calorie_data.append({
            'food': food,
            'estimate_calories_per_100g': estimate,
            'results': None
        })
    print(f'{food}: ', estimate)

In [None]:
for food_entry in all_calorie_data:
    pprint(food_entry)

Turn Image Data Into Train and Test Sets


In [None]:
SRC_DIR = 'food-128'

TRAIN_DIR = 'food-128-split/train'
TEST_DIR = 'food-128-split/test'

# Output directories
os.makedirs(TRAIN_DIR)
os.makedirs(TEST_DIR)

cntr = 0
# Loop through each food subclass
for class_name in os.listdir(SRC_DIR):
    class_path = os.path.join(SRC_DIR, class_name)

    # Ensure its a folder
    if os.path.isdir(class_path):
        images = os.listdir(class_path)

        # Shuffle images
        random.shuffle(images)

        # Set splitting point
        split_idx = int(len(images) * 0.8)
        
        train_images = images[:split_idx]
        test_images = images[split_idx:]

        # Create new food folders in train and test
        train_class_dir = os.path.join(TRAIN_DIR, class_name)
        test_class_dir = os.path.join(TEST_DIR, class_name)
        os.makedirs(train_class_dir)
        os.makedirs(test_class_dir)

        for img in train_images:
            shutil.copy(os.path.join(class_path, img), os.path.join(train_class_dir, img))
        for img in test_images:
            shutil.copy(os.path.join(class_path, img), os.path.join(test_class_dir, img))

        cntr += 1
        print(f"{cntr}. Processesed {class_name}: {len(train_images)} train, {len(test_images)} test")

print("Dataset split successfully.")


Convert Images to new format using PyTorch

In [None]:
# Image transformations
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(
        # These numbers come from ImageNet
        # Represent mean and standard deviation values for RGB based on a massive dateset of color distribution in natural images
        mean = [0.485, 0.456, 0.406],
        std = [0.229, 0.224, 0.225]
    )
])

# Load training and test sets
train_dataset = datasets.ImageFolder(root = 'food-128-split/train', transform = transform)
test_dataset = datasets.ImageFolder(root = 'food-128-split/test', transform = transform)

# Create PyTorch data loaders for later
train_loader = DataLoader(
    train_dataset, 
    batch_size = 32, # use 32 images per batch
    shuffle = True, # shuffle training data each time to not get false patterns
    num_workers = 4 # use 4 CPU threads in parallel to speed up work
)

test_loader = DataLoader(
    test_dataset,
    batch_size = 32, # same as above
    shuffle = False, # don't shuffle test data for consistent benchmarks
    num_workers = 4 # same as above
)

# Verify stuff
class_cnt = len(train_dataset.classes)
# Get class count
print(f"Num Classes: {class_cnt}")
# See a few just to verify names saved correctly
print(f"Sample Classes: {train_dataset.classes[:10]}")
