In [None]:
!pip install albumentations

In [215]:
import albumentations as A
# Import Image manipulation
from PIL import Image
import numpy as np

# Import data visualization
from matplotlib import pyplot as plt
import matplotlib.patches as patches
import matplotlib

import torch
import torch.nn as nn
from torchvision import transforms
from torch.optim import lr_scheduler
from torch.utils.data import Dataset, DataLoader

Аугментация позволяет расширить возможное количество обучающих примеров, снизить переобучение, увеличить выборку. Таким образом, мы должны обучать нейронную сеть на аугментированных картинках из **обучающей выборки (train)**. Проверять ее результат на неаугментированных картинках из **валидационной выборки (val)** и давать предсказание на неаугментированных из **тестовой (test)**, которая идет в score.

In [496]:
class SimpsonsDataset(Dataset):
    """
    Датасет с картинками, который паралельно подгружает их из папок
    производит скалирование и превращение в торчевые тензоры
    """
    # Получаем на вход список путей к файлам, которые нужно подгрузить *files*
    # А также указываем какой тип датасета мы используем *DATA_MODES*
    def __init__(self, files, mode):
        super().__init__()
        # список файлов для загрузки
        self.files = sorted(files)
        # режим работы
        self.mode = mode

        if self.mode not in DATA_MODES:
            print(f"{self.mode} is not correct; correct modes: {DATA_MODES}")
            raise NameError
        
        # Определяем переменную для метода длины датасета
        self.len_ = len(self.files)
        
        # Определяем LabelEncoder, который по имени картинки преобразует ее в число
        # self.label_encoder = LabelEncoder()
        
        # Лейблы тестового набора фалов не известны 
        if self.mode != 'test':
            self.labels = [path.parent.name for path in self.files]
            self.label_encoder.fit(self.labels)
            
            # тут мы пользуемся странной библиотекой
            with open('label_encoder.pkl', 'wb') as le_dump_file:
                  pickle.dump(self.label_encoder, le_dump_file)
                      
    def __len__(self):
        return self.len_
      
    def load_sample(self, file):
        # Тут мы открываем нашу картинку
        image = Image.open(file)
        # Выгружаем пиксели
        image.load()
        return image
    
    def prepare_sample(self, image):

        H = image.size[1]
        W = image.size[0]
        
        if H > W:
            image = transforms.functional.pad(image,padding=[round(abs(H-W)/2),0])
        else:
            image = transforms.functional.pad(image,padding=[0,round(abs(H-W)/2)])
            
        return np.array(image)
    
    def augmentation_pipline(self, image):
        """Аугментирующие трансформации для трейна"""
        
        image = np.asarray(image)
        
        augmentation_pipeline = A.Compose(
            [
                A.HorizontalFlip(p = 0.5), # apply horizontal flip to 50% of images
                A.OneOf(
                    [
                        # apply one of transforms to 50% of images
                        A.RandomContrast(limit=0.2), # apply random contrast
                        A.RandomGamma(), # apply random gamma
                        A.RandomBrightness(limit=0.2), # apply random brightness
                    ],
                    p = 0.5
                ),
                A.OneOf(
                    [
                        # apply one of transforms to 50% images
                        A.ElasticTransform(
                            alpha = 120,
                            sigma = 120 * 0.09,
                            alpha_affine = 120 * 0.03
                        ),
                        # A.GridDistortion(),
                        A.OpticalDistortion(
                            distort_limit = 0.7,
                            shift_limit = 0.5
                        ),
                    ],
                    p = 0.5
                ),
                
            ],
            p = 1
        )
        
        image = augmentation_pipeline(image = image)
        image = Image.fromarray(image["image"])
        return image
  
    def __getitem__(self, index):
        # Перевод в нужный размер и в тензоры
        transform_train = transforms.Compose([
            transforms.Resize(size=(RESCALE_SIZE+16, RESCALE_SIZE+16)),
            transforms.RandomRotation(degrees=5),
            transforms.RandomCrop(size=(RESCALE_SIZE, RESCALE_SIZE)),
            transforms.ToTensor(), # переводит в интервал от 0 до 1
            # transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
            ])
        
        transform_test = transforms.Compose([
            transforms.Resize(size=(RESCALE_SIZE, RESCALE_SIZE)),
            transforms.ToTensor(), # переводит в интервал от 0 до 1
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
            ])
        
        # Загружаем картинку по пути, используя конкретный индекс
        x = self.load_sample(self.files[index])
        
        # Если тест, то лейблы не нужны
        if self.mode == 'test':
            x = transform_test(x)
            return x
        else:
            label = self.labels[index]
            label_id = self.label_encoder.transform([label])
            y = label_id.item()
            x = prepare_sample(x)
            x = self.augmentation_pipline(x)
            x = transform_train(x)
            return x, y

In [None]:
# сохранить веса нашей нейросети model
torch.save(model.state_dict(), "path_to\\model_weights.pth") 

# or

PATH = "path_to\\model_weights.pth"
torch.save({
            'modelA_state_dict': netA.state_dict(),
            'modelB_state_dict': netB.state_dict(),
            'optimizerA_state_dict': optimizerA.state_dict(),
            'optimizerB_state_dict': optimizerB.state_dict(),
            }, PATH)

# загружаем сохраненное состояние весов нейросети
model.load_state_dict(torch.load("path_to\\model_wights.pth"))
model.train() # переключаем нейросеть в режим обучения

checkpoint = torch.load(PATH)
modelA.load_state_dict(checkpoint['modelA_state_dict'])
modelB.load_state_dict(checkpoint['modelB_state_dict'])
optimizerA.load_state_dict(checkpoint['optimizerA_state_dict'])
optimizerB.load_state_dict(checkpoint['optimizerB_state_dict'])

In [None]:
import math
def find_lr(model, dataloaders, loss_fn, optimizer, init_value=1e-8, final_value=10.0, use_gpu=True):
    """Слегка модифицированная функция для поиска оптимального learning rate 
    функция взята из замечатлеьной книги книги "Ian Pointer - Programming PyTorch
    for Deep Learning - Creating and Deploying Deep Learning Applications-
    O’Reilly Media (2019)
    """
    model.train()
    number_in_epoch = len(dataloaders['train']) - 1
    update_step = (final_value / init_value) ** (1 / number_in_epoch)
    lr = init_value
    optimizer.param_groups[0]["lr"] = lr
    best_loss = 0.0
    batch_num = 0
    losses = []
    log_lrs = []
    for inputs, labels in dataloaders['train']:
        if use_gpu:
            inputs = inputs.to(DEVICE)
            labels = labels.to(DEVICE)

        batch_num += 1
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = loss_fn(outputs, labels)

        # Crash out if loss explodes

        if batch_num > 1 and loss > 4 * best_loss:
            return log_lrs[10:-5], losses[10:-5]

        # Record the best loss

        if loss < best_loss or batch_num == 1:
            best_loss = loss

        # Store the values

        losses.append(loss)
        log_lrs.append(math.log10(lr))

        # Do the backward pass and optimize

        loss.backward()
        optimizer.step()

        # Update the lr for the next step and store

        lr *= update_step
        optimizer.param_groups[0]["lr"] = lr
    return log_lrs[10:-5], losses[10:-5]

# подбор оптимального lr для классификатора model_vgg16_bn.classifier
logs, losses = find_lr(model_vgg16_bn, dataloaders, loss_fn, optimizer, init_value=1e-8, final_value=10.0)

# построим график для оптимального подбора lr
fig = plt.figure(figsize=(10,10))
ax = fig.add_subplot(111)
ax.plot(logs,losses)
ax.set_xlabel("$10^x$")
ax.set_ylabel("loss")

In [None]:
def initialize_model(model_name, num_classes, feature_extract, use_pretrained=True):
    # Initialize these variables which will be set in this if statement. Each of these
    #   variables is model specific.
    model_ft = None
    input_size = 0
    
    elif model_name == "alexnet":
        """ Alexnet
        """
        model_ft = models.alexnet(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        num_ftrs = model_ft.classifier[6].in_features
        model_ft.classifier[6] = nn.Linear(num_ftrs,num_classes)
        input_size = 224

    if model_name == "resnet":
        """ Resnet18
        """
        model_ft = models.resnet18(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        num_ftrs = model_ft.fc.in_features
        model_ft.fc = nn.Linear(num_ftrs, num_classes)
        input_size = 224

    elif model_name == "vgg":
        """ VGG11_bn
        """
        model_ft = models.vgg16_bn(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        num_ftrs = model_ft.classifier[6].in_features
        model_ft.classifier[6] = nn.Linear(num_ftrs,num_classes)
        input_size = 224

    elif model_name == "densenet":
        """ Densenet
        """
        model_ft = models.densenet121(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        num_ftrs = model_ft.classifier.in_features
        model_ft.classifier = nn.Linear(num_ftrs, num_classes)
        input_size = 224

    elif model_name == "inception":
        """ Inception v3
        Be careful, expects (299,299) sized images and has auxiliary output
        """
        model_ft = models.inception_v3(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        # Handle the auxilary net
        num_ftrs = model_ft.AuxLogits.fc.in_features
        model_ft.AuxLogits.fc = nn.Linear(num_ftrs, num_classes)
        # Handle the primary net
        num_ftrs = model_ft.fc.in_features
        model_ft.fc = nn.Linear(num_ftrs,num_classes)
        input_size = 299

    else:
        print("Invalid model name, exiting...")
        exit()

    return model_ft, input_size

# Initialize the model for this run
model_ft, input_size = initialize_model(model_name, num_classes, feature_extract, use_pretrained=True)

In [None]:
param512 = model_vgg16_bn.features[27:44].parameters()
param256 = model_vgg16_bn.features[17:27].parameters()
param128 = model_vgg16_bn.features[10:17].parameters()
param64 = model_vgg16_bn.features[0:10].parameters()

# В качестве cost function используем кросс-энтропию
loss_fn = nn.CrossEntropyLoss()

found_lr = lr=1e-3

# Дифференциальное обучение (по группам слоев)у каждой группы свой lr

optimizer = torch.optim.AdamW([
{ 'params': model_vgg16_bn.classifier.parameters(), 'lr': found_lr},
{ 'params': param512, 'lr': found_lr / 3},
{ 'params': param256, 'lr': found_lr / 10},
{ 'params': param128, 'lr': found_lr / 50},
{ 'params': param64, 'lr': found_lr / 100},
], lr=found_lr / 100, amsgrad=True)

# Использовать ли GPU
model_vgg16_bn = model_vgg16_bn.cuda()

In [None]:
def accurancy_for_each_class(y_test_all, predictions_all):
    class_correct = [0 for i in range(classes_number)]
    class_total = [0 for i in range(classes_number)]
    feature_names = sorted(set(dataloaders['val'].dataset.labels))

    c = (predictions_all == y_test_all).squeeze()
    for i in range(len(predictions_all)):
        label = predictions_all[i]            
        class_correct[label] += c[i].item()
        class_total[label] += 1

    print(class_total)
    print(len(class_total))

    for i in range(classes_number):
        print('Accuracy of %5s : %2d %%' % (
            (feature_names[i], (100 * class_correct[i] / class_total[i]) if class_total[i] != 0 else -1)))

In [None]:
class MyEnsemble(nn.Module):   
    def __init__(self, modelA, modelB):
        super(MyEnsemble, self).__init__()
        self.modelA = modelA
        self.modelB = modelB
        self.classifier = nn.Linear(classes_number * 2, classes_number)
        
    def forward(self, x):
        x1 = self.modelA(x)
        x2 = self.modelB(x)
        x = torch.cat((x1, x2), dim=1)
        x = self.classifier(x)
        return x

In [None]:
# Зададим путь для загрузки моделей!
path_vgg16_bn = '/путь_до_весов_модели/vgg16_bn.pth'
path_resnet50 = '/путь_до_весов_модели/resnet50.pth'

# Загружаем state dicts
model_vgg16_bn.load_state_dict(torch.load(path_vgg16_bn))
model_resnet50.load_state_dict(torch.load(path_resnet50))

# замораживаем параметры (веса) не входящие в layers_to_unfreeze
for param in model_ensemble.parameters():
    param.requires_grad = False

for param in model_ensemble.classifier.parameters():
    param.requires_grad = True

In [None]:
# oversampling

labels = [path.parent.name for path in files] 

def create_dct_path_labels(train_files, train_labels):
    dct_simpsons = {}
    for label_i in np.unique(train_labels).tolist():
        dct_simpsons[label_i] = []

    for path_i, label_i in zip(train_files, train_labels):
        dct_simpsons[label_i].append(path_i)

    return dct_simpsons

def print_dct(dct_simpsons):
    for key in dct_simpsons:
        print(f"{key}\t{dct_simpsons[key]}")
        
# Создадим словарь в котором ключами будут персонажи Симпсонов, а значениями списки с путями к картинкам.
dct_path_train = create_dct_path_labels(train_files, train_labels)

# Дополним картинки классов у которых менее 75 картинок, до 75 картинок в классе
for person in dct_path_train:
    if len(dct_path_train[person]) < 75:
        dct_path_train[person] = dct_path_train[person] * (75 // len(dct_path_train[person]))
        dct_path_train[person].extend(dct_path_train[person][:75 - len(dct_path_train[person])])
        
new_files = []

for person in dct_path_train:
    new_files.extend(dct_path_train[person])

new_labels = [path.parent.name for path in new_files]

In [None]:
# Очень простая сеть
class SimpleCnn(nn.Module):
  
    def __init__(self, n_classes):
        super().__init__()
        self.conv1 = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=8, kernel_size=3),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(in_channels=8, out_channels=16, kernel_size=3),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        self.conv3 = nn.Sequential(
            nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        self.conv4 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        self.conv5 = nn.Sequential(
            nn.Conv2d(in_channels=64, out_channels=96, kernel_size=3),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        
        self.out = nn.Sequential(
            nn.Dropout(p=0.2),
            nn.Linear(in_features=96 * 5 * 5, out_features=600),
            nn.ReLU(),
            nn.Linear(in_features=600, out_features=n_classes)
        )
  
  
    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.conv5(x)

        x = x.view(x.size(0), -1)
        logits = self.out(x)
        return logits

In [None]:
# Сделаем классную визуализацию, чтобы посмотреть насколько сеть уверена в своих ответах. 
# Можете исспользовать это, чтобы отлаживать правильность вывода.

import matplotlib.patches as patches
from matplotlib.font_manager import FontProperties

fig, ax = plt.subplots(nrows=3, ncols=3,figsize=(12, 12), \
                        sharey=True, sharex=True)
for fig_x in ax.flatten():
    random_characters = int(np.random.uniform(0,1000))
    im_val, label = val_dataset[random_characters]
    img_label = " ".join(map(lambda x: x.capitalize(),\
                val_dataset.label_encoder.inverse_transform([label])[0].split('_')))
    
    

    imshow(im_val.data.cpu(), \
          title=img_label,plt_ax=fig_x)
    
    actual_text = "Actual : {}".format(img_label)
            
    fig_x.add_patch(patches.Rectangle((0, 53),86,35,color='white'))
    font0 = FontProperties()
    font = font0.copy()
    font.set_family("fantasy")
    prob_pred = predict_one_sample(eff_net, im_val.unsqueeze(0))
    predicted_proba = np.max(prob_pred)*100
    y_pred = np.argmax(prob_pred)
    
    predicted_label = label_encoder.classes_[y_pred]
    predicted_label = predicted_label[:len(predicted_label)//2] + '\n' + predicted_label[len(predicted_label)//2:]
    predicted_text = "{} : {:.0f}%".format(predicted_label,predicted_proba)
            
    fig_x.text(1, 59, predicted_text , horizontalalignment='left', fontproperties=font,
                    verticalalignment='top',fontsize=8, color='black',fontweight='bold')