In [29]:
import warnings

warnings.filterwarnings("ignore")

## 1.1 Data preprocessing

In [30]:
# necessary imports
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from tqdm.notebook import tqdm

from torch.utils.tensorboard import SummaryWriter

import torchvision.transforms as transforms
from torchvision.io import read_image

In [31]:
from torchvision.transforms import v2


def load_img(fname):
    """
    Load an image from file, do transformation (including possible augmentation) and return it as torch.tensor

    :param fname: path to jpg image
    """
    img = read_image(fname)
    x = img / 255.

    # Write your code here
    transform = transforms.Compose(
        [
            transforms.Resize((200, 200)),                # Resize image to 224x224 (or the size of your model input)
            transforms.RandomHorizontalFlip(p=0.5),       # Random horizontal flip with 50% probability
            transforms.RandomRotation(degrees=15),        # Randomly rotate the image by up to 15 degrees
            transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                                 std=[0.229, 0.224, 0.225]),
            v2.ToDtype(torch.float32, scale=True),  # Convert to float and scale
            # Keep color augmentations minimal as we're detecting hair color:
        ]
    )

    return transform(x)

In [32]:
from os import walk

decode = {
    0: "1-5",1: "5-10",2: "10-15",3: "15-20",4: "20-25",5: "25-30",6: "30-35",7: "35-40",8: "40-45",9: "45-50",10: "50-55",11: "55-60",12: "60-65",13: "65-70",14: "70-75",15: "75-80",16: "80-85",17: "85-90",18: "90-95",19: "95-100"
}
encode = {
    "1-5":0,"5-10":1,"10-15":2,"15-20":3,"20-25":4,"25-30":5,"30-35":6,"35-40":7,"40-45":8,"45-50":9,"50-55":10,"55-60":11,"60-65":12,"65-70":13,"70-75":14,"75-80":15,"80-85":16,"85-90":17,"90-95":18,"95-100":19
}
img_path = "C:/Users/pv010/OneDrive/Рабочий стол/Homework/PMDL/project/facial-age"
# _id = []
# _age = []
# dirs_ages = []
# for (dirpath, dirnames, filenames) in walk(img_path):
#     dirs_ages.extend(dirnames)
#     break
# for dire in dirs_ages:
#     for (dirpath, dirnames, filenames) in walk(img_path + "/" + dire):
#         _id.extend(list(map(lambda x:dire+"/"+x,filenames)))
#         _age.extend([to_range[int(dire) // 5] for _ in range(len(filenames))])
#         break
# 
# features_data = {"ID": _id, "Age": _age}

features = pd.read_csv("data.csv")
features['Age'] = list(map(lambda x:encode[x], features['Age']))


In [33]:
#img_path = "/kaggle/input/pmldl-week-2-dnn-training-with-tracking-tools/archive"

# Image attributes
#train_features = pd.read_csv(f"{img_path}/train.csv")

# Load and transform images 
images = torch.stack(
    [load_img(f"{img_path}/{item['ID']}") for _, item in features.iterrows()])

# Write your code here
# Select label(s) from train_features
labels = features.get('Age')
# Leave values that only 1 or 0 and convert to float just for simplicity
labels = torch.from_numpy(labels.to_numpy()).float()

In [34]:
# Just some checking of shapes
images.shape, labels.shape

(torch.Size([9773, 3, 224, 224]), torch.Size([9773]))

## 1.3 Data loaders creation

In [35]:
from torch.utils.data import TensorDataset, DataLoader

processed_dataset = TensorDataset(images, labels)

# Write your code here
# Set proportion and split dataset into train and validation parts
proportion = 0.9

train_dataset, val_dataset, test_dataset = torch.utils.data.random_split(
    processed_dataset,
    [int(len(images) * 0.7)+1, int(len(images) * 0.15)+1, int(len(images) * 0.15)],
)


In [36]:
# Create Dataloaders for training and validation 
# Dataloader is iterable object over dataset
batch_size = 64
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)


## 2. Training


In [89]:
import torch
import torch.nn as nn
import torch.nn.functional as F


# class CNNClassificationModel(nn.Module):
#     """
#     MLP (multi-layer perceptron) based classification model for MNIST
#     """
# 
#     def __init__(self, num_classes=20):
#         super(CNNClassificationModel, self).__init__()
# 
#         # Add fully connected layers to nn.Sequential to create MLP
#         # First layer should take 28x28 vector
#         # last layer should return vector of size num_classes
#         # do not forget to add activation function between layers
# 
#         self.block1 = nn.Sequential(
#             nn.BatchNorm2d(3),
#             nn.Conv2d(3, 4, kernel_size=(3, 3), stride=2),
#             nn.ReLU(),
#             nn.BatchNorm2d(4),
#         )
# 
#         self.block2 = nn.Sequential(
#             nn.Conv2d(4, 8, kernel_size=(3, 3)),
#             nn.ReLU(),
#             nn.BatchNorm2d(8),
#         )
# 
#         self.out = nn.Sequential(
#             nn.Linear(95048, 1024),
#             nn.ReLU(),
#             nn.Dropout(0.3),
#             nn.Linear(1024, 32),
#             nn.ReLU(),
#             nn.Dropout(0.4),
#             nn.Linear(32, num_classes),
#         )
# 
#     def forward(self, x):
#         x = self.block1(x)
#         x = self.block2(x)
#         x = x.view(x.size(0), -1)
#         x = self.out(x)
#         return x




In [108]:

class CNNClassificationModel(nn.Module):
    """
    CNN-based classification model for age prediction with 20 classes.
    """

    def __init__(self, num_classes=20):  # Changed to 20 classes
        super(CNNClassificationModel, self).__init__()

        # Define the convolutional layers
        self.block1 = nn.Sequential(
            nn.BatchNorm2d(3),  # assuming 3-channel input (RGB image)
            nn.Conv2d(3, 16, kernel_size=(3, 3), stride=2, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),  # reduces spatial size by half
            nn.BatchNorm2d(16)
        )

        self.block2 = nn.Sequential(
            nn.Conv2d(16, 32, kernel_size=(3, 3), padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.BatchNorm2d(32)
        )

        self.block3 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=(3, 3), padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.BatchNorm2d(64)
        )

        self.block4 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=(3, 3), padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.BatchNorm2d(128)
        )

        # Flatten and fully connected layers for classification
        self.fc = nn.Sequential(
            nn.Linear(6272, 512),  # assuming 128 filters with 3x3 spatial size after conv layers
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, 128),
            nn.ReLU(),
            nn.Dropout(0.4),
            nn.Linear(128, num_classes)  # final output layer with 20 classes
        )

    def forward(self, x):
        x = self.block1(x)  # Apply first block
        x = self.block2(x)  # Apply second block
        x = self.block3(x)  # Apply third block
        x = self.block4(x)  # Apply fourth block
        
        x = x.view(x.size(0), -1)  # Flatten the output for fully connected layers
        x = self.fc(x)  # Apply the fully connected layers
        return x


In [103]:
def train(
        model,
        optimizer,
        loss_fn,
        train_loader,
        val_loader,
        writer=None,
        epochs=1,
        device="cpu",
        ckpt_path="best.pt",
):
    # best score for checkpointing
    best = 0.0

    # iterating over epochs
    for epoch in range(epochs):
        # training loop description
        train_loop = tqdm(
            enumerate(train_loader, 0), total=len(train_loader), desc=f"Epoch {epoch}"
        )
        model.train()
        train_loss = 0.0
        # iterate over dataset 
        for data in train_loop:
            # Write your code here
            # Move data to a device, do forward pass and loss calculation, do backward pass and run optimizer
            id, (inputs, labels) = data
            inputs = inputs.to(device)
            labels = labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            labels = labels.type(torch.int64)
            loss = loss_fn(outputs, labels)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()
            train_loop.set_postfix({"loss": loss.item()})
        # write loss to tensorboard
        if writer:
            writer.add_scalar("Loss/train", train_loss / len(train_loader), epoch)

        # Validation
        correct = 0
        total = 0
        with torch.no_grad():
            model.eval()  # evaluation mode
            val_loop = tqdm(enumerate(val_loader, 0), total=len(val_loader), desc="Val")
            for data in val_loop:
                id, (inputs, labels) = data

                # Write your code here
                # Get predictions and compare them with labels
                inputs = inputs.to(device)
                labels = labels.to(device)
                outputs = model(inputs)
                labels = labels.type(torch.int64)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                
                for i, j in zip(predicted, labels):
                    if i == j: correct += 1

                val_loop.set_postfix({"acc": correct / total})

            if correct / total > best:
                torch.save(model.state_dict(), ckpt_path)
                best = correct / total


## 2.3 Combining everything together

In [109]:
import torch.optim as optim

# Write your code here
# Pick optimizer from torch.optim and loss function loss_fn from torch.nn that suits best the model
# SummaryWriter is used by tensorboard and could be set None
model = CNNClassificationModel()

train(
    model=model,
    optimizer=optim.Adam(model.parameters(), lr=0.001),
    loss_fn=nn.CrossEntropyLoss(),
    train_loader=train_loader,
    val_loader=val_loader,
    device='cpu',
    writer=SummaryWriter(),
    epochs=3
)


Epoch 0:   0%|          | 0/107 [00:00<?, ?it/s]

Val:   0%|          | 0/23 [00:00<?, ?it/s]

Epoch 1:   0%|          | 0/107 [00:00<?, ?it/s]

Val:   0%|          | 0/23 [00:00<?, ?it/s]

Epoch 2:   0%|          | 0/107 [00:00<?, ?it/s]

Val:   0%|          | 0/23 [00:00<?, ?it/s]

## 2.4 Inference
Here you need to perform inference of trained model on test data. 

Load the best checkpoint from training to the model and run inference

In [None]:
# load best checkpoint to model
model = CNNClassificationModel()
ckpt = torch.load("best.pt")
model.load_state_dict(ckpt)

In [None]:
def predict(model, test_loader, device):
    """
    Run model inference on test data
    """
    predictions = []
    with torch.no_grad():
        model.eval()  # evaluation mode
        test_loop = tqdm(enumerate(test_loader, 0), total=len(test_loader), desc="Test")

        for inputs in test_loop:
            # Write your code here
            # Similar to validation part in training cell
            id, pred = inputs
            pred = pred.to(device)
            _, predicted = torch.max(model(pred).data, 1)

            # Extend overall predictions by prediction for a batch
            predictions.extend([i.item() for i in predicted])
        return predictions


def load_training_img(fname):
    img = read_image(fname)
    x = img / 255.

    transform = transforms.Compose(
        [
            # Write your code here
            # Do not apply data augmentation as in training function, but make sure the size of input image is the same
            v2.ToDtype(torch.float32, scale=True),
        ]
    )

    return transform(x)

In [None]:
# process test data and run inference on it
test_features = pd.read_csv(f"{img_path}/test.csv")
images = torch.stack(
    [load_img(f"{img_path}/img_align_celeba/test/{item['image_id']}") for _, item in test_features.iterrows()])

test_loader = DataLoader(images, batch_size=batch_size, shuffle=False)
predictions = predict(model, test_loader, device='cpu')

# generate the submission file
submission_df = pd.DataFrame(columns=['ID', 'Blond_Hair'])
submission_df['ID'] = test_features.index
submission_df['Blond_Hair'] = predictions
submission_df.to_csv('submission.csv', index=False)
submission_df.head()