In [1]:
import torch
import torch.nn as nn  
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, random_split
import torchvision
import torchvision.transforms as transforms
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from PIL import Image
import os
from sklearn.metrics import classification_report
import ResNet as resnet
import DenseNet as densenet
import EfficientNet as efficientnet
import MobileNetV2 as mobilenet
import ViT as vit
import solver.solver as solver
import solver.solver_v2 as solver_v2
import solver.solver_v3 as solver_v3

In [2]:
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "2"

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

n_epochs = 30
batch_size = 64
learning_rate = 1e-4
diff_drop = "v0"

model = vit.ViT(num_classes=8, diff_drop=diff_drop).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [3]:
dataset_path = "/media/data2/data/CelebAMask-HQ/CelebA-HQ-img/"
annotation_path = "/media/data2/data/CelebAMask-HQ/CelebAMask-HQ-attribute-anno.txt"

In [4]:
class CelebA(Dataset):
    def __init__(self, root, annotation_path, transform=None):
        self.transform = transform
        self.root_dir = root
        self.classes = [
            "Female, Not smiling, Young", "Female, Not smiling, Old", 
            "Female, Smiling, Young", "Female, Smiling, Old", 
            "Male, Not smiling, Young", "Male, Not smiling, Old", 
            "Male, Smiling, Young", "Male, Smiling, Old",
        ]
        
        self.image_paths = []
        self.image_labels = []

        anno_file = open(annotation_path, "r")
        num = int(anno_file.readline())
    
        line = anno_file.readline()
        attributes = list(line[:-1].split(' '))
        
        sex_index = attributes.index('Male') + 2
        smile_index = attributes.index('Smiling') + 2
        young_index = attributes.index('Young') + 2
        for _ in range(num):
            line = anno_file.readline()
            record = list(line[:-1].split(' '))
            self.image_paths.append(record[0])
            if record[sex_index] == "1":
                if record[smile_index] == "1":
                    if record[young_index] == "1":
                        self.image_labels.append(6)
                    else:
                        self.image_labels.append(7)
                else:
                    if record[young_index] == "1":
                        self.image_labels.append(4)
                    else:
                        self.image_labels.append(5)
            else:
                if record[smile_index] == "1":
                    if record[young_index] == "1":
                        self.image_labels.append(2)
                    else:
                        self.image_labels.append(3)
                else:
                    if record[young_index] == "1":
                        self.image_labels.append(0)
                    else:
                        self.image_labels.append(1)
        anno_file.close()
        
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, index):
        image_file_path = os.path.join(self.root_dir, self.image_paths[index])
        image = Image.open(image_file_path)

        label = self.image_labels[index]
        
        if self.transform is not None:
            image = self.transform(image)
        
        return image, label

In [5]:
trans = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])

celebAset = CelebA(root=dataset_path, annotation_path=annotation_path, transform=trans)
train_size = int(0.7 * len(celebAset))
trainset, testset = random_split(celebAset, [train_size, len(celebAset) - train_size])

train_loader = DataLoader(celebAset, batch_size=batch_size, shuffle=True, drop_last=False, )
test_loader = DataLoader(testset, batch_size=batch_size, shuffle=True, drop_last=False, )

classes = celebAset.classes
print(classes)
print(len(trainset), len(testset), len(celebAset))

['Female, Not smiling, Young', 'Female, Not smiling, Old', 'Female, Smiling, Young', 'Female, Smiling, Old', 'Male, Not smiling, Young', 'Male, Not smiling, Old', 'Male, Smiling, Young', 'Male, Smiling, Old']
21000 9000 30000


In [6]:
def train_loop(dataloader, model, criterion, optimizer, epoch):
    size = len(dataloader.dataset)
    total_correct = 0
    total_loss = 0
    
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        model.train()
        correct = 0
        
        pred = None
        p = 0.0
        if diff_drop == "v3":
            pred, p = model(X.to(device), epoch)
        else:
            pred, p = model(X.to(device))
        
        loss = criterion(pred, y.to(device))
        correct += (torch.argmax(pred, dim=1) == y.to(device)).type(torch.float).sum().item()
        total_correct += correct
        
        model.train()
        total_loss += loss.item()
        
        optimizer.zero_grad()
        loss.backward()
        
        if diff_drop == "v1":
            solver.PseudoPruning(model.classification_head.fc, p)
        elif diff_drop == "v2":
            solver_v2.PseudoPruning(model.classification_head.fc, p)
        elif diff_drop == "v3":
            solver_v3.PseudoPruning(model.classification_head.fc, p)
        
        optimizer.step()
        
        if (batch % 50 == 0):
            print(f"Batch {batch:>5d}: Loss per batch is {loss.item():>7.6f} and Accuracy is {correct / batch_size:>7.6f}")
    print(f"For Iteration: Loss is {(total_loss / (size // batch_size)):>7.6f} and Accuracy is {total_correct / size:>7.6f}")

In [7]:
def test_loop(dataloader, model, criterion):
    test_loss = 0
    correct = 0

    model.eval()
    
    y_true = []
    y_pred = []
    
    with torch.no_grad():
        for X, y in dataloader:
            pred, _ = model(X.to(device))
            test_loss += criterion(pred, y.to(device)).item()
            
            pred = torch.argmax(pred, dim=1)
            correct += (pred == y.to(device)).type(torch.float).sum().item()
            
            pred = pred.cpu().numpy()
            y = y.cpu().numpy()
            y_true.extend(y)
            y_pred.extend(pred)
    
    print(classification_report(y_true=y_true, y_pred=y_pred, target_names=classes))
    
    return correct / len(dataloader.dataset)

In [8]:
max = 0.0
for epoch in range(n_epochs):
    print(f"Training: Epoch {epoch + 1} ------------------")
    train_loop(train_loader, model, criterion, optimizer, epoch + 1)
    print(f"----Test: Epoch {epoch + 1} ------------------")
    temp = test_loop(test_loader, model, criterion)
    
    # if temp > max:
    #     max = temp
    #     torch.save(model, './celebAHQ_split_effib0_v3.pt')

Training: Epoch 1 ------------------
Batch     0: Loss per batch is 2.510577 and Accuracy is 0.093750


In [None]:
torch.save(model, './celebAHQ_all_vit_nothing.pt')