In [1]:
import os
import pandas as pd
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
from torchvision import transforms, models
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, r2_score

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# DEVICE = torch.device("mps" if torch.mps.is_available() else "cpu")

In [3]:
# LOCAL_BASE_DIR = "/Users/georgiikuznetsov/Desktop/coding/CNN_nutrition/nutrition5k"
LOCAL_BASE_DIR = "/users/eleves-b/2023/georgii.kuznetsov/CNN_nutrition/nutrition5k"

IMAGERY_DIR = os.path.join(LOCAL_BASE_DIR, "imagery/realsense_overhead")
METADATA_FILE_CAFE1 = os.path.join(LOCAL_BASE_DIR, "metadata/dish_metadata_cafe1.csv")
METADATA_FILE_CAFE2 = os.path.join(LOCAL_BASE_DIR, "metadata/dish_metadata_cafe2.csv")

assert(os.path.exists(LOCAL_BASE_DIR))
assert(os.path.exists(IMAGERY_DIR))
assert(os.path.exists(METADATA_FILE_CAFE1))
assert(os.path.exists(METADATA_FILE_CAFE2))

RGB_IMAGE_FILENAME = "rgb.png" 

BATCH_SIZE = 64
LEARNING_RATE = 1e-4
NUM_EPOCHS = 10
TARGET_COLUMNS = ['calories', 'weight', 'fat', 'carbs', 'protein']

In [4]:
# Cell 3: Simplified Multi-Task CNN Model
class SimpleNutritionNet(nn.Module):
    def __init__(self, num_outputs=5, backbone='resnet34', pretrained=True):
        super(SimpleNutritionNet, self).__init__()
        
        # Use a pre-trained ResNet as backbone (simpler than InceptionV2)
        if backbone == 'resnet34':
            self.backbone = models.resnet34(pretrained=pretrained)
            num_features = 512
        elif backbone == 'resnet50':
            self.backbone = models.resnet50(pretrained=pretrained)
            num_features = 2048
        else:
            raise ValueError(f"Unsupported backbone: {backbone}")
        
        # Remove the final classification layer
        self.backbone = nn.Sequential(*list(self.backbone.children())[:-1])
        
        # Multi-task heads - one for each nutritional component
        self.shared_fc = nn.Sequential(
            nn.Linear(num_features, 256),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Dropout(0.3)
        )
        
        # Individual heads for each target
        self.calorie_head = nn.Linear(128, 1)
        self.weight_head = nn.Linear(128, 1)
        self.fat_head = nn.Linear(128, 1)
        self.carb_head = nn.Linear(128, 1)
        self.protein_head = nn.Linear(128, 1)
        
    def forward(self, x):
        # Extract features using backbone
        features = self.backbone(x)
        features = features.view(features.size(0), -1)
        
        # Shared layers
        shared = self.shared_fc(features)
        
        # Task-specific predictions
        calories = self.calorie_head(shared)
        weight = self.weight_head(shared)
        fat = self.fat_head(shared)
        carbs = self.carb_head(shared)
        protein = self.protein_head(shared)
        
        # Stack predictions in the order of TARGET_COLUMNS
        return torch.cat([calories, weight, fat, carbs, protein], dim=1)

In [5]:
class MultiTaskLoss(nn.Module):
    def __init__(self, task_weights=None):
        super(MultiTaskLoss, self).__init__()
        if task_weights is None:
            self.task_weights = torch.ones(5)
        else:
            self.task_weights = torch.tensor(task_weights, dtype=torch.float32)
    
    def forward(self, predictions, targets):
        # Calculate MAE for each task
        losses = torch.abs(predictions - targets)
        
        # Ensure task weights are on the same device
        if self.task_weights.device != predictions.device:
            self.task_weights = self.task_weights.to(predictions.device)
        
        # Weight the losses
        weighted_losses = losses * self.task_weights
        
        # Return mean loss
        return weighted_losses.mean()

In [6]:
def parse_nutrition_csv(file_path):
    dishes = []
    ingredients_list = []
    
    with open(file_path, 'r') as f:
        for line in f:
            parts = line.strip().split(',')
            if not parts[0].startswith('dish_'):
                continue
                
            dish_id = parts[0]
            dish_calories = float(parts[1])
            dish_weight = float(parts[2])
            dish_fat = float(parts[3])
            dish_carbs = float(parts[4])
            dish_protein = float(parts[5])
            
            dishes.append({
                'dish_id': dish_id,
                'calories': dish_calories,
                'weight': dish_weight,
                'fat': dish_fat,
                'carbs': dish_carbs,
                'protein': dish_protein
            })
            
            # Extract ingredients (in groups of 7 fields)
            ingredient_data = parts[6:]
            i = 0
            while i < len(ingredient_data) - 6:
                if not ingredient_data[i].startswith('ingr_'):
                    break
                    
                ingredients_list.append({
                    'dish_id': dish_id,
                    'ingredient_id': ingredient_data[i],
                    'ingredient_name': ingredient_data[i+1],
                    'amount': float(ingredient_data[i+2]),
                    'calories': float(ingredient_data[i+3]),
                    'fat': float(ingredient_data[i+4]),
                    'carbs': float(ingredient_data[i+5]),
                    'protein': float(ingredient_data[i+6])
                })
                i += 7
    
    # Create two separate dataframes
    dish_df = pd.DataFrame(dishes)
    ingredient_df = pd.DataFrame(ingredients_list)
    
    return dish_df, ingredient_df

# Load and combine metadata
dish_df_cafe1, _ = parse_nutrition_csv(METADATA_FILE_CAFE1)
dish_df_cafe2, _ = parse_nutrition_csv(METADATA_FILE_CAFE2)
dish_metadata_df = pd.concat([dish_df_cafe1, dish_df_cafe2], ignore_index=True)

# Filter for dishes with available images
available_dishes = [d for d in os.listdir(IMAGERY_DIR) 
                   if os.path.isdir(os.path.join(IMAGERY_DIR, d)) and 
                   os.path.exists(os.path.join(IMAGERY_DIR, d, "rgb.png"))]
filtered_metadata = dish_metadata_df[dish_metadata_df['dish_id'].isin(available_dishes)]

# Remove any rows with NaN values in target columns
filtered_metadata = filtered_metadata.dropna(subset=TARGET_COLUMNS)

print(f"Found {len(filtered_metadata)} dishes with both metadata and images")

In [7]:

# Load and combine metadata
dish_df_cafe1, _ = parse_nutrition_csv(METADATA_FILE_CAFE1)
dish_df_cafe2, _ = parse_nutrition_csv(METADATA_FILE_CAFE2)
dish_metadata_df = pd.concat([dish_df_cafe1, dish_df_cafe2], ignore_index=True)

# Filter for dishes with available images
available_dishes = [d for d in os.listdir(IMAGERY_DIR) 
                   if os.path.exists(os.path.join(IMAGERY_DIR, d, "rgb.png"))]
filtered_metadata = dish_metadata_df[dish_metadata_df['dish_id'].isin(available_dishes)]

print(f"Found {len(filtered_metadata)} dishes with both metadata and images")

In [8]:
class NutritionDataset(Dataset):
    def __init__(self, dish_ids, labels, imagery_dir, transform=None):
        self.dish_ids = dish_ids
        self.labels = labels
        self.imagery_dir = imagery_dir
        self.transform = transform
    
    def __len__(self):
        return len(self.dish_ids)
    
    def __getitem__(self, idx):
        dish_id = self.dish_ids[idx]
        
        # Load RGB image
        img_path = os.path.join(self.imagery_dir, dish_id, "rgb.png")
        image = Image.open(img_path).convert("RGB")
        
        if self.transform:
            image = self.transform(image)
        
        # Get labels
        label = torch.tensor(self.labels[idx], dtype=torch.float32)
        
        return image, label

# Define transforms
train_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.RandomCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

val_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

In [9]:
# Prepare datasets and dataloaders - Fixed for macOS
dish_ids = filtered_metadata['dish_id'].tolist()
labels = filtered_metadata[TARGET_COLUMNS].values.astype(np.float32)

# Split data
train_ids, val_ids, train_labels, val_labels = train_test_split(
    dish_ids, labels, test_size=0.2, random_state=42
)

# Create datasets
train_dataset = NutritionDataset(train_ids, train_labels, IMAGERY_DIR, train_transform)
val_dataset = NutritionDataset(val_ids, val_labels, IMAGERY_DIR, val_transform)

# Create dataloaders - Set num_workers=0 to avoid multiprocessing issues
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=0)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)

print(f"Training samples: {len(train_dataset)}")
print(f"Validation samples: {len(val_dataset)}")

In [10]:
# Cell 8: Initialize model, loss, and optimizer
model = SimpleNutritionNet(num_outputs=5, backbone='resnet34').to(DEVICE)

# Use multi-task loss with task-specific weights
# Weights can be adjusted based on the scale of each target
task_weights = [1.0, 1.0, 1.0, 1.0, 1.0]  # Equal weights initially
criterion = MultiTaskLoss(task_weights)

optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=5, factor=0.5)

In [11]:
# Cell 9: Enhanced training and validation functions with progress tracking
from tqdm import tqdm
import time

def train_epoch(model, loader, criterion, optimizer, device):
    model.train()
    total_loss = 0
    batch_losses = []
    
    # Create progress bar
    pbar = tqdm(loader, desc='Training', leave=False)
    
    for batch_idx, (images, labels) in enumerate(pbar):
        images, labels = images.to(device), labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        batch_loss = loss.item()
        total_loss += batch_loss
        batch_losses.append(batch_loss)
        
        # Update progress bar
        pbar.set_postfix({
            'loss': f'{batch_loss:.4f}',
            'avg_loss': f'{np.mean(batch_losses):.4f}'
        })
    
    pbar.close()
    return total_loss / len(loader)

def validate(model, loader, criterion, device):
    model.eval()
    total_loss = 0
    all_predictions = []
    all_labels = []
    
    # Create progress bar
    pbar = tqdm(loader, desc='Validating', leave=False)
    
    with torch.no_grad():
        for images, labels in pbar:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            
            batch_loss = loss.item()
            total_loss += batch_loss
            
            all_predictions.append(outputs.cpu().numpy())
            all_labels.append(labels.cpu().numpy())
            
            # Update progress bar
            pbar.set_postfix({'loss': f'{batch_loss:.4f}'})
    
    pbar.close()
    
    predictions = np.concatenate(all_predictions)
    labels = np.concatenate(all_labels)
    
    # Calculate percentage errors
    percentage_errors = {}
    for i, col in enumerate(TARGET_COLUMNS):
        mae = mean_absolute_error(labels[:, i], predictions[:, i])
        mean_val = labels[:, i].mean()
        percentage_error = (mae / mean_val) * 100 if mean_val != 0 else 0
        percentage_errors[col] = percentage_error
    
    return total_loss / len(loader), percentage_errors, predictions, labels

In [72]:
# Cell 10: Verbose Training loop with detailed progress tracking

best_val_loss = float('inf')
history = {'train_loss': [], 'val_loss': [], 'percentage_errors': [], 'lr': []}

# Print training configuration
print("=" * 60)
print("TRAINING CONFIGURATION")
print("=" * 60)
print(f"Model: SimpleNutritionNet with {model.backbone.__class__.__name__} backbone")
print(f"Device: {DEVICE}")
print(f"Batch Size: {BATCH_SIZE}")
print(f"Learning Rate: {LEARNING_RATE}")
print(f"Number of Epochs: {NUM_EPOCHS}")
print(f"Training Samples: {len(train_dataset)}")
print(f"Validation Samples: {len(val_dataset)}")
print(f"Target Columns: {TARGET_COLUMNS}")
print("=" * 60)
print()

try:
    for epoch in range(NUM_EPOCHS):
        epoch_start_time = time.time()
        
        # Get current learning rate
        current_lr = optimizer.param_groups[0]['lr']
        
        print(f"\nEPOCH {epoch+1}/{NUM_EPOCHS} | LR: {current_lr:.6f}")
        print("-" * 60)
        
        # Train
        print("Training phase:")
        train_loss = train_epoch(model, train_loader, criterion, optimizer, DEVICE)
        
        # Validate
        print("\nValidation phase:")
        val_loss, percentage_errors, predictions, labels = validate(
            model, val_loader, criterion, DEVICE
        )
        
        # Calculate epoch time
        epoch_time = time.time() - epoch_start_time
        
        # Update scheduler
        old_lr = optimizer.param_groups[0]['lr']
        scheduler.step(val_loss)
        new_lr = optimizer.param_groups[0]['lr']
        
        # Save history
        history['train_loss'].append(train_loss)
        history['val_loss'].append(val_loss)
        history['percentage_errors'].append(percentage_errors)
        history['lr'].append(current_lr)
        
        # Print detailed results
        print(f"\n{'='*60}")
        print(f"EPOCH {epoch+1} RESULTS:")
        print(f"{'='*60}")
        print(f"Train Loss: {train_loss:.4f}")
        print(f"Val Loss: {val_loss:.4f}")
        print(f"Epoch Time: {epoch_time:.2f} seconds")
        
        # Calculate improvement
        if epoch > 0:
            train_improvement = (history['train_loss'][-2] - train_loss) / history['train_loss'][-2] * 100
            val_improvement = (history['val_loss'][-2] - val_loss) / history['val_loss'][-2] * 100
            print(f"Train Loss Change: {train_improvement:+.2f}%")
            print(f"Val Loss Change: {val_improvement:+.2f}%")
        
        print("\nPERCENTAGE ERRORS BY NUTRIENT:")
        print("-" * 40)
        for nutrient, error in percentage_errors.items():
            # Show trend if we have history
            if len(history['percentage_errors']) > 1:
                prev_error = history['percentage_errors'][-2][nutrient]
                change = error - prev_error
                print(f"  {nutrient:12s}: {error:6.2f}% ({change:+.2f}%)")
            else:
                print(f"  {nutrient:12s}: {error:6.2f}%")
        
        # Calculate average percentage error
        avg_percentage_error = np.mean(list(percentage_errors.values()))
        print(f"  {'Average':12s}: {avg_percentage_error:6.2f}%")
        
        # Save best model
        if val_loss < best_val_loss:
            improvement_pct = (best_val_loss - val_loss) / best_val_loss * 100 if best_val_loss != float('inf') else 100
            best_val_loss = val_loss
            torch.save(model.state_dict(), 'best_nutrition_model.pth')
            print(f"\n✓ NEW BEST MODEL SAVED! (Improvement: {improvement_pct:.2f}%)")
        else:
            epochs_since_best = epoch - history['val_loss'].index(min(history['val_loss']))
            print(f"\n  No improvement for {epochs_since_best} epoch(s)")
        
        # Check if learning rate changed
        if old_lr != new_lr:
            print(f"\n⚡ Learning rate reduced: {old_lr:.6f} → {new_lr:.6f}")
        
        print("=" * 60)
        
except KeyboardInterrupt:
    print("\n\n⚠️  Training interrupted by user!")
    print(f"Completed {epoch}/{NUM_EPOCHS} epochs")
    
except Exception as e:
    print(f"\n\n❌ Error during training: {e}")
    print(f"Failed at epoch: {epoch+1}")
    raise

finally:
    print("\n" + "="*60)
    print("TRAINING SUMMARY")
    print("="*60)
    if history['train_loss']:
        print(f"Final Train Loss: {history['train_loss'][-1]:.4f}")
        print(f"Final Val Loss: {history['val_loss'][-1]:.4f}")
        print(f"Best Val Loss: {best_val_loss:.4f}")
        print(f"Total Epochs Completed: {len(history['train_loss'])}")
        
        # Final percentage errors
        if history['percentage_errors']:
            print("\nFinal Percentage Errors:")
            final_errors = history['percentage_errors'][-1]
            for nutrient, error in final_errors.items():
                print(f"  {nutrient}: {error:.2f}%")

In [73]:
# Cell 10-continue
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))

# Loss plot
ax1.plot(history['train_loss'], label='Train Loss')
ax1.plot(history['val_loss'], label='Val Loss')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Loss')
ax1.set_title('Training and Validation Loss')
ax1.legend()

# Percentage error plot
percentage_df = pd.DataFrame(history['percentage_errors'])
for col in percentage_df.columns:
    ax2.plot(percentage_df[col], label=col)
ax2.set_xlabel('Epoch')
ax2.set_ylabel('Percentage Error (%)')
ax2.set_title('Percentage Errors by Nutrient')
ax2.legend()

plt.tight_layout()
plt.show()

In [76]:
# Cell 11: Load best model and evaluate performance
import seaborn as sns
from sklearn.metrics import r2_score, mean_squared_error

# Load the best model
model.load_state_dict(torch.load('best_nutrition_model.pth'))
model.eval()

# Get predictions on validation set
print("Evaluating model on validation set...")
all_predictions = []
all_labels = []
all_dish_ids = []

with torch.no_grad():
    for i, (images, labels) in enumerate(tqdm(val_loader, desc='Predicting')):
        images = images.to(DEVICE)
        outputs = model(images)
        
        all_predictions.append(outputs.cpu().numpy())
        all_labels.append(labels.numpy())
        
        # Get dish IDs for this batch
        batch_start = i * BATCH_SIZE
        batch_end = min(batch_start + BATCH_SIZE, len(val_ids))
        all_dish_ids.extend(val_ids[batch_start:batch_end])

predictions = np.concatenate(all_predictions)
labels = np.concatenate(all_labels)

# Create DataFrame for easier analysis
results_df = pd.DataFrame({
    'dish_id': all_dish_ids[:len(predictions)],
    'calories_pred': predictions[:, 0],
    'calories_true': labels[:, 0],
    'weight_pred': predictions[:, 1],
    'weight_true': labels[:, 1],
    'fat_pred': predictions[:, 2],
    'fat_true': labels[:, 2],
    'carbs_pred': predictions[:, 3],
    'carbs_true': labels[:, 3],
    'protein_pred': predictions[:, 4],
    'protein_true': labels[:, 4]
})

print(f"\nPredictions completed for {len(results_df)} samples")

In [77]:
# Cell 12: Calculate detailed metrics
def calculate_metrics(true_values, pred_values, name):
    mae = mean_absolute_error(true_values, pred_values)
    mse = mean_squared_error(true_values, pred_values)
    rmse = np.sqrt(mse)
    r2 = r2_score(true_values, pred_values)
    
    # Percentage error
    mean_true = np.mean(true_values)
    percentage_error = (mae / mean_true) * 100 if mean_true != 0 else 0
    
    # Mean Absolute Percentage Error (MAPE) - per sample
    mape = np.mean(np.abs((true_values - pred_values) / true_values)) * 100
    
    return {
        'nutrient': name,
        'MAE': mae,
        'RMSE': rmse,
        'R²': r2,
        'Percentage Error': percentage_error,
        'MAPE': mape,
        'Mean True': mean_true,
        'Mean Pred': np.mean(pred_values)
    }

# Calculate metrics for each nutrient
metrics_list = []
for nutrient in TARGET_COLUMNS:
    true_col = f'{nutrient}_true'
    pred_col = f'{nutrient}_pred'
    metrics = calculate_metrics(
        results_df[true_col].values,
        results_df[pred_col].values,
        nutrient
    )
    metrics_list.append(metrics)

metrics_df = pd.DataFrame(metrics_list)

# Display metrics table
print("\n" + "="*80)
print("MODEL PERFORMANCE METRICS")
print("="*80)
print(metrics_df.to_string(index=False, float_format='%.3f'))
print("="*80)

# Highlight best and worst performing nutrients
best_nutrient = metrics_df.loc[metrics_df['Percentage Error'].idxmin(), 'nutrient']
worst_nutrient = metrics_df.loc[metrics_df['Percentage Error'].idxmax(), 'nutrient']
print(f"\n✓ Best prediction: {best_nutrient} ({metrics_df.loc[metrics_df['nutrient']==best_nutrient, 'Percentage Error'].values[0]:.2f}% error)")
print(f"✗ Worst prediction: {worst_nutrient} ({metrics_df.loc[metrics_df['nutrient']==worst_nutrient, 'Percentage Error'].values[0]:.2f}% error)")

In [78]:
# Cell 13: Visualize predictions vs actual values
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
axes = axes.flatten()

for i, nutrient in enumerate(TARGET_COLUMNS):
    ax = axes[i]
    true_col = f'{nutrient}_true'
    pred_col = f'{nutrient}_pred'
    
    # Get data
    x = results_df[true_col].values
    y = results_df[pred_col].values
    
    # Create scatter plot
    ax.scatter(x, y, alpha=0.5, s=50, edgecolors='k', linewidth=0.5)
    
    # Add perfect prediction line
    min_val = min(x.min(), y.min())
    max_val = max(x.max(), y.max())
    ax.plot([min_val, max_val], [min_val, max_val], 'r--', lw=2, label='Perfect Prediction')
    
    # Add trend line
    z = np.polyfit(x, y, 1)
    p = np.poly1d(z)
    ax.plot(x, p(x), "b-", alpha=0.8, label=f'Trend: y={z[0]:.2f}x+{z[1]:.2f}')
    
    # Calculate R²
    r2 = r2_score(x, y)
    
    # Labels and title
    ax.set_xlabel(f'True {nutrient.capitalize()}')
    ax.set_ylabel(f'Predicted {nutrient.capitalize()}')
    ax.set_title(f'{nutrient.capitalize()} (R² = {r2:.3f})')
    ax.legend()
    ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.suptitle('Predictions vs True Values', fontsize=16, y=1.02)
plt.show()

In [79]:
# Cell 14: Error distribution analysis
fig, axes = plt.subplots(2, 3, figsize=(18, 10))
axes = axes.flatten()

for i, nutrient in enumerate(TARGET_COLUMNS):
    ax = axes[i]
    true_col = f'{nutrient}_true'
    pred_col = f'{nutrient}_pred'
    
    # Calculate errors
    errors = results_df[pred_col] - results_df[true_col]
    relative_errors = (errors / results_df[true_col]) * 100
    
    # Remove outliers for better visualization
    q1 = relative_errors.quantile(0.25)
    q3 = relative_errors.quantile(0.75)
    iqr = q3 - q1
    lower_bound = q1 - 1.5 * iqr
    upper_bound = q3 + 1.5 * iqr
    filtered_errors = relative_errors[(relative_errors >= lower_bound) & (relative_errors <= upper_bound)]
    
    # Create histogram
    ax.hist(filtered_errors, bins=30, alpha=0.7, color='skyblue', edgecolor='black')
    ax.axvline(x=0, color='red', linestyle='--', linewidth=2, label='Zero Error')
    ax.axvline(x=filtered_errors.mean(), color='green', linestyle='-', linewidth=2, 
               label=f'Mean: {filtered_errors.mean():.1f}%')
    
    # Labels
    ax.set_xlabel('Relative Error (%)')
    ax.set_ylabel('Frequency')
    ax.set_title(f'{nutrient.capitalize()} Error Distribution')
    ax.legend()
    ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

In [80]:
# Cell 15: Show sample predictions with images
def show_predictions_with_images(n_samples=6):
    # Get random samples
    sample_indices = np.random.choice(len(results_df), n_samples, replace=False)
    
    fig, axes = plt.subplots(2, 3, figsize=(15, 10))
    axes = axes.flatten()
    
    for idx, ax in enumerate(axes):
        if idx >= n_samples:
            ax.axis('off')
            continue
            
        sample_idx = sample_indices[idx]
        dish_id = results_df.iloc[sample_idx]['dish_id']
        
        # Load and display image
        img_path = os.path.join(IMAGERY_DIR, dish_id, "rgb.png")
        img = Image.open(img_path)
        ax.imshow(img)
        ax.axis('off')
        
        # Create prediction text
        pred_text = "Predicted:\n"
        true_text = "Actual:\n"
        
        for nutrient in TARGET_COLUMNS:
            pred_val = results_df.iloc[sample_idx][f'{nutrient}_pred']
            true_val = results_df.iloc[sample_idx][f'{nutrient}_true']
            error = abs(pred_val - true_val) / true_val * 100
            
            if nutrient == 'calories':
                pred_text += f"Cal: {pred_val:.0f}\n"
                true_text += f"Cal: {true_val:.0f} ({error:.1f}%)\n"
            elif nutrient == 'weight':
                pred_text += f"Weight: {pred_val:.0f}g\n"
                true_text += f"Weight: {true_val:.0f}g ({error:.1f}%)\n"
            else:
                pred_text += f"{nutrient.capitalize()}: {pred_val:.1f}g\n"
                true_text += f"{nutrient.capitalize()}: {true_val:.1f}g ({error:.1f}%)\n"
        
        # Add text to image
        ax.text(0.02, 0.98, pred_text, transform=ax.transAxes, 
                verticalalignment='top', bbox=dict(boxstyle='round', 
                facecolor='lightblue', alpha=0.8), fontsize=9)
        ax.text(0.98, 0.98, true_text, transform=ax.transAxes, 
                verticalalignment='top', horizontalalignment='right',
                bbox=dict(boxstyle='round', facecolor='lightgreen', alpha=0.8), fontsize=9)
        
        ax.set_title(f"Dish: {dish_id}", fontsize=10)
    
    plt.tight_layout()
    plt.suptitle('Sample Predictions vs Ground Truth', fontsize=14, y=1.02)
    plt.show()

show_predictions_with_images(6)

In [81]:
# Cell 16: Analyze prediction quality by nutritional characteristics
# Analyze errors by meal size
results_df['meal_size'] = pd.cut(results_df['weight_true'], 
                                  bins=[0, 100, 200, 300, float('inf')],
                                  labels=['Small (<100g)', 'Medium (100-200g)', 
                                         'Large (200-300g)', 'Very Large (>300g)'])

# Calculate percentage errors
for nutrient in TARGET_COLUMNS:
    results_df[f'{nutrient}_error_pct'] = abs(
        (results_df[f'{nutrient}_pred'] - results_df[f'{nutrient}_true']) / 
        results_df[f'{nutrient}_true']
    ) * 100

# Analyze by meal size
print("\nERROR ANALYSIS BY MEAL SIZE")
print("="*60)
error_by_size = results_df.groupby('meal_size')[[f'{n}_error_pct' for n in TARGET_COLUMNS]].mean()
print(error_by_size.round(2))

# Analyze by calorie density
results_df['calorie_density'] = results_df['calories_true'] / results_df['weight_true']
results_df['density_category'] = pd.cut(results_df['calorie_density'],
                                        bins=[0, 1, 2, 3, float('inf')],
                                        labels=['Low (<1)', 'Medium (1-2)', 
                                               'High (2-3)', 'Very High (>3)'])

print("\n\nERROR ANALYSIS BY CALORIE DENSITY")
print("="*60)
error_by_density = results_df.groupby('density_category')[[f'{n}_error_pct' for n in TARGET_COLUMNS]].mean()
print(error_by_density.round(2))

In [82]:
# Cell 17: Create confusion matrix for macronutrient ratios
# Calculate macronutrient percentages
def calculate_macro_percentages(df, suffix):
    total_macros = df[f'fat_{suffix}'] + df[f'carbs_{suffix}'] + df[f'protein_{suffix}']
    df[f'fat_pct_{suffix}'] = (df[f'fat_{suffix}'] / total_macros) * 100
    df[f'carbs_pct_{suffix}'] = (df[f'carbs_{suffix}'] / total_macros) * 100
    df[f'protein_pct_{suffix}'] = (df[f'protein_{suffix}'] / total_macros) * 100
    return df

results_df = calculate_macro_percentages(results_df, 'true')
results_df = calculate_macro_percentages(results_df, 'pred')

# Visualize macronutrient prediction accuracy
fig, axes = plt.subplots(1, 3, figsize=(15, 5))

for i, macro in enumerate(['fat', 'carbs', 'protein']):
    ax = axes[i]
    
    # Create 2D histogram
    h = ax.hist2d(results_df[f'{macro}_pct_true'], 
                  results_df[f'{macro}_pct_pred'],
                  bins=20, cmap='Blues')
    
    # Add colorbar
    plt.colorbar(h[3], ax=ax, label='Count')
    
    # Add perfect prediction line
    ax.plot([0, 100], [0, 100], 'r--', lw=2)
    
    # Labels
    ax.set_xlabel(f'True {macro.capitalize()} %')
    ax.set_ylabel(f'Predicted {macro.capitalize()} %')
    ax.set_title(f'{macro.capitalize()} Percentage Prediction')
    ax.set_xlim(0, 100)
    ax.set_ylim(0, 100)

plt.tight_layout()
plt.show()

In [83]:
# Cell 18: Save detailed results and create summary report
# Save detailed predictions
results_df.to_csv('nutrition_predictions_detailed.csv', index=False)
print("Detailed predictions saved to 'nutrition_predictions_detailed.csv'")

# Create summary report
print("\n" + "="*80)
print("FINAL PERFORMANCE SUMMARY")
print("="*80)

# Overall performance
overall_mae = np.mean([metrics_df.loc[metrics_df['nutrient']==n, 'Percentage Error'].values[0] 
                      for n in TARGET_COLUMNS])
print(f"\nOverall Mean Percentage Error: {overall_mae:.2f}%")

# Performance compared to paper
print("\nComparison to Nutrition5k Paper Results:")
print("-"*40)
paper_results = {
    'calories': 26.1,
    'weight': 18.8,
    'fat': 34.2,
    'carbs': 31.9,
    'protein': 29.5
}

for nutrient in TARGET_COLUMNS:
    our_error = metrics_df.loc[metrics_df['nutrient']==nutrient, 'Percentage Error'].values[0]
    paper_error = paper_results.get(nutrient, 0)
    diff = our_error - paper_error
    
    if diff < 0:
        print(f"{nutrient:10s}: {our_error:5.1f}% (Paper: {paper_error:.1f}%) ✓ Better by {abs(diff):.1f}%")
    else:
        print(f"{nutrient:10s}: {our_error:5.1f}% (Paper: {paper_error:.1f}%) ✗ Worse by {diff:.1f}%")

# Nutritionist comparison (from paper: 41% error for professionals)
print(f"\n{'Nutritionist (Professional)':10s}: ~41% average error")
print(f"{'Our Model':10s}: {overall_mae:.1f}% average error")
if overall_mae < 41:
    print(f"\n✓ Model performs {41-overall_mae:.1f}% better than professional nutritionists!")