In [None]:
import os
import torch
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import numpy as np # linear algebra

print(os.listdir("MAFood121"))

use_cuda = True
device = torch.device("cuda" if use_cuda else "cpu")
torch.manual_seed(42) # try and make the results more reproducible
BASE_PATH = 'MAFood121'

epochs = 35
batch_size = 64
MICRO_DATA = True # very small subset (just 3 groups)
SAMPLE_TRAINING = False # make train set smaller for faster iteration
IMG_SIZE = (384, 384) # Try to change the model to U-net to avoid the resizing

#Classes of dishes
f = open(BASE_PATH + '/annotations/dishes.txt', "r")
classes = f.read().strip().split('\n')
f.close()
#print("***** classes = dishes.txt: ***** " + str(classes))
#print("#######################################################################################")

#Ingredients for each class
f = open(BASE_PATH + '/annotations/foodgroups.txt', "r")
ingredients = f.read().strip().split('\n')
f.close()
#print("***** ingredients = foodgroups.txt: ***** " + str(ingredients))
#print("#######################################################################################")

#Base Ingredients
f = open(BASE_PATH + '/annotations/baseIngredients.txt', "r")
base_ing = f.read().strip().split(', ')
f.close()
#print("***** base_ing = baseIngredients.txt: ***** " + str(base_ing))
#print("#######################################################################################")

#Recovery of annotations ML
#train
f = open(BASE_PATH + '/annotations/train.txt', "r")
train_images = f.read().split('\n')
f.close()
f = open(BASE_PATH + '/annotations/train_lbls_ff.txt', "r")
train_labels = f.read().split('\n')
f.close()

#val
f = open(BASE_PATH + '/annotations/val.txt', "r")
val_images = f.read().split('\n')
f.close()
f = open(BASE_PATH + '/annotations/val_lbls_ff.txt', "r")
val_labels = f.read().split('\n')
f.close()

#test
f = open(BASE_PATH + '/annotations/test.txt', "r")
test_images = f.read().split('\n')
f.close()
f = open(BASE_PATH + '/annotations/test_lbls_ff.txt', "r")
test_labels = f.read().split('\n')
f.close()

#Recovery of annotations SL
#train
f = open(BASE_PATH + '/annotations/train.txt', "r")
train_imagessl = f.read().split('\n')
f.close()
f = open(BASE_PATH + '/annotations/train_lbls_d.txt', "r")
train_labelssl = f.read().split('\n')
f.close()

#val
f = open(BASE_PATH + '/annotations/val.txt', "r")
val_imagessl = f.read().split('\n')
f.close()
f = open(BASE_PATH + '/annotations/val_lbls_d.txt', "r")
val_labelssl = f.read().split('\n')
f.close()

#test
f = open(BASE_PATH + '/annotations/test.txt', "r")
test_imagessl = f.read().split('\n')
f.close()
f = open(BASE_PATH + '/annotations/test_lbls_d.txt', "r")
test_labelssl = f.read().split('\n')
f.close()

# Multi-label
train_images_ml = [BASE_PATH+"/images/" + s for s in train_images]
train_df_ml = pd.DataFrame({'path': train_images_ml, 'ml_class_id': train_labels})

val_images_ml = [BASE_PATH+"/images/" + s for s in val_images]
val_df_ml = pd.DataFrame({'path': val_images_ml, 'ml_class_id': val_labels})

test_images_ml = [BASE_PATH+"/images/" + s for s in test_images]
test_df_ml = pd.DataFrame({'path': test_images_ml, 'ml_class_id': test_labels})

# Single-Label
train_images_sl = [BASE_PATH+"/images/" + s for s in train_imagessl]
train_df_sl = pd.DataFrame({'path': train_images_sl, 'sl_class_id': train_labelssl})

val_images_sl = [BASE_PATH+"/images/" + s for s in val_imagessl]
val_df_sl = pd.DataFrame({'path': val_images_sl, 'sl_class_id': val_labelssl})

test_images_sl = [BASE_PATH+"/images/" + s for s in test_imagessl]
test_df_sl = pd.DataFrame({'path': test_images_sl, 'sl_class_id': test_labelssl})

train_df_ml = train_df_ml[:-1]
val_df_ml = val_df_ml[:-1]
test_df_ml = test_df_ml[:-1]

train_df_ml['class_name'] = train_df_ml['path'].map(lambda x: os.path.split(os.path.dirname(x))[-1])
#print(train_df_ml)
#print("-------------------------------------------------------------------------------------------------")

val_df_ml['class_name'] = val_df_ml['path'].map(lambda x: os.path.split(os.path.dirname(x))[-1])
#print(val_df_ml)
#print("-------------------------------------------------------------------------------------------------")

test_df_ml['class_name'] = test_df_ml['path'].map(lambda x: os.path.split(os.path.dirname(x))[-1])
#print(test_df_ml)
#print("-------------------------------------------------------------------------------------------------")

import glob
#Recovery of annotations ML
from sklearn.preprocessing import MultiLabelBinarizer
mlb = MultiLabelBinarizer()

#Dataframe for train
train_ingredients = []
train_classid = []
# Load train_lbls_ff.txt here
for file_path in glob.glob(BASE_PATH + '/annotations/train_lbls_ff.txt'):
    with open(file_path) as f1:
        for line in f1:
            idx_ingredients = []
            classid = int(line)
            train_classid.append(classid)
            for ing in ingredients[classid].strip().split(","):
                idx_ingredients.append(str(base_ing.index(ing)))
            train_ingredients.append(idx_ingredients)

df_train = pd.DataFrame(mlb.fit_transform(train_ingredients), columns=mlb.classes_) #binary encode ingredients
df_train["path"] = train_df_ml['path'] #train_img_df['path']
df_train["ml_class_id"] = train_classid
food_dict_train = df_train

#Dataframe for train images
new_data = []
for index, row in train_df_ml.iterrows():
    #food = row["class_name"]
    path = row["path"]
    class_id = row["ml_class_id"]

    binary_encod = food_dict_train.loc[food_dict_train["path"] == path]
    new_data.append(np.array(binary_encod)[0])

col_names = list(binary_encod.columns.values)
train_df = pd.DataFrame(new_data, columns = col_names)

# Dataframe for val
val_ingredients = []
val_classid = []
# Load val_lbls_ff.txt here
for file_path in glob.glob(BASE_PATH + '/annotations/val_lbls_ff.txt'):
    with open(file_path) as f1:
        for line in f1:
            idx_ingredients = []
            classid = int(line)
            val_classid.append(classid)
            for ing in ingredients[classid].strip().split(","):
                idx_ingredients.append(str(base_ing.index(ing)))
            val_ingredients.append(idx_ingredients)

# Use the same mlb object for val
val_df = pd.DataFrame(mlb.transform(val_ingredients), columns=mlb.classes_) # binary encode ingredients
val_df["path"] = val_df_ml['path']
val_df["ml_class_id"] = val_classid
food_dict_val = val_df  # Change this to use val_df

# Dataframe for val images
val_data = []
for index, row in val_df_ml.iterrows():
    path = row["path"]
    class_id = row["ml_class_id"]

    binary_encod = food_dict_val.loc[food_dict_val["path"] == path]
    val_data.append(np.array(binary_encod)[0])

col_names = list(binary_encod.columns.values)
val_df = pd.DataFrame(val_data, columns=col_names)

# Dataframe for test
test_ingredients = []
test_classid = []
# busca ambos archivos en el directorio de anotaciones
for file_path in glob.glob(BASE_PATH + '/annotations/test_lbls_ff.txt'):
    with open(file_path) as f1:
        for line in f1:
            idx_ingredients = []
            classid = int(line)
            test_classid.append(classid)
            for ing in ingredients[classid].strip().split(","):
                idx_ingredients.append(str(base_ing.index(ing)))
            test_ingredients.append(idx_ingredients)

# Use the same mlb object for test
df_test = pd.DataFrame(mlb.transform(test_ingredients), columns=mlb.classes_) # binary encode ingredients
df_test["path"] = test_df_ml['path']
df_test["ml_class_id"] = test_classid
food_dict_test = df_test  # Change this to use df_test

# Dataframe for test images
test_data = []
for index, row in test_df_ml.iterrows():
    path = row["path"]
    class_id = row["ml_class_id"]

    binary_encod = food_dict_test.loc[food_dict_test["path"] == path]
    test_data.append(np.array(binary_encod)[0])

col_names = list(binary_encod.columns.values)
test_df = pd.DataFrame(test_data, columns=col_names)

train_df = train_df.merge(train_df_sl, left_on='path', right_on='path')
val_df = val_df.merge(val_df_sl, left_on='path', right_on='path')
test_df = test_df.merge(test_df_sl, left_on='path', right_on='path')

train_df.to_hdf('train_df.h5', 'df', mode='w', format='table', data_columns=True)
val_df.to_hdf('val_df.h5', 'df', mode='w', format='table', data_columns=True)
test_df.to_hdf('test_df.h5', 'df', mode='w', format='table', data_columns=True)

In [7]:
val_df

Unnamed: 0,0,1,2,3,4,5,6,7,8,path,ml_class_id,sl_class_id
0,1,1,0,0,0,0,0,0,1,MAFood121/images/tostadas/65_2.jpg,9,10
1,1,0,0,0,0,0,0,0,1,MAFood121/images/guacamole/2264147.jpg,5,5
2,1,0,0,0,0,0,0,0,1,MAFood121/images/caesar_salad/3407801.jpg,5,0
3,1,0,0,1,0,0,0,0,1,MAFood121/images/tostadas/32_1.jpg,0,10
4,1,0,0,0,0,0,0,0,1,MAFood121/images/nachos/217585.jpg,5,7
...,...,...,...,...,...,...,...,...,...,...,...,...
273,1,0,0,0,0,0,0,0,1,MAFood121/images/chicken_quesadilla/1362306.jpg,5,1
274,0,0,0,0,0,0,0,1,1,MAFood121/images/pozole/20_1.jpg,7,8
275,1,0,0,0,0,0,0,0,1,MAFood121/images/nachos/2649179.jpg,5,7
276,1,0,0,0,0,0,0,0,1,MAFood121/images/tacos/304636.jpg,5,9


In [None]:
# metrics loss and accuracy

import torch
import pandas as pd
from torch.utils.data import DataLoader, Dataset
import cv2
from tqdm import tqdm
from torchvision import transforms
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
from torchvision import models
from torch import nn
from torchsummary import summary

epochs = 1
batch_size = 16
SMALL_DATA = False
IMG_SIZE = (384, 384)

# Load data from .h5 files
train_df = pd.read_hdf('train_df.h5')
val_df = pd.read_hdf('val_df.h5')
test_df = pd.read_hdf('test_df.h5')

if SMALL_DATA:
    train_df = train_df[:128]
    val_df = test_df[:128]
    test_df = test_df[:128]

col_names = list(train_df.columns.values)
ing_names = col_names[:-3]
targets = ing_names

class CustomDataset(Dataset):
    def __init__(self, df):
        self.df = df

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        image_path = self.df.iloc[idx]['path']
        try:
            image = cv2.imread(image_path, 1)
            if image is None:
                raise ValueError(f"Failed to read image at {image_path}")
            if image.shape[0] == 0 or image.shape[1] == 0:
                raise ValueError(f"Invalid image size for {image_path}")

            x = cv2.resize(image, IMG_SIZE)
            x = torch.from_numpy(x.transpose(2, 0, 1)).float()

#            sl_class_id = int(self.df.iloc[idx]['sl_class_id'])
#            sl_onehot = np.array(sl_class_id)
#            sl_onehot = (np.arange(len(classes)) == sl_onehot).astype(np.float32)
#            sl_y = torch.from_numpy(sl_onehot)

            ml_y = []
            for i in range(len(base_ing)):
                ml_y.append(self.df.iloc[idx][str(i)])
            ml_y = np.array(ml_y, dtype=np.float32)

            return (x, ml_y)  # return (x, sl_y, ml_y)
        except Exception as e:
            print(f"Error reading image at {image_path}: {str(e)}")
            # Devuelve un valor predeterminado o imagen vacía
            x = torch.zeros((3, IMG_SIZE[0], IMG_SIZE[1])).float()
#            sl_y = torch.zeros(len(classes)).float()
            ml_y = np.zeros(len(base_ing), dtype=np.float32)
            return (x, ml_y) # return (x, sl_y, ml_y)

# Create DataLoader objects for training, validation, and testing sets
train_dataset = CustomDataset(train_df)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

val_dataset = CustomDataset(val_df)
val_loader = DataLoader(val_dataset, batch_size=batch_size)

test_dataset = CustomDataset(test_df)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

# ResNet50 Model
resnet = models.resnet50(pretrained=True)
# Disable grad for all conv layers
for param in resnet.parameters():
    param.requires_grad = False

# Add two heads
resnet.last_linear = resnet.fc
n_features = resnet.fc.out_features
#head_sl = nn.Sequential(
#    nn.Linear(n_features, 512),
#    nn.ReLU(inplace=True),
#    nn.Dropout(p=0.2),
#    nn.Linear(512, len(classes))
#)
head_ml = nn.Sequential(
    nn.Linear(n_features, 512),
    nn.ReLU(inplace=True),
    nn.Dropout(p=0.2),
    nn.Linear(512, len(base_ing)),
    nn.Sigmoid()
)

# Connect two heads
class FoodModel(nn.Module):
    def __init__(self, base_model, head_ml): # def __init__(self, base_model, head_sl, head_ml):
        super().__init__()
        self.base_model = base_model
#        self.head_sl = head_sl
        self.head_ml = head_ml

    def forward(self, x):
        x = self.base_model(x)
#        sl = self.head_sl(x)
        ml = self.head_ml(x)
        return ml # return sl, ml

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = FoodModel(resnet, head_ml) # model = FoodModel(resnet, head_sl, head_ml)
model.to(device)

# Define Loss
#sl_loss_fn = nn.CrossEntropyLoss()
ml_loss_fn = nn.BCELoss()

# Define Optimizer
optimizer = torch.optim.Adam(model.parameters())

# Define function to calculate accuracy for both SL and ML tasks
def calculate_accuracy(preds, targets, task='sl'): # def calculate_accuracy(preds, targets, task='sl'):
#    if task == 'sl':
#        predicted_labels = torch.argmax(preds, dim=1)
#        correct_predictions = (predicted_labels == targets).sum().item()
    if task == 'ml': # elif task == 'ml':
        predicted_labels = (preds > 0.5).float()  # Threshold predictions for multi-label task
        correct_predictions = (predicted_labels == targets).all(dim=1).sum().item()
    else:
        raise ValueError("Invalid task. It should be 'sl'.")  # raise ValueError("Invalid task. It should be 'sl' or 'ml'.")

    total_samples = targets.size(0)
    accuracy = correct_predictions / total_samples
    return accuracy

def calculate_f1_ml(preds, targets):
    predicted_labels = (preds > 0.5).float()

    tp = (predicted_labels * targets).sum().item()
    fp = ((predicted_labels - targets) == 1).sum().item()
    fn = ((predicted_labels - targets) == -1).sum().item()

    precision = tp / (tp + fp + 1e-8)
    recall = tp / (tp + fn + 1e-8)

    f1_score = 2 * (precision * recall) / (precision + recall + 1e-8)
    return f1_score

# Modify the train_step function to include accuracy calculation and f1 for ML
def train_step(model, optimizer, ml_loss_fn, data, device): # def train_step(model, optimizer, sl_loss_fn, ml_loss_fn, data, device):
    # Retrieve data
    x, ml_y = data # x, sl_y, ml_y = data

    # Convert to device
    x = x.to(device)
#    sl_y = sl_y.to(device)
    ml_y = ml_y.to(device)

    # Zero out gradients
    optimizer.zero_grad()

    # Forward pass
    ml_preds = model(x) # sl_preds, ml_preds = model(x)

    # Calculate losses
#    sl_loss = sl_loss_fn(sl_preds, torch.argmax(sl_y, dim=1))
    ml_loss = ml_loss_fn(ml_preds, ml_y)
    loss = ml_loss # loss = sl_loss + ml_loss

    # Backward pass
    loss.backward()

    # Step optimizer
    optimizer.step()

    # Calculate accuracies
#    sl_accuracy = calculate_accuracy(sl_preds, torch.argmax(sl_y, dim=1), task='sl')
    ml_accuracy = calculate_accuracy(ml_preds, ml_y, task='ml')

    # Calculate F1 scores
    ml_f1 = calculate_f1_ml(ml_preds, ml_y)

    # Return losses and accuracies
    return ml_loss.item(), ml_accuracy, ml_f1 # return sl_loss.item(), ml_loss.item(), sl_accuracy, ml_accuracy, ml_f1

# Lists to store losses and accuracies for plotting
#train_lossessl = []
train_lossesml = []
#train_accuraciessl = []
train_accuraciesml = []
train_f1ml = []

for i in tqdm(range(epochs), desc='Epochs'):
    print("Epoch ", i)
#    total_sl_loss = 0.0
    total_ml_loss = 0.0
#    total_sl_accuracy = 0.0
    total_ml_accuracy = 0.0
    total_ml_f1 = 0.0
    total_batches = 0

    with tqdm(train_loader, desc='Training', total=len(train_loader), miniters=1) as pbar:
        for data in pbar:
            ML_loss, ML_accuracy, ML_f1 = train_step(model, optimizer, ml_loss_fn, data, device)  # SL_loss, ML_loss, SL_accuracy, ML_accuracy, ML_f1 = train_step(model, optimizer, sl_loss_fn, ml_loss_fn, data, device)

#            total_sl_loss += SL_loss
            total_ml_loss += ML_loss
#            total_sl_accuracy += SL_accuracy
            total_ml_accuracy += ML_accuracy
            total_ml_f1 += ML_f1
            total_batches += 1 #data[0].size(0)

            # Update progress bar
            pbar.set_postfix({
                'Total samples': total_batches,
#                'SL Loss': total_sl_loss/ total_batches,
                'ML Loss': total_ml_loss/ total_batches,
#                'SL Accuracy': total_sl_accuracy/ total_batches,
                'ML Accuracy': total_ml_accuracy/ total_batches,
                'ML F1': total_ml_f1 / total_batches,
            })

    # Calculate average losses, accuracies and F1 score
#    avg_sl_loss = total_sl_loss / len(train_loader)
    avg_ml_loss = total_ml_loss / len(train_loader)
#    avg_sl_accuracy = total_sl_accuracy / len(train_loader)
    avg_ml_accuracy = total_ml_accuracy / len(train_loader)
    avg_ml_f1 = total_ml_f1 / len(train_loader)

    # Append losses and accuracies to the lists
#    train_lossessl.append(avg_sl_loss)
    train_lossesml.append(avg_ml_loss)
#    train_accuraciessl.append(avg_sl_accuracy)
    train_accuraciesml.append(avg_ml_accuracy)
    train_f1ml.append(avg_ml_f1)

# Plot loss and accuracy
plt.figure(figsize=(10, 5))
#plt.plot(train_lossessl, label='Train Loss SL')
plt.plot(train_lossesml, label='Train Loss ML')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss')
plt.legend()
plt.show()

plt.figure(figsize=(10, 5))
#plt.plot(train_accuraciessl, label='Train Accuracy SL')
plt.plot(train_accuraciesml, label='Train Accuracy ML')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Training Accuracy')
plt.legend()
plt.show()

plt.figure(figsize=(10, 5))
plt.plot(train_f1ml, label='Train F1 ML')
plt.xlabel('Epoch')
plt.ylabel('F1')
plt.title('Training F1')
plt.legend()
plt.show()

# Load a test image
img_path = '56_1.jpg'
img = Image.open(img_path).convert('RGB')
plt.imshow(img)

# Resize image and convert to tensor
transform = transforms.Compose([transforms.Resize(IMG_SIZE), transforms.ToTensor()])
img = transform(img)
img = img.unsqueeze(0)

# Get model predictions
model.eval()
with torch.no_grad():
    ml_preds = model(img.to(device)) # sl_preds, ml_preds = model(img.to(device))

#sl_preds = torch.nn.functional.softmax(sl_preds)
#sl_preds = sl_preds.cpu().numpy()
ml_preds = ml_preds.cpu().numpy()

# Plot prediction results
#sl_preds = sl_preds.squeeze()
#plt.figure(figsize=(10, 5))
#plt.bar(classes, sl_preds)
#plt.title('Softmax Prediction')
#plt.xticks(rotation=90)
#plt.xlabel('Food Category')
#plt.ylabel('Probability')
#plt.show()

ml_preds = ml_preds.squeeze()
plt.figure(figsize=(10, 5))
plt.bar(base_ing, ml_preds)
plt.title('Sigmoid Prediction')
plt.xticks(rotation=90)
plt.xlabel('Ingredient')
plt.ylabel('Probability')
plt.show()