Классификация изображений
Основная идея этого решения: взять предобученую на ImageNet сеть Xception и дообучить под нашу задачу.  
По ходу решения мы будем давать вам рекомендации, которые помогут улучшить качество модели.

Удачи и Поехали!

In [None]:
!nvidia-smi

In [None]:
!pip freeze > requirements.txt

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import pickle
import zipfile
import csv
import sys
import os


import tensorflow as tf
from tensorflow.keras.preprocessing import image
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import LearningRateScheduler, ModelCheckpoint, ReduceLROnPlateau 
from tensorflow.keras.callbacks import Callback, EarlyStopping
from tensorflow.keras.regularizers import l2
from tensorflow.keras import optimizers
import tensorflow.keras.models as Model
import tensorflow.keras.layers as Layer
from tensorflow.keras.applications.xception import Xception as xcp
from tensorflow.keras.applications import EfficientNetB7 as ebn7



import tensorflow.keras.backend as K

from sklearn.model_selection import train_test_split, StratifiedKFold

import PIL
from PIL import ImageOps, ImageFilter


In [None]:
# В setup выносим основные настройки: так удобнее их перебирать в дальнейшем.

EPOCHS               = 5  # эпох на обучение
BATCH_SIZE           = 64 # уменьшаем batch если сеть большая, иначе не влезет в память на GPU
LR                   = 1e-4
VAL_SPLIT            = 0.20 # сколько данных выделяем на тест = 20%

CLASS_NUM            = 10  # количество классов в нашей задаче
IMG_SIZE             = 224 # какого размера подаем изображения в сеть
IMG_CHANNELS         = 3   # у RGB 3 канала
input_shape          = (IMG_SIZE, IMG_SIZE, IMG_CHANNELS)

RANDOM_SEED          = 42 # для воспроизводимости

DATA_PATH = '../input/sf-dl-car-classification/'
PATH = "../working/car/" # рабочая директория

# EDA / Анализ данных

In [None]:
train_df = pd.read_csv(DATA_PATH+"train.csv")
sample_submission = pd.read_csv(DATA_PATH+"sample-submission.csv")
train_df.head()

In [None]:
train_df.Category.value_counts()

In [None]:
print('Распаковываем картинки')
# Will unzip the files so that you can see them..
for data_zip in ['train.zip', 'test.zip']:
    with zipfile.ZipFile(DATA_PATH+data_zip,"r") as z:
        z.extractall(PATH)
        
print(os.listdir(PATH))

In [None]:
print('Пример картинок (random sample)')
plt.figure(figsize=(12,8))

random_image = train_df.sample(n=9)
random_image_paths = random_image['Id'].values
random_image_cat = random_image['Category'].values

for index, path in enumerate(random_image_paths):
    im = PIL.Image.open(PATH+f'train/{random_image_cat[index]}/{path}')
    plt.subplot(3,3, index+1)
    plt.imshow(im)
    plt.title('Class: '+str(random_image_cat[index]))
    plt.axis('off')
plt.show()

## Подготовка данных

### Аугментация данных

In [None]:
train_datagen = ImageDataGenerator(
    rescale=1. / 255,
    rotation_range = 5,
    width_shift_range=0.1,
    height_shift_range=0.1,
    validation_split=VAL_SPLIT, # set validation split
    horizontal_flip=False)

test_datagen = ImageDataGenerator(rescale=1. / 255)

In [None]:
# Завернем наши данные в генератор:

train_generator = train_datagen.flow_from_directory(
    PATH+'train/',      # директория где расположены папки с картинками 
    target_size=(IMG_SIZE, IMG_SIZE),
    batch_size=BATCH_SIZE,
    class_mode='categorical',
    shuffle=True, seed=RANDOM_SEED,
    subset='training') # set as training data

test_generator = train_datagen.flow_from_directory(
    PATH+'train/',
    target_size=(IMG_SIZE, IMG_SIZE),
    batch_size=BATCH_SIZE,
    class_mode='categorical',
    shuffle=True, seed=RANDOM_SEED,
    subset='validation') # set as validation data

test_sub_generator = test_datagen.flow_from_dataframe( 
    dataframe=sample_submission,
    directory=PATH+'test_upload/',
    x_col="Id",
    y_col=None,
    shuffle=False,
    class_mode=None,
    seed=RANDOM_SEED,
    target_size=(IMG_SIZE, IMG_SIZE),
    batch_size=BATCH_SIZE,)

In [None]:
x,y = train_generator.next()
print('Пример картинок из test_generator')
plt.figure(figsize=(12,8))

for i in range(0,6):
    image = x[i]
    plt.subplot(3,3, i+1)
    plt.imshow(image)
plt.show()

## Построение модели

In [None]:
# В качестве базовой модели берем Xception, замораживаем все слои, кроме головы и надстраиваем простую голову 
def baseline_model():
    base_model = xcp(weights='imagenet', include_top=False, input_shape = input_shape)
    base_model.trainable = False
    model=Model.Sequential()
    model.add(base_model)
    model.add(Layer.GlobalAveragePooling2D())
    model.add(Layer.Dense(256, 
                          activation='relu'))
    model.add(Layer.Dense(CLASS_NUM, activation='softmax'))
    return model

In [None]:
# Определим callbacks для сохранения моделей в процессе обучения
def callbacks(filename):
    checkpoint = ModelCheckpoint(filename + '.hdf5', 
                             monitor = ['val_accuracy'], 
                             verbose = 1,
                             mode = 'max')
    earlystop = EarlyStopping(monitor = 'val_accuracy',
                              min_delta = 0.001,
                              patience = 3,
                              restore_best_weights = True)
    reduce_lr = ReduceLROnPlateau(monitor='val_loss',
                                  factor=0.5,
                                  patience=2,
                                  min_lr=0.0000001,
                                  verbose=1,
                                  mode='auto')
    callbacks_list = [checkpoint, earlystop, reduce_lr]
    return callbacks_list

In [None]:
def compile_history(model, model_name):
    model.compile(loss="categorical_crossentropy", 
            optimizer=optimizers.Adam(lr=LR), 
            metrics=["accuracy"])
    history_model = model.fit(
        train_generator,
        steps_per_epoch = len(train_generator),
        validation_data = test_generator, 
        validation_steps = len(test_generator),
        epochs = EPOCHS,
        callbacks = callbacks(model_name)
    )
    return history_model

In [None]:
# Функция для отбражения графиков перфоманса модели

def show_graphs(history):
    acc = history.history['accuracy']
    val_acc = history.history['val_accuracy']
    loss = history.history['loss']
    val_loss = history.history['val_loss']

    epochs = range(len(acc))

    plt.plot(epochs, acc, 'b', label='Training acc')
    plt.plot(epochs, val_acc, 'r', label='Validation acc')
    plt.title('Training and validation accuracy')
    plt.legend()

    plt.figure()

    plt.plot(epochs, loss, 'b', label='Training loss')
    plt.plot(epochs, val_loss, 'r', label='Validation loss')
    plt.title('Training and validation loss')
    plt.legend()

    plt.show()

In [None]:
# Для отслеживания результатов создадим переменную, куда будем вписывать перфоманс всех моделей
result ={}

In [None]:
baseline_model = baseline_model()

In [None]:
baseline_model.summary()

In [None]:
history_baseline_model = compile_history(baseline_model,'baseline_model')

In [None]:
show_graphs(history_baseline_model)

In [None]:
scores_baseline_model = baseline_model.evaluate(test_generator, steps=len(test_generator), verbose=1)
print("Accuracy: %.2f%%" % (scores_baseline_model[1]*100))

In [None]:
result['baseline_model'] = scores_baseline_model[1]*100

In [None]:
# Добавим в "голову" batch normalization, сменим global varege pooling на max pooling и установим dropout.
def baseline_model_full_head():
    base_model = xcp(weights='imagenet', include_top=False, input_shape = input_shape)
    base_model.trainable = False
    model=Model.Sequential()
    model.add(base_model)
    model.add(Layer.BatchNormalization())
    model.add(Layer.GlobalMaxPooling2D())       
    model.add(Layer.Dense(256, 
                          activation='relu'))
    model.add(Layer.BatchNormalization())
    model.add(Layer.Dropout(0.25))
    model.add(Layer.Dense(CLASS_NUM, activation='softmax'))    
    return model

In [None]:
baseline_model_full_head=baseline_model_full_head()

In [None]:
history_baseline_model_full_head=compile_history(baseline_model_full_head,'baseline_model_full_head')

In [None]:
scores_baseline_model_full_head = baseline_model_full_head.evaluate(test_generator, steps=len(test_generator), verbose=1)
print("Accuracy: %.2f%%" % (scores_baseline_model_full_head[1]*100))
result['baseline_model_full_head'] = round(scores_baseline_model_full_head[1]*100,4)

In [None]:
# Заменим базовую можель с Xceprion на EfficientNet B7, она лучше себя показывает на датасете imagenet.
def model_ebn7_bn():
    base_model = ebn7(weights='imagenet', include_top=False, input_shape = input_shape)
    base_model.trainable = False
    model=Model.Sequential()
    model.add(base_model)
    model.add(Layer.BatchNormalization())
    model.add(Layer.GlobalMaxPooling2D())       
    model.add(Layer.Dense(256, 
                          activation='relu'))
    model.add(Layer.BatchNormalization())
    model.add(Layer.Dropout(0.25))
    model.add(Layer.Dense(CLASS_NUM, activation='softmax'))    
    return model

In [None]:
model_ebn7_bn = model_ebn7_bn()
history_model_ebn7_bn=compile_history(model_ebn7_bn,'model_ebn7_bn')
scores_model_ebn7_bn = model_ebn7_bn.evaluate(test_generator, steps=len(test_generator), verbose=1)
print("Accuracy: %.2f%%" % (scores_model_ebn7_bn[1]*100))
result['model_ebn7_bn'] = scores_model_ebn7_bn[1]*100


In [None]:
show_graphs(history_model_ebn7_bn)

In [None]:
# Разморозим модель с лучшим перфомансом
def xcp_bn_full():
    base_model = xcp(weights='imagenet', include_top=False, input_shape = input_shape)
    base_model.trainable = True
    model=Model.Sequential()
    model.add(base_model)
    model.add(Layer.BatchNormalization())
    model.add(Layer.GlobalMaxPooling2D())       
    model.add(Layer.Dense(256, 
                          activation='relu'))
    model.add(Layer.BatchNormalization())
    model.add(Layer.Dropout(0.25))
    model.add(Layer.Dense(CLASS_NUM, activation='softmax'))    
    return model

In [None]:
xcp_bn_full = xcp_bn_full()
history_xcp_bn_full=compile_history(xcp_bn_full,'xcp_bn_full')
scores_xcp_bn_full = xcp_bn_full.evaluate(test_generator, steps=len(test_generator), verbose=1)
print("Accuracy: %.2f%%" % (scores_xcp_bn_full[1]*100))
result['xcp_bn_full'] = scores_xcp_bn_full[1]*100

In [None]:
show_graphs(history_xcp_bn_full)

## Удвоим размер изображений и добавим горизонтальное отражение в датасет

In [None]:
IMG_SIZE = 549
BATCH_SIZE = 16
# поменяем imagedatagenerator
train_datagen_mod = ImageDataGenerator(
    rescale=1. / 255,
    rotation_range = 5,
    width_shift_range=0.1,
    height_shift_range=0.1,
    validation_split=VAL_SPLIT, # set validation split
    horizontal_flip=True,
    fill_mode = 'reflect')
# Завернем наши данные в генератор:


train_generator = train_datagen.flow_from_directory(
    PATH+'train/',      # директория где расположены папки с картинками 
    target_size=(IMG_SIZE, IMG_SIZE),
    batch_size=BATCH_SIZE,
    class_mode='categorical',
    shuffle=True, seed=RANDOM_SEED,
    subset='training') # set as training data

test_generator = train_datagen.flow_from_directory(
    PATH+'train/',
    target_size=(IMG_SIZE, IMG_SIZE),
    batch_size=BATCH_SIZE,
    class_mode='categorical',
    shuffle=True, seed=RANDOM_SEED,
    subset='validation') # set as validation data

test_sub_generator = test_datagen.flow_from_dataframe( 
    dataframe=sample_submission,
    directory=PATH+'test_upload/',
    x_col="Id",
    y_col=None,
    shuffle=False,
    class_mode=None,
    seed=RANDOM_SEED,
    target_size=(IMG_SIZE, IMG_SIZE),
    batch_size=BATCH_SIZE,)

In [None]:
xcp_bn_full = xcp_bn_full()
history_xcp_bn_full=compile_history(xcp_bn_full,'xcp_bn_full')
scores_xcp_bn_full = xcp_bn_full.evaluate(test_generator, steps=len(test_generator), verbose=1)
print("Accuracy: %.2f%%" % (scores_xcp_bn_full[1]*100))
result['xcp_bn_full'] = scores_xcp_bn_full[1]*100

In [None]:
# Увеличим количество эпох
EPOCHS = 25

In [None]:
xcp_bn_full = xcp_bn_full()
history_xcp_bn_full=compile_history(xcp_bn_full,'xcp_bn_full')
scores_xcp_bn_full = xcp_bn_full.evaluate(test_generator, steps=len(test_generator), verbose=1)
print("Accuracy: %.2f%%" % (scores_xcp_bn_full[1]*100))
result['xcp_bn_full'] = scores_xcp_bn_full[1]*100

In [None]:
scores_xcp_bn_full = xcp_bn_full.evaluate(test_generator, steps=len(test_generator), verbose=1)
print("Accuracy: %.2f%%" % (scores_xcp_bn_full[1]*100))
result['xcp_bn_full'] = scores_xcp_bn_full[1]*100

## Используем TTA (Test Time Augmentation).  
  
Финальное предсказание как среднее из 10 предсказаний.


In [None]:
tta_steps = 10
predictions = []

for i in range(tta_steps):
    preds = xcp_bn_full.predict(test_sub_generator, verbose=1) 
    predictions.append(preds)

pred = np.mean(predictions, axis=0)

In [None]:
predictions = np.argmax(pred, axis=-1) #multiple categories
label_map = (train_generator.class_indices)
label_map = dict((v,k) for k,v in label_map.items()) #flip k,v
predictions = [label_map[k] for k in predictions]
filenames_with_dir=test_sub_generator.filenames
submission = pd.DataFrame({'Id':filenames_with_dir, 'Category':predictions}, 
                          columns=['Id', 'Category'])

submission['Id'] = submission['Id'].replace('test_upload/','')

In [None]:
submission.to_csv('submission_TTA.csv', index=False)