# Primera parte

In [None]:
import os
import torch
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import numpy as np # linear algebra

print(os.listdir("MAFood121"))

use_cuda = True
device = torch.device("cuda" if use_cuda else "cpu")
torch.manual_seed(42) # try and make the results more reproducible
BASE_PATH = 'MAFood121/'

epochs = 35
batch_size = 64
MICRO_DATA = True # very small subset (just 3 groups)
SAMPLE_TRAINING = False # make train set smaller for faster iteration
IMG_SIZE = (384, 384) # Try to change the model to U-net to avoid the resizing

#Classes of dishes
f = open(BASE_PATH + '/annotations/dishes.txt', "r")
classes = f.read().strip().split('\n')
f.close()
print("***** classes = dishes.txt: ***** " + str(classes))
print("#######################################################################################")

#Ingredients for each class
f = open(BASE_PATH + '/annotations/foodgroups.txt', "r")
ingredients = list(set(f.read().strip().split('\n')))
f.close()
print("***** ingredients = foodgroups.txt: ***** " + str(ingredients))
print("#######################################################################################")

#Base Ingredients
f = open(BASE_PATH + '/annotations/baseIngredients.txt', "r")
base_ing = f.read().strip().split('\n')
f.close()
print("***** base_ing = baseIngredients.txt: ***** " + str(base_ing))
print("#######################################################################################")

#Recovery of annotations ML
from sklearn.preprocessing import MultiLabelBinarizer
mlb = MultiLabelBinarizer()

#train
f = open(BASE_PATH + '/annotations/train.txt', "r")
train_images = f.read().split('\n')
f.close()
f = open(BASE_PATH + '/annotations/train_lbls_ff.txt', "r")
train_labels = f.read().split('\n')
f.close()

#test
f = open(BASE_PATH + '/annotations/test.txt', "r")
test_images = f.read().split('\n')
f.close()
f = open(BASE_PATH + '/annotations/test_lbls_ff.txt', "r")
test_labels = f.read().split('\n')
f.close()

#val
f = open(BASE_PATH + '/annotations/val.txt', "r")
val_images = f.read().split('\n')
f.close()
f = open(BASE_PATH + '/annotations/val_lbls_ff.txt', "r")
val_labels = f.read().split('\n')
f.close()

#Recovery of annotations SL
#train
f = open(BASE_PATH + '/annotations/train.txt', "r")
train_imagessl = f.read().split('\n')
f.close()
f = open(BASE_PATH + '/annotations/train_lbls_d.txt', "r")
train_labelssl = f.read().split('\n')
f.close()

#val
f = open(BASE_PATH + '/annotations/val.txt', "r")
val_imagessl = f.read().split('\n')
f.close()
f = open(BASE_PATH + '/annotations/val_lbls_d.txt', "r")
val_labelssl = f.read().split('\n')
f.close()

#test
f = open(BASE_PATH + '/annotations/test.txt', "r")
test_imagessl = f.read().split('\n')
f.close()
f = open(BASE_PATH + '/annotations/test_lbls_d.txt', "r")
test_labelssl = f.read().split('\n')
f.close()

# Single-Label
train_images_sl = ["MAFood121/images/" + s for s in train_imagessl]
train_df_sl = pd.DataFrame({'path': train_images_sl, 'sl_class_id': train_labelssl})

val_images_sl = ["MAFood121/images/" + s for s in val_imagessl]
val_df_sl = pd.DataFrame({'path': val_images_sl, 'sl_class_id': val_labelssl})
print(val_df_sl)

test_images_sl = ["MAFood121/images/" + s for s in test_imagessl]
test_df_sl = pd.DataFrame({'path': test_images_sl, 'sl_class_id': test_labelssl})

# Multi-label
train_images_ml = ["MAFood121/images/" + s for s in train_images]
train_df_ml = pd.DataFrame({'path': train_images_ml, 'ml_class_id': train_labels})

val_images_ml = ["MAFood121/images/" + s for s in val_images]
val_df_ml = pd.DataFrame({'path': val_images_ml, 'ml_class_id': val_labels})

test_images_ml = ["MAFood121/images/" + s for s in test_images]
test_df_ml = pd.DataFrame({'path': test_images_ml, 'ml_class_id': test_labels})

# Dataframe for train images
import glob

train_ingredients = []
train_classid = []

# busca ambos archivos en el directorio de anotaciones
for file_path in glob.glob(BASE_PATH + '/annotations/train_lbls_*.txt'):
    with open(file_path) as f1:
        for line in f1:
            idx_ingredients = []
            classid = int(line)
            train_classid.append(classid)
            for ing in ingredients[classid].strip().split(","):
                idx_ingredients.append(str(base_ing.index(ing)))
            train_ingredients.append(idx_ingredients)

df_train = pd.DataFrame(mlb.fit_transform(train_ingredients), columns=mlb.classes_) #binary encode ingredients
df_train["path"] = train_df_ml['path'] #train_img_df['path']
df_train["ml_class_id"] = train_classid 
food_dict_train = df_train

new_data = []
for index, row in train_df_ml.iterrows():
    #food = row["class_name"]
    path = row["path"]
    class_id = row["ml_class_id"]
    
    binary_encod = food_dict_train.loc[food_dict_train["path"] == path]
    new_data.append(np.array(binary_encod)[0])

col_names = list(binary_encod.columns.values)
train_df = pd.DataFrame(new_data, columns = col_names)

#Dataframe for val images
val_ingredients = []
val_classid = []

# busca ambos archivos en el directorio de anotaciones
for file_path in glob.glob(BASE_PATH + '/annotations/val_lbls_*.txt'):
    with open(file_path) as f1:
        for line in f1:
            idx_ingredients = []
            classid = int(line)
            val_classid.append(classid)
            for ing in ingredients[classid].strip().split(","):
                idx_ingredients.append(str(base_ing.index(ing)))
            val_ingredients.append(idx_ingredients)

df_val = pd.DataFrame(mlb.fit_transform(val_ingredients), columns=mlb.classes_) #binary encode ingredients
df_val["path"] = val_df_ml['path']
df_val["ml_class_id"] = val_classid 
food_dict_val = df_val


new_data = []
for index, row in val_df_ml.iterrows():
    #food = row["class_name"]
    path = row["path"]
    class_id = row["ml_class_id"]
    
    binary_encod = food_dict_val.loc[food_dict_val["path"] == path]
    new_data.append(np.array(binary_encod)[0])

col_names = list(binary_encod.columns.values)
val_df = pd.DataFrame(new_data, columns = col_names)

#Dataframe for test images
test_ingredients = []
test_classid = []

# busca ambos archivos en el directorio de anotaciones
for file_path in glob.glob(BASE_PATH + '/annotations/test_lbls_*.txt'):
    with open(file_path) as f1:
        for line in f1:
            idx_ingredients = []
            classid = int(line)
            test_classid.append(classid)
            for ing in ingredients[classid].strip().split(","):
                idx_ingredients.append(str(base_ing.index(ing)))
            test_ingredients.append(idx_ingredients)

df_test = pd.DataFrame(mlb.fit_transform(test_ingredients), columns=mlb.classes_) #binary encode ingredients
df_test["path"] = test_df_ml['path']
df_test["ml_class_id"] = test_classid 
food_dict_test = df_test


new_data = []
for index, row in test_df_ml.iterrows():
    #food = row["class_name"]
    path = row["path"]
    class_id = row["ml_class_id"]
    
    binary_encod = food_dict_test.loc[food_dict_test["path"] == path]
    new_data.append(np.array(binary_encod)[0])

col_names = list(binary_encod.columns.values)
test_df = pd.DataFrame(new_data, columns = col_names)

train_df = train_df.merge(train_df_sl, left_on='path', right_on='path')
val_df = val_df.merge(val_df_sl, left_on='path', right_on='path') 
test_df = test_df.merge(test_df_sl, left_on='path', right_on='path')

train_df.to_hdf('train_df.h5','df',mode='w',format='table',data_columns=True)
val_df.to_hdf('val_df.h5','df',mode='w',format='table',data_columns=True)
test_df.to_hdf('test_df.h5','df',mode='w',format='table',data_columns=True)

# segunda parte

In [None]:
# metrics loss and accuracy

import torch
import pandas as pd
from torch.utils.data import DataLoader, Dataset
import cv2
from tqdm import tqdm
from torchvision import transforms
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
from torchvision import models
from torch import nn
from torchsummary import summary

# Load data from .h5 files
train_df = pd.read_hdf('train_df.h5', 'df')
val_df = pd.read_hdf('val_df.h5', 'df')
test_df = pd.read_hdf('test_df.h5', 'df')

epochs = 10
batch_size = 16
SMALL_DATA = False
IMG_SIZE = (224, 224)

if SMALL_DATA:
    train_df = train_df[:128]
    val_df = test_df[:128]
    test_df = test_df[:128]

col_names = list(train_df.columns.values)
ing_names = col_names[:-3]
targets = ing_names

class CustomDataset(Dataset):
    def __init__(self, df):
        self.df = df

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        image_path = self.df.iloc[idx]['path']
        try:
            image = cv2.imread(image_path, 1)
            if image is None:
                raise ValueError(f"Failed to read image at {image_path}")
            if image.shape[0] == 0 or image.shape[1] == 0:
                raise ValueError(f"Invalid image size for {image_path}")

            x = cv2.resize(image, IMG_SIZE)
            x = torch.from_numpy(x.transpose(2, 0, 1)).float()

            sl_class_id = int(self.df.iloc[idx]['sl_class_id'])
            sl_onehot = np.array(sl_class_id)
            sl_onehot = (np.arange(len(classes)) == sl_onehot).astype(np.float32)
            sl_y = torch.from_numpy(sl_onehot)

            ml_y = []
            for i in range(len(base_ing)):
                ml_y.append(self.df.iloc[idx][str(i)])
            ml_y = np.array(ml_y, dtype=np.float32)

            return (x, sl_y, ml_y)
        except Exception as e:
            print(f"Error reading image at {image_path}: {str(e)}")
            # Devuelve un valor predeterminado o imagen vacía
            x = torch.zeros((3, IMG_SIZE[0], IMG_SIZE[1])).float()
            sl_y = torch.zeros(len(classes)).float()
            ml_y = np.zeros(len(base_ing), dtype=np.float32)
            return (x, sl_y, ml_y)

# Define batch size
batch_size = 64

# Create DataLoader objects for training, validation, and testing sets
train_dataset = CustomDataset(train_df)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

val_dataset = CustomDataset(val_df)
val_loader = DataLoader(val_dataset, batch_size=batch_size)

test_dataset = CustomDataset(test_df)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

# ResNet50 Model
resnet = models.resnet50(pretrained=True)
# Disable grad for all conv layers
for param in resnet.parameters():
    param.requires_grad = False

# Add two heads
resnet.last_linear = resnet.fc
n_features = resnet.fc.out_features
head_sl = nn.Sequential(
    nn.Linear(n_features, 512),
    nn.ReLU(inplace=True),
    nn.Dropout(p=0.2),
    nn.Linear(512, len(classes))
)
head_ml = nn.Sequential(
    nn.Linear(n_features, 512),
    nn.ReLU(inplace=True),
    nn.Dropout(p=0.2),
    nn.Linear(512, len(base_ing)),
    nn.Sigmoid()
)  

# Connect two heads
class FoodModel(nn.Module):
    def __init__(self, base_model, head_sl, head_ml):
        super().__init__()
        self.base_model = base_model
        self.head_sl = head_sl
        self.head_ml = head_ml

    def forward(self, x):
        x = self.base_model(x)
        sl = self.head_sl(x)
        ml = self.head_ml(x)
        return sl, ml

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = FoodModel(resnet, head_sl, head_ml)
model.to(device)

# Define Loss
sl_loss_fn = nn.CrossEntropyLoss()
ml_loss_fn = nn.BCELoss()

# Define Optimizer
optimizer = torch.optim.Adam(model.parameters())

# Define function to calculate accuracy for both SL and ML tasks
def calculate_accuracy(preds, targets, task='sl'):
    if task == 'sl':
        predicted_labels = torch.argmax(preds, dim=1)
        correct_predictions = (predicted_labels == targets).sum().item()
    elif task == 'ml':
        predicted_labels = (preds > 0.5).float()  # Threshold predictions for multi-label task
        correct_predictions = (predicted_labels == targets).all(dim=1).sum().item()
    else:
        raise ValueError("Invalid task. It should be 'sl' or 'ml'.")

    total_samples = targets.size(0)
    accuracy = correct_predictions / total_samples
    return accuracy

# Modify the train_step function to include accuracy calculation
def train_step(model, optimizer, sl_loss_fn, ml_loss_fn, data, device):
    # Retrieve data
    x, sl_y, ml_y = data

    # Convert to device
    x = x.to(device)
    sl_y = sl_y.to(device)
    ml_y = ml_y.to(device)

    # Zero out gradients
    optimizer.zero_grad()

    # Forward pass
    sl_preds, ml_preds = model(x)

    # Calculate losses
    sl_loss = sl_loss_fn(sl_preds, torch.argmax(sl_y, dim=1))
    ml_loss = ml_loss_fn(ml_preds, ml_y)
    loss = sl_loss + ml_loss

    # Backward pass
    loss.backward()

    # Step optimizer
    optimizer.step()

    # Calculate accuracies
    sl_accuracy = calculate_accuracy(sl_preds, torch.argmax(sl_y, dim=1), task='sl')
    ml_accuracy = calculate_accuracy(ml_preds, ml_y, task='ml')

    # Return losses and accuracies
    return sl_loss.item(), ml_loss.item(), sl_accuracy, ml_accuracy

# Lists to store losses and accuracies for plotting
train_lossessl = []
train_lossesml = []
train_accuraciessl = []
train_accuraciesml = []

epochs = 10
for i in tqdm(range(epochs), desc='Epochs'):
    print("Epoch ", i)
    total_sl_loss = 0.0
    total_ml_loss = 0.0
    total_sl_accuracy = 0.0
    total_ml_accuracy = 0.0
    total_samples = 0

    with tqdm(train_loader, desc='Training', total=len(train_loader), miniters=1) as pbar:
        for data in pbar: 
            SL_loss, ML_loss, SL_accuracy, ML_accuracy = train_step(model, optimizer, sl_loss_fn, ml_loss_fn, data, device)

            total_sl_loss += SL_loss
            total_ml_loss += ML_loss
            total_sl_accuracy += SL_accuracy
            total_ml_accuracy += ML_accuracy
            total_samples += data[0].size(0)

            # Update progress bar
            pbar.set_postfix({
                'SL Loss': total_sl_loss / total_samples,
                'ML Loss': total_ml_loss / total_samples,
                'SL Accuracy': total_sl_accuracy / total_samples,
                'ML Accuracy': total_ml_accuracy / total_samples,
            })

    # Calculate average losses and accuracies
    avg_sl_loss = total_sl_loss / len(train_loader)
    avg_ml_loss = total_ml_loss / len(train_loader)
    avg_sl_accuracy = total_sl_accuracy / len(train_loader)
    avg_ml_accuracy = total_ml_accuracy / len(train_loader)

    # Append losses and accuracies to the lists
    train_lossessl.append(avg_sl_loss)
    train_lossesml.append(avg_ml_loss)
    train_accuraciessl.append(avg_sl_accuracy)
    train_accuraciesml.append(avg_ml_accuracy)

# Plot loss and accuracy
plt.figure(figsize=(10, 5))
plt.plot(train_lossessl, label='Train Loss SL')
plt.plot(train_lossesml, label='Train Loss ML')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss')
plt.legend()
plt.show()

plt.figure(figsize=(10, 5))
plt.plot(train_accuraciessl, label='Train Accuracy SL')
plt.plot(train_accuraciesml, label='Train Accuracy ML')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Training Accuracy')
plt.legend()
plt.show()

# Load a test image
img_path = '2352494.jpg'
img = Image.open(img_path).convert('RGB')
plt.imshow(img)

# Resize image and convert to tensor
transform = transforms.Compose([transforms.Resize(IMG_SIZE), transforms.ToTensor()])
img = transform(img)
img = img.unsqueeze(0)

# Get model predictions
model.eval()
with torch.no_grad():
    sl_preds, ml_preds = model(img.to(device))

sl_preds = torch.nn.functional.softmax(sl_preds)
sl_preds = sl_preds.cpu().numpy()
ml_preds = ml_preds.cpu().numpy()

# Plot prediction results
sl_preds = sl_preds.squeeze()
plt.figure(figsize=(10, 5))
plt.bar(classes, sl_preds)
plt.title('Softmax Prediction')
plt.xticks(rotation=90)
plt.xlabel('Food Category')
plt.ylabel('Probability')
plt.show()

ml_preds = ml_preds.squeeze()
plt.figure(figsize=(10, 5))
plt.bar(base_ing, ml_preds)
plt.title('Sigmoid Prediction')
plt.xticks(rotation=90)
plt.xlabel('Ingredient')
plt.ylabel('Probability')
plt.show()