In [1]:
import os
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import matplotlib.pyplot as plt
from torchvision import transforms, datasets, models
import torch.utils.data as utils
from sklearn.model_selection import train_test_split
import pandas as pd
from PIL import Image
import random

seed_value = 42
random.seed(seed_value)
torch.manual_seed(seed_value)
np.random.seed(seed_value)

In [2]:
df = pd.read_csv("list_attr_celeba.csv")

df = df.loc[:, ["Photo", "Male"]]
df.loc[(df["Male"] == -1), "Male"] = 0
df

Unnamed: 0,Photo,Male
0,000001.jpg,0
1,000002.jpg,0
2,000003.jpg,1
3,000004.jpg,0
4,000005.jpg,0
...,...,...
202594,202595.jpg,0
202595,202596.jpg,1
202596,202597.jpg,1
202597,202598.jpg,0


In [3]:
df.loc[df["Photo"] == "000005.jpg", "Male"].values[0]

0

In [14]:
class CustomDataset(utils.Dataset):
    def __init__(self, file_paths, labels, transform=None):
        self.file_paths = file_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        img_path = self.file_paths[idx]
        label = self.labels.loc[df["Photo"] == img_path[17:], "Male"].values[0]

        image = Image.open(img_path)
        image = self.transform(image)

        return image, label

In [15]:
transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(25),
    transforms.CenterCrop(227),  
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

directory = "img_align_celeba"
file_names = os.listdir(directory)

file_paths = [os.path.join(directory, file_name) for file_name in file_names]

celeb_data = CustomDataset(file_paths, df, transform)
celeb_subset = torch.utils.data.Subset(celeb_data, range(5000))

celeb_split_data = utils.random_split(celeb_subset, lengths=[0.7, 0.3])

pre_train_dataloader = utils.DataLoader(dataset=celeb_split_data[0], batch_size=64, shuffle=True)
pre_val_dataloader = utils.DataLoader(dataset=celeb_split_data[1], batch_size=64, shuffle=True)

In [16]:
pre_train_dataloader.dataset[0]

(tensor([[[-2.1179, -2.1179, -2.1179,  ..., -2.1179, -2.1179, -2.1179],
          [-2.1179, -2.1179, -2.1179,  ..., -2.1179, -2.1179, -2.1179],
          [-2.1179, -2.1179, -2.1179,  ..., -2.1179, -2.1179, -2.1179],
          ...,
          [-2.1179, -2.1179, -2.1179,  ..., -2.1179, -2.1179, -2.1179],
          [-2.1179, -2.1179, -2.1179,  ..., -2.1179, -2.1179, -2.1179],
          [-2.1179, -2.1179, -2.1179,  ..., -2.1179, -2.1179, -2.1179]],
 
         [[-2.0357, -2.0357, -2.0357,  ..., -2.0357, -2.0357, -2.0357],
          [-2.0357, -2.0357, -2.0357,  ..., -2.0357, -2.0357, -2.0357],
          [-2.0357, -2.0357, -2.0357,  ..., -2.0357, -2.0357, -2.0357],
          ...,
          [-2.0357, -2.0357, -2.0357,  ..., -2.0357, -2.0357, -2.0357],
          [-2.0357, -2.0357, -2.0357,  ..., -2.0357, -2.0357, -2.0357],
          [-2.0357, -2.0357, -2.0357,  ..., -2.0357, -2.0357, -2.0357]],
 
         [[-1.8044, -1.8044, -1.8044,  ..., -1.8044, -1.8044, -1.8044],
          [-1.8044, -1.8044,

In [17]:
class CNN(nn.Module):
    def __init__(self, num_classes):
        super(CNN, self).__init__()
        
        # Convolutional Layers
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=96, kernel_size=7, stride=1, padding=0)
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool2d(kernel_size=4, stride=4)
        self.norm1 = nn.LocalResponseNorm(size=5, alpha=0.0001, beta=0.75, k=2)
        
        self.conv2 = nn.Conv2d(in_channels=96, out_channels=256, kernel_size=5, stride=1, padding=0)
        self.relu2 = nn.ReLU()
        self.pool2 = nn.MaxPool2d(kernel_size=4, stride=4)
        self.norm2 = nn.LocalResponseNorm(size=5, alpha=0.0001, beta=0.75, k=2)
        
        self.conv3 = nn.Conv2d(in_channels=256, out_channels=384, kernel_size=3, stride=1, padding=0)
        self.relu3 = nn.ReLU()
        self.pool3 = nn.MaxPool2d(kernel_size=6, stride=6)
        
        # Fully Connected Layers
        self.fc1 = nn.Linear(384, 512)
        self.relu4 = nn.ReLU()
        self.dropout1 = nn.Dropout(0.5)
        
        self.fc2 = nn.Linear(512, 512)
        self.relu5 = nn.ReLU()
        self.dropout2 = nn.Dropout(0.5)
        
        self.fc3 = nn.Linear(512, num_classes)
        
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        # Convolutional Layers
        x = self.pool1(self.relu1(self.conv1(x)))
        x = self.norm1(x)
        
        x = self.pool2(self.relu2(self.conv2(x)))
        x = self.norm2(x)
        
        x = self.pool3(self.relu3(self.conv3(x)))
        
        # Flatten
        x = x.view(x.size(0), -1)
        
        # Fully Connected Layers
        x = self.dropout1(self.relu4(self.fc1(x)))
        x = self.dropout2(self.relu5(self.fc2(x)))
        x = self.fc3(x)
        
        x = self.sigmoid(x)
        
        return x


num_classes = 2
model = CNN(num_classes)

print(model)

CNN(
  (conv1): Conv2d(3, 96, kernel_size=(7, 7), stride=(1, 1))
  (relu1): ReLU()
  (pool1): MaxPool2d(kernel_size=4, stride=4, padding=0, dilation=1, ceil_mode=False)
  (norm1): LocalResponseNorm(5, alpha=0.0001, beta=0.75, k=2)
  (conv2): Conv2d(96, 256, kernel_size=(5, 5), stride=(1, 1))
  (relu2): ReLU()
  (pool2): MaxPool2d(kernel_size=4, stride=4, padding=0, dilation=1, ceil_mode=False)
  (norm2): LocalResponseNorm(5, alpha=0.0001, beta=0.75, k=2)
  (conv3): Conv2d(256, 384, kernel_size=(3, 3), stride=(1, 1))
  (relu3): ReLU()
  (pool3): MaxPool2d(kernel_size=6, stride=6, padding=0, dilation=1, ceil_mode=False)
  (fc1): Linear(in_features=384, out_features=512, bias=True)
  (relu4): ReLU()
  (dropout1): Dropout(p=0.5, inplace=False)
  (fc2): Linear(in_features=512, out_features=512, bias=True)
  (relu5): ReLU()
  (dropout2): Dropout(p=0.5, inplace=False)
  (fc3): Linear(in_features=512, out_features=2, bias=True)
  (sigmoid): Sigmoid()
)


In [18]:
class EarlyStopping:
    def __init__(self, patience=5, delta=0, verbose=False):
        self.patience = patience
        self.delta = delta
        self.verbose = verbose
        self.counter = 0
        self.worst_score = None
        self.early_stop = False

    def __call__(self, val_loss):
        if self.worst_score is None:
            self.worst_score = val_loss
        elif val_loss < self.worst_score - self.delta:
            self.counter += 1
            if self.verbose:
                print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.worst_score = val_loss
            self.counter = 0

        return self.early_stop

In [19]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [20]:
EPOCHS = 10
def train_loop(model, train_dataloader, val_dataloader, criterion, optimizer):
    val_acc = []
    earlyStopper = EarlyStopping()
    print('Training...')
    for epoch in range(EPOCHS):
        print(f'--Epoch {epoch+1}--')
        model.train()
        total_loss = 0
        correct = 0
        for batch_idx, (data, target) in enumerate(train_dataloader):
            print(f"Batch: {batch_idx}")
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()

        train_accuracy = 100. * correct / len(train_dataloader.dataset)

        model.eval()
        val_loss = 0
        correct = 0
        with torch.no_grad():
            for data, target in val_dataloader:
                output = model(data)
                loss = criterion(output, target)
                val_loss += loss.item()
                pred = output.argmax(dim=1, keepdim=True)
                correct += pred.eq(target.view_as(pred)).sum().item()

        if earlyStopper(loss.item()):
            print("Early stopping")
            break

        val_accuracy = 100. * correct / len(val_dataloader.dataset)
        val_acc.append(val_accuracy)
        print(f'Epoch {epoch+1}, Loss: {total_loss/len(train_dataloader):.4f}, Accuracy: {train_accuracy:.2f}%, Test Loss: {val_loss/len(val_dataloader):.4f}, Test Accuracy: {val_accuracy:.2f}%')

In [21]:
def test_loop(model, test_dataloader, criterion):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_dataloader:
            output = model(data)
            loss = criterion(output, target)
            test_loss += loss.item()
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
    print(f'Test Loss: {test_loss/len(test_dataloader):.4f}, Test Accuracy: {100. * correct / len(test_dataloader.dataset):.2f}%')

In [22]:
train_loop(model, pre_train_dataloader, pre_val_dataloader, criterion, optimizer)

torch.save(model, 'celeba_pretrained_model_2.pth')

Training...
--Epoch 1--
Batch: 0
Batch: 1
Batch: 2
Batch: 3
Batch: 4
Batch: 5
Batch: 6
Batch: 7
Batch: 8
Batch: 9
Batch: 10
Batch: 11
Batch: 12
Batch: 13
Batch: 14
Batch: 15
Batch: 16
Batch: 17
Batch: 18
Batch: 19
Batch: 20
Batch: 21
Batch: 22
Batch: 23
Batch: 24
Batch: 25
Batch: 26
Batch: 27
Batch: 28
Batch: 29
Batch: 30
Batch: 31
Batch: 32
Batch: 33
Batch: 34
Batch: 35
Batch: 36
Batch: 37
Batch: 38
Batch: 39
Batch: 40
Batch: 41
Batch: 42
Batch: 43
Batch: 44
Batch: 45
Batch: 46
Batch: 47
Batch: 48
Batch: 49
Batch: 50
Batch: 51
Batch: 52
Batch: 53
Batch: 54
Epoch 1, Loss: 0.6787, Accuracy: 59.66%, Test Loss: 0.7018, Test Accuracy: 54.40%


In [23]:
gender_data = datasets.ImageFolder(root='gender', transform=transform)
gender_split_data = utils.random_split(gender_data, lengths=[0.4, 0.3, 0.3])

train_dataloader = utils.DataLoader(dataset=gender_split_data[0], batch_size=64, shuffle=True)
val_dataloader = utils.DataLoader(dataset=gender_split_data[1], batch_size=64, shuffle=True)
test_dataloader = utils.DataLoader(dataset=gender_split_data[2], batch_size=64, shuffle=True)

In [24]:
for param in model.parameters():
    param.requires_grad = False

model.fc3 = nn.Linear(model.fc3.in_features, 2)

In [25]:
train_loop(model, train_dataloader, val_dataloader, criterion, optimizer)
test_loop(model, test_dataloader, criterion)

Training...
--Epoch 1--
Batch: 0
Batch: 1
Batch: 2
Batch: 3
Batch: 4
Batch: 5
Batch: 6
Batch: 7
Batch: 8
Batch: 9
Batch: 10
Batch: 11
Batch: 12
Batch: 13
Batch: 14
Batch: 15
Batch: 16
Batch: 17
Batch: 18
Batch: 19
Batch: 20
Batch: 21
Batch: 22
Batch: 23
Batch: 24
Batch: 25
Batch: 26
Batch: 27
Batch: 28
Batch: 29
Batch: 30
Batch: 31
Batch: 32
Batch: 33
Batch: 34
Batch: 35
Batch: 36
Batch: 37
Batch: 38
Batch: 39
Batch: 40
Batch: 41
Batch: 42
Batch: 43
Batch: 44
Batch: 45
Batch: 46
Batch: 47
Batch: 48
Batch: 49
Batch: 50
Batch: 51
Batch: 52
Batch: 53
Batch: 54
Batch: 55
Batch: 56
Batch: 57
Batch: 58
Batch: 59
Batch: 60
Batch: 61
Epoch 1, Loss: 0.6953, Accuracy: 49.62%, Test Loss: 0.6939, Test Accuracy: 49.86%
Test Loss: 0.6944, Test Accuracy: 49.93%
