<a href="https://colab.research.google.com/github/543877815/KDEF/blob/master/resnet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## google colab 磁盘挂载和位置调整

In [0]:
from google.colab import drive
drive.mount('/content/drive/')

In [0]:
cd drive/My\ Drive/KDEF

# 程序开始

In [0]:
import keras
keras.__version__

In [0]:
base_dir = 'data/KDEF_and_AKDEF/KDEF/'

In [0]:
import os
# 显示几个随机增强后的训练图像
from keras.preprocessing import image 

In [0]:
# 将文件名读入list中
def read(filename):
    data = []
    file = open(filename, 'r')
    for line in file:
        data.append(line.replace('\n',''))
    file.close()
    return data

In [0]:
training_data = read('data/train.txt')
test_data = read('data/test.txt')
validation_data = read('data/validation.txt')

In [0]:
len(training_data), len(test_data), len(validation_data)

In [0]:
import numpy as np
# 从文本中提取数据出来
def process(data):
    path = [item.split(',')[0] for item in data]
    expression = [int(item.split(',')[1]) for item in data]
    angle = [int(item.split(',')[2]) for item in data]
    return path, expression, angle

# one-hot 编码
def to_one_hot(labels, dimension=1):
    results = np.zeros((len(labels), dimension))
    for i, label in enumerate(labels):
        results[i, label] = 1.
    return results

In [0]:
training_path, training_expression, training_angle = process(training_data)
test_path, test_expression, test_angle = process(test_data)
validation_path, validation_expression, validation_angle = process(validation_data)

## 可视化aumentation图片

In [0]:
# 可视化data augmentation图片
from keras.preprocessing.image import ImageDataGenerator

datagen = ImageDataGenerator(
      rescale=1./255,
      rotation_range=40,
      width_shift_range=0.2,
      height_shift_range=0.2,
      shear_range=0.2,
      zoom_range=0.2,
      fill_mode='nearest')


img = image.load_img(base_dir + training_path[0], target_size=(200, 200)) # 读取图像并调整大小

x = image.img_to_array(img) # 将其转化为(img.size[0], img.size[1], 3)的Numpy数组

x = x.reshape((1,) + x.shape) # 将其转化为(1, img.size[0], img.size[1], 3)的Numpy

import matplotlib.pyplot as plt

i = 0
for batch in datagen.flow(x, batch_size=1):
    plt.figure(i)
    imgplot=plt.imshow(image.array_to_img(batch[0]))
    i += 1
    if i % 4 == 0:
        break

plt.show()
    

## 将表情图片分放

In [0]:
# 将数据分放
data_dir = 'data/expression/'

if not os.path.exists(data_dir):
    os.mkdir(data_dir)

label_range = np.unique(training_expression)

# training data dictionary
expression_train_dir = os.path.join(data_dir, 'train')
if not os.path.exists(expression_train_dir):
    os.mkdir(expression_train_dir)
    for i in label_range:
        train_i_dir = os.path.join(expression_train_dir, str(i))
        if not os.path.exists(train_i_dir):
            os.mkdir(train_i_dir)
            
# test data dictionary
expression_test_dir = os.path.join(data_dir, 'test')
if not os.path.exists(expression_test_dir):
    os.mkdir(expression_test_dir)
    for i in label_range:
        test_i_dir = os.path.join(expression_test_dir, str(i))
        if not os.path.exists(test_i_dir):
            os.mkdir(test_i_dir)
            
# validation data dictionary
expression_validation_dir = os.path.join(data_dir, 'validation')
if not os.path.exists(expression_validation_dir):
    os.mkdir(expression_validation_dir)
    for i in label_range:
        validation_i_dir = os.path.join(expression_validation_dir, str(i))
        if not os.path.exists(validation_i_dir):
            os.mkdir(validation_i_dir)

In [0]:
# copy file to the dictionary
import shutil
from tqdm import tqdm

# training data copy
for path, expression in tqdm(zip(training_path, training_expression)):
    file = path.split('/')[1]
    src = os.path.join(base_dir, path)
    dst = os.path.join(expression_train_dir, str(expression), file)
    shutil.copyfile(src, dst)

# test data copy
for path, expression in tqdm(zip(test_path, test_expression)):
    file = path.split('/')[1]
    src = os.path.join(base_dir, path)
    dst = os.path.join(expression_test_dir, str(expression), file)
    shutil.copyfile(src, dst)
    
for path, expression in tqdm(zip(validation_path, validation_expression)):
    file = path.split('/')[1]
    src = os.path.join(base_dir, path)
    dst = os.path.join(expression_validation_dir, str(expression), file)
    shutil.copyfile(src, dst)

## 将角度图片分放

In [0]:
# 将数据分放
data_dir = 'data/angle/'

if not os.path.exists(data_dir):
    os.mkdir(data_dir)

label_range = np.unique(training_angle)

# training data dictionary
angle_train_dir = os.path.join(data_dir, 'train')
if not os.path.exists(angle_train_dir):
    os.mkdir(angle_train_dir)
    for i in label_range:
        train_i_dir = os.path.join(angle_train_dir, str(i))
        if not os.path.exists(train_i_dir):
            os.mkdir(train_i_dir)
            
# test data dictionary
angle_test_dir = os.path.join(data_dir, 'test')
if not os.path.exists(angle_test_dir):
    os.mkdir(angle_test_dir)
    for i in label_range:
        test_i_dir = os.path.join(angle_test_dir, str(i))
        if not os.path.exists(test_i_dir):
            os.mkdir(test_i_dir)
            
# validation data dictionary
angle_validation_dir = os.path.join(data_dir, 'validation')
if not os.path.exists(angle_validation_dir):
    os.mkdir(angle_validation_dir)
    for i in label_range:
        validation_i_dir = os.path.join(angle_validation_dir, str(i))
        if not os.path.exists(validation_i_dir):
            os.mkdir(validation_i_dir)

In [0]:
# copy file to the dictionary
import shutil
from tqdm import tqdm

# training data copy
for path, angle in tqdm(zip(training_path, training_angle)):
    file = path.split('/')[1]
    src = os.path.join(base_dir, path)
    dst = os.path.join(angle_train_dir, str(angle), file)
    shutil.copyfile(src, dst)

# test data copy
for path, angle in tqdm(zip(test_path, test_angle)):
    file = path.split('/')[1]
    src = os.path.join(base_dir, path)
    dst = os.path.join(angle_test_dir, str(angle), file)
    shutil.copyfile(src, dst)
    
for path, angle in tqdm(zip(validation_path, validation_angle)):
    file = path.split('/')[1]
    src = os.path.join(base_dir, path)
    dst = os.path.join(angle_validation_dir, str(angle), file)
    shutil.copyfile(src, dst)

## 预训练模型作特征提取

In [0]:
# 引入预训练模型
from keras.applications import  ResNet50
conv_base = ResNet50(include_top=False, 
            weights='imagenet',
            input_shape=(224, 224, 3))

In [0]:
conv_base.summary()

In [0]:
batch_size = 20

def extract_features(directory, sample_count):
    features = np.zeros(shape=(sample_count, 7, 7, 2048))
    labels = np.zeros(shape=(sample_count))
    generator = datagen.flow_from_directory(
        directory,
        target_size=(224, 224),
        batch_size=batch_size,
        class_mode='binary')
    i = 0
    for inputs_batch, labels_batch in tqdm(generator):
        features_batch = conv_base.predict(inputs_batch)
        features[i * batch_size : (i + 1) * batch_size] = features_batch
        labels[i * batch_size : (i + 1) * batch_size] = labels_batch
        i += 1
        if i * batch_size >= sample_count:
            # Note that since generators yield data indefinitely in a loop,
            # we must `break` after every image has been seen once.
            #注意，这些生成器在循环中不断生成数据，所以你必须在读取完所有图像后终止循环
            break
    return features, labels

expression_train_features, expression_train_labels = extract_features(expression_train_dir, 2900)
expression_validation_features, expression_validation_labels = extract_features(expression_validation_dir, 1000)
expression_test_features, expression_test_labels = extract_features(expression_test_dir, 1000)

angle_train_features, angle_train_labels = extract_features(angle_train_dir, 2900)
angle_validation_features, angle_validation_labels = extract_features(angle_validation_dir, 1000)
angle_test_features, angle_test_labels = extract_features(angle_test_dir, 1000)

In [0]:
expression_train_features.shape, expression_train_labels.shape

In [0]:
expression_train_features = np.reshape(expression_train_features, (2900, 7 * 7 * 2048))
expression_validation_features = np.reshape(expression_validation_features, (1000, 7 * 7 * 2048))
expression_test_features = np.reshape(expression_test_features, (1000, 7 * 7 * 2048))

angle_train_features = np.reshape(angle_train_features, (2900, 7 * 7 * 2048))
angle_validation_features = np.reshape(angle_validation_features, (1000, 7 * 7 * 2048))
angle_test_features = np.reshape(angle_test_features, (1000, 7 * 7 * 2048))

In [0]:
# one_hot 编码
from keras.utils.np_utils import to_categorical

one_hot_expression_train_labels = to_categorical(expression_train_features)
one_hot_expression_test_labels = to_categorical(expression_test_features)
one_hot_expression_validation_labels = to_categorical(expression_validation_features)

one_hot_angle_train_labels = to_categorical(angle_train_features)
one_hot_angle_test_labels = to_categorical(angle_test_features)
one_hot_angle_validation_labels = to_categorical(angle_validation_features)

## 将提取出来的特征作为新的输入到一个新的神经网络中

In [0]:
# from keras import models
# from keras import layers
# from keras import optimizers

# model = models.Sequential()
# model.add(layers.Dense(256, activation='relu', input_dim=7 * 7 * 2048))
# model.add(layers.Dropout(0.5))
# model.add(layers.Dense(7, activation='sigmoid'))

# model.compile(optimizer=optimizers.RMSprop(lr=2e-5),
#               loss='binary_crossentropy',
#               metrics=['acc'])

# history = model.fit(train_features, one_hot_train_labels,
#                     epochs=10,
#                     batch_size=20,
#                     validation_data=(validation_features, one_hot_validation_labels))

In [0]:
from keras import models
from keras import layers
from keras import optimizers


input_tensor = layers.Input(shape=(7 * 7 * 2048,))
# hidden_input = layers.Dense(512, activation='relu')(input_tensor)
# hidden_input = layers.Dropout(0.5)(hidden_input)
# output_input = layers.Dense(32, activation='relu')(hidden_input)
# output_input = layers.Dropout(0.5)(output_input)
expression_output_tensor = layers.Dense(7, activation='sigmoid')(input_tensor)
angle_output_tensor = layers.Dense(5, activation='sigmoid')(input_tensor)

model = models.Model(inputs=input_tensor, outputs=[expression_output_tensor, angle_output_tensor])

model.compile(optimizer=optimizers.RMSprop(lr=2e-5),
              loss='binary_crossentropy',
              loss_weights=[1., 0.5],
              metrics=['acc'])

history = model.fit(train_features, {'expression_output': one_hot_expression_train_labels, 'angle_output': one_hot_angle_train_labels},
                    epochs=10,
                    batch_size=20,
                    validation_data=(validation_features, {'expression_output': one_hot_expression_validation_labels, 'angle_output': one_hot_angle_validation_labels}))

## 模型评估

In [0]:
result = model.evaluate(test_features, {'expression_output': one_hot_expression_test_labels, 'angle_output': one_hot_angle_test_labels})

In [0]:
result