In [49]:
import torch
from torch import nn
from torch import optim
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision import transforms, models
import torch.nn.functional as F
import json
import os
from PIL import Image
import csv
import pandas as pd
import dnnlib
import torch_utils

import matplotlib.pyplot as plt
from pytorch_model_summary import summary
from tqdm import tqdm


# PyTorch TensorBoard support
from torch.utils.tensorboard import SummaryWriter
from datetime import datetime

In [84]:
class FacesDataset(Dataset):
    def __init__(self, data_folder, csv_folder, split='train_small'):
        self.data_folder = os.path.join(data_folder, split+'_faces')
        self.split = split
        if self.split != 'test':
            self.annotations = self.load_annotations(csv_folder, split)
        self.id2name = {}
        self.name2id = {}
        self.category_csv = os.path.join(csv_folder, 'category.csv')
        self.load_category(self.category_csv)
        
        
        
    def __len__(self):
        if self.split != 'test':
            return len(self.annotations)
        else:
            return 4977#len(os.listdir(self.data_folder))
    
    def __getitem__(self, idx):
        if self.split != 'test':
            annotation = self.annotations.iloc[idx]
            image_path = os.path.join(self.data_folder, annotation['File Name']).replace('\\', '/')
        else:
            image_path = os.path.join(self.data_folder, f"{idx}.jpg").replace('\\', '/')
            
        # Load image
        try:
            image = Image.open(image_path).convert('RGB')
        except Exception as e:
            image = Image.new("RGB", (256, 256), color=(0, 0, 0))
            
        transform = transforms.Compose([
            transforms.Resize((256, 256)),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])
        
        inverse_transform = transforms.Compose([
            transforms.Normalize(mean=[0.0], std=[2]),
            transforms.Normalize(mean=[-0.5], std=[1]),
            transforms.ToPILImage()
        ])
        
        image = transform(image)
        
        # plt.imshow(inverse_transform(image))

        if self.split != 'test':
            name = annotation['Category']
            label = self.name2id[name]
            return image, label
        else:
            return image
    
    def load_annotations(self, annotations_file, split):
        csv_path = os.path.join(annotations_file, split+'.csv')
        return pd.read_csv(csv_path, index_col='Unnamed: 0')
    
    def load_category(self, csv_file_path):
        with open(csv_file_path, 'r') as file:
            reader = csv.reader(file)
            next(reader)
            for row in reader:
                id = int(row[0])
                name = row[1]
                self.id2name[id] = name
                self.name2id[name] = id


In [85]:
# Train dataloader
data_folder = './'
csv_folder = './purdue-face-recognition-challenge-2024/'

dataset = FacesDataset(data_folder, csv_folder, split='train')

train_size = int(len(dataset)*0.9)
val_size = len(dataset)-train_size

train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

print(len(train_dataset), len(val_dataset))

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)

62586 6954


## Finetuning pretrained models

In [86]:
model = torch.hub.load("pytorch/vision", "resnet18", weights="IMAGENET1K_V1")

for param in model.parameters():
    param.requires_grad = False

num_features = model.fc.in_features
model.fc = nn.Linear(num_features, 100)


Using cache found in C:\Users\rc615/.cache\torch\hub\pytorch_vision_main


In [103]:
# model = torch.hub.load("pytorch/vision", "resnet18", weights="IMAGENET1K_V1")
model = models.resnet50(weights="IMAGENET1K_V2")
for param in model.parameters():
    param.requires_grad = False

for name, param in model.named_parameters():
    if 'layer4' in name or 'layer3' in name:
        param.requires_grad = True

num_features = model.fc.in_features
model.fc = nn.Linear(num_features, 100)

print(summary(model, torch.zeros((1, 3, 256, 256)), show_input=False))

------------------------------------------------------------------------------
           Layer (type)          Output Shape         Param #     Tr. Param #
               Conv2d-1     [1, 64, 128, 128]           9,408               0
          BatchNorm2d-2     [1, 64, 128, 128]             128               0
                 ReLU-3     [1, 64, 128, 128]               0               0
            MaxPool2d-4       [1, 64, 64, 64]               0               0
           Bottleneck-5      [1, 256, 64, 64]          75,008               0
           Bottleneck-6      [1, 256, 64, 64]          70,400               0
           Bottleneck-7      [1, 256, 64, 64]          70,400               0
           Bottleneck-8      [1, 512, 32, 32]         379,392               0
           Bottleneck-9      [1, 512, 32, 32]         280,064               0
          Bottleneck-10      [1, 512, 32, 32]         280,064               0
          Bottleneck-11      [1, 512, 32, 32]         280,064  

In [118]:
# Training loop, Validation loop, and Inference loop
import csv

def run_inference(net, data_loader, output_file):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    net = net.to(device)
    
    predictions = []
    confidences = []

    with torch.no_grad():
        net.eval()
        for i, data in enumerate(data_loader):
            inputs = data
            inputs = inputs.to(device)
            outputs = net(inputs)
            
            prob = F.softmax(outputs, dim=1)
            
            confidence, prediction = prob.max(dim=1)
            predictions.extend(prediction.cpu().numpy())
            confidences.extend(confidence.cpu().numpy())

    
    # Write predictions to CSV file
    with open(output_file, 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(['Id', 'Category'])  # Write header
        for i, prediction in enumerate(predictions):
            writer.writerow([i, data_loader.dataset.id2name[prediction]])  # Write prediction to CSV row
    
    with open(output_file[:-4]+'_wconfidence.csv', 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(['Id', 'Category', 'Confidence'])  # Write header
        for i, (prediction, confidence) in enumerate(zip(predictions, confidences)):
            if confidence < 0.3:
                writer.writerow([i, data_loader.dataset.id2name[prediction], confidence])  # Write prediction to CSV row

def run_validation(net, val_loader, criterion):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    net = net.to(device)

    with torch.no_grad():
        net.eval()
        correct_count = 0
        running_loss = 0.0
        for i, data in enumerate(val_loader):
            inputs, labels = data
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            outputs = net(inputs)
            loss = criterion(outputs, labels)
            running_loss += loss
            
            predictions = outputs.argmax(axis=1)
            
            correct_count += (predictions == labels).sum()

    total_samples = len(val_loader.dataset)
    total_batches = len(val_loader)
    # print(total_samples, len(val_loader), correct_count)
    return{'acc':correct_count/total_samples, 'loss':running_loss/total_batches}

def run_training(net, train_loader, val_loader, epochs=50, lr=1e-2, betas=(0.9, 0.99), add_info=None):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    net = net.to(device)
    
    criterion = nn.CrossEntropyLoss()
    
    optimizer = optim.Adam(net.parameters(), lr, betas)
    
    # Bookkeeping
    train_loss_history = []
    val_loss_history = []
    val_acc_history = []
    
    best_val_loss = 1e10
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    writer = SummaryWriter('runs/fashion_trainer_{}'.format(timestamp))
    
    print("Training Starts...")
    for epoch in range(epochs):
        running_loss = 0.0
        net.train()
        
        tqdm_loader = tqdm(train_loader, desc=f'Epoch {epoch + 1}/{epochs}', leave=False)

        for i, data in enumerate(tqdm_loader):
            inputs, labels = data
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            optimizer.zero_grad()
            outputs = net(inputs)
            
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
            
            tqdm_loader.set_postfix(loss=f'{loss:.3f}')
            

        train_loss = running_loss/len(train_loader)
        train_loss_history.append(train_loss)
        
        val_metrics = run_validation(net, val_loader, criterion)
        val_loss = val_metrics["loss"]
        val_acc = val_metrics["acc"]
        
        val_loss_history.append(val_loss)
        val_acc_history.append(val_acc)
        
        writer.add_scalars('Losses', { 'Training' : train_loss, 'Validation' : val_loss },  epoch + 1)
        writer.flush()

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            model_path = './model_{}_{}'.format(timestamp, add_info)
            torch.save(net.state_dict(), model_path)
        
        print(f"Epoch {epoch + 1}, Train Loss: {train_loss:.3f}, Val Loss: {val_loss:.3f}, Val Acc: {val_acc:.3f}")
    
    return {'train_loss_history':train_loss_history, 'val_acc_history':val_acc_history, 'val_loss_history':val_loss_history}

In [None]:
# Train
run_training(model, train_loader, val_loader, lr=1e-2, add_info='resnet50_train')

In [107]:
# Finetune
model = torch.hub.load("pytorch/vision", "resnet50", weights="IMAGENET1K_V2")

for param in model.parameters():
    param.requires_grad = False
    
# for name, param in model.named_parameters():
#     if 'layer4.2' in name or 'layer4.1' in name:
#         param.requires_grad = True

for name, param in model.named_parameters():
    if 'layer4' in name or 'layer3' in name:
        param.requires_grad = True

num_features = model.fc.in_features
model.fc = nn.Linear(num_features, 100)

model.load_state_dict(torch.load('./model_20240403_214306_resnet50_train'))

print(summary(model, torch.zeros((1, 3, 256, 256)), show_input=False))

# run_training(model, train_loader, val_loader, lr=1e-2, add_info='resnet50_train')

Using cache found in C:\Users\rc615/.cache\torch\hub\pytorch_vision_main


------------------------------------------------------------------------------
           Layer (type)          Output Shape         Param #     Tr. Param #
               Conv2d-1     [1, 64, 128, 128]           9,408               0
          BatchNorm2d-2     [1, 64, 128, 128]             128               0
                 ReLU-3     [1, 64, 128, 128]               0               0
            MaxPool2d-4       [1, 64, 64, 64]               0               0
           Bottleneck-5      [1, 256, 64, 64]          75,008               0
           Bottleneck-6      [1, 256, 64, 64]          70,400               0
           Bottleneck-7      [1, 256, 64, 64]          70,400               0
           Bottleneck-8      [1, 512, 32, 32]         379,392               0
           Bottleneck-9      [1, 512, 32, 32]         280,064               0
          Bottleneck-10      [1, 512, 32, 32]         280,064               0
          Bottleneck-11      [1, 512, 32, 32]         280,064  

In [108]:

data_folder = './'
csv_folder = './purdue-face-recognition-challenge-2024/'


test_dataset = FacesDataset(data_folder, csv_folder, split='test')

test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)


In [115]:
# Validation
model = torch.hub.load("pytorch/vision", "resnet50")
num_features = model.fc.in_features
model.fc = nn.Linear(num_features, 100)

add_info = 'resnet50_train'
model.load_state_dict(torch.load(f'./model_20240403_214306_{add_info}'))


criterion = nn.CrossEntropyLoss()
val_data = run_validation(model, val_loader, criterion)
print(val_data)

Using cache found in C:\Users\rc615/.cache\torch\hub\pytorch_vision_main


{'acc': tensor(0.7183, device='cuda:0'), 'loss': tensor(1.4106, device='cuda:0')}


In [119]:
# Test
model = torch.hub.load("pytorch/vision", "resnet50")
num_features = model.fc.in_features
model.fc = nn.Linear(num_features, 100)

add_info = 'resnet50_train'
model.load_state_dict(torch.load(f'./model_20240403_214306_{add_info}'))

# print(summary(model, torch.zeros((1, 3, 256, 256)), show_input=False))
run_inference(model, test_loader, f'./test_result_{add_info}_try.csv')

Using cache found in C:\Users\rc615/.cache\torch\hub\pytorch_vision_main
