In [1]:
"""
训练脚本，由于数据集不大，这里一次性读入内存
"""
import os
import argparse
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"

from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
from tensorflow.keras.optimizers import SGD, Adam
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import numpy as np
from sklearn.model_selection import train_test_split


In [2]:
"""
读取数据集
"""
from tqdm import tqdm
import os
import numpy as np
from tensorflow.keras.preprocessing.image import img_to_array, load_img


class Fer2013(object):
    def __init__(self, folder="../dataset/fer2013"):
        """
        构造函数
        """
        self.folder = folder

    def gen_train(self):
        """
        产生训练数据
        :return expressions:读取文件的顺序即标签的下标对应
        :return x_train: 训练数据集
        :return y_train： 训练标签
        """
        #
        folder = os.path.join(self.folder, 'Training')
        #定义表情的文件夹
        expressions = ['anger', 'disgust', 'fear', 'happy', 'sad', 'surprised', 'neutral', 'contempt']
        x_train = []
        y_train = []
        #tdqm加载进度条
        for i in tqdm(range(len(expressions))):
            #contempt表情并没有对应的训练集
            if expressions[i] == 'contempt':
                continue
            #每个表情生成一个文件夹
            expression_folder = os.path.join(folder, expressions[i])
            #images为返回每个表情对应的文件夹的列表
            images = os.listdir(expression_folder)
            #遍历每个表情对应的文件夹中的图片
            for j in range(len(images)):
                #读取48*48的灰度图
                img = load_img(os.path.join(expression_folder, images[j]), target_size=(48, 48), color_mode="grayscale")
                img = img_to_array(img)  # 灰度化
                #往x_train数组中添加元素
                x_train.append(img)
                y_train.append(i)
        #https://blog.csdn.net/qq_40247705/article/details/104756991
        #使用float32减少空间占用
        x_train = np.array(x_train).astype('float32') / 255.
        y_train = np.array(y_train).astype('int')
        return expressions, x_train, y_train

    def gen_valid(self):
        """
        产生验证集数据
        :return:
        """
        folder = os.path.join(self.folder, 'PublicTest')
        expressions = ['anger', 'disgust', 'fear', 'happy', 'sad', 'surprised', 'neutral', 'contempt']
        x_valid = []
        y_valid = []
        for i in tqdm(range(len(expressions))):
            if expressions[i] == 'contempt':
                continue
            expression_folder = os.path.join(folder, expressions[i])
            images = os.listdir(expression_folder)
            for j in range(len(images)):
                img = load_img(os.path.join(expression_folder, images[j]), target_size=(48, 48), color_mode="grayscale")
                img = img_to_array(img)  # 灰度化
                x_valid.append(img)
                y_valid.append(i)
        x_valid = np.array(x_valid).astype('float32') / 255.
        y_valid = np.array(y_valid).astype('int')
        return expressions, x_valid, y_valid

    def     gen_test(self):
        """
        产生验证集数据
        :return:
        """
        folder = os.path.join(self.folder, 'PrivateTest')
        expressions = ['anger', 'disgust', 'fear', 'happy', 'sad', 'surprised', 'neutral', 'contempt']
        x_test = []
        y_test = []
        for i in tqdm(range(len(expressions))):
            if expressions[i] == 'contempt':
                continue
            expression_folder = os.path.join(folder, expressions[i])
            images = os.listdir(expression_folder)
            for j in range(len(images)):
                img = load_img(os.path.join(expression_folder, images[j]), target_size=(48, 48), color_mode="grayscale")
                img = img_to_array(img)  # 灰度化
                x_test.append(img)
                y_test.append(i)
        x_test = np.array(x_test).astype('float32') / 255.
        y_test = np.array(y_test).astype('int')
        return expressions, x_test, y_test

In [3]:
"""
构建CNN模型
"""
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Dropout, BatchNormalization, Flatten, Dense, AveragePooling2D
from tensorflow.keras.models import Model
from tensorflow.keras.layers import PReLU


def CNN1(input_shape=(48, 48, 1), n_classes=8):
    """
    参考VGG思路设计的第一个模型，主要注意点是感受野不能太大，以免获得很多噪声信息
    :param input_shape: 输入图片的尺寸
    :param n_classes: 目标类别数目
    :return:
    """
    # input
    input_layer = Input(shape=input_shape)
    # block1
    x = Conv2D(32, kernel_size=(3, 3), strides=1, padding='same', activation='relu')(input_layer)
    x = Conv2D(32, kernel_size=(3, 3), strides=1, padding='same', activation='relu')(x)
    x = MaxPooling2D(pool_size=(2, 2), strides=(2, 2))(x)
    x = Dropout(0.5)(x)
    # block2
    x = Conv2D(64, kernel_size=(3, 3), strides=1, padding='same', activation='relu')(x)
    x = Conv2D(64, kernel_size=(3, 3), strides=1, padding='same', activation='relu')(x)
    x = MaxPooling2D(pool_size=(2, 2), strides=(2, 2))(x)
    x = Dropout(0.5)(x)
    # block3
    x = Conv2D(128, kernel_size=(3, 3), strides=1, padding='same', activation='relu')(x)
    x = Conv2D(128, kernel_size=(3, 3), strides=1, padding='same', activation='relu')(x)
    x = MaxPooling2D(pool_size=(2, 2), strides=(2, 2))(x)
    x = Dropout(0.5)(x)
    # fc
    x = Flatten()(x)
    x = Dense(1024, activation='relu')(x)
    x = Dropout(0.5)(x)
    x = Dense(128, activation='relu')(x)
    output_layer = Dense(n_classes, activation='softmax')(x)

    model = Model(inputs=input_layer, outputs=output_layer)
    return model


def CNN2(input_shape=(48, 48, 1), n_classes=8):
    """
    参考论文Going deeper with convolutions在输入层后加一层的1*1卷积增加非线性表示

    :param input_shape:
    :param n_classes:
    :return:
    """
    # input
    input_layer = Input(shape=input_shape)
    # block1
    x = Conv2D(32, (1, 1), strides=1, padding='same', activation='relu')(input_layer)
    x = Conv2D(32, (5, 5), strides=1, padding='same', activation='relu')(x)
    x = MaxPooling2D(pool_size=(2, 2), strides=2)(x)
    # block2
    x = Conv2D(32, (3, 3), padding='same', activation='relu')(x)
    x = MaxPooling2D(pool_size=(2, 2), strides=2)(x)
    # block3
    x = Conv2D(64, (5, 5), padding='same', activation='relu')(x)
    x = MaxPooling2D(pool_size=(2, 2), strides=2)(x)
    # fc
    x = Flatten()(x)
    x = Dense(2048, activation='relu')(x)
    x = Dropout(0.5)(x)
    x = Dense(1024, activation='relu')(x)
    x = Dropout(0.5)(x)
    x = Dense(n_classes, activation='softmax')(x)

    model = Model(inputs=input_layer, outputs=x)
    return model


def CNN3(input_shape=(48, 48, 1), n_classes=8):
    """
    参考论文实现
    A Compact Deep Learning Model for Robust Facial Expression Recognition
    https://blog.csdn.net/lynlindasy/article/details/100094333
    :param input_shape:
    :param n_classes:
    :return:
    """
    # input activation function为relu，步长为1，padding为赋值一样的值
    input_layer = Input(shape=input_shape)
    x = Conv2D(32, (1, 1), strides=1, padding='same', activation='relu')(input_layer)
    # block1 PRelu指的是带参数的ReLU函数 https://blog.csdn.net/shuzfan/article/details/51345832
    x = Conv2D(64, (3, 3), strides=1, padding='same')(x)
    x = PReLU()(x)
    x = Conv2D(64, (5, 5), strides=1, padding='same')(x)
    x = PReLU()(x)
    #最大池化
    x = MaxPooling2D(pool_size=(2, 2), strides=2)(x)
    # block2 重复一次block1
    x = Conv2D(64, (3, 3), strides=1, padding='same')(x)
    x = PReLU()(x)
    x = Conv2D(64, (5, 5), strides=1, padding='same')(x)
    x = PReLU()(x)
    x = MaxPooling2D(pool_size=(2, 2), strides=2)(x)
    # fully-connected 64经过两次全连接层，并且dropout = 0.6，最后经过一个sofmax分类器
    x = Flatten()(x)
    x = Dense(2048, activation='relu')(x)
    x = Dropout(0.5)(x)
    x = Dense(1024, activation='relu')(x)
    x = Dropout(0.5)(x)
    x = Dense(n_classes, activation='softmax')(x)

    model = Model(inputs=input_layer, outputs=x)
    return model


In [4]:
#当训练接为fer2013时
expressions, x_train, y_train = Fer2013().gen_train()
_, x_valid, y_valid = Fer2013().gen_valid()
_, x_test, y_test = Fer2013().gen_test()


100%|████████████████████████████████████████████████████████████████████████████████████| 8/8 [00:16<00:00,  2.02s/it]
100%|████████████████████████████████████████████████████████████████████████████████████| 8/8 [00:01<00:00,  4.57it/s]
100%|████████████████████████████████████████████████████████████████████████████████████| 8/8 [00:01<00:00,  5.80it/s]


In [5]:
# target编码
y_train = to_categorical(y_train).reshape(y_train.shape[0], -1)
y_valid = to_categorical(y_valid).reshape(y_valid.shape[0], -1)

In [6]:
print(y_train)

[[1. 0. 0. ... 0. 0. 0.]
 [1. 0. 0. ... 0. 0. 0.]
 [1. 0. 0. ... 0. 0. 0.]
 ...
 [0. 0. 0. ... 0. 0. 1.]
 [0. 0. 0. ... 0. 0. 1.]
 [0. 0. 0. ... 0. 0. 1.]]


In [7]:
print(y_valid)

[[1. 0. 0. ... 0. 0. 0.]
 [1. 0. 0. ... 0. 0. 0.]
 [1. 0. 0. ... 0. 0. 0.]
 ...
 [0. 0. 0. ... 0. 0. 1.]
 [0. 0. 0. ... 0. 0. 1.]
 [0. 0. 0. ... 0. 0. 1.]]


In [8]:
# 为了统一几个数据集，必须增加一列为0的，训练集合验证集的第一列表情一律为0
y_train = np.hstack((y_train, np.zeros((y_train.shape[0], 1))))
y_valid = np.hstack((y_valid, np.zeros((y_valid.shape[0], 1))))
print("load fer2013 dataset successfully, it has {} train images and {} valid iamges".format(y_train.shape[0], y_valid.shape[0]))

load fer2013 dataset successfully, it has 28709 train images and 3589 valid iamges


In [None]:

#以48*48的矩阵输入，因为是黑白，所以高度为1
model = CNN3(input_shape=(48, 48, 1), n_classes=8)
#使用随机梯度下降算法(https://blog.csdn.net/weixin_46301248/article/details/105883723)，同时定义loss为交叉熵形式计算
sgd = SGD(lr=0.01, decay=1e-6, momentum=0.9, nesterov=True)
model.compile(optimizer=sgd, loss='categorical_crossentropy', metrics=['accuracy'])
callback = [
    #EarlyuStopping防止过拟合，因为表现不佳而备注
    #     EarlyStopping(monitor='val_loss', patience=50, verbose=True),
    #     ReduceLROnPlateau(monitor='lr', factor=0.1, patience=20, verbose=True),
    ModelCheckpoint('../models/cnn2_best_weights.h5', monitor='val_accuracy', verbose=True, save_best_only=True,
                    save_weights_only=True)]

train_generator = ImageDataGenerator(rotation_range=10,
                                      width_shift_range=0.05,
                                      height_shift_range=0.05,
                                      horizontal_flip=True,
                                      shear_range=0.2,
                                      zoom_range=0.2).flow(x_train, y_train, batch_size=32)
valid_generator = ImageDataGenerator().flow(x_valid, y_valid, batch_size=32)
history_fer2013 = model.fit_generator(train_generator,
                                      steps_per_epoch=len(y_train) // 32,
                                      epochs=32,
                                      validation_data=valid_generator,
                                      validation_steps=len(y_valid) // 32,
                                      callbacks=callback)
his = history_fer2013

# test
pred = model.predict(x_test)
pred = np.argmax(pred, axis=1)
print("test accuacy", np.sum(pred.reshape(-1) == y_test.reshape(-1)) / y_test.shape[0])

  super(SGD, self).__init__(name, **kwargs)


Epoch 1/32
 85/897 [=>............................] - ETA: 24:09 - loss: 2.0894 - accuracy: 0.093 - ETA: 13:06 - loss: 2.0841 - accuracy: 0.109 - ETA: 13:51 - loss: 2.0815 - accuracy: 0.114 - ETA: 15:38 - loss: 2.0781 - accuracy: 0.140 - ETA: 17:36 - loss: 2.0759 - accuracy: 0.156 - ETA: 18:20 - loss: 2.0703 - accuracy: 0.182 - ETA: 18:49 - loss: 2.0677 - accuracy: 0.187 - ETA: 18:50 - loss: 2.0613 - accuracy: 0.203 - ETA: 19:05 - loss: 2.0576 - accuracy: 0.211 - ETA: 19:04 - loss: 2.0480 - accuracy: 0.225 - ETA: 19:04 - loss: 2.0447 - accuracy: 0.221 - ETA: 19:11 - loss: 2.0318 - accuracy: 0.234 - ETA: 19:15 - loss: 2.0268 - accuracy: 0.233 - ETA: 18:51 - loss: 2.0074 - accuracy: 0.241 - ETA: 18:37 - loss: 2.0175 - accuracy: 0.239 - ETA: 19:01 - loss: 2.0074 - accuracy: 0.244 - ETA: 18:45 - loss: 1.9979 - accuracy: 0.242 - ETA: 18:25 - loss: 1.9914 - accuracy: 0.239 - ETA: 18:19 - loss: 1.9816 - accuracy: 0.238 - ETA: 18:02 - loss: 1.9737 - accuracy: 0.237 - ETA: 17:51 - loss: 1.9697 