In [25]:
import torch
import torch.nn as nn
import torchvision.transforms as tr
import torchvision.models as models
import torchvision.utils as vutils
from torch.utils.data import random_split, DataLoader
from torch_snippets import *
from torchinfo import summary

from efficientnet_pytorch import EfficientNet

import albumentations as A
from albumentations.pytorch import ToTensorV2

import os
from glob import glob
import cv2
from PIL import Image

import numpy as np

from tqdm.notebook import tqdm

device = torch.device("mps")

In [26]:
def check_image_sizes(root_dir, print_uniformity=True):
    widths = []
    heights = []
    for subdir, dirs, files in os.walk(root_dir):
        for file in files:
            try:
                with Image.open(os.path.join(subdir, file)) as img:
                    width, height = img.size
                    widths.append(width)
                    heights.append(height)
            except IOError:
                # Skip files that are not images
                continue

    if not widths or not heights:
        print("No images found.")
        return None

    median_width = np.median(widths)
    median_height = np.median(heights)

    if print_uniformity:
        if len(set(widths)) == 1 and len(set(heights)) == 1:
            print("All images have the same size.")
        else:
            print("Images vary in size and may need to be resized.")

    return median_width, median_height

root_dir = 'data/data/train'  
median_width, median_height = check_image_sizes(root_dir)
print(f"Median Width: {median_width}, Median Height: {median_height}")


In [27]:
train_transform = A.Compose(
    [
        A.SmallestMaxSize(max_size=160),
        A.ShiftScaleRotate(shift_limit=0.05, scale_limit=0.05, rotate_limit=15, p=0.5),
        A.RandomCrop(height=128, width=128),
        A.RGBShift(r_shift_limit=15, g_shift_limit=15, b_shift_limit=15, p=0.5),
        A.RandomBrightnessContrast(p=0.5),
        A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
        ToTensorV2(),
    ]
)


val_transform = A.Compose(
    [
        A.SmallestMaxSize(max_size=160),
        A.CenterCrop(height=128, width=128),
        A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
        ToTensorV2(),
    ]
)

In [89]:
class HumanFaceDataset(
    torch.utils.data.Dataset
):
    def __init__(self, path, flag = "train", transforms = None):
        self.path = path
        self.flag = flag
        self.transforms = transforms
        if self.flag == "train":
            self.objects = sorted(glob(f"{self.path}/**/*.jpg", recursive=True))
        else:
            self.objects = sorted(glob(f"{self.path}/*.jpg", recursive=True))

            
        self.labels = self.find_labels(self.objects)
        self.label2idx = {label: idx for idx, label in enumerate(sorted(set(self.labels)))}
        self.idx2label = {idx: label for label, idx in self.label2idx.items()}
    
    def find_labels(self, list_paths):
        labels = []
        for path in list_paths:
            label = os.path.basename(os.path.dirname(path))
            labels.append(label)
        return labels
    def __len__(self):
        return len(self.objects)
    
    def __getitem__(self, idx):
        image_path = str(self.objects[idx])
        label = self.labels[idx]
        label = self.label2idx[label]
        
        image = cv2.imread(image_path, cv2.IMREAD_COLOR)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        if self.transforms is not None:
            image = self.transforms(image = image)["image"]
            
        label = torch.tensor(label, dtype = torch.long)
        if self.flag == "train":
            return image, label
        else:
            return image, image_path
    
full_ds = HumanFaceDataset(
    path = "data/data", transforms = train_transform
)

train_size = int(len(full_ds)  * 0.9)
val_size = len(full_ds) - train_size

train_ds, val_ds = random_split(full_ds, [train_size, val_size])


len(train_ds)

126000

In [73]:
train_dl = DataLoader(
    train_ds, batch_size = 32, shuffle = True, pin_memory = True
)

val_dl = DataLoader(
    val_ds, batch_size = 32, shuffle = False, pin_memory = True, drop_last = True
)

In [74]:
num_classes = len(full_ds.label2idx)
num_classes

7000

In [75]:
#we gonna try several models
def device_choose(var = "mps"):
    if var == "mps":
        device = torch.device("mps")
    elif var == "cpu":
        device = torch.device("cpu")
    elif var == "cuda":
        device = torch.device("cuda")
    return device
    
def choose_model(num_classes, var="en"):
    if var == "en":
        model = EfficientNet.from_pretrained("efficientnet-b0", num_classes=num_classes)
    elif var == "rs18":
        model = models.resnet18(pretrained=True)
        in_features = model.fc.in_features
        model.fc = nn.Linear(in_features, num_classes)
    elif var == "vgg":
        model = models.vgg16(pretrained=True)
        in_features = model.classifier[6].in_features  
        model.classifier[6] = nn.Linear(in_features, num_classes)

    return model

num_classes = num_classes
model_type = "en"  
device = device_choose()  

model = choose_model(num_classes, model_type).to(device)
model = model.to(device)
summary(model, (1, 3, 224, 224))

Loaded pretrained weights for efficientnet-b0


Layer (type:depth-idx)                             Output Shape              Param #
EfficientNet                                       [1, 7000]                 --
├─Conv2dStaticSamePadding: 1-1                     [1, 32, 112, 112]         864
│    └─ZeroPad2d: 2-1                              [1, 3, 225, 225]          --
├─BatchNorm2d: 1-2                                 [1, 32, 112, 112]         64
├─MemoryEfficientSwish: 1-3                        [1, 32, 112, 112]         --
├─ModuleList: 1-4                                  --                        --
│    └─MBConvBlock: 2-2                            [1, 16, 112, 112]         --
│    │    └─Conv2dStaticSamePadding: 3-1           [1, 32, 112, 112]         288
│    │    └─BatchNorm2d: 3-2                       [1, 32, 112, 112]         64
│    │    └─MemoryEfficientSwish: 3-3              [1, 32, 112, 112]         --
│    │    └─Conv2dStaticSamePadding: 3-4           [1, 8, 1, 1]              264
│    │    └─MemoryEfficientSwish

In [76]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr = 0.000001)
optimizer = torch.optim.Adam(model.parameters(), lr = 0.001)

num_epochs = 10

lr_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode = "min", factor = 0.1, patience = 5, threshold = 0.0001, threshold_mode = "abs"
)

In [77]:
model = model.to(torch.device("mps"))
print(next(model.parameters()).device)

assert next(model.parameters()).device.type == "mps", "model params are not on mps device"

In [78]:
def check(train_ds, val_ds):
    train_indices = list(range(1000))
    train_ds = torch.utils.data.Subset(train_ds, train_indices)

    val_indices = list(range(1000))
    val_ds = torch.utils.data.Subset(val_ds, val_indices)
    return train_ds, val_ds

train_ds, val_ds = check(train_ds, val_ds)

train_dl = DataLoader(train_ds, batch_size=4) 
val_dl = DataLoader(val_ds, batch_size=4, drop_last = True)  

In [79]:
import numpy as np

# Early stopping parameters
best_val_loss = np.inf
patience = 5  # How many epochs to wait after last time validation loss improved.
patience_counter = 0  # Tracks how many epochs have passed since last improvement.

for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for idx, (image, label) in tqdm(enumerate(train_dl), total=len(train_dl)):
        image = image.to(device)
        label = label.to(device)
        
        optimizer.zero_grad()
        output = model(image)
        loss = criterion(output, label)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    avg_train_loss = total_loss / len(train_dl)
    print(f"Epoch: [{epoch+1}/{num_epochs}], Avg Train Loss: {avg_train_loss:.4f}")

    total_val_loss = 0
    model.eval()
    with torch.no_grad():
        total = 0
        correct = 0
        for idx, (image, label) in tqdm(enumerate(val_dl), total=len(val_dl)):  
            image = image.to(device)
            label = label.to(device)
            output = model(image)
            val_loss = criterion(output, label)
            total_val_loss += val_loss.item()
            _, predicts = torch.max(output, 1)
            total += label.size(0)
            correct += (predicts == label).sum().item()
        avg_val_loss = total_val_loss / len(val_dl)
        val_acc = 100 * correct / total
    print(f"Validation Accuracy: {val_acc:.4f}%, Avg Validation Loss: {avg_val_loss:.4f}")

    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        patience_counter = 0  # Reset patience counter
        torch.save(model.state_dict(), f"model_weights/best_model.pt")  # Save the best model
        print("Validation loss decreased, saving model.")
    else:
        patience_counter += 1
        print(f"Validation loss did not decrease, patience counter: {patience_counter}/{patience}")

    if patience_counter >= patience:
        print("Early stopping triggered.")
        break

    lr_scheduler.step(avg_val_loss)


  0%|          | 0/250 [00:00<?, ?it/s]

  0%|          | 0/250 [00:00<?, ?it/s]

  0%|          | 0/250 [00:00<?, ?it/s]

  0%|          | 0/250 [00:00<?, ?it/s]

  0%|          | 0/250 [00:00<?, ?it/s]

  0%|          | 0/250 [00:00<?, ?it/s]

  0%|          | 0/250 [00:00<?, ?it/s]

  0%|          | 0/250 [00:00<?, ?it/s]

  0%|          | 0/250 [00:00<?, ?it/s]

  0%|          | 0/250 [00:00<?, ?it/s]

  0%|          | 0/250 [00:00<?, ?it/s]

  0%|          | 0/250 [00:00<?, ?it/s]

In [82]:
test_ds = HumanFaceDataset(
    path = "test", flag = "test", transforms = val_transform
)

print("the lenght of test ds:", len(test_ds))

test_dl = DataLoader(
    test_ds, batch_size = 32, shuffle = True
)

In [83]:
out_dict = {}

In [None]:

denormalize = tr.Normalize(
    mean=[-0.485/0.229, -0.456/0.224, -0.406/0.225],
    std=[1/0.229, 1/0.224, 1/0.225]
)

out_dict = {}
model.load_state_dict(torch.load("model_weights/best_model.pt"))
model.eval()

with torch.no_grad():
    for idx, (images, paths) in tqdm(enumerate(test_dl), total=len(test_dl)):
        images = images.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs, 1)
        for i, image in enumerate(images):
            image_cpu = denormalize(image).cpu().squeeze(0)
            pred_class_idx = predicted[i].item()
            pred_class_label = full_ds.idx2label[pred_class_idx]
            image_path = paths[i] 
            out_dict[image_path] = {"image": image_cpu, "predicted": pred_class_label}



In [87]:
import csv

with open('predictions.csv', 'w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(['image_name', 'label'])
    for image_path, data in out_dict.items():
        writer.writerow([os.path.basename(image_path), data['predicted']])