In [None]:
!rm -r /content/sample_data/

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
path = '/content/drive/MyDrive/colab_data/diplom/'
path_data = path + 'data/'

In [None]:
#Константы
BATCH_SIZE = 128
EPOCHS = 130
STEPS_PER_EPOCH = 60
TEST_BATCH_SIZE = 100

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow as tf

import os

from tensorflow.keras import Model
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.preprocessing.image import load_img

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Flatten
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import MaxPooling2D
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.python.compiler.tensorrt import trt_convert as trt

In [None]:
class EmotionModel(tf.keras.Model):
  def __init__(self, weight_path:str=None, atulapra_weight_path:str=None, ):
    super(EmotionModel, self).__init__()
    
    # Словарь классов
    self.classes = {0: 'anger',\
           1: 'contempt',\
           2: 'disgust',\
           3: 'fear',\
           4: 'happy',\
           5: 'neutral',\
           6: 'sad',\
           7: 'surprise',
           8: 'uncertain'}
   
    self.IMG_SIZE = 48

    self.model = Sequential()

    self.model.add(Conv2D(32, kernel_size=(3, 3), activation='relu',\
                          input_shape=(self.IMG_SIZE, self.IMG_SIZE,1)))
    self.model.add(Conv2D(64, kernel_size=(3, 3), activation='relu'))
    self.model.add(MaxPooling2D(pool_size=(2, 2)))
    self.model.add(Dropout(0.25))

    self.model.add(Conv2D(128, kernel_size=(3, 3), activation='relu'))
    self.model.add(MaxPooling2D(pool_size=(2, 2)))
    self.model.add(Conv2D(128, kernel_size=(3, 3), activation='relu'))
    self.model.add(MaxPooling2D(pool_size=(2, 2)))
    self.model.add(Dropout(0.25))

    self.model.add(Flatten())
    self.model.add(Dense(1024, activation='relu'))
    self.model.add(Dropout(0.5))

    # Применл transfer learning. За основу была взята модель https://github.com/atulapra/Emotion-detection
    if atulapra_weight_path is not None:
      self.model.add(Dense(7, activation='softmax'))
      self.model.load_weights(atulapra_weight_path)
      self.model.pop()

    self.model.add(Dense(9, activation='softmax'))

    self.model.load_weights(path + 'mdl_wts.hdf5')
  
  def call(self, input):
    return self.model(input)

  # Создаём поток из фотографий
  def create_generator(self, data_path, batch_size):
    train_datagen = ImageDataGenerator(rescale=1./255)

    return train_datagen.flow_from_directory(
        data_path,
        target_size=(self.IMG_SIZE, self.IMG_SIZE),
        batch_size=batch_size,
        color_mode="grayscale",
        class_mode='categorical')
    
  def preprocess_image(self, img_path):
    return (np.array(load_img(img_path,\
                     color_mode='grayscale',\
                     target_size=(self.IMG_SIZE, self.IMG_SIZE)),\
                     dtype=np.float32) / 255.)[..., None]
  
  def class_by_index(self, index):
    return self.classes[index]

  def set_opt_model(self, opt_model):
    self.model = opt_model

In [None]:
def fit_model(model, generator, epochs, steps_per_epoch, weight_save_path):
  mcp = ModelCheckpoint(weight_save_path,\
                      save_best_only=True,\
                      monitor='accuracy')
  
  opt = Adam(lr=0.0001, decay=1e-6)

  model.compile(loss='categorical_crossentropy',\
              optimizer=opt,\
              metrics=['accuracy'])
  
  model.fit(generator,\
          epochs=epochs,\
          steps_per_epoch=steps_per_epoch,\
          callbacks=[mcp])

In [None]:
# Оптимизируем модель с помощью TensorRT
def optimize_model(model, path_to_save):
  model.save(path_to_save+'saved_model')
  converter = trt.TrtGraphConverterV2(input_saved_model_dir=path_to_save+'saved_model')
  converter.convert()
  converter.save(path_to_save+'saved_model_trt')
  model_trt = tf.keras.models.load_model(path_to_save+'saved_model_trt')
  return model_trt

In [None]:
def predict(model, imgs, files):
  imgs = np.array(imgs, dtype=np.float32)
  predictions = model(imgs)
  predictions = [model.class_by_index(predict) for predict in np.argmax(predictions, axis=1)]
  return list(zip(files, predictions))

def predict_classes(model, path_data, test_batch_size):
  out = []
  imgs = []
  files = [] 

  # Прогоняем все файлы для теста

  for filepath in os.listdir(path_data):
    files.append(filepath)

    imgs.append(model.preprocess_image(path_data+'/{0}'.format(filepath)))
    
    if len(imgs) == test_batch_size:
      out += predict(model, imgs, files)
      imgs = []
      files = []

  if len(imgs) != 0:
    out += predict(model, imgs, files)

  out = pd.DataFrame(out, columns=['image_path', 'emotion'])
  out['sort_val'] = out.image_path.apply(lambda x: int(x.replace('.jpg', '')))
  out = out.sort_values('sort_val')
  out = out.drop('sort_val', 1)

  return out

In [None]:
def save_out(out, path):
  compression_opts = dict(method='zip', archive_name='out.csv') 
  out.to_csv(path, index=False, compression=compression_opts)  

In [None]:
fit = False

# Обучать или предсказывать
if fit:
  model = EmotionModel(path+'mdl_wts.hdf5', path+'model.h5')
  train_generator = model.create_generator(path_data+'train', BATCH_SIZE)
  fit_model(model, train_generator, EPOCHS, STEPS_PER_EPOCH, path+'mdl_wts.hdf5')
else:
  model = EmotionModel(path+'mdl_wts.hdf5')
  opt_model = optimize_model(model.model, path)
  out = predict_classes(model, path_data+'test_kaggle/', TEST_BATCH_SIZE)
  save_out(out, path+'out.zip')