In [1]:
from torchvision import transforms
from PIL import Image
import os
import pandas as pd
from torch.utils.data import DataLoader, TensorDataset
from utils.preprocess import prepare_dataloaders

transform = transforms.Compose([
    transforms.Resize((299, 299)),  # Resize to Inception-compatible dimensions
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])  # Normalize if needed
])

import os


dataset_path = '../data/nutrition5k_reconstructed/'
prepared_path = './utils/data'

image_path = os.path.join(dataset_path, 'images')
train_labels = os.path.join(prepared_path, 'train_labels.csv')
val_labels= os.path.join(prepared_path, 'val_labels.csv')
test_labels = os.path.join(prepared_path, 'test_labels.csv')
mass_inputs = os.path.join(prepared_path, 'mass_inputs.csv')

train_loader, val_loader, test_loader = prepare_dataloaders(image_path, mass_inputs, train_labels, val_labels, test_labels, transform, batch_size = 32, shuffle = True, mass = True)
for (train_batch, val_batch, test_batch) in zip(train_loader, val_loader, test_loader):
    print(train_batch[0].shape, train_batch[1].shape, train_batch[2].shape)
    print(val_batch[0].shape, val_batch[1].shape, val_batch[2].shape)
    print(test_batch[0].shape, test_batch[1].shape, test_batch[2].shape)
    break

torch.Size([32, 3, 299, 299]) torch.Size([32, 1]) torch.Size([32, 4])
torch.Size([32, 3, 299, 299]) torch.Size([32, 1]) torch.Size([32, 4])
torch.Size([32, 3, 299, 299]) torch.Size([32, 1]) torch.Size([32, 4])


In [2]:
import torch
import torch.nn as nn
import torchvision.models as models

# Define the InceptionV2 backbone
class InceptionV2Backbone(nn.Module):
    def __init__(self, pretrained=True):
        super(InceptionV2Backbone, self).__init__()
        # Use InceptionV3 as a proxy for InceptionV2
        self.backbone = models.inception_v3(pretrained=pretrained, aux_logits=True)
        self.backbone.fc = nn.Identity()  # Remove the classification head

    def forward(self, x):
        # When aux_logits=True, the output is a tuple: (main_output, aux_output)
        x = self.backbone(x)
        if isinstance(x, tuple):  # Extract the main output
            x = x[0]
        return x


In [3]:
class NutritionModel(nn.Module):
    def __init__(self, num_tasks=3):
        """
        Args:
            num_tasks: Number of tasks (calories, macronutrients, and mass).
        """
        super(NutritionModel, self).__init__()
        self.backbone = InceptionV2Backbone()  # Use the corrected backbone
        
        # Shared image feature layers
        self.shared_fc1 = nn.Linear(2048, 4096)
        self.shared_fc2 = nn.Linear(4096, 4096)
        
        # Mass input processing
        self.mass_fc1 = nn.Linear(1, 128)
        self.mass_fc2 = nn.Linear(128, 256)
        
        # Task-specific heads
        self.task_heads = nn.ModuleList([
            nn.Sequential(
                nn.Linear(4096 + 256, 4096),  # Adjust input size to account for concatenation
                nn.ReLU(),
                nn.Linear(4096, 1)
            ) for _ in range(num_tasks)
        ])

    def forward(self, image, mass):
        # Process the image through the backbone
        image_features = self.backbone(image)
        image_features = nn.functional.relu(self.shared_fc1(image_features))
        image_features = nn.functional.relu(self.shared_fc2(image_features))
        
        # Process the mass input
        mass_features = nn.functional.relu(self.mass_fc1(mass))
        mass_features = nn.functional.relu(self.mass_fc2(mass_features))
        
        # Concatenate image and mass features
        combined_features = torch.cat([image_features, mass_features], dim=1)
        
        # Pass through task-specific heads
        outputs = [task_head(combined_features) for task_head in self.task_heads]
        return torch.cat(outputs, dim=1)

In [4]:
# # Define the loss function for multi-task learning
# def multi_task_loss(predictions, targets):
#     """
#     Args:
#         predictions: Tensor of shape (batch_size, num_tasks).
#         targets: Tensor of shape (batch_size, num_tasks).
#     Returns:
#         Combined loss for all tasks.
#     """
#     losses = torch.abs(predictions - targets)  # Mean Absolute Error (MAE)
#     return losses.mean()

In [None]:
def train_model(
    model, 
    dataloader, 
    val_dataloader, 
    criterion, 
    optimizer, 
    num_epochs=50, 
    save_path="best_model.pth"
):
    model = model.to(device)  # Move model to device
    current_best_loss = float('inf')

    for epoch in range(num_epochs):
        # Training phase
        model.train()
        running_loss = 0.0

        for batch in dataloader:
            # Unpack and move data to device
            images, masses, targets = (data.to(device) for data in batch)

            # Forward pass and optimization
            optimizer.zero_grad()
            outputs = model(images, masses)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        avg_train_loss = running_loss / len(dataloader)
        print(f"Epoch [{epoch + 1}/{num_epochs}] - Training Loss: {avg_train_loss:.4f}")

        # Validation phase
        model.eval()
        val_loss = 0.0

        with torch.no_grad():
            for batch in val_dataloader:
                images, masses, targets = (data.to(device) for data in batch)
                outputs = model(images, masses)
                loss = criterion(outputs, targets)
                val_loss += loss.item()

        avg_val_loss = val_loss / len(val_dataloader)
        print(f"Epoch [{epoch + 1}/{num_epochs}] - Validation Loss: {avg_val_loss:.4f}")

        # Save the best model
        if avg_val_loss < current_best_loss:
            current_best_loss = avg_val_loss
            torch.save(model.state_dict(), save_path)
            print(f"Model improved. Saved at epoch {epoch + 1} with validation loss: {avg_val_loss:.4f}")

    print("Training complete!")

        

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Define the model
model = NutritionModel(num_tasks=4)  # Predicting 4 outputs: calories, fat, carbs, protein

loss_fn = nn.MSELoss()  # Mean Squared Error (MSE) loss
optimizer = torch.optim.Adam(model.parameters(), lr=1e-5)

# Train the model (testing with 1 epoch)
train_model(model, train_loader, loss_fn, optimizer, num_epochs=1)

In [None]:
# load the model from model.pth
model = NutritionModel(num_tasks=4)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model.load_state_dict(torch.load("model.pth", map_location=device))

# Evaluate the model
from sklearn.metrics import mean_absolute_error
import numpy as np

def test_model(model, dataloader):
    """
    Tests the model and calculates MAE for each nutritional fact.
    
    Args:
        model (torch.nn.Module): Trained model to evaluate.
        dataloader (DataLoader): DataLoader for the test dataset.
    
    Returns:
        dict: MAE for each nutritional fact.
    """
    model.eval()
    all_targets = []
    all_predictions = []

    with torch.no_grad():
        for images, masses, targets in dataloader:
            # Move data to device
            images = images.to(device)
            masses = masses.to(device)
            targets = targets.to(device)

            # Make predictions
            outputs = model(images, masses)

            # Store predictions and targets
            all_predictions.append(outputs.cpu().numpy())
            all_targets.append(targets.cpu().numpy())

    # Concatenate all predictions and targets
    all_predictions = np.concatenate(all_predictions, axis=0)
    # print a few predictions
    print(all_predictions[:10])
    all_targets = np.concatenate(all_targets, axis=0)

    # Calculate MAE and MAE percentage for each nutritional fact
    results = {}
    nutritional_facts = ["calories", "fat", "carb", "protein"]
    for i, fact in enumerate(nutritional_facts):
        mae = mean_absolute_error(all_targets[:, i], all_predictions[:, i])
        mean_value = np.mean(all_targets[:, i])
        mae_percent = (mae / mean_value) * 100 if mean_value != 0 else 0
        results[fact] = {
            "MAE": mae,
            "MAE (%)": mae_percent
        }

    return results

# Example usage:
print("Evaluating the model...")
results = test_model(model, test_loader)
print("MAE Results:")
for fact, metrics in results.items():
    print(f"{fact.capitalize()}: MAE = {metrics['MAE']:.4f}, MAE (%) = {metrics['MAE (%)']:.2f}%")

  model.load_state_dict(torch.load("model.pth", map_location=device))


Evaluating the model...
[[5.2857315e+01 2.5471659e+00 3.9730988e+00 1.0896251e+00]
 [5.1330708e+01 1.5975579e+00 4.1416597e+00 9.8164767e-01]
 [6.1485149e+01 2.8130555e-01 1.2798719e+01 1.3858711e+00]
 [3.1589348e+01 6.2669200e-01 1.4806970e+00 6.3902992e-01]
 [7.8282733e+02 4.1500912e+01 4.0511253e+01 5.1395893e+01]
 [5.2753632e+02 2.7451101e+01 2.9004736e+01 3.7224117e+01]
 [2.3498866e+02 2.7696066e+00 3.1352003e+01 1.3369636e+01]
 [3.6531082e+02 1.8845421e+01 2.0279406e+01 2.7657698e+01]
 [3.3646085e+02 1.7050840e+01 1.9057165e+01 2.6817871e+01]
 [5.2395947e+02 4.7897747e+01 1.7891827e+01 2.4275877e+01]]
MAE Results:
Calories: MAE = 60.8842, MAE (%) = 23.28%
Fat: MAE = 6.2410, MAE (%) = 47.09%
Carb: MAE = 9.3452, MAE (%) = 46.14%
Protein: MAE = 7.0701, MAE (%) = 38.43%
