# Insert Consumables into Database

This notebook provides a modular system for inserting food data from the MM-Food-100K dataset into the consumable table.

## Overview

- **Source**: MM-Food-100K unique dishes dataset  
- **Target**: PostgreSQL consumable table via Supabase
- **Features**: Batch processing, error handling, data validation

## Setup & Configuration

In [35]:
import pandas as pd
import psycopg2
from psycopg2.extras import execute_batch
import json
import uuid
import os
from dotenv import load_dotenv
from typing import List, Dict, Any
from tqdm import tqdm

# Load environment variables
load_dotenv()

print("✓ Libraries imported successfully")
print("✓ Environment variables loaded")

✓ Libraries imported successfully
✓ Environment variables loaded


In [36]:
# Load the unique dishes dataset
data_path = "/Users/luvsuneja/Documents/repos/masala-embed/esci-dataset/data/mm-food-100k-unique-dishes.parquet"
df_unique_dishes = pd.read_parquet(data_path)

print(f"✓ Loaded {len(df_unique_dishes):,} unique dishes")
print(f"✓ Dataset shape: {df_unique_dishes.shape}")
print(f"✓ Memory usage: {df_unique_dishes.memory_usage(deep=True).sum() / 1024**2:.1f} MB")

✓ Loaded 19,289 unique dishes
✓ Dataset shape: (19289, 10)
✓ Memory usage: 11.3 MB


## Data Preparation Module

In [37]:
def prepare_consumable_data(df: pd.DataFrame) -> pd.DataFrame:
    """
    Prepare DataFrame for insertion into consumable table.
    
    Args:
        df: Source DataFrame with food data
        
    Returns:
        DataFrame ready for database insertion
    """
    # Create a copy to avoid modifying original
    df_prep = df.copy()
    
    # Map columns to database schema (no ID needed - auto-increment)
    column_mapping = {
        'image_url': 'image_url', 
        'dish_name': 'consumable_name',
        'food_type': 'consumable_type',
        'ingredients': 'consumable_ingredients',
        'portion_size': 'consumable_portion_size',
        'nutritional_profile': 'consumable_nutritional_profile',
        'cooking_method': 'consumable_cooking_method'
    }
    
    # Select and rename columns
    df_prep = df_prep[column_mapping.keys()].rename(columns=column_mapping)
    
    # Clean and validate data
    df_prep = clean_data(df_prep)
    
    print(f"✓ Prepared {len(df_prep)} records for insertion")
    return df_prep

def clean_data(df: pd.DataFrame) -> pd.DataFrame:
    """Clean and validate data for database insertion."""
    
    # Remove rows with missing required fields
    required_fields = ['image_url', 'consumable_name']
    initial_count = len(df)
    df = df.dropna(subset=required_fields)
    dropped_count = initial_count - len(df)
    
    if dropped_count > 0:
        print(f"⚠ Dropped {dropped_count} rows with missing required fields")
    
    # Handle JSON fields - convert string representation to actual JSON
    if 'consumable_nutritional_profile' in df.columns:
        df['consumable_nutritional_profile'] = df['consumable_nutritional_profile'].apply(parse_json_field)
    
    # Clean text fields
    text_fields = ['consumable_name', 'consumable_type', 'consumable_cooking_method']
    for field in text_fields:
        if field in df.columns:
            df[field] = df[field].astype(str).str.strip()
            df[field] = df[field].replace('nan', None)
    
    return df

def parse_json_field(value) -> Dict[str, Any]:
    """Parse JSON field, handling string representations."""
    if pd.isna(value):
        return None
    
    if isinstance(value, str):
        try:
            return json.loads(value)
        except json.JSONDecodeError:
            return None
    
    return value

# Test the preparation function
df_prepared = prepare_consumable_data(df_unique_dishes)
print("\\nSample prepared data:")
print(df_prepared.head(2))

⚠ Dropped 1 rows with missing required fields
✓ Prepared 19288 records for insertion
\nSample prepared data:
                                           image_url consumable_name  \
0  https://file.b18a.io/7843322356500104680_44354...   Fried Chicken   
1  https://file.b18a.io/7833227147700100732_67487...             Pho   

   consumable_type                             consumable_ingredients  \
0  Restaurant food                       ["chicken","breading","oil"]   
1  Restaurant food  ["noodles","beef","basil","lime","green onions...   

                         consumable_portion_size  \
0                               ["chicken:300g"]   
1  ["noodles:200g","beef:100g","vegetables:50g"]   

                      consumable_nutritional_profile consumable_cooking_method  
0  {'fat_g': 25.0, 'protein_g': 30.0, 'calories_k...                    Frying  
1  {'fat_g': 15.0, 'protein_g': 25.0, 'calories_k...                    boiled  


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['consumable_nutritional_profile'] = df['consumable_nutritional_profile'].apply(parse_json_field)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df[field] = df[field].astype(str).str.strip()
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df[field] = df[field].replace('nan', None)
A value is tryin

In [38]:
## Database Connection Module

In [39]:
def get_db_connection():
    """Create database connection using environment variables."""
    try:
        connection = psycopg2.connect(
            user=os.getenv("user"),
            password=os.getenv("password"),
            host=os.getenv("host"),
            port=os.getenv("port"),
            dbname=os.getenv("dbname")
        )
        return connection
    except Exception as e:
        print(f"❌ Failed to connect to database: {e}")
        raise

def test_connection():
    """Test database connection and basic operations."""
    try:
        with get_db_connection() as conn:
            with conn.cursor() as cursor:
                # Test basic query
                cursor.execute("SELECT NOW();")
                result = cursor.fetchone()
                
                # Check if consumable table exists
                cursor.execute("""
                    SELECT EXISTS (
                        SELECT FROM information_schema.tables 
                        WHERE table_name = 'consumable'
                    );
                """)
                table_exists = cursor.fetchone()[0]
                
                if table_exists:
                    # Get current record count
                    cursor.execute("SELECT COUNT(*) FROM consumable;")
                    count = cursor.fetchone()[0]
                    print(f"✓ Connected to database. Consumable table has {count:,} records")
                else:
                    print("⚠ Consumable table does not exist")
                
                return table_exists
                
    except Exception as e:
        print(f"❌ Connection test failed: {e}")
        return False

# Test the connection
connection_ok = test_connection()

✓ Connected to database. Consumable table has 1 records


In [40]:
df_prepared.iloc[0]

image_url                         https://file.b18a.io/7843322356500104680_44354...
consumable_name                                                       Fried Chicken
consumable_type                                                     Restaurant food
consumable_ingredients                                 ["chicken","breading","oil"]
consumable_portion_size                                            ["chicken:300g"]
consumable_nutritional_profile    {'fat_g': 25.0, 'protein_g': 30.0, 'calories_k...
consumable_cooking_method                                                    Frying
Name: 0, dtype: object

## Insertion Module

In [41]:
# Simple single record insertion test
def insert_single_record():
    """Insert a single test record to verify the schema works."""
    if not connection_ok:
        raise Exception("Database connection not available")
    
    # Get first record from prepared data
    test_record = df_prepared.iloc[0]
    
    sql = """
    INSERT INTO consumable (
        image_url, consumable_name, consumable_type, 
        consumable_ingredients, consumable_portion_size, 
        consumable_nutritional_profile, consumable_cooking_method
    ) VALUES (%s, %s, %s, %s, %s, %s, %s)
    """
    
    try:
        with get_db_connection() as conn:
            with conn.cursor() as cursor:
                # Prepare data
                nutrition = test_record['consumable_nutritional_profile']
                if nutrition is not None:
                    nutrition = json.dumps(nutrition) if not isinstance(nutrition, str) else nutrition
                
                # Execute insert
                cursor.execute(sql, (
                    test_record['image_url'],
                    test_record['consumable_name'],
                    test_record['consumable_type'],
                    test_record['consumable_ingredients'],
                    test_record['consumable_portion_size'],
                    nutrition,
                    test_record['consumable_cooking_method']
                ))
                
                conn.commit()
                print(f"✓ Successfully inserted record")
                print(f"✓ Record name: {test_record['consumable_name']}")
                
                # Verify insertion
                cursor.execute("SELECT COUNT(*) FROM consumable")
                count = cursor.fetchone()[0]
                print(f"✓ Total records in database: {count}")
                
    except Exception as e:
        print(f"❌ Failed to insert record: {e}")
        raise

# Test single record insertion
insert_single_record()

✓ Successfully inserted record
✓ Record name: Fried Chicken
✓ Total records in database: 2


In [42]:
# Batch insertion module
def insert_consumables_batch(df: pd.DataFrame, batch_size: int = 1000) -> Dict[str, int]:
    """
    Insert consumables data in batches with progress tracking.
    
    Args:
        df: Prepared DataFrame with consumable data
        batch_size: Number of records per batch
        
    Returns:
        Dictionary with insertion statistics
    """
    if not connection_ok:
        raise Exception("Database connection not available")
    
    # Prepare SQL statement (no ID column - auto-increment)
    sql = """
    INSERT INTO consumable (
        image_url, consumable_name, consumable_type, 
        consumable_ingredients, consumable_portion_size, 
        consumable_nutritional_profile, consumable_cooking_method
    ) VALUES %s
    """
    
    stats = {
        'total_records': len(df),
        'successful_batches': 0,
        'failed_batches': 0,
        'total_inserted': 0,
        'errors': []
    }
    
    total_batches = (len(df) + batch_size - 1) // batch_size
    
    with get_db_connection() as conn:
        with conn.cursor() as cursor:
            # Process data in batches with progress bar
            pbar = tqdm(total=len(df), desc="Inserting records", unit="records")
            
            for i in range(0, len(df), batch_size):
                batch_df = df.iloc[i:i+batch_size]
                
                try:
                    # Prepare batch data (no ID - auto-increment)
                    batch_data = []
                    for _, row in batch_df.iterrows():
                        # Convert nutritional profile to JSON string for PostgreSQL
                        nutrition = row['consumable_nutritional_profile']
                        if nutrition is not None:
                            nutrition = json.dumps(nutrition) if not isinstance(nutrition, str) else nutrition
                        
                        batch_data.append((
                            row['image_url'],
                            row['consumable_name'],
                            row['consumable_type'],
                            row['consumable_ingredients'],
                            row['consumable_portion_size'],
                            nutrition,
                            row['consumable_cooking_method']
                        ))
                    
                    # Execute batch insert
                    from psycopg2.extras import execute_values
                    execute_values(cursor, sql, batch_data, template=None, page_size=batch_size)
                    
                    stats['successful_batches'] += 1
                    stats['total_inserted'] += len(batch_data)
                    
                    pbar.update(len(batch_data))
                
                except Exception as e:
                    stats['failed_batches'] += 1
                    stats['errors'].append(f"Batch failed: {str(e)}")
                    pbar.update(len(batch_data))
                    # Continue with next batch instead of breaking
                    continue
            
            pbar.close()
            
            # Commit all changes
            conn.commit()
            print(f"✓ Insertion completed: {stats['total_inserted']:,}/{stats['total_records']:,} records")
            if stats['failed_batches'] > 0:
                print(f"⚠ {stats['failed_batches']} batches failed")
    
    return stats

# Function to preview what will be inserted
def preview_insertion(df: pd.DataFrame, num_rows: int = 3):
    """Preview the data that will be inserted."""
    print(f"Preview of {num_rows} records to be inserted:")
    print("=" * 50)
    
    for i, (_, row) in enumerate(df.head(num_rows).iterrows()):
        print(f"{i+1}. {row['consumable_name']} ({row['consumable_type']})")
        if row['consumable_cooking_method']:
            print(f"   Cooking: {row['consumable_cooking_method']}")

# Preview the prepared data
preview_insertion(df_prepared, num_rows=3)

Preview of 3 records to be inserted:
1. Fried Chicken (Restaurant food)
   Cooking: Frying
2. Pho (Restaurant food)
   Cooking: boiled
3. Pan-fried Dumplings (Restaurant food)
   Cooking: Pan-frying


In [43]:
# Execute batch insertion
# Run this to insert all remaining records (skip first one already inserted)
df_to_insert = df_prepared.iloc[1:]  # Skip first record already inserted

print(f"About to insert {len(df_to_insert):,} records in batches...")
print("This may take several minutes...")

# Execute the batch insertion
insertion_stats = insert_consumables_batch(df_to_insert, batch_size=500)

# Display final statistics
print("\n" + "="*50)
print("FINAL INSERTION STATISTICS")
print("="*50)
for key, value in insertion_stats.items():
    if key == 'errors' and value:
        print(f"{key}: {len(value)} errors")
        for error in value[:5]:  # Show first 5 errors
            print(f"  - {error}")
        if len(value) > 5:
            print(f"  ... and {len(value) - 5} more errors")
    else:
        print(f"{key}: {value}")

# Final verification
if connection_ok:
    with get_db_connection() as conn:
        with conn.cursor() as cursor:
            cursor.execute("SELECT COUNT(*) FROM consumable")
            final_count = cursor.fetchone()[0]
            print(f"\n✓ Final database count: {final_count:,} records")

About to insert 19,287 records in batches...
This may take several minutes...


Inserting records: 100%|██████████| 19287/19287 [00:04<00:00, 4337.09records/s]


✓ Insertion completed: 19,287/19,287 records

FINAL INSERTION STATISTICS
total_records: 19287
successful_batches: 39
failed_batches: 0
total_inserted: 19287
errors: []

✓ Final database count: 19,287 records
