In [None]:
%pip install tensorboard datasets -q

In [None]:
import torch
from torch import nn
from torch import optim
from torch.optim.lr_scheduler import ExponentialLR
from torch.utils.tensorboard import SummaryWriter
from torchvision import transforms

import numpy as np
import pandas as pd

from datasets import load_dataset
from PIL import Image

In [None]:
torch.manual_seed(0)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
import torchvision.models as models

# Load pre-trained ResNet model
resnet_model = models.resnet50(pretrained=True)

# Modify the final layer to match the number of attributes (40 in this case)
num_features = resnet_model.fc.in_features
resnet_model.fc = nn.Linear(num_features, 40) # 2048 in, 40 out
checkpoint = torch.load("./trained/model_resnet.pth")

resnet_model.load_state_dict(checkpoint)


In [None]:
class SimpleMLP(nn.Module):
    def __init__(self, input_dim=40):
        super(SimpleMLP, self).__init__()
        
        self.fc1 = nn.Linear(input_dim, 64)
        self.bn1 = nn.BatchNorm1d(64)
        self.relu1 = nn.LeakyReLU()
        
        self.fc2 = nn.Linear(64, 128)
        self.bn2 = nn.BatchNorm1d(128)
        self.relu2 = nn.LeakyReLU()

        self.fc3 = nn.Linear(128, 256)
        self.bn3 = nn.BatchNorm1d(256)
        self.relu3 = nn.LeakyReLU()

        self.fc4 = nn.Linear(256, 512)
        self.bn4 = nn.BatchNorm1d(512)
        self.relu4 = nn.LeakyReLU()

        self.fc5 = nn.Linear(512, 1024)
        self.bn5 = nn.BatchNorm1d(1024)
        self.relu5 = nn.LeakyReLU()

        self.fc6 = nn.Linear(1024, 1024)
        self.bn6 = nn.BatchNorm1d(1024)
        self.relu6 = nn.LeakyReLU()

        self.fc7 = nn.Linear(1024, 1024)
        self.bn7 = nn.BatchNorm1d(1024)
        self.relu7 = nn.LeakyReLU()

        self.fc8 = nn.Linear(1024, 128)
        self.bn8 = nn.BatchNorm1d(128)
        self.relu8 = nn.LeakyReLU()

        self.fc9 = nn.Linear(128, 64)
        self.bn9 = nn.BatchNorm1d(64)
        self.relu9 = nn.LeakyReLU()

        self.fc10 = nn.Linear(64, 32)
        self.bn10 = nn.BatchNorm1d(32)
        self.relu10 = nn.LeakyReLU()

        self.fc11 = nn.Linear(32, 16)
        self.bn11 = nn.BatchNorm1d(16)
        self.relu11 = nn.LeakyReLU()

        self.fc_out = nn.Linear(16, 1)

    def forward(self, x):
        # First layer block
        x1 = self.fc1(x)
        x1 = self.bn1(x1)
        x1 = self.relu1(x1)
        
        # Second layer block
        x2 = self.fc2(x1)
        x2 = self.bn2(x2)
        x2 = self.relu2(x2)

        # Skip connection from x1 to x2
        x2 = x2 + x1

        # Third layer block
        x3 = self.fc3(x2)
        x3 = self.bn3(x3)
        x3 = self.relu3(x3)

        # Skip connection from x2 to x3
        x3 = x3 + x2

        # Continue with the remaining layers
        x4 = self.fc4(x3)
        x4 = self.bn4(x4)
        x4 = self.relu4(x4)

        x5 = self.fc5(x4)
        x5 = self.bn5(x5)
        x5 = self.relu5(x5)

        # Add skip connection from x4 to x5
        x5 = x5 + x4

        x6 = self.fc6(x5)
        x6 = self.bn6(x6)
        x6 = self.relu6(x6)

        x7 = self.fc7(x6)
        x7 = self.bn7(x7)
        x7 = self.relu7(x7)

        x8 = self.fc8(x7)
        x8 = self.bn8(x8)
        x8 = self.relu8(x8)

        # Skip connection from x7 to x8
        x8 = x8 + x7

        x9 = self.fc9(x8)
        x9 = self.bn9(x9)
        x9 = self.relu9(x9)

        x10 = self.fc10(x9)
        x10 = self.bn10(x10)
        x10 = self.relu10(x10)

        x11 = self.fc11(x10)
        x11 = self.bn11(x11)
        x11 = self.relu11(x11)

        output = self.fc_out(x11)

        return output

mlp = SimpleMLP(input_dim=40)
checkpoint = torch.load("./trained/model_hairline.pth")
mlp.load_state_dict(checkpoint)


In [None]:
from PIL import Image
from io import BytesIO

# Define custom transform
custom_transform = transforms.Compose([
    transforms.CenterCrop((178, 178)),
    transforms.Resize((128, 128)),
    transforms.ToTensor()
])

from torch.utils.data import DataLoader, Dataset

class CelebADataset(Dataset):
    def __init__(self, images, targets, transform=None):
        self.images = images  # List of image paths or byte data dictionaries
        self.targets = targets  # Corresponding targets
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image_data = self.images[idx]

        if isinstance(image_data, dict):  # If image data is stored as a dict (e.g., {'bytes': ...})
            img_bytes = image_data['bytes']
            image = Image.open(BytesIO(img_bytes))
        else:  # Otherwise, assume it's a file path
            image = Image.open(image_data)

        if self.transform:
            image = self.transform(image)

        target = self.targets[idx]
        return image, target

In [None]:
#
def eval_model(model, val_loader, criterion, num_epochs=1):
    writer = SummaryWriter(log_dir="./tensorboard_logs")

    for epoch in range(num_epochs):
        model.eval()

        with torch.no_grad():
            for batch_idx, (images, targets) in enumerate(val_loader):
                images, targets = images.to(device), targets.to(device)

                outputs = model(images)
                loss = criterion(outputs, targets)
                eval_loss += loss.item()

                predictions = torch.sigmoid(outputs)
                predicted_labels = (predictions > 0.5).float()
                correct_predictions += (predicted_labels == targets).sum().item()
                total_predictions += targets.numel()

        avg_eval_loss = eval_loss / len(val_loader)
        accuracy = correct_predictions / total_predictions

        # Log to TensorBoard
        writer.add_scalar("Loss/Eval", avg_eval_loss, epoch)
        writer.add_scalar("Accuracy", accuracy, epoch)

        print(f"Epoch {epoch + 1}, Eval Loss: {avg_eval_loss:.4f}, Accuracy: {accuracy:.4f}")

    # Close the TensorBoard writer
    writer.close()

    return model

In [None]:
data_dir = "./data"
celeba = load_dataset("tpremoli/CelebA-attrs", split="all", cache_dir=data_dir)

df = celeba.to_pandas()

attr_names = df.columns[1:-1]
image_col_name = df.columns[0]

attributes = df[attr_names].values.astype(np.float32)

inputs = df[image_col_name].values

targets = (attributes + 1) / 2

train_percentage = 0.8
val_percentage = 0.1
test_percentage = 1 - train_percentage - val_percentage

training_amount = int(train_percentage * len(inputs))
validation_amount = int(val_percentage * len(inputs))
testing_amount = int(train_percentage * len(inputs))

train_inputs = inputs[0:training_amount]
val_inputs = inputs[training_amount:training_amount+validation_amount]
test_inputs = inputs[training_amount+validation_amount:]


train_targets = targets[0:training_amount]
val_targets = targets[training_amount:training_amount+validation_amount]
test_targets = targets[training_amount+validation_amount:]

train_targets = torch.tensor(train_targets, dtype=torch.float32).to(device)
val_targets = torch.tensor(val_targets, dtype=torch.float32).to(device)
test_targets = torch.tensor(test_targets, dtype=torch.float32).to(device)

# Custom dataset and dataloaders for ResNet and CNN
train_dataset = CelebADataset(train_inputs, train_targets, transform=custom_transform)
val_dataset = CelebADataset(val_inputs, val_targets, transform=custom_transform)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)

In [None]:
class CombinedModel(nn.Module):

    def __init__(self, resnet_model, mlp):
        super().__init__()
        
        self.resnet = resnet_model # Last layer is from FC 40
        self.mlp = mlp # Input expects 40


    def forward(self, x):
        resnet_output = self.resnet(x)
        output = self.mlp(resnet_output)
        return output

In [None]:
combined_model = CombinedModel(resnet_model, mlp)

eval_model(combined_model, val_loader, nn.BCEWithLogitsLoss())