In [1]:
import numpy as np
import pandas as pd

import torch
from torch import nn
import torch.nn.functional as F
from torch.optim import Adam
from torchinfo import summary

## Dataset

#### Generation

In [2]:
# Synthetic dataset generation for demonstration
np.random.seed(0)

In [3]:
num_users = 10000
X_features = []    # user feature vectors
Y_meals = []       # ground-truth meal class sequences (length 6 for each user)
target_EIs = []    # target daily energy intakes for each user
min_macros = []    # minimum recommended macronutrient values for each user (based on guidelines)
max_macros = []    # maximum recommended macronutrient values for each user

In [4]:
for i in range(num_users):
    # Random user profile
    weight = np.random.uniform(50, 100)   # kg
    height = np.random.uniform(150, 200)  # cm
    BMI = weight / ((height/100)**2)
    age = np.random.randint(18, 60)
    # Basal Metabolic Rate (BMR) using Mifflin-St Jeor formula (for a male user as an example)
    BMR = 10*weight + 6.25*height - 5*age + 5
    PAL = np.random.uniform(1.2, 2.0)     # Physical Activity Level (sedentary ~1.2 to very active ~2.0)
    # Medical conditions (binary flags for presence of cardiovascular disease, type-2 diabetes, iron deficiency)
    has_CVD = np.random.rand() < 0.1      # 10% chance
    has_T2D = np.random.rand() < 0.1      # 10% chance
    has_iron_def = np.random.rand() < 0.1 # 10% chance
    
    # Compose feature vector
    user_features = [weight, height, BMI, BMR, PAL, int(has_CVD), int(has_T2D), int(has_iron_def)]
    X_features.append(user_features)
    
    # Determine target daily energy intake using factorial method (BMR * PAL) adjusted by factor D for BMI (based on nutritional guidelines)
    D = 1.0
    if BMI < 18.5:    # underweight: increase target EI to encourage weight gain
        D = 1.1
    elif BMI > 25:    # overweight: decrease target EI for weight loss
        D = 0.9
    target_EI = BMR * PAL * D
    target_EIs.append(target_EI)
    
    # Recommended macronutrient intake ranges (based on nutritional guidelines)
    # For simplicity, use fixed percentage ranges of total energy for each macro:
    # Protein: 10-35%, Carbs: 45-65%, Fat: 20-35%, SFA: 0-10% of total energy.
    min_prot, max_prot = 0.10, 0.35
    min_carb, max_carb = 0.45, 0.65
    min_fat, max_fat   = 0.20, 0.35
    min_sfa, max_sfa   = 0.00, 0.10
    # Convert these fractions to absolute amounts (grams) using energy densities (4 kcal/g for protein & carbs, 9 kcal/g for fat & SFA)
    min_prot_g = min_prot * target_EI / 4.0;   max_prot_g = max_prot * target_EI / 4.0
    min_carb_g = min_carb * target_EI / 4.0;   max_carb_g = max_carb * target_EI / 4.0
    min_fat_g  = min_fat  * target_EI / 9.0;   max_fat_g  = max_fat  * target_EI / 9.0
    min_sfa_g  = min_sfa  * target_EI / 9.0;   max_sfa_g  = max_sfa  * target_EI / 9.0
    min_macros.append([min_prot_g, min_carb_g, min_fat_g, min_sfa_g])
    max_macros.append([max_prot_g, max_carb_g, max_fat_g, max_sfa_g])
    
    # Generate a synthetic "ground truth" meal plan (sequence of 6 meal class labels) for the user.
    # We bias the meal choices based on user's BMI category for realism:
    # Underweight users get more high-calorie meals, overweight get more low-calorie meals.
    meal_classes = []
    if BMI < 18.5:
        # Underweight: 80% chance to pick a high-calorie meal (class 0-4), 20% chance low-calorie (5-9)
        for t in range(6):
            if np.random.rand() < 0.8:
                meal_classes.append(np.random.randint(0, 5))   # high-calorie meal class
            else:
                meal_classes.append(np.random.randint(5, 10))  # low-calorie meal class
    elif BMI > 25:
        # Overweight: 80% chance low-calorie meal, 20% high-calorie meal
        for t in range(6):
            if np.random.rand() < 0.8:
                meal_classes.append(np.random.randint(5, 10))  # low-calorie meal class
            else:
                meal_classes.append(np.random.randint(0, 5))   # high-calorie meal class
    else:
        # Normal weight: no strong bias, random meals
        for t in range(6):
            meal_classes.append(np.random.randint(0, 10))
    Y_meals.append(meal_classes)

In [5]:
# Convert data lists to tensors for training
X_features = torch.tensor(X_features, dtype=torch.float32)
Y_meals = torch.tensor(Y_meals, dtype=torch.long)
target_EIs = torch.tensor(target_EIs, dtype=torch.float32)
min_macros = torch.tensor(min_macros, dtype=torch.float32)
max_macros = torch.tensor(max_macros, dtype=torch.float32)

#### Exploration

In [6]:
print(
    X_features.shape,
    Y_meals.shape,
    target_EIs.shape,
    min_macros.shape,
    max_macros.shape
)

torch.Size([10000, 8]) torch.Size([10000, 6]) torch.Size([10000]) torch.Size([10000, 4]) torch.Size([10000, 4])


In [7]:
target_EIs

tensor([3462.2251, 3184.7493, 2556.8049,  ..., 2154.7561, 2250.2383,
        2484.2988])

In [8]:
# Convert tensors to numpy arrays
X_features_np = X_features.numpy()
Y_meals_np = Y_meals.numpy()
target_EIs_np = target_EIs.numpy()
min_macros_np = min_macros.numpy()
max_macros_np = max_macros.numpy()

In [9]:
feature_columns = ["weight", "height", "BMI", "BMR", "PAL", "has_CVD", "has_T2D", "has_iron_def"]
df_features = pd.DataFrame(X_features_np, columns=feature_columns)
df_features["target_EI"] = target_EIs_np

In [10]:
meal_columns = [f"meal_{i+1}" for i in range(Y_meals_np.shape[1])]
df_meals = pd.DataFrame(Y_meals_np, columns=meal_columns)

In [11]:
min_macro_columns = ["min_prot", "min_carb", "min_fat", "min_sfa"]
df_min_macros = pd.DataFrame(min_macros_np, columns=min_macro_columns)

In [12]:
max_macro_columns = ["max_prot", "max_carb", "max_fat", "max_sfa"]
df_max_macros = pd.DataFrame(max_macros_np, columns=max_macro_columns)

In [13]:
df_full = pd.concat([df_features, df_meals, df_min_macros, df_max_macros], axis=1)
df_full.head()

Unnamed: 0,weight,height,BMI,BMR,PAL,has_CVD,has_T2D,has_iron_def,target_EI,meal_1,...,meal_5,meal_6,min_prot,min_carb,min_fat,min_sfa,max_prot,max_carb,max_fat,max_sfa
0,77.440674,185.759476,22.442291,1835.403442,1.886356,0.0,0.0,0.0,3462.225098,7,...,1,6,86.555626,389.500336,76.938339,0.0,302.944702,562.611572,134.64209,38.46917
1,90.608437,173.998856,29.927872,1823.577271,1.940477,1.0,1.0,1.0,3184.749268,0,...,6,2,79.618729,358.284302,70.772202,0.0,278.665558,517.521729,123.851357,35.386101
2,70.733093,163.227783,26.548166,1587.504639,1.789535,0.0,0.0,0.0,2556.804932,5,...,6,3,63.92012,287.640564,56.817886,0.0,223.720428,415.480804,99.431297,28.408943
3,60.519127,156.44632,24.726463,1427.980713,1.686265,0.0,1.0,0.0,2407.953369,6,...,4,4,60.198833,270.894745,53.510075,0.0,210.695908,391.292419,93.642632,26.755037
4,62.664581,173.315536,20.861576,1449.86792,1.698808,0.0,0.0,0.0,2463.047363,5,...,5,0,61.576183,277.092834,54.734386,0.0,215.516647,400.245178,95.785172,27.367193


In [14]:
X_features_np.shape

(10000, 8)

In [15]:
Y_meals_np.shape

(10000, 6)

In [16]:
df_full.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10000 entries, 0 to 9999
Data columns (total 23 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   weight        10000 non-null  float32
 1   height        10000 non-null  float32
 2   BMI           10000 non-null  float32
 3   BMR           10000 non-null  float32
 4   PAL           10000 non-null  float32
 5   has_CVD       10000 non-null  float32
 6   has_T2D       10000 non-null  float32
 7   has_iron_def  10000 non-null  float32
 8   target_EI     10000 non-null  float32
 9   meal_1        10000 non-null  int64  
 10  meal_2        10000 non-null  int64  
 11  meal_3        10000 non-null  int64  
 12  meal_4        10000 non-null  int64  
 13  meal_5        10000 non-null  int64  
 14  meal_6        10000 non-null  int64  
 15  min_prot      10000 non-null  float32
 16  min_carb      10000 non-null  float32
 17  min_fat       10000 non-null  float32
 18  min_sfa       10000 non-nul

In [17]:
df_full.to_csv("synthetic_nutrition_data.csv", index=False)

## Encoder

#### Code

In [18]:
class Encoder(nn.Module):
    def __init__(self, input_dim, hidden_dim, latent_dim):
        super(Encoder, self).__init__()
        # fc1: projects input features to a hidden representation.
        #   Input: [batch_size, input_dim] → Output: [batch_size, hidden_dim]
        self.fc1 = nn.Linear(in_features=input_dim, out_features=hidden_dim)
        
        # fc2: further processes the hidden representation.
        #   Input: [batch_size, hidden_dim] → Output: [batch_size, hidden_dim]
        self.fc2 = nn.Linear(in_features=hidden_dim, out_features=hidden_dim)

        # fc_mu: computes the mean of the latent distribution.
        #   Input: [batch_size, hidden_dim] → Output: [batch_size, latent_dim]
        self.fc_mu = nn.Linear(in_features=hidden_dim, out_features=latent_dim)
        
        # fc_logvar: computes the log-variance of the latent distribution.
        #   Input: [batch_size, hidden_dim] → Output: [batch_size, latent_dim]
        self.fc_logvar = nn.Linear(in_features=hidden_dim, out_features=latent_dim)
    
    def forward(self, x):
        # Apply fc1 with ReLU activation.
        #   x: [batch_size, input_dim] → [batch_size, hidden_dim]
        x = F.relu(self.fc1(x))
        
        # Apply fc2 with ReLU activation.
        #   x: [batch_size, hidden_dim] → h: [batch_size, hidden_dim]
        h = F.relu(self.fc2(x))
        
        # Compute the latent mean.
        #   h: [batch_size, hidden_dim] → mu: [batch_size, latent_dim]
        mu = self.fc_mu(h)
        
        # Compute the latent log-variance.
        #   h: [batch_size, hidden_dim] → logvar: [batch_size, latent_dim]
        logvar = self.fc_logvar(h)
        
        return mu, logvar

#### Summary

In [19]:
input_dim = 8
hidden_units = 62
latent_dim = 16

encoder = Encoder(input_dim, hidden_units, latent_dim)

In [20]:
summary(encoder, (input_dim,), col_names=["input_size", "output_size", "num_params"])

Layer (type:depth-idx)                   Input Shape               Output Shape              Param #
Encoder                                  [8]                       [16]                      --
├─Linear: 1-1                            [8]                       [62]                      558
├─Linear: 1-2                            [62]                      [62]                      3,906
├─Linear: 1-3                            [62]                      [16]                      1,008
├─Linear: 1-4                            [62]                      [16]                      1,008
Total params: 6,480
Trainable params: 6,480
Non-trainable params: 0
Total mult-adds (Units.MEGABYTES): 0.31
Input size (MB): 0.00
Forward/backward pass size (MB): 0.00
Params size (MB): 0.03
Estimated Total Size (MB): 0.03

## Decoder

#### Code

In [21]:
class Decoder(nn.Module):
    def __init__(self, latent_dim, hidden_units, num_classes, macro_dim):
        super(Decoder, self).__init__()
        self.hidden_units = hidden_units
        self.macro_dim = macro_dim

        # Projects latent vector (shape: [batch_size, latent_dim]) to hidden space (shape: [batch_size, hidden_dim])
        self.latent_to_hidden = nn.Linear(in_features=latent_dim, out_features=hidden_units)

        # GRUCell that takes an input of shape [batch_size, hidden_dim] and outputs a hidden state of the same shape
        self.gru1 = nn.GRUCell(input_size=hidden_units, hidden_size=hidden_units)
        self.gru2 = nn.GRUCell(input_size=hidden_units, hidden_size=hidden_units)

        # Classifier head: maps hidden state [batch_size, hidden_dim] to class logits [batch_size, num_classes]
        self.classifier = nn.Linear(in_features=hidden_units, out_features=num_classes)
        # Energy head: maps hidden state [batch_size, hidden_dim] to a scalar energy [batch_size, 1]
        self.energy_head = nn.Linear(in_features=hidden_units, out_features=1)
        # Macro head: maps hidden state [batch_size, hidden_dim] to macro outputs [batch_size, macro_dim]
        self.macro_head = nn.Linear(in_features=hidden_units, out_features=macro_dim) 
    
    def forward(self, z):
        """
        Args:
            z (torch.Tensor): Latent vector of shape [batch_size, latent_dim]
            
        Returns:
            class_logits_seq (torch.Tensor): Sequence of class logits, shape [batch_size, T, num_classes]
            total_energy (torch.Tensor): Summed energy over T time steps, shape [batch_size, 1]
            total_macros (torch.Tensor): Accumulated macro outputs over T time steps, shape [batch_size, macro_dim]
            energies_tensor (torch.Tensor): Sequence of energy values, shape [batch_size, T, 1]
        """
        batch_size = z.size(0)
        T = 6  # Number of GRU time steps


        # Initialize hidden state for GRUCell with zeros, shape: [batch_size, hidden_dim]
        h1 = torch.zeros(size=(batch_size, self.hidden_units), device=z.device)
        h2 = torch.zeros(size=(batch_size, self.hidden_units), device=z.device)
        h_prev = h2

        # Project latent vector to hidden space (input for GRU at t=0), shape: [batch_size, hidden_dim]
        z_projected = self.latent_to_hidden(z)
         
        class_logits_seq = []
        energies_list = []
        # Initialize accumulation for macro outputs, shape: [batch_size, macro_dim]
        total_macros = torch.zeros(batch_size, self.macro_dim, device=z.device)

        for t in range(T):
            # For t=0, use the projected latent vector; for t>0, use previous hidden state as input.
            if t == 0:
                z = z_projected
            else:
                z = h_prev

            # GRUCell update: input z and previous hidden state h, both of shape [batch_size, hidden_dim]
            h1 = self.gru1(z, h1)
            h2 = self.gru2(h1, h2)

            # Compute outputs from the current hidden state
            logits = self.classifier(h2)    # Shape: [batch_size, num_classes]
            energy = self.energy_head(h2)     # Shape: [batch_size, 1]
            macros = self.macro_head(h2)      # Shape: [batch_size, macro_dim]

            class_logits_seq.append(logits)
            energies_list.append(energy)
            total_macros += macros  # Accumulate macro outputs over time steps

            h_prev = h2  # Save current hidden state for next iteration

        # Stack list of tensors along a new time dimension: [batch_size, T, num_classes] and [batch_size, T, 1]
        class_logits_seq = torch.stack(class_logits_seq, dim=1)
        energies_tensor = torch.stack(energies_list, dim=1)
        # Sum energy values over the time dimension, resulting in shape: [batch_size, 1]
        total_energy = energies_tensor.sum(dim=1)

        return class_logits_seq, total_energy, total_macros, energies_tensor

#### Summary

In [22]:
latent_dim = 16
hidden_units = 25
num_classes = 140
macro_dim = 5
batch_size = 50

decoder = Decoder(latent_dim, hidden_units, num_classes, macro_dim)

In [23]:
summary(decoder, (batch_size, latent_dim), col_names=["input_size", "output_size", "num_params"])

Layer (type:depth-idx)                   Input Shape               Output Shape              Param #
Decoder                                  [50, 16]                  [50, 6, 140]              --
├─Linear: 1-1                            [50, 16]                  [50, 25]                  425
├─GRUCell: 1-2                           [50, 25]                  [50, 25]                  3,900
├─GRUCell: 1-3                           [50, 25]                  [50, 25]                  3,900
├─Linear: 1-4                            [50, 25]                  [50, 140]                 3,640
├─Linear: 1-5                            [50, 25]                  [50, 1]                   26
├─Linear: 1-6                            [50, 25]                  [50, 5]                   130
├─GRUCell: 1-7                           [50, 25]                  [50, 25]                  (recursive)
├─GRUCell: 1-8                           [50, 25]                  [50, 25]                  (recursive)
├─Line

## Training

In [24]:
def adjust_meal_quantity(energies, batch_target_EI):
    pred_total_energy = energies.sum(dim=1)
    d = (target_EI - pred_total_energy) / pred_total_energy
    d_expanded = d.unsqueeze(1)
    adjusted_energies = energies * (1 + d_expanded)
    new_total_energy = adjusted_energies.sum(dim=1)
    return adjusted_energies, new_total_energy   

In [25]:
def compute_L_macro(batch_min_macros, batch_max_macros, pred_macros):
    diff_min = torch.abs(batch_min_macros - pred_macros)
    diff_max = torch.abs(batch_max_macros - pred_macros)
    macro_penalty = diff_min + diff_max
    L_macro = macro_penalty.mean()
    return L_macro

In [26]:
def compute_L_energy(pred_energy, batch_target_EI):
    L_energy = F.mse_loss(pred_energy, batch_target_EI)
    return L_energy

In [27]:
def compute_KLD(mu, logvar, batch_size):
    KLD_loss = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    KLD_loss = KLD_loss / batch_size
    return KLD_loss

In [28]:
def compute_L_MC(class_logits, batch_Y):
    T = class_logits.size(1)
    CE_loss = 0.0
    for t in range(T):
        CE_loss += F.cross_entropy(class_logits[:, t, :], batch_Y[:, t])
    CE_loss = CE_loss / T
    return CE_loss

In [29]:
def train_one_epoch(encoder, decoder, X_features, Y_meals, target_EIs, min_macros, max_macros, batch_size, optimizer):
    encoder.train()
    decoder.train()

    # X_features: [num_samples, feature_dim]
    # Y_meals: [num_samples, num_meals]
    # target_EIs: [num_samples, 1]
    # min_macros and max_macros: [num_samples, macro_dim]
    num_samples = X_features.size(0)
    permutation = torch.randperm(num_samples)  # permutation: [num_samples]
    total_loss = 0.0

    for i in range(0, num_samples, batch_size):
        optimizer.zero_grad()

        indices = permutation[i: i+batch_size]      # indices: [batch_size] (or fewer for the last batch)
        batch_X = X_features[indices]                 # batch_X: [batch_size, feature_dim]
        batch_Y = Y_meals[indices]                    # batch_Y: [batch_size, num_meals]
        batch_target_EI = target_EIs[indices]         # batch_target_EI: [batch_size, 1]
        batch_min_macros = min_macros[indices]          # batch_min_macros: [batch_size, macro_dim]
        batch_max_macros = max_macros[indices]          # batch_max_macros: [batch_size, macro_dim]

        # Forward pass through the encoder:
        # mu and logvar each have shape [batch_size, latent_dim]
        mu, logvar = encoder(batch_X)
        epsilon = torch.randn_like(logvar)            # epsilon: [batch_size, latent_dim]
        z = mu + epsilon * logvar                     # z: [batch_size, latent_dim]

        # Forward pass through the decoder:
        # class_logits: [batch_size, T, num_classes] where T is the number of time steps (here T = 6)
        # pred_energy: [batch_size, 1]
        # pred_macros: [batch_size, macro_dim]
        # energies_tensor: [batch_size, T, 1]
        class_logits, pred_energy, pred_macros, energies_tensor = decoder(z)

        # Compute losses:
        # L_macro: scalar loss related to macro constraints
        L_macro = compute_L_macro(batch_min_macros, batch_max_macros, pred_macros)
        # L_energy: scalar loss comparing predicted energy to target energy
        L_energy = compute_L_energy(pred_energy, batch_target_EI)
        # L_kld: Kullback-Leibler divergence loss, scalar
        L_kld = compute_KLD(mu, logvar, batch_size)
        # L_mc: classification loss computed from the logits over T time steps
        L_mc = compute_L_MC(class_logits, batch_Y)
        
        loss = L_macro + L_energy + L_kld + L_mc
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        
    avg_loss = total_loss / (num_samples / batch_size)
    return avg_loss

In [30]:
def training(csv_file, batch_size=64, hidden_dim=256, latent_dim=256, hidden_units=512, epochs=500):
    # Load the CSV file into a pandas DataFrame
    df = pd.read_csv(csv_file)
    
    feature_columns = ["weight", "height", "BMI", "BMR", "PAL", "has_CVD", "has_T2D", "has_iron_def"]
    X_features_np = df[feature_columns].values
    target_EIs_np = df["target_EI"].values
    
    # Deduce meal labels (6 meals per day)
    meal_columns = [f"meal_{i+1}" for i in range(6)]
    Y_meals_np = df[meal_columns].values
    
    # Deduce macronutrient ranges
    min_macro_columns = ["min_prot", "min_carb", "min_fat", "min_sfa"]
    max_macro_columns = ["max_prot", "max_carb", "max_fat", "max_sfa"]
    min_macros_np = df[min_macro_columns].values
    max_macros_np = df[max_macro_columns].values

    # Convert arrays to PyTorch tensors
    X_features = torch.tensor(X_features_np, dtype=torch.float32)
    Y_meals = torch.tensor(Y_meals_np, dtype=torch.long)
    target_EIs = torch.tensor(target_EIs_np, dtype=torch.float32)
    min_macros = torch.tensor(min_macros_np, dtype=torch.float32)
    max_macros = torch.tensor(max_macros_np, dtype=torch.float32)

    input_dim = X_features.shape[1]                # Number of user features (should be 8)
    num_classes = int(Y_meals.max().item() + 1)      # Meal classes, deduced from the maximum label + 1
    macro_dim = min_macros.shape[1]                  # Number of macronutrients (should be 4)
    
    print(f"Deduced dimensions: input_dim={input_dim}, num_classes={num_classes}, macro_dim={macro_dim}")
    
    # Instantiate Variation AutoEncoder
    encoder = Encoder(input_dim=input_dim, hidden_dim=hidden_dim, latent_dim=latent_dim)
    decoder = Decoder(latent_dim=latent_dim, hidden_units=hidden_units, num_classes=num_classes, macro_dim=macro_dim)
    
    # Setup the optimizer (cf. paper)
    optimizer = Adam(list(encoder.parameters()) + list(decoder.parameters()), lr=1e-4)
    
    # Training loop
    loss_seq = []
    for epoch in range(epochs):
        avg_loss = train_one_epoch(encoder, decoder, X_features, Y_meals, target_EIs, min_macros, max_macros, batch_size, optimizer)
        if epoch % 10 == 0 or epoch == epochs - 1:
            loss_seq.append(avg_loss)
            print(f"Epoch {epoch+1}/{epochs} - Avg Loss: {avg_loss:.4f}")
    
    return encoder, decoder, loss_seq