# --------------------------Segunda parte-------------------------------

In [None]:
import torch
import pandas as pd
from torch.utils.data import DataLoader, Dataset
import cv2
from tqdm import tqdm
from torchvision import transforms
import numpy as np
from sklearn.metrics import f1_score
import matplotlib.pyplot as plt
from PIL import Image
from torchvision import models
from torch import nn
from torchsummary import summary

# Load data from .h5 files
train_df = pd.read_hdf('train_df.h5', 'df')
val_df = pd.read_hdf('val_df.h5', 'df')
test_df = pd.read_hdf('test_df.h5', 'df')

epochs = 8
batch_size = 16
SMALL_DATA = False
IMG_SIZE = (224, 224)

if SMALL_DATA:
    train_df = train_df[:128]
    val_df = test_df[:128]
    test_df = test_df[:128]

col_names = list(train_df.columns.values)
ing_names = col_names[:-3]
targets = ing_names

class CustomDataset(Dataset):
    def __init__(self, df):
        self.df = df

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        image_path = self.df.iloc[idx]['path']
        try:
            image = cv2.imread(image_path, 1)
            if image is None:
                raise ValueError(f"Failed to read image at {image_path}")
            if image.shape[0] == 0 or image.shape[1] == 0:
                raise ValueError(f"Invalid image size for {image_path}")

            x = cv2.resize(image, IMG_SIZE)
            x = torch.from_numpy(x.transpose(2, 0, 1)).float()

            sl_class_id = int(self.df.iloc[idx]['sl_class_id'])
            sl_onehot = np.array(sl_class_id)
            sl_onehot = (np.arange(len(classes)) == sl_onehot).astype(np.float32)
            sl_y = torch.from_numpy(sl_onehot)

            ml_y = []
            for i in range(len(base_ing)):
                ml_y.append(self.df.iloc[idx][str(i)])
            ml_y = np.array(ml_y, dtype=np.float32)

            return (x, sl_y, ml_y)
        except Exception as e:
            print(f"Error reading image at {image_path}: {str(e)}")
            # Devuelve un valor predeterminado o imagen vacía
            x = torch.zeros((3, IMG_SIZE[0], IMG_SIZE[1])).float()
            sl_y = torch.zeros(len(classes)).float()
            ml_y = np.zeros(len(base_ing), dtype=np.float32)
            return (x, sl_y, ml_y)

# Define batch size
batch_size = 64

# Create DataLoader objects for training, validation, and testing sets
train_dataset = CustomDataset(train_df)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

val_dataset = CustomDataset(val_df)
val_loader = DataLoader(val_dataset, batch_size=batch_size)

test_dataset = CustomDataset(test_df)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

# ResNet50 Model
resnet = models.resnet50(pretrained=True)
# Disable grad for all conv layers
for param in resnet.parameters():
    param.requires_grad = False

# Add two heads
resnet.last_linear = resnet.fc
n_features = resnet.fc.out_features
head_sl = nn.Sequential(
    nn.Linear(n_features, 512),
    nn.ReLU(inplace=True),
    nn.Dropout(p=0.2),
    nn.Linear(512, len(classes))
)
head_ml = nn.Sequential(
    nn.Linear(n_features, 512),
    nn.ReLU(inplace=True),
    nn.Dropout(p=0.2),
    nn.Linear(512, len(base_ing)),
    nn.Sigmoid()
)  

# Connect two heads
class FoodModel(nn.Module):
    def __init__(self, base_model, head_sl, head_ml):
        super().__init__()
        self.base_model = base_model
        self.head_sl = head_sl
        self.head_ml = head_ml

    def forward(self, x):
        x = self.base_model(x)
        sl = self.head_sl(x)
        ml = self.head_ml(x)
        return sl, ml

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = FoodModel(resnet, head_sl, head_ml)
model.to(device)

# Define Loss
sl_loss_fn = nn.CrossEntropyLoss()
ml_loss_fn = nn.BCELoss()

# Define Optimizer
optimizer = torch.optim.Adam(model.parameters())

def train_step(model, optimizer, sl_loss_fn, ml_loss_fn, data, device):
    # Retrieve data
    x, sl_y, ml_y = data

    # Convert to device
    x = x.to(device)
    sl_y = sl_y.to(device)
    ml_y = ml_y.to(device)

    # Zero out gradients
    optimizer.zero_grad()

    # Forward pass
    sl_preds, ml_preds = model(x)

    # Calculate losses
    sl_loss = sl_loss_fn(sl_preds, torch.argmax(sl_y, dim=1))
    ml_loss = ml_loss_fn(ml_preds, ml_y)
    loss = sl_loss + ml_loss

    # Calculate F1 score
    sl_f1 = f1_score(torch.argmax(sl_y, dim=1).cpu().numpy(), torch.argmax(sl_preds, dim=1).cpu().numpy(), average='macro')
    ml_f1 = f1_score(ml_y.cpu().numpy(), (ml_preds > 0.5).cpu().numpy(), average='macro')

    # Backward pass
    loss.backward()

    # Step optimizer
    optimizer.step()

    # Return losses and F1 scores
    return sl_loss.item(), ml_loss.item(), sl_f1, ml_f1

# Lists to store loss and accuracy for plotting
train_losses = []
train_accs = []

epochs = 300
for i in tqdm(range(epochs), desc='Epochs'):
    print("Epoch ", i)
    total_sl_loss = 0.0
    total_ml_loss = 0.0
    total_sl_f1 = 0.0
    total_ml_f1 = 0.0
    total_samples = 0

    with tqdm(train_loader, desc='Training', total=len(train_loader), miniters=1) as pbar:
        for data in pbar: 
            SL_loss, ML_loss, SL_f1, ML_f1 = train_step(model, optimizer, sl_loss_fn, ml_loss_fn, data, device)

            total_sl_loss += SL_loss
            total_ml_loss += ML_loss
            total_sl_f1 += SL_f1
            total_ml_f1 += ML_f1
            total_samples += data[0].size(0)

            # Update progress bar
            pbar.set_postfix({
                'SL Loss': total_sl_loss / total_samples,
                'ML Loss': total_ml_loss / total_samples,
                'SL F1': total_sl_f1 / total_samples,
                'ML F1': total_ml_f1 / total_samples
            })

    # Calculate average losses and accuracy
    avg_sl_loss = total_sl_loss / len(train_loader)
    avg_ml_loss = total_ml_loss / len(train_loader)
    avg_sl_f1 = total_sl_f1 / len(train_loader)
    avg_ml_f1 = total_ml_f1 / len(train_loader)

    # Append losses and accuracy to the lists
    train_losses.append(avg_sl_loss + avg_ml_loss)
    train_accs.append(avg_sl_f1)

# Plot loss and accuracy
plt.figure(figsize=(10, 5))
plt.plot(train_losses, label='Train Loss')
plt.plot(train_accs, label='Train Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Loss/Accuracy')
plt.title('Training Loss and Accuracy')
plt.legend()
plt.show()

# Load a test image
img_path = '82439.jpg'
img = Image.open(img_path).convert('RGB')
plt.imshow(img)

# Resize image and convert to tensor
transform = transforms.Compose([transforms.Resize(IMG_SIZE), transforms.ToTensor()])
img = transform(img)
img = img.unsqueeze(0)

# Get model predictions
model.eval()
with torch.no_grad():
    sl_preds, ml_preds = model(img.to(device))

sl_preds = torch.nn.functional.softmax(sl_preds)
sl_preds = sl_preds.cpu().numpy()
ml_preds = ml_preds.cpu().numpy()

# Plot prediction results
sl_preds = sl_preds.squeeze()
plt.figure(figsize=(20, 5))
plt.bar(classes, sl_preds)
plt.title('Softmax Prediction')
plt.xticks(rotation=90)
plt.xlabel('Food Category')
plt.ylabel('Probability')
plt.show()

ml_preds = ml_preds.squeeze()
plt.figure(figsize=(10, 5))
plt.bar(base_ing, ml_preds)
plt.title('Sigmoid Prediction')
plt.xticks(rotation=90)
plt.xlabel('Ingredient')
plt.ylabel('Probability')
plt.show()

In [None]:
import torch
import pandas as pd
from torch.utils.data import DataLoader, Dataset
import cv2
from tqdm import tqdm
from torchvision import transforms
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
from torchvision import models
from torch import nn
from torchsummary import summary

# Load data from .h5 files
train_df = pd.read_hdf('train_df.h5', 'df')
val_df = pd.read_hdf('val_df.h5', 'df')
test_df = pd.read_hdf('test_df.h5', 'df')

epochs = 8
batch_size = 16
SMALL_DATA = False
IMG_SIZE = (224, 224)

if SMALL_DATA:
    train_df = train_df[:128]
    val_df = test_df[:128]
    test_df = test_df[:128]

col_names = list(train_df.columns.values)
ing_names = col_names[:-3]
targets = ing_names

class CustomDataset(Dataset):
    def __init__(self, df):
        self.df = df

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        image_path = self.df.iloc[idx]['path']
        try:
            image = cv2.imread(image_path, 1)
            if image is None:
                raise ValueError(f"Failed to read image at {image_path}")
            if image.shape[0] == 0 or image.shape[1] == 0:
                raise ValueError(f"Invalid image size for {image_path}")

            x = cv2.resize(image, IMG_SIZE)
            x = torch.from_numpy(x.transpose(2, 0, 1)).float()

            sl_class_id = int(self.df.iloc[idx]['sl_class_id'])
            sl_onehot = np.array(sl_class_id)
            sl_onehot = (np.arange(len(classes)) == sl_onehot).astype(np.float32)
            sl_y = torch.from_numpy(sl_onehot)

            ml_y = []
            for i in range(len(base_ing)):
                ml_y.append(self.df.iloc[idx][str(i)])
            ml_y = np.array(ml_y, dtype=np.float32)

            return (x, sl_y, ml_y)
        except Exception as e:
            print(f"Error reading image at {image_path}: {str(e)}")
            # Devuelve un valor predeterminado o imagen vacía
            x = torch.zeros((3, IMG_SIZE[0], IMG_SIZE[1])).float()
            sl_y = torch.zeros(len(classes)).float()
            ml_y = np.zeros(len(base_ing), dtype=np.float32)
            return (x, sl_y, ml_y)

# Define batch size
batch_size = 64

# Create DataLoader objects for training, validation, and testing sets
train_dataset = CustomDataset(train_df)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

val_dataset = CustomDataset(val_df)
val_loader = DataLoader(val_dataset, batch_size=batch_size)

test_dataset = CustomDataset(test_df)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

# ResNet50 Model
resnet = models.resnet50(pretrained=True)
# Disable grad for all conv layers
for param in resnet.parameters():
    param.requires_grad = False

# Add two heads
resnet.last_linear = resnet.fc
n_features = resnet.fc.out_features
head_sl = nn.Sequential(
    nn.Linear(n_features, 512),
    nn.ReLU(inplace=True),
    nn.Dropout(p=0.2),
    nn.Linear(512, len(classes))
)
head_ml = nn.Sequential(
    nn.Linear(n_features, 512),
    nn.ReLU(inplace=True),
    nn.Dropout(p=0.2),
    nn.Linear(512, len(base_ing)),
    nn.Sigmoid()
)  

# Connect two heads
class FoodModel(nn.Module):
    def __init__(self, base_model, head_sl, head_ml):
        super().__init__()
        self.base_model = base_model
        self.head_sl = head_sl
        self.head_ml = head_ml

    def forward(self, x):
        x = self.base_model(x)
        sl = self.head_sl(x)
        ml = self.head_ml(x)
        return sl, ml

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = FoodModel(resnet, head_sl, head_ml)
model.to(device)

# Define Loss
sl_loss_fn = nn.CrossEntropyLoss()
ml_loss_fn = nn.BCELoss()

# Define Optimizer
optimizer = torch.optim.Adam(model.parameters())

def train_step(model, optimizer, sl_loss_fn, ml_loss_fn, data, device):
    # Retrieve data
    x, sl_y, ml_y = data

    # Convert to device
    x = x.to(device)
    sl_y = sl_y.to(device)
    ml_y = ml_y.to(device)

    # Zero out gradients
    optimizer.zero_grad()

    # Forward pass
    sl_preds, ml_preds = model(x)

    # Calculate losses
    sl_loss = sl_loss_fn(sl_preds, torch.argmax(sl_y, dim=1))
    ml_loss = ml_loss_fn(ml_preds, ml_y)
    loss = sl_loss + ml_loss

    # Backward pass
    loss.backward()

    # Step optimizer
    optimizer.step()

    # Return losses
    return sl_loss.item(), ml_loss.item()

# Lists to store loss and accuracy for plotting
train_losses = []
train_accs = []

epochs = 300
for i in tqdm(range(epochs), desc='Epochs'):
    print("Epoch ", i)
    total_sl_loss = 0.0
    total_ml_loss = 0.0
    total_samples = 0

    with tqdm(train_loader, desc='Training', total=len(train_loader), miniters=1) as pbar:
        for data in pbar: 
            SL_loss, ML_loss = train_step(model, optimizer, sl_loss_fn, ml_loss_fn, data, device)

            total_sl_loss += SL_loss
            total_ml_loss += ML_loss
            total_samples += data[0].size(0)

            # Update progress bar
            pbar.set_postfix({
                'SL Loss': total_sl_loss / total_samples,
                'ML Loss': total_ml_loss / total_samples
            })

    # Calculate average losses
    avg_sl_loss = total_sl_loss / len(train_loader)
    avg_ml_loss = total_ml_loss / len(train_loader)

    # Append losses to the lists
    train_losses.append(avg_sl_loss + avg_ml_loss)

# Plot loss
plt.figure(figsize=(10, 5))
plt.plot(train_losses, label='Train Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss')
plt.legend()
plt.show()

# Load a test image
img_path = '82439.jpg'
img = Image.open(img_path).convert('RGB')
plt.imshow(img)

# Resize image and convert to tensor
transform = transforms.Compose([transforms.Resize(IMG_SIZE), transforms.ToTensor()])
img = transform(img)
img = img.unsqueeze(0)

# Get model predictions
model.eval()
with torch.no_grad():
    sl_preds, ml_preds = model(img.to(device))

sl_preds = torch.nn.functional.softmax(sl_preds)
sl_preds = sl_preds.cpu().numpy()
ml_preds = ml_preds.cpu().numpy()

# Plot prediction results
sl_preds = sl_preds.squeeze()
plt.figure(figsize=(20, 5))
plt.bar(classes, sl_preds)
plt.title('Softmax Prediction')
plt.xticks(rotation=90)
plt.xlabel('Food Category')
plt.ylabel('Probability')
plt.show()

ml_preds = ml_preds.squeeze()
plt.figure(figsize=(10, 5))
plt.bar(base_ing, ml_preds)
plt.title('Sigmoid Prediction')
plt.xticks(rotation=90)
plt.xlabel('Ingredient')
plt.ylabel('Probability')
plt.show()
