In [1]:
import os
import PIL
import dlib
import matplotlib.pyplot as plt
import datetime
import numpy as np

from sklearn.metrics import confusion_matrix
import seaborn as sns
import pandas as pd

In [2]:
import torch
import torchvision.transforms as transforms
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset, random_split, SubsetRandomSampler
import torch.optim as optim

import tensorboardX

2024-04-12 00:08:56.676999: I tensorflow/core/util/port.cc:113] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-04-12 00:08:56.731330: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [3]:
torch.manual_seed(0)
np.random.seed(0)

In [4]:
epochs = 30
batch_size = 16
lr = 1e-4

In [5]:
logname = f'resnet_{batch_size}_{epochs}_{lr}_{datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")}'

if not os.path.exists(os.path.join('models', logname)):
    os.makedirs(os.path.join('models', logname))

In [6]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

In [7]:
detector = dlib.get_frontal_face_detector()
predictor = dlib.shape_predictor("shape_predictor_68_face_landmarks.dat")

In [8]:
data_root = 'images/dataset'
save_path = 'images/cropped_faces0'

In [9]:
transform = transforms.Compose([
    transforms.Resize(256),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [10]:
class FaceDataset(Dataset):
    def __init__(self, n_classes=4, root_dir=save_path, transform=transform):
        self.root_dir = root_dir
        self.transform = transform
        self.files = os.listdir(self.root_dir)
        if n_classes == 4:
            self.labels = [int(file.split('_')[0]) for file in self.files]
        
        self.n_classes = n_classes
        
    def __len__(self):
        return len(self.files)
    
    def __getitem__(self, idx):
        img_name = os.path.join(self.root_dir, self.files[idx])
        img = PIL.Image.open(img_name)
        img = self.transform(img)
        label = int(self.files[idx].split('_')[0])
        label = label if self.n_classes == 4 else label // 2
        return img, label

In [11]:
dataset = FaceDataset(n_classes=4)

len_data = len(dataset)
train_size = int(0.8 * len_data)
test_size = len_data - train_size

train_data, test_data = random_split(dataset, [train_size, test_size])
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=True)

print('Train:', len(train_data), 'Test:', len(test_data))

Train: 464 Test: 117


In [12]:
model = torch.hub.load('pytorch/vision:v0.6.0', 'resnet18', pretrained=True)
model.fc = nn.Linear(512, 4)
model = model.to(device)

# transfer learning
for param in model.parameters():
    param.requires_grad = True
in_features = model.fc.in_features
model.fc = nn.Linear(in_features, 4)

In [None]:
optimizer = optim.Adam(model.parameters(), lr=lr)
criterion = nn.CrossEntropyLoss()

In [None]:
writer = tensorboardX.SummaryWriter(logdir=f'runs/{logname}'+ datetime.datetime.now().strftime('%Y%m%d%H%M%S'))

In [None]:
def train(train_loader, epoch):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    for i, (inputs, labels) in enumerate(train_loader):
        inputs, labels = inputs.to(device), labels.to(device)
        
        optimizer.zero_grad()
        
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        
        writer.add_scalar('train_loss', loss.item(), i*epoch)
        writer.add_scalar('train_accuracy', 100*correct/total, i*epoch)
        
        if i % 10 == 99:
            print(f'[{i+1}, {running_loss/100:.3f}]')
            running_loss = 0.0
            correct = 0
            total = 0
        
    train_loss = running_loss/len(train_data)
    train_accuracy = 100*correct/total
    print(f'Training {epoch+1}/{epochs}:\tTrain loss: {train_loss:.3f},\tTrain acc: {train_accuracy:.3f}')
    
    return train_loss, train_accuracy

In [None]:
# y_pred = []
# y_true = []

def validate(model, val_data, epoch):
    model.eval()
    
    correct = 0
    total = 0
    total_loss = 0
    
    with torch.no_grad():
        for batch_idx, (inputs, labels) in enumerate(val_data):
            inputs, labels = inputs.to(device), labels.to(device)
            
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
            # y_pred.extend(predicted.cpu().numpy())
            # y_true.extend(labels.cpu().numpy())
            
            loss = criterion(outputs, labels)
            total_loss += loss.item()
            
            writer.add_scalar('val_accuracy', 100*correct/total, epoch*batch_idx)
            writer.add_scalar('val_loss', total_loss/(batch_idx+1), epoch*batch_idx)
    
    val_loss = total_loss/len(val_data)
    val_accuracy = 100*correct/total
    print(f'Validating {epoch+1}/{epochs}:\tVal loss: {val_loss:.3f},\tVal acc: {val_accuracy:.3f}')
    
    return val_loss, val_accuracy

In [None]:
y_pred = []
y_true = []
    
def test():
    model.eval()
    
    with torch.no_grad():
        
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            
            y_pred.extend(predicted.cpu().numpy())
            y_true.extend(labels.cpu().numpy())
            
            loss = criterion(outputs, labels)
            total_loss += loss.item()
            
    test_loss = total_loss / len(test_loader)
    test_acc = (y_pred == y_true).sum() / len(y_true)
    print(f'Test:\tTest loss: {test_loss:.3f},\tTest acc: {test_acc:.3f}')
    
    writer.add_scalar('test_accuracy', test_acc)
    writer.add_scalar('test_loss', test_loss)
    
    return test_loss, test_acc

In [None]:
def train_val_curve(epoch, fold, train_losses, train_accuracies, val_losses, val_accuracies):
    fig, ax = plt.subplots(2, 1, figsize=(10, 10))
    
    ax[0].plot(range(epochs*fold), train_losses, label='train')
    ax[0].plot(range(epochs*fold), val_losses, label='val')
    ax[0].set_title('Loss')
    ax[0].legend()
    
    ax[1].plot(range(epochs*fold), train_accuracies, label='train')
    ax[1].plot(range(epochs*fold), val_accuracies, label='val')
    ax[1].set_title('Accuracy')
    ax[1].legend()
    
    plt.show()
    
    writer.add_figure('train_val_curve', fig, epoch*fold)

In [None]:
train_losses_folds = []
train_accs_folds = []
val_losses_folds = []
val_accs_folds = []

In [None]:
from sklearn.model_selection import KFold

kfold = KFold(n_splits=5, shuffle=True, random_state=0)

In [None]:
def run():
    for fold, (train_idx, val_idx) in enumerate(kfold.split(train_data)):
        print(f'Fold {fold+1}/{5}')
        
        train_subset = SubsetRandomSampler(train_idx)
        val_subset = SubsetRandomSampler(val_idx)
        train_loader = DataLoader(train_data, batch_size=batch_size, sampler=train_subset)
        val_loader = DataLoader(train_data, batch_size=batch_size, sampler=val_subset)
        
        train_losses = []
        train_accuracies = []
        val_losses = []
        val_accuracies = []
        
        for epoch in range(epochs):
            
            train_loss, train_acc = train(train_loader, epoch)
            val_loss, val_acc = validate(val_loader, epoch)
            
            train_losses.append(train_loss)
            train_accuracies.append(train_acc)
            val_losses.append(val_loss)
            val_accuracies.append(val_acc)
            
            # for this epoch
            train_val_curve(epoch, fold, train_losses, train_accuracies, val_losses, val_accuracies)
            
        train_losses_folds.append(train_losses.mean())
        train_accs_folds.append(train_accuracies.mean())
        val_losses_folds.append(val_losses.mean())
        val_accs_folds.append(val_accuracies.mean())
        
    final_train_loss = train_losses_folds.mean()
    final_train_acc = train_accs_folds.mean()
    final_val_loss = val_losses_folds.mean()
    final_val_acc = val_accs_folds.mean()
    
    test_loss, test_acc = test()
    
    return final_train_loss, final_train_acc, final_val_loss, final_val_acc, test_loss, test_acc

In [None]:
# plt.figure(figsize=(10, 5))
# plt.plot(train_accuracies, label='Train Accuracy')
# plt.plot(val_accuracies, label='Val Accuracy')
# plt.xlabel('Epoch')
# plt.ylabel('Accuracy')
# plt.title('Train and Val Accuracy')
# plt.legend()
# plt.show()

# plt.figure(figsize=(10, 5))
# plt.plot(train_losses, label='Train Loss')
# plt.plot(val_losses, label='Val Loss')
# plt.xlabel('Epoch')
# plt.ylabel('Loss')
# plt.title('Train and Val Loss')
# plt.legend()
# plt.show()

In [None]:
final_train_loss, final_train_acc, final_val_loss, final_val_acc, test_loss, test_acc = run()

In [None]:
classes = ['Spring', 'Summer', 'Fall', 'Winter'] if dataset.n_classes == 4 else ['Warm', 'Cool']

In [None]:
cm = confusion_matrix(y_true, y_pred)
df_cm = pd.DataFrame(cm, index=classes, columns=classes)

In [None]:
plt.figure(figsize=(10, 7))
sns.heatmap(df_cm, annot=True, cmap='Blues', fmt='g')
plt.xlabel('Predicted')
plt.ylabel('True')
writer.add_figure('confusion_matrix', plt.gcf())