<a href="https://colab.research.google.com/github/Ostroverkhov/master-thesis/blob/master/UTKClassification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
import os
from google.colab import drive

import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import torchvision
import torchvision.transforms as transforms

from torch.utils.data.sampler import SubsetRandomSampler

from torchsummary import summary

from tqdm import tqdm_notebook

import matplotlib.pyplot as plt
%matplotlib inline

# 1. Подготовка датасета

In [0]:
from torchvision.datasets import ImageFolder

# Класс для получения url и меток изображений
class UTKImageFolder(ImageFolder):
    # В датасете всего 5 классов, но т.к. 5 класс не обладает четкими признаками
    # и не несет смысловой нагрузки, то его игнорируем
    def __init__(self, root, transform=None, target_transform=None, is_valid_file=None):
        super(UTKImageFolder, self).__init__(root,
                                             transform=transform,
                                             target_transform=target_transform,
                                             loader=torchvision.datasets.folder.default_loader,
                                             is_valid_file=is_valid_file)
        
        self.classes = ['white', 'black', 'asian', 'indian']
        self.class_to_idx = {
            'white': 0,
            'black': 1,
            'asian': 2,
            'indian': 3
            }
            
        file_names = list(map(lambda x: x[0], self.samples))
        temp_file_names = []
        labels = []
        for name in file_names:
          image_name = name.split('/')[-1]
          _, _, race = image_name.split("_")[:3]
          if int(race) == 4:
            continue
          temp_file_names.append(name)
          labels.append(int(race))

        self.targets = labels
        self.imgs = list(zip(temp_file_names, labels))
        self.samples = self.imgs

In [0]:
# Создание Dataset из ImageFolder
class UTKImageDataset(Dataset): 
  def init(self, image_folder): self.array = image_folder.imgs

  def __getitem__(self, index):
    return self.array[index]
 
  def __len__(self):
    return len(self.array)

In [0]:
# Подключаемся к google drive
drive.mount('/content/drive')

In [4]:
# обработка изображения перед подачей на сеть
transform = transforms.Compose([
                                transforms.Resize(200),
                                transforms.ToTensor(),
                                transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

dataset = UTKImageFolder(root="/content/drive/My Drive/UTKDataset", transform=transform)

# разбивка на train и validation
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])

trainloader = torch.utils.data.DataLoader(train_dataset, batch_size=64,
                                           shuffle=True, num_workers=2)

testloader = torch.utils.data.DataLoader(test_dataset, batch_size=64,
                                           shuffle=False, num_workers=2)


classes = dataset.classes

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


In [0]:
def create_data_loaders(root, transform, ratio, batch_size):
  dataset = UTKImageFolder(root=root, transform=transform)

  # разбивка на train и validation
  train_size = int(ratio * len(dataset))
  test_size = len(dataset) - train_size
  train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])

  trainloader = torch.utils.data.DataLoader(
      train_dataset, 
      batch_size=batch_size,
      shuffle=True, 
      num_workers=2)

  testloader = torch.utils.data.DataLoader(
      test_dataset, 
      batch_size=batch_size,
      shuffle=False, 
      num_workers=2)


  classes = dataset.classes

  return (trainloader, testloader, classes)

In [0]:
transform = transforms.Compose([
                                transforms.Resize(200),
                                transforms.ToTensor(),
                                transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

root = "/content/drive/My Drive/UTKDataset"

trainloader, testloader, classes = create_data_loaders(root=root, transform=transform, ratio=0.8, batch_size=64)

<torch.utils.data.sampler.RandomSampler at 0x7f4c7df413c8>

# Визуализация данных

In [0]:

# functions to show an image
def imshow(img):
    img = img / 2 + 0.5     # unnormalize
    npimg = img.numpy()
    plt.figure(figsize=(20, 20))
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()


# get some random training images
dataiter = iter(train_loader)
images, labels = dataiter.next()
print(images.shape)
# show images
imshow(torchvision.utils.make_grid(images))
# print labels
print(' '.join('%5s' % classes[labels[j]] for j in range(64)))

# Конструирование сети


### Создание модулей


In [0]:
# Первая версия сверточной сети
class MyFCNet1(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(3072, 100)
        self.fc2 = nn.Linear(100, 120)
        self.fc3 = nn.Linear(120, 100)
        self.fc4 = nn.Linear(100, 84)
        self.fc5 = nn.Linear(84, 5)

    def forward(self, x):
        # print(x.shape)
        x = x.view(-1, 3072)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x = F.relu(self.fc4(x))
        x = self.fc5(x)
        return x


class MyConvNet1(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=10, kernel_size=5)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(in_channels=10, out_channels=16, kernel_size=5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 5)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x


In [0]:
# вторая версия сверточной сети
class MyConvNet2(nn.Module):
    def __init__(self):
        # вызов конструктора предка
        super().__init__()
        # необходмо заранее знать, сколько каналов у картинки (сейчас = 1),
        # которую будем подавать в сеть, больше ничего
        # про входящие картинки знать не нужно
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=64, kernel_size=5)
        self.conv2 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3)
        self.conv3 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv4 = nn.Conv2d(in_channels=256, out_channels=512, kernel_size=3)
        self.conv5 = nn.Conv2d(in_channels=512, out_channels=1024, kernel_size=2)

        self.fc1 = nn.Linear(5 * 5 * 1024, 1024)  # !!!
        self.fc2 = nn.Linear(1024, 512)
        self.fc3 = nn.Linear(512, 256)
        self.fc4 = nn.Linear(256, 64)
        self.fc5 = nn.Linear(64, 5)

    def forward(self, x):
        #(32,32)
        x = F.relu(self.conv1(x))
        #(28,28)
        x = F.relu(self.conv2(x))
        #(26, 26)
        x = F.relu(self.conv3(x))
        #(24, 24)
        x = self.pool(F.relu(self.conv4(x)))
        #(11, 11)
        x = self.pool(F.relu(self.conv5(x)))
        #(5, 5)
        # print(x.shape)
        x = x.view(-1, 5 * 5 * 1024)  # !!!
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x = F.relu(self.fc4(x))
        x = self.fc5(x)
        return x

### Инициализация и переход на GPU

In [0]:
# net = MyConvNet1()
net = MyConvNet2()

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
net.to(device)

### Вывод параметров сети

In [0]:
torchsummary.summary(net.cuda(), (3, 32, 32))

### Обучение


In [0]:
loss_fn = torch.nn.CrossEntropyLoss()

learning_rate = 1e-4
# optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)
optimizer = torch.optim.Adam(net.parameters(), lr=learning_rate)

# итерируемся
for epoch in tqdm_notebook(range(1)):

    running_loss = 0.0
    for i, batch in enumerate(tqdm_notebook(train_loader)):
        # так получаем текущий батч
        X_batch, y_batch = batch[0].to(device), batch[1].to(device)

        # обнуляем веса
        optimizer.zero_grad()

        # forward + backward + optimize
        y_pred = net(X_batch)
        loss = loss_fn(y_pred, y_batch)
        loss.backward()
        optimizer.step()

        # выведем текущий loss
        running_loss += loss.item()
        # выводем качество каждые 2000 батчей
        if i % 2000 == 1999:
            print('[%d, %5d] loss: %.3f' %
                  (epoch + 1, i + 1, running_loss / 2000))
            running_loss = 0.0

PATH = '/content/drive/My Drive/UTKDataset/utkface_net3.pth'
torch.save(net.state_dict(), PATH)
print('Обучение закончено')

### Accuracy

In [0]:
# загрузка сохраненной модели
PATH = '/content/drive/My Drive/UTKDataset/utkface_net3.pth'
net = MyConvNet2()
net.load_state_dict(torch.load(PATH))
device = torch.device("cpu")
net.to(device)

In [0]:
#  По всем классам
correct = 0
total = 0
with torch.no_grad():
    for data in trainloader:
        images, labels = data
        outputs = net(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print('Accuracy of the network on the 10000 test images: %d %%' % (
    100 * correct / total))

In [0]:
# По каждому классу 
class_correct = list(0. for i in range(len(classes)))
class_total = list(0. for i in range(len(classes)))
with torch.no_grad():
    for data in trainloader:
        images, labels = data
        outputs = net(images)
        _, predicted = torch.max(outputs, 1)
        c = (predicted == labels).squeeze()
        for i in range(4):
            label = labels[i]
            class_correct[label] += c[i].item()
            class_total[label] += 1


for i in range(len(classes)):
    print('Accuracy of %5s : %2d %%' % (
        classes[i], 100 * class_correct[i] / class_total[i]))
  

### Train and test functions

In [0]:
def train(model, optim, criterion, dataloader, epoch, device):
    total = 0
    correct = 0
    train_loss = 0
    
    model.train()
    for data, label in dataloader:
        data, label = data.to(device), label.to(device)
        optim.zero_grad()
        output = model(data)
        loss = criterion(output, label)
        loss.backward()
        optim.step()
        
        train_loss += loss.item()
        pred = output.argmax(1)
        total += output.shape[0]
        correct += pred.eq(label).sum().item()
    return train_loss / total, 100. * correct / total

In [0]:
def test(model, criterion, dataloader, epoch, device, best_acc, model_name='model'):
    total = 0
    correct = 0
    test_loss = 0
    
    model.eval()
    with torch.no_grad():
        for data, label in dataloader:
            data, label = data.to(device), label.to(device)
            output = model(data)
            
            loss = criterion(output, label)
            
            test_loss += loss
            pred = output.argmax(1)
            correct += torch.eq(pred, label).sum().item()
            total += data.shape[0]
    
    acc = 100.*correct/total
    if acc > best_acc:
        # print('Saving..')
        state = {
            'net': model.state_dict(),
            'acc': acc,
            'epoch': epoch,
        }
        PATH = '/content/drive/My Drive/UTKDataset/checkpoint/ckpt_{}.pth'.format(model_name)
        torch.save(state, PATH)
    
    return test_loss / total, acc

### Train


In [0]:
from IPython.display import clear_output
import numpy as np

In [0]:
# model = model.to(device)


criterion = nn.CrossEntropyLoss()
# optimizer = optim.SGD(net.parameters(), 
                      # lr=0.001, momentum=0.9, weight_decay=5e-4)
learning_rate = 1e-3
optimizer = torch.optim.Adam(net.parameters(), lr=learning_rate)
# scheduler = optim.lr_scheduler.StepLR(optimizer, 2, gamma=0.5)

In [0]:
train_losses = []
test_losses = []
best_acc = 0
best_epoch = -1
for i in range(500):
    train_loss, train_acc = train(net, optimizer, criterion, train_loader, i, device)
    train_losses.append(train_loss)
    test_loss, test_acc = test(net, criterion, validation_loader, i, device, best_acc)
    # scheduler.step(test_loss)
    test_losses.append(test_loss)
    best_acc = max(best_acc, test_acc)
    best_epoch = i if best_acc == test_acc else best_epoch
    clear_output()
    plt.figure(figsize=(18, 9))
    plt.plot(np.arange(len(train_losses)), train_losses, label=f'Train, loss: {train_loss:.4f}, Acc: {train_acc}')
    plt.plot(np.arange(len(test_losses)), test_losses, label=f'Test, loss: {test_loss:.4f}, Acc: {test_acc}')
    plt.title(f'Epoch {i}')
    plt.legend(loc='best')
    plt.show()

In [0]:
os.path.isdir('checkpoint')

### Pretrained

In [0]:
from torchvision.models import resnet18

In [0]:
model = resnet18(pretrained=True)

In [0]:
torchsummary.summary(model.cuda(), (3, 32, 32))

In [0]:
model

In [0]:
for param in model.parameters():
    param.requires_grad = False

In [0]:
model.fc = nn.Linear(in_features=512, out_features=4)

In [0]:
sum([p.numel() for p in model.parameters() if p.requires_grad])

In [0]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), 
                      lr=0.001, momentum=0.9, weight_decay=5e-4)
scheduler = optim.lr_scheduler.StepLR(optimizer, 2, gamma=0.5)

In [0]:
train_losses = []
test_losses = []
best_acc = 0
best_epoch = -1
for i in range(3):
    train_loss, train_acc = train(model, optimizer, criterion, trainloader, i, device)
    train_losses.append(train_loss)
    test_loss, test_acc = test(model, criterion, testloader, i, device, best_acc, 'resnet')
    scheduler.step(test_loss)
    test_losses.append(test_loss)
    best_acc = max(best_acc, test_acc)
    best_epoch = i if best_acc == test_acc else best_epoch
    clear_output()
    plt.figure(figsize=(18, 9))
    plt.plot(np.arange(len(train_losses)), train_losses, label=f'Train, loss: {train_loss:.4f}, Acc: {train_acc}')
    plt.plot(np.arange(len(test_losses)), test_losses, label=f'Test, loss: {test_loss:.4f}, Acc: {test_acc}')
    plt.title(f'Epoch {i}')
    plt.legend(loc='best')
    plt.show()

In [0]:
for param in model.parameters():
    param.requires_grad = True

In [0]:
sum([p.numel() for p in model.parameters() if p.requires_grad])

In [0]:
for i in range(3, 10):
    train_loss, train_acc = train(model, optimizer, criterion, trainloader, i, device)
    train_losses.append(train_loss)
    test_loss, test_acc = test(model, criterion, testloader, i, device, best_acc, 'resnet')
    scheduler.step(test_loss)
    test_losses.append(test_loss)
    best_acc = max(best_acc, test_acc)
    best_epoch = i if best_acc == test_acc else best_epoch
    clear_output()
    plt.figure(figsize=(18, 9))
    plt.plot(np.arange(len(train_losses)), train_losses, label=f'Train, loss: {train_loss:.4f}, Acc: {train_acc}')
    plt.plot(np.arange(len(test_losses)), test_losses, label=f'Test, loss: {test_loss:.4f}, Acc: {test_acc}')
    plt.title(f'Epoch {i}')
    plt.legend(loc='best')
    plt.show()

In [0]:
best_acc

In [0]:
plt.figure(figsize=(18, 9))
plt.plot(np.arange(len(train_losses)), train_losses, label=f'Train, loss: {train_loss:.4f}, Acc: {train_acc}')
plt.plot(np.arange(len(test_losses)), test_losses, label=f'Test, loss: {test_loss:.4f}, Acc: {test_acc}')
plt.plot(np.ones(10)*best_epoch, np.linspace(np.min(train_losses).item(), np.max(test_losses).item(), 10), color='r', label='best epoch')
plt.title(f'Epoch {i}')
plt.legend(loc='best')

In [0]:
import torchvision.models as models

In [0]:
resnet18 = models.resnet18(pretrained=False)
googlenet = models.googlenet(pretrained=False)
# torchsummary.summary(model.cuda(), (3, 32, 32))
net1 = MyConvNet1()
net2 = MyConvNet2()
torchsummary.summary(net1, (3, 32, 32), device='cpu')
torchsummary.summary(net2, (3, 32, 32), device='cpu')
torchsummary.summary(resnet18, (3, 32, 32), device='cpu')
torchsummary.summary(googlenet, (3, 32, 32), device='cpu')

In [0]:
def trainLoop(model, optim, criterion, traindataloader, testdataloader, device, model_name='model'):
  train_losses = []
  test_losses = []
  best_acc = 0
  best_epoch = -1
  
  for i in range(10):
    train_loss, train_acc = train(model, optim, criterion, traindataloader, i, device)
    train_losses.append(train_loss)
    test_loss, test_acc = test(model, criterion, testdataloader, i, device, best_acc, model_name)
    # scheduler.step(test_loss)
    test_losses.append(test_loss)
    best_acc = max(best_acc, test_acc)
    best_epoch = i if best_acc == test_acc else best_epoch
    clear_output()
    plt.figure(figsize=(18, 9))
    plt.plot(np.arange(len(train_losses)), train_losses, label=f'Train, loss: {train_loss:.4f}, Acc: {train_acc}')
    plt.plot(np.arange(len(test_losses)), test_losses, label=f'Test, loss: {test_loss:.4f}, Acc: {test_acc}')
    plt.title(f'Epoch {i}')
    plt.legend(loc='best')
    plt.show()

# Пример 1. Полносвязная нс

In [0]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = MyFCNet1()
model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), 
                      lr=0.001, momentum=0.9, weight_decay=5e-4)
# scheduler = optim.lr_scheduler.StepLR(optimizer, 2, gamma=0.5)

trainLoop(model=model, optim=optimizer, criterion=criterion, traindataloader=trainloader, testdataloader=testloader, device=device, model_name='fc1')