需要使用的工具
- face_recognition
- cv2
- PIL
- tensorflow
- keras
- gc 
- glob

# Data Prepare

## 一、截取人脸

In [None]:
import gc
import glob
import os
import pickle

import numpy as np
from PIL import Image

import cv2
import face_recognition
import tensorflow as tf

In [None]:
class Preprocess(object):

    def __init__(self, scale_size=139):
        self.img_cant = []  # 失效图片路径
        self.scale_size = scale_size  # 统一尺寸

    @staticmethod
    def show_face(path):
        "test"
        image = face_recognition.load_image_file(path)
        face_locations = face_recognition.face_locations(image)  # list
        if len(face_locations) == 0:
            return -1
        for k, i in enumerate(face_locations):
            (a, b, c, d) = i
            image_spilt = image[a:c, d:b, :]
            cv2.imshow("img_{}".format(k), image_spilt)
            return 1

    def split_face(self, in_path, out_path, use_cnn):
        "main process"
        image = face_recognition.load_image_file(in_path)
        # 图片尺寸限制
        if max(image.shape) > 2000:
            if image.shape[0] > image.shape[1]:
                image = cv2.resize(
                    image, (2000, int(2000 * image.shape[1] / image.shape[0])))
            else:
                image = cv2.resize(
                    image, (int(2000 * image.shape[0] / image.shape[1]), 2000))

        if not use_cnn:
            face_locations = face_recognition.face_locations(image)
        else:
            face_locations = face_recognition.face_locations(
                image, number_of_times_to_upsample=1, model="cnn")

        img_name = os.path.basename(in_path)
        if len(face_locations) == 0:  # 失效部分
            self.img_cant.append(img_name)
            print(img_name)
            return

        for k, i in enumerate(face_locations):  # 截取识别出的人脸
            (a, b, c, d) = i
            image_spilt = image[a:c, d:b, :]
            image_spilt = self.scale_img(image_spilt)
            img = Image.fromarray(image_spilt)
            img.save(out_path + "/{}_{}.png".format(img_name, k))
            print("success")

    def scale_img(self, img):
        "等比缩放，截取"
        h, w = img.shape[:2]
        if h > w:
            new_h, new_w = self.scale_size * h / w, self.scale_size
        else:
            new_h, new_w = self.scale_size, self.scale_size * w / h

        new_h, new_w = int(new_h), int(new_w)
        img = cv2.resize(img, (new_w, new_h))

        # 截取
        if h == w:
            return img
        elif h < w:
            top = 0
            left = np.random.randint(0, new_w - self.scale_size)
        elif h > w:
            top = np.random.randint(0, new_h - self.scale_size)
            left = 0

        img = img[top:top + self.scale_size, left:left + self.scale_size]
        return img

    def preprocess(self, in_path, out_path, use_cnn=False):
        path_lst = glob.glob(in_path + "/*.jpg")
        for index, path in enumerate(path_lst):
            if index % 20 == 0:
                gc.collect()
            print(path)
            self.split_face(path, out_path, use_cnn)

In [None]:
# run preprocess
dir_lst = ["data/{}".format(i) for i in range(5)]

train_ins = Preprocess()

for in_path in dir_lst:
    out_path = os.path.basename(in_path) + "_face"
    if not os.path.exists(out_path):
        os.mkdir(out_path)

    train_ins.preprocess(in_path, out_path, use_cnn=False)
    with open("train_face_cant_{}.pkl".format(os.path.basename(in_path)),
              "wb") as f:
        pickle.dump(train_ins.img_cant, f)

## 生成tfrecord

In [None]:
import os
import sys

import numpy as np
from PIL import Image
from tqdm import tqdm

import tensorflow as tf

In [None]:
def image2tfrecord(image_list, label_list, filename):
    """
    image_list:image path list
    label_list:label list
    """
    length = len(image_list)
    writer = tf.python_io.TFRecordWriter(filename)
    for i in range(length):
        if i % 100 == 0:
            ratio = round(i / float(length), 4)
            sys.stdout.write("ratio:{}\r".format(ratio))
            sys.stdout.flush()
        image = Image.open(image_list[i])
        if "png" in image_list[i][-4:]:
            if image.mode == "RGB":
                r, g, b = image.split()
                image = Image.merge("RGB", (r, g, b))
            elif image.mode == "L":  # 灰度
                pass
            else:  # 透明通道处理
                r, g, b, a = image.split()
                image = Image.merge("RGB", (r, g, b))
        image = image.resize((139, 139))

        image_bytes = image.tobytes()
        features = {}
        features["image"] = tf.train.Feature(bytes_list=tf.train.BytesList(
            value=[image_bytes]))
        features["label"] = tf.train.Feature(int64_list=tf.train.Int64List(
            value=[int(label_list[i])]))
        tf_features = tf.train.Features(feature=features)
        tf_example = tf.train.Example(features=tf_features)
        tf_serialized = tf_example.SerializeToString()
        writer.write(tf_serialized)
    writer.close()

In [None]:
cwd = os.path.join(os.getcwd(), "data")

random_ratio = 0.05
sum_number = 0

for i in range(5):
    print(i)
    image_list = np.array(
        glob.glob(os.path.join(cwd, "{}_face".format(i)) + "/*.png"))

    test_choice = np.random.choice(range(len(image_list)),
                                   size=int(len(image_list) * random_ratio),
                                   replace=False)
    train_choice = list(set(range(len(image_list))).difference(test_choice))

    image_list_train = image_list[train_choice]
    image_list_test = image_list[test_choice]
    label_list = np.zeros(len(image_list)) + i

    image2tfrecord(
        image_list_train,
        np.zeros(len(image_list_train)) + i,
        "face_train_{}.tfrecords".format(i),
    )
    image2tfrecord(
        image_list_test,
        np.zeros(len(image_list_test)) + i,
        "face_test_{}.tfrecords".format(i),
    )

## 读取tfrecord

In [None]:
# 图像预处理
def pre_process(
    images,
    random_flip_up_down=False,
    random_flip_left_right=False,
    random_brightness=True,
    random_contrast=True,
    random_saturation=False,
    random_hue=False,
):
    if random_flip_up_down:
        images = tf.image.random_flip_up_down(images)
    if random_flip_left_right:
        images = tf.image.random_flip_left_right(images)
    if random_brightness:
        images = tf.image.random_brightness(images, max_delta=0.2)
    if random_contrast:
        images = tf.image.random_contrast(images, 0.9, 1.1)
    if random_saturation:
        images = tf.image.random_saturation(images, 0.3, 0.5)
    if random_hue:
        images = tf.image.random_hue(images, 0.2)
    new_size = tf.constant([28, 28], dtype=tf.int32)
    images = tf.image.resize_images(images, new_size)
    return images


# 解析函数
def pares_tf(example_proto):
    dics = {}
    dics["label"] = tf.FixedLenFeature((), dtype=tf.int64, default_value=0)
    dics["image"] = tf.FixedLenFeature((), dtype=tf.string, default_value="")

    parsed_example = tf.parse_single_example(serialized=example_proto,
                                             features=dics)
    image = tf.decode_raw(parsed_example["image"], out_type=tf.uint8)
    # image=tf.image.decode_jpeg(parsed_example['image'], channels=1)

    # 增强
    image = tf.reshape(image, (139, 139, 3))
    image = pre_process(image)
    image = tf.cast(image, tf.float32) / 255
    # 标签的操作
    label = parsed_example["label"]
    label = tf.cast(label, tf.int32)
    label = tf.one_hot(label, depth=5, on_value=1.0, off_value=0.0)
    return image, label


# dataset
def dataset(filenames, batch_size, epochs):
    dataset = tf.data.TFRecordDataset(filenames=filenames)
    new_dataset = dataset.map(pares_tf)
    shuffle_dataset = new_dataset.shuffle(buffer_size=(100000))
    batch_dataset = shuffle_dataset.batch(batch_size).repeat(epochs)
    batch_dataset = batch_dataset.prefetch(1)
    iterator = batch_dataset.make_one_shot_iterator()
    next_element = iterator.get_next()
    return next_element

<img src="pic/prefetch.png" style="zoom:30%" >

In [None]:
# tf.reset_default_graph()
# init = tf.global_variables_initializer()

# filenames = ["data/face_train_{}.tfrecords".format(i) for i in range(5)]
# next_batch = dataset(filenames, batch_size=5, epochs=1)

# Base Model

In [None]:
import glob
import sys

import tensorflow as tf

In [None]:
tf.reset_default_graph()

epochs = 15
batch_size = 32
total_sum = 0
epoch = 0
random_ratio = 0.05

filenames_train = ["data/face_train_{}.tfrecords".format(i) for i in range(5)]
filenames_test = ["data/face_test_{}.tfrecords".format(i) for i in range(5)]

test_num = sum([
    int(len(glob.glob("./face/{}_face/*.png".format(i))) * random_ratio)
    for i in range(5)
])
train_num = (
    sum([len(glob.glob("./face/{}_face/*.png".format(i))) for i in range(5)]) -
    test_num)

train_set = dataset(filenames_train, batch_size=batch_size, epochs=epochs)
next_element_trest = dataset(filenames_test, batch_size=batch_size, epochs=1)

In [None]:
input_data = tf.placeholder(tf.float32, shape=(None, 139, 139, 3))
input_label = tf.placeholder(tf.float32, shape=(None, 5))
# 139
hidden = tf.keras.layers.Conv2D(filters=16,
                                kernel_size=3,
                                strides=1,
                                padding="valid",
                                activation="relu")(input_data)
# 137
hidden = tf.keras.layers.MaxPool2D(pool_size=2)(hidden)
# 68
hidden = tf.keras.layers.Conv2D(filters=32,
                                kernel_size=3,
                                strides=2,
                                padding="same",
                                activation="relu")(hidden)
# 34
hidden = tf.keras.layers.MaxPool2D(pool_size=2)(hidden)
# 17
hidden = tf.keras.layers.Conv2D(filters=64,
                                kernel_size=3,
                                strides=2,
                                padding="valid",
                                activation="relu")(hidden)
# 8
hidden = tf.keras.layers.MaxPool2D(pool_size=2)(hidden)
# 4
hidden = tf.keras.layers.Conv2D(filters=128,
                                kernel_size=3,
                                strides=2,
                                padding="valid",
                                activation="relu")(hidden)
# 1
hidden = tf.layers.Flatten()(hidden)

output = tf.keras.layers.Dense(5, activation="softmax")(hidden)

In [None]:
# 损失函数
loss = tf.reduce_mean(
    tf.keras.losses.categorical_crossentropy(input_label, output))

# 优化器
opt = tf.train.AdamOptimizer()
train_op = opt.minimize(loss)

# 测试评估
acc = tf.reduce_mean(tf.keras.metrics.categorical_accuracy(input_label, output))
# correct_pred = tf.equal(tf.argmax(input_label,axis=1),tf.argmax(output,axis=1))
# acc = tf.reduce_mean(tf.cast(correct_pred,tf.float32))

tf.add_to_collection("my_op", input_data)
tf.add_to_collection("my_op", output)
tf.add_to_collection("my_op", loss)

In [None]:
init = tf.global_variables_initializer()
saver = tf.train.Saver()
with tf.Session() as sess:
    sess.run([init])
    while epoch < epochs:
        data, label = sess.run([train_set[0], train_set[1]])

        total_sum += batch_size
        _, loss_val = sess.run([train_op, loss],
                               feed_dict={
                                   input_data: data,
                                   input_label: label
                               })

        if total_sum % 20 == 0:
            sys.stdout.write("epoch:{},index:{}\\{},loss:{:.4f}\r".format(
                epoch, total_sum % train_num, train_num, loss_val))
            sys.stdout.flush()

        if total_sum // train_num > epoch:
            epoch = total_sum // train_num
            loss_val = sess.run([loss],
                                feed_dict={
                                    input_data: data,
                                    input_label: label
                                })

            test_set = dataset(filenames_test, batch_size=batch_size, epochs=1)
            total_test_sum = 0
            acc_test_lst = []
            while total_test_sum < test_num:
                total_test_sum += batch_size
                test_data, test_label = sess.run([test_set[0], test_set[1]])
                acc_test = sess.run([acc],
                                    feed_dict={
                                        input_data: test_data,
                                        input_label: test_label
                                    })
                acc_test_lst.append(acc_test[0])

            acc_val = sum(acc_test_lst) / len(acc_test_lst)
            saver.save(sess, save_path="./model/my_model.ckpt")
            print("epoch:{},train_loss:{:.4f},test_acc:{:.4f}".format(
                epoch, loss_val[0], acc_val))