In [3]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import os
import warnings
warnings.filterwarnings('ignore')
from tqdm.notebook import tqdm
%matplotlib inline
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report

# mac optimization codes
import torch
if torch.backends.mps.is_available():
    mps_device = torch.device("mps")
    x = torch.ones(1, device=mps_device)
    print (x)
else:
    print ("MPS device not found.")

import tensorflow as tf
print(tf.test.gpu_device_name())
print(tf.config.list_physical_devices('GPU'))

tensor([1.], device='mps:0')
/device:GPU:0
[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


2023-11-09 13:12:04.457496: I metal_plugin/src/device/metal_device.cc:1154] Metal device set to: Apple M1 Pro
2023-11-09 13:12:04.457518: I metal_plugin/src/device/metal_device.cc:296] systemMemory: 16.00 GB
2023-11-09 13:12:04.457521: I metal_plugin/src/device/metal_device.cc:313] maxCacheSize: 5.33 GB
2023-11-09 13:12:04.457553: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:306] Could not identify NUMA node of platform GPU ID 0, defaulting to 0. Your kernel may not have been built with NUMA support.
2023-11-09 13:12:04.457565: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:272] Created TensorFlow device (/device:GPU:0 with 0 MB memory) -> physical PluggableDevice (device: 0, name: METAL, pci bus id: <undefined>)


In [52]:
#Load the Dataset

CELEBA_DATA_PATH = './Data/celeba'
IMG_PATH = os.path.join(CELEBA_DATA_PATH, 'img_align_celeba/img_align_celeba')
CROPPED_IMG_PATH = os.path.join(CELEBA_DATA_PATH, 'processed_img/')

ATTR_PATH = os.path.join(CELEBA_DATA_PATH,'list_attr_celeba.csv') #for gender and age


def getImagePath(image_id):
    return os.path.join(IMG_PATH,image_id)

def getCroppedPath(image_id):
    return os.path.join(CROPPED_IMG_PATH,image_id)

import pandas as pd
from sklearn.model_selection import train_test_split

# Load the attributes
attributes_df = pd.read_csv(ATTR_PATH)
attributes_df['Gender'] = attributes_df['Male'].map({1: 'Male', -1: 'Female'})
attributes_df['Age'] = attributes_df['Young'].map({1: 'Young', -1: 'Old'})
attributes_df = attributes_df[['image_id', 'Gender', 'Age']]

# Get first 50k
cropped_images_df = attributes_df.head(50000)

# Split the data into training and validation sets (80-20 split for example)
train_df, val_test_df = train_test_split(cropped_images_df, test_size=0.2, random_state=42)
val_df, test_df = train_test_split(val_test_df, test_size=0.5, random_state=42)

# Assign partition labels: 0 for train and 1 for validation
train_df['partition'] = 0
val_df['partition'] = 1
test_df['partition'] = 2

# Combine back to a single dataframe
partitioned_df = pd.concat([train_df, val_df, test_df])

PARTITION_OUTPUT_PATH = os.path.join(CELEBA_DATA_PATH,"partitioned.csv")

# Export the partition data to a new CSV file
try:
    print(PARTITION_OUTPUT_PATH)
    partitioned_df.to_csv(PARTITION_OUTPUT_PATH, index=False)
except Exception as e:
    print(e)

# The 'new_partition.csv' file will now have the image_id, Gender, Age, and partition columns
partitioned_df.head()


./Data/celeba/partitioned.csv


Unnamed: 0,image_id,Gender,Age,partition
39087,039088.jpg,Female,Old,0
30893,030894.jpg,Male,Old,0
45278,045279.jpg,Female,Young,0
16398,016399.jpg,Female,Young,0
13653,013654.jpg,Male,Old,0


In [86]:
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from PIL import Image
import pandas as pd

# Define a custom dataset
class CelebADataset(Dataset):
    def __init__(self, dataframe, img_dir, transform=None):
        self.dataframe = dataframe
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        img_name = os.path.join(self.img_dir, self.dataframe.iloc[idx, 0])
        image = Image.open(img_name)

        # Assuming the 'Gender' column is the second column and contains numeric values after mapping
        label = self.dataframe.iloc[idx, 1]

        # Convert label to tensor
        label = torch.tensor(label, dtype=torch.float32)

        if self.transform:
            image = self.transform(image)

        return image, label


# Load the partitioned dataset
df = pd.read_csv(PARTITION_OUTPUT_PATH)

# Assign binary labels to the 'Gender' column
df['Gender'] = df['Gender'].map({'Male': 1, 'Female': 0})

# Split the dataframe into training and validation sets
train_df = df[df['partition'] == 0]
val_df = df[df['partition'] == 1]
test_df = df[df['partition'] == 2]

# Define the transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Function to create a data loader given the image paths
def create_data_loader(df, img_dir, transform, batch_size=32):
    dataset = CelebADataset(dataframe=df, img_dir=img_dir, transform=transform)
    loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
    return loader

# Assuming IMG_PATH and CROPPED_IMG_PATH are defined
train_loader = create_data_loader(train_df, IMG_PATH, transform)
val_loader = create_data_loader(val_df, IMG_PATH, transform)
test_loader = create_data_loader(test_df, IMG_PATH, transform)

cropped_train_loader = create_data_loader(train_df, CROPPED_IMG_PATH, transform)
cropped_val_loader = create_data_loader(val_df, CROPPED_IMG_PATH, transform)
cropped_test_loader = create_data_loader(test_df, CROPPED_IMG_PATH, transform)

In [87]:
import torch.nn as nn
import torch.optim as optim

# Define a simple CNN model
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(64 * 28 * 28, 512)
        self.fc2 = nn.Linear(512, 1) 
        self.dropout = nn.Dropout(0.5)


    def forward(self, x):
        x = self.pool(nn.functional.relu(self.conv1(x)))
        x = self.pool(nn.functional.relu(self.conv2(x)))
        x = self.pool(nn.functional.relu(self.conv3(x)))
        x = x.view(-1, 64 * 28 * 28)  # Flatten the layer
        x = nn.functional.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

In [88]:
from torch.optim.lr_scheduler import ReduceLROnPlateau
from tqdm import tqdm

class EarlyStopping:
    """Early stops the training if validation loss doesn't improve after a given patience."""
    def __init__(self, patience=7, verbose=False, delta=0, checkpoint_name="checkpoint.pt"):
        """
        Args:
            patience (int): How long to wait after last time validation loss improved.
                            Default: 7
            verbose (bool): If True, prints a message for each validation loss improvement. 
                            Default: False
            delta (float): Minimum change in the monitored quantity to qualify as an improvement.
                            Default: 0
            checkpoint_name (str): Name of the checkpoint file. 
                            Default: "checkpoint.pt"
        """
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = float('inf')
        self.delta = delta
        self.checkpoint_name = checkpoint_name 

    def __call__(self, val_loss, model):
        score = -val_loss
        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
        elif score < self.best_score + self.delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            self.counter = 0

    def save_checkpoint(self, val_loss, model):
        """Saves model when validation loss decreases."""
        if self.verbose:
            print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}).  Saving model ...')
        torch.save(model.state_dict(), self.checkpoint_name) 
        self.val_loss_min = val_loss

        
def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs, device, checkpoint_name):
    # Initialize early stopping
    early_stopping = EarlyStopping(patience=5, verbose=True, checkpoint_name=checkpoint_name)

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        train_pbar = tqdm(train_loader, unit="batch")
        for inputs, labels in train_pbar:
            train_pbar.set_description(f"Epoch {epoch+1}/{num_epochs}")
            inputs, labels = inputs.to(device), labels.to(device).float()
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels.view(-1, 1))  
            loss.backward()
            optimizer.step()
            running_loss += loss.item() * inputs.size(0)
            train_pbar.set_postfix(loss=loss.item())

        train_loss = running_loss / len(train_loader.dataset)

        model.eval()
        val_running_loss = 0.0
        val_pbar = tqdm(val_loader, unit="batch")
        with torch.no_grad():
            for inputs, labels in val_pbar:
                val_pbar.set_description(f"Val Epoch {epoch+1}/{num_epochs}")
                inputs, labels = inputs.to(device), labels.to(device).float()
                outputs = model(inputs)
                loss = criterion(outputs, labels.view(-1, 1))  
                val_running_loss += loss.item() * inputs.size(0)
                val_pbar.set_postfix(loss=loss.item())

        val_loss = val_running_loss / len(val_loader.dataset)
        print(f'Epoch {epoch+1}/{num_epochs} Train loss: {train_loss:.4f} Val loss: {val_loss:.4f}')

        # Call early stopping
        early_stopping(val_loss, model)
        if early_stopping.early_stop:
            print("Early stopping")
            break

        scheduler.step(val_loss)

    # Load the last checkpoint with the best model
    model.load_state_dict(torch.load(checkpoint_name))
    return model

In [89]:
import torch
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch import nn

# For non-cropped images
device = torch.device("cuda:0" if torch.cuda.is_available() else "mps")
model_non_cropped = SimpleCNN().to(device)
# model_non_cropped = SimpleCNN().to(mps_device)
optimizer_non_cropped = torch.optim.Adam(model_non_cropped.parameters(), lr=0.001)
scheduler_non_cropped = ReduceLROnPlateau(optimizer_non_cropped, 'min', patience=3, verbose=True)
criterion = torch.nn.BCEWithLogitsLoss()

# For cropped images
model_cropped = SimpleCNN().to(device)
optimizer_cropped = torch.optim.Adam(model_cropped.parameters(), lr=0.001)
scheduler_cropped = ReduceLROnPlateau(optimizer_cropped, 'min', patience=3, verbose=True)

# Define the number of epochs
num_epochs = 1

# Call the training function for non-cropped images
trained_model_non_cropped = train_model(
    model_non_cropped,
    train_loader,
    val_loader,
    criterion,
    optimizer_non_cropped,
    scheduler_non_cropped,
    num_epochs,
    device,
    checkpoint_name='model_non_cropped_checkpoint.pt'
)

# Call the training function for cropped images
trained_model_cropped = train_model(
    model_cropped,
    cropped_train_loader,
    cropped_val_loader,
    criterion,
    optimizer_cropped,
    scheduler_cropped,
    num_epochs,
    device,
    checkpoint_name='model_cropped_checkpoint.pt'
)


Epoch 1/1:   4%|▌              | 50/1250 [00:06<02:26,  8.20batch/s, loss=0.415]


KeyboardInterrupt: 

In [91]:
from sklearn.metrics import accuracy_score

def evaluate_model(model, test_loader, device):
    model.eval()  # Set the model to evaluation mode
    y_true = []
    y_pred = []
    
    with torch.no_grad():  # No need to track the gradients
        for inputs, labels in test_loader:
            inputs = inputs.to(device)
            labels = labels.to(device).float()  
            outputs = model(inputs)
            
            # Apply sigmoid and round to get the binary class
            predictions = torch.sigmoid(outputs).round().cpu().numpy()
            y_pred.extend(predictions)
            y_true.extend(labels.cpu().numpy())
    
    # Calculate accuracy
    accuracy = accuracy_score(y_true, y_pred)
    return accuracy, y_true, y_pred

checkpoint = 'model_non_cropped_checkpoint.pt'  
model = SimpleCNN().to(device)
model.load_state_dict(torch.load(checkpoint))

checkpoint = 'model_cropped_checkpoint.pt'  
cropped_model = SimpleCNN().to(device)
cropped_model.load_state_dict(torch.load(checkpoint))

# test both against cropped
accuracy, true_labels, predicted_labels = evaluate_model(model, cropped_test_loader, device)
print(f"Test Accuracy: {accuracy * 100:.2f}%")

accuracy, true_labels, predicted_labels = evaluate_model(cropped_model, cropped_test_loader, device)
print(f"Test Accuracy: {accuracy * 100:.2f}%")


Test Accuracy: 61.44%
Test Accuracy: 91.18%
