# Segédkönyvtár arcok alapján való osztályozáshoz

## Előkészületek és a háló elkészítése

Szükséges csomagok hivatkozása.

In [1]:
import sys
sys.path.append('c:\\users\\ifjto\\appdata\\local\\programs\\python\\python37\\lib\\site-packages')
import numpy as np
import torch
from torch import nn
from torch.utils.data import *
import torch.optim as optim
import torch.cuda
import torchvision.transforms as transforms
import cv2
import os
import time
from IPython.display import Image
%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt

Nézzük meg, hogy tudunk-e videókártyán futtatni.

In [2]:
if torch.cuda.is_available():
    print("CUDA available!")
else:
    print("CUDA not available!")

CUDA available!


A neurális háló reziduális blokkját megvalósító osztály. A neurális háló a ResNet architektúrát használja. Azért esett erre a híres típusra a választásom, mert ez a neurális hálók képességeit nagyban növelte, de bonyolultsága és számításigénye mégsem óriási. 

In [3]:
class ResidualBlock(nn.Module):
    def __init__(self, nFeat, layersPerLevel, kernelSize):
        super(ResidualBlock, self).__init__()
        self.layers = nn.ModuleList()
        for i in range(layersPerLevel):
            layer = nn.Sequential( 
                nn.Conv2d(nFeat, nFeat, kernelSize, padding=kernelSize//2, bias=False),
                nn.BatchNorm2d(nFeat),
                nn.ReLU()
            )
            self.layers.append(layer)
        self.under_scaling = nn.Conv2d(nFeat, 2*nFeat, kernelSize, padding=kernelSize//2, stride=2, bias=False)

    def forward(self, x):
        out = self.layers[0](x)
        for layer in self.layers[1:]:
            out = layer(out)
        out += x
        out = self.under_scaling(out)
        return out

A neurális hálót reziduális blokkokból összerakó osztály.

In [4]:
class NeuralNetwork(nn.Module):
    def __init__(self, inCh, nC, nFeat, nLevels, layersPerLevel, kernelSize, dropout):
        super(NeuralNetwork, self).__init__()
        
        self.first_layer = nn.Conv2d(inCh, nFeat, kernelSize, padding=kernelSize//2, bias=False)
        self.blocks = nn.ModuleList()
        for i in range(nLevels):
            level = ResidualBlock(nFeat*pow(2, i), layersPerLevel, kernelSize)
            self.blocks.append(level)
        self.pooling_layer = nn.AdaptiveAvgPool2d(10)
        self.dropout_layer = nn.Dropout2d(p=dropout)
        self.linear_layer = nn.Linear(100*pow(2, nLevels)*nFeat, nC)
    
    def forward(self, x):
        x = self.first_layer(x)
        for level in self.blocks:
            x = level(x)
        x = self.pooling_layer(x)
        x = x.view(x.shape[0], -1)
        x = self.dropout_layer(x)
        x = self.linear_layer(x)
        return x

## A tanulóadatok beolvasása és előfeldolgozása

Olvassuk be a képeket, és címkézzük fel. Erre hozzunk létre egy Dataset objektumot. Beolvasáskor normáljuk is a képeket a numerikus konvergencia támogatásáért. Mérjük le az egyes beolvasott adatok relatív gyakoriságának reciprokát, hogy később ezzel súlyozva a hálónk jobban tudjon tanulni.

In [5]:
class MyDataset(Dataset):
    def __init__(self, attribute, accepted_values):
        self.face = []
        self.label = []
        self.frequencies = [0]*len(accepted_values)

        pictures = {}
        for pic in os.listdir("cropped64"):
            image = cv2.imread("cropped64/"+pic, cv2.IMREAD_GRAYSCALE)
            norm_image = cv2.normalize(image, None, alpha=0, beta=1, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_32F)
            pictures[pic] = torch.Tensor( norm_image ).unsqueeze(0)

        with open("processed.txt", "r") as file:
            not_accepted = {}
            for z, record in enumerate(file.readlines()):
                parts = record[:-1].split(";")
                attributes = {}
                for i in parts:
                    attributes[i.split(":")[0]] = i.split(":")[1]
                if attributes["pic"] != "None":
                    if attributes["pic"] in pictures.keys():
                        
                        if attributes.get(attribute) is None:
                            print("Missing attribute "+attribute)
                            continue
                        
                        # if the record is acceptable save it
                        if attributes[attribute] in accepted_values:
                            self.face.append(pictures[attributes["pic"]])
                            ID = accepted_values.index(attributes[attribute])
                            self.label.append(ID)
                            self.frequencies[ID] += 1
                        
                        # if the record isn't accepted but correct, log it
                        else:
                            if attributes[attribute] in not_accepted.keys():
                                not_accepted[attributes[attribute]] += 1
                            else:
                                not_accepted[attributes[attribute]] = 1
            
        # print logs
        for attr in not_accepted.keys():
            print("Encountered " + attr + " " + str(not_accepted[attr]) + " times but it isn't accepted")
        
        for i in range(len(self.frequencies)):
            print(accepted_values[i]+" is "+str(round(100*self.frequencies[i]/sum(self.frequencies), 2))+"% of data")
        
        # making frequencies relative
        summa = sum(self.frequencies)
        for i in self.frequencies:
            i = 1/i
        
        print("Dataset of ", len(self.face), " records initialised.")
    def __getitem__(self, index):
        return self.face[index], self.label[index]
    def __len__(self):
        return len(self.label)

Majd osszuk szét az adatot tanító-, teszt- és validációs adatbázisok között, 70:15:15 arányban. Használjunk az ellenőrzés kedvéért egy meghatározott random magot. Rendezzük a képeket kötegekbe.

In [6]:
def get_data(attribute, accepted_values, batch_size = 20):
    np.random.seed(42)
    torch.manual_seed(42)

    dataset = MyDataset(attribute, accepted_values)
    dataset_size = len(dataset)
    num_val = int(np.floor(0.15 * dataset_size))
    num_test = int(np.floor(0.15 * dataset_size))

    indices = list(range(dataset_size))
    np.random.shuffle(indices)
    val_indices = indices[:num_val]
    test_indices = indices[num_val:num_val+num_test]
    train_indices = indices[num_val+num_test:]

    train_sampler = SubsetRandomSampler(train_indices)
    test_sampler = SubsetRandomSampler(test_indices)
    valid_sampler = SubsetRandomSampler(val_indices)

    trainloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, sampler=train_sampler)
    testloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, sampler=test_sampler)
    validationloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size,sampler=valid_sampler)
    return trainloader, testloader, validationloader, dataset.frequencies

## Tanítás

Osztály a validációs adatok reprezentációjára, és a validáció eredményének tárolására.

In [7]:
class Result:
    def __init__(self, correct, total, balanced_acc, acc_best_num, balanced_acc_best_num, confusion_matrix,
                 values, best_num):
        self.correct = correct
        self.total = total
        self.balanced_acc = balanced_acc
        self.confusion_matrix = confusion_matrix
        self.balanced_acc_best_num = balanced_acc_best_num
        self.values = values
        self.best_num = best_num
        self.acc_best_num = acc_best_num
    def __str__(self):
        to_print = ""
        to_print += 'Accuracy: ' + str(round(100 * self.correct / self.total, 4)) + "%\n"
        to_print += "Accuracy for balanced classes: " + str(round(100*self.balanced_acc, 2)) + "%\n"
        to_print += "Best " + str(self.best_num) + " accuracy: " + str(round(100*self.acc_best_num, 4)) + "%\n"
        to_print += "Best " + str(self.best_num) + " accuracy for balanced classes: "
        to_print += str(round(100*self.balanced_acc_best_num, 2)) + "%\n"
        for i in self.values:
            to_print += "\t"+i[:5]
        to_print += "\n"
        for row in range(len(self.confusion_matrix)):
            summa = sum(self.confusion_matrix[row])
            to_print += self.values[row][:5]+"\t"
            for i in range(len(self.confusion_matrix[row])):
                #confusion_matrix[row][i] = round(100*confusion_matrix[row][i]/summa, 2)
                to_print += str(round(100*self.confusion_matrix[row][i]/summa, 2)) + "\t"
            to_print += "\n"
        return to_print

Függvény a validációra. Fontos, hogy a tesztadatok és a validációs adatok elkülönüljenek a végső tesztelésnél, ám ez a függvény a tanulás közben, a tesztadatokon való tesztelésre is használható.

In [8]:
def validate(network, validationloader, values, best_num=1):
    correct = total = 0
    confusion_matrix = []
    best_num_stats = [0]*len(values)
    for i in range(len(values)):
        confusion_matrix.append([0]*len(values))
    network.train(mode=False)
    with torch.no_grad():
        for data in validationloader:
            images, labels = data
            if torch.cuda.is_available():
                network = network.cuda()
                images, labels = images.cuda(), labels.cuda()
            outputs = network(images)
            #print(outputs)
            _, predicted = torch.max(outputs.data, 1)
            total += images.size(0)
            correct += (predicted == labels).sum().item()
            if best_num > 1:
                for pred, lab in zip(outputs, labels):
                    #print(torch.topk(pred, best_num, sorted=False).indices)
                    if lab.item() in torch.topk(pred, best_num, sorted=False).indices:
                        best_num_stats[lab.item()] += 1
            for pred, label in zip(predicted, labels):
                confusion_matrix[label.item()][pred.item()] += 1
    
    balanced_acc = 0
    for i in range(len(confusion_matrix)):
        balanced_acc += (confusion_matrix[i][i] / sum(confusion_matrix[i]))
    balanced_acc /= len(confusion_matrix)
    
    balanced_acc_best_num = balanced_acc
    if best_num > 1:
        balanced_acc_best_num = 0
        for i in range(len(values)):
            balanced_acc_best_num += best_num_stats[i] / sum(confusion_matrix[i]) / len(values)
    acc_best_num = correct/total
    if best_num > 1:
        acc_best_num = sum(best_num_stats)/total
    
    return Result(correct, total, balanced_acc, acc_best_num, balanced_acc_best_num, confusion_matrix, values,
                  best_num)

Függvény egy háló betanítására.
Példányosítsuk a neurális hálót és a segédosztályait. A hiba kiszámítására CrossEntropyLosst-t használok, mivel az egyes osztályok elemszáma nagyon változó, és ez az osztály támogatja a súlyozást, így kiegyenlítve a különböző osztályok fontosságát, elkerülve, hogy mindig a leggyakoribb osztályt tippelje a háló.
És tanítsunk. Minden epoch után jegyezzük fel, hogy alakul a pontosság a tanulóadatokon és a tesztadatokon. Ha a tesztadatokon már romlik a pontosság, de a tanulóadaton még nő, akkor overfitting áll fent - a háló lényegében "bemagolja" az adatokat, ahelyett, hogy általános szabályokat találna ki. Emiatt a pontossága ismeretlen képeken csökken, így ilyenkor le kell állítani a tanítást.
Mentsük el a modellt, hogy később tanítás nélkül is lehessen használni.

In [9]:
def train(nC, nFeat, nLevels, layersPerLevel, kernelSize, dropout, trainloader, testloader, lr, wd, numEpoch,
          frequencies, stats_required = True):
    start_time = time.time()
    stats = []
    losses = []
    np.random.seed(4)
    torch.manual_seed(4)

    # initialising network
    myNet = NeuralNetwork(1, nC, nFeat, nLevels, layersPerLevel, kernelSize, dropout)
    optimizer = torch.optim.Adam(myNet.parameters(), lr=lr, weight_decay=wd)
    criterion = nn.CrossEntropyLoss(torch.Tensor(frequencies))
    if torch.cuda.is_available():
        torch.cuda.manual_seed(42)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False
        myNet = myNet.cuda()
        criterion = criterion.cuda()
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, numEpoch)
    
    # training
    for i in range(numEpoch):
        myNet.train(mode=True)
        running_loss = 0
        for data in trainloader:
            inputs = data[0]
            labels = data[1]
            if torch.cuda.is_available():
                inputs, labels = inputs.cuda(), labels.cuda()
            optimizer.zero_grad()
            outputs = myNet(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            with torch.no_grad():
                running_loss += loss.sum().item()
        
        # testing the results
        losses.append(running_loss)
        result = None
        if stats_required or i == numEpoch-1:
            result = validate(myNet, testloader, frequencies)
            stats.append(result.balanced_acc)
            print( str(i+1) + " / " + str(numEpoch) + '\tbalanced accuracy: ' +
                  str(round(100*result.balanced_acc, 2)) + "%\tloss: " + str(round(running_loss, 2)), end="\r")
        scheduler.step()
        
    # printing results
    print( str(i+1) + " / " + str(numEpoch) + '\tbalanced accuracy: ' +
            str(round(100*result.balanced_acc, 2)) + "%\tloss: " + str(round(running_loss, 2)))
    print('Finished training after ', round(time.time() - start_time), " seconds")
    
    return myNet, stats, losses

Függvény a betanított neurális hálón tetszőleges kép osztályozásra, hogy a gyakorlatban is használhassuk az eredményt.

In [10]:
def classify(image, model, values, size=64):
    faceCascade = cv2.CascadeClassifier("haarcascade_frontalface_default.xml")
    gray = cv2.imread(image, cv2.IMREAD_GRAYSCALE)
    for scale in range(11, 25):
        faces = faceCascade.detectMultiScale(gray, scaleFactor=scale/10, minNeighbors=5,
                                         minSize=(25, 25), flags = cv2.CASCADE_SCALE_IMAGE)
        if len(faces) == 1:
            break
    x, y, w, h = faces[0]
    size = max(w, h)
    cropped = gray[y:y + size, x:x + size]
    face = cv2.resize(cropped, (size, size))
    cv2.imwrite("test_face.png", face)
    Image(filename="test_face.png")
    face = cv2.normalize(face, None, alpha=0, beta=1, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_32F)
    batch = torch.Tensor(face).unsqueeze(0).unsqueeze(0)
    
    if torch.cuda.is_available():
        batch = batch.cuda()
    
    model.train(mode=False)
    with torch.no_grad():
        output = model(batch)
        to_print = ""
        for i in range(len(output[0])):
            to_print += values[i] + ":\t" + str(output[0][i].item()) + "\n"
        print(to_print)