In [None]:
import os
import time

import cv2
import numpy as np
import tensorflow as tf
import imgaug as ia
import imgaug.augmenters as iaa
import matplotlib.pyplot as plt
from PIL import Image
from sklearn.model_selection import train_test_split

from utils.config import Config
from utils.data import SEED
from utils.data import C as IC
from utils.data import H as IH
from utils.data import W as IW
from utils.data import (data_import, img_add_label, img_plot, change_range,
                        test_data_import, img_save, img_read, batch_loader,
                        img_crop_by_label, subtitle_recognition)
from utils.video_editor import THRESH, pics_merge_into_video, video_split
from utils.common import touch_dir

In [None]:
CONFIG = Config()
MODEL_FILE = CONFIG['model_file']
TEST_IMG_DIR = touch_dir(CONFIG['test_video_imgs_dir'])
TEST_VIDEO = CONFIG['test_video']
CROP_IMGS_DIR = touch_dir(CONFIG['crop_imgs_dir'])
LABEL_IMGS_DIR = touch_dir(CONFIG['label_imgs_dir'])
TEST_CACHE = CONFIG['test_cache']
TEST_OUT_VIDEO = CONFIG['test_out_video']
SUBTITLES_TXT = CONFIG['subtitles_txt']

# Define Loss and Metric

In [None]:
class BBoxError(tf.keras.losses.Loss):
    """ BoundingBox Loss 函数 """
    def __init__(self, weights=[1, 1, 1, 1, 1]):
        """ 输出 5 元 Vector，各个维度分别为 [pc, bx, by, bh, bw] """
        super().__init__()
        self.weights = np.array(weights)

    def call(self, y_true, y_pred):
        # 如果不包含目标
        # loss_1 = (1 - y_true[:, 0]) * self.weights[0] * (y_true[:, 0] - y_pred[:, 0])**2
        loss_1 = (1 - y_true[:, 0]) * self.weights[0] * (-tf.math.log(1-y_pred[:, 0]+1e-10))
        # 如果包含目标，回归项
        # loss_2_logis = y_true[:, 0] * self.weights[0] * (y_true[:, 0] - y_pred[:, 0])**2
        loss_2_logis = y_true[:, 0] * self.weights[0] * (-tf.math.log(y_pred[:, 0]+1e-10))
        # 如果包含目标，定位项
        loss_2_square = y_true[:, 0] * tf.reduce_sum(self.weights[1: ] * (y_true[:, 1: ] - y_pred[:, 1: ])**2, axis=-1)
        return tf.reduce_mean(loss_1 + loss_2_logis + loss_2_square)

class IOUMeanMetric(tf.keras.metrics.Metric):
    """ 交并比评估器 """
    def __init__(self):
        super().__init__()
        self.iou_sum = self.add_weight(name='iou_sum', dtype=tf.float32, initializer=tf.zeros_initializer())
        self.total = self.add_weight(name='total', dtype=tf.int32, initializer=tf.zeros_initializer())

    def update_state(self, y_true, y_pred, sample_weight=None):
        bx1, by1, bh1, bw1 = y_true[:, 1], y_true[:, 2], y_true[:, 3], y_true[:, 4]
        bx2, by2, bh2, bw2 = y_pred[:, 1], y_pred[:, 2], y_pred[:, 3], y_pred[:, 4]
        cross_w = tf.minimum(bx1+bw1/2, bx2+bw2/2) - tf.maximum(bx1-bw1/2, bx2-bw2/2)
        cross_h = tf.minimum(by1+bh1/2, by2+bh2/2) - tf.maximum(by1-bh1/2, by2-bh2/2)
        mask = tf.cast(cross_w > 0, tf.float32) * tf.cast(cross_h > 0, tf.float32) * y_true[:, 0]
        s1 = bh1 * bw1
        s2 = bh2 * bw2
        sc = cross_w * cross_h
        iou = mask * sc / (s1 + s2 - sc)
        self.iou_sum.assign_add(tf.reduce_sum(iou))
        self.total.assign_add(tf.cast(tf.reduce_sum(y_true[:, 0]), dtype=tf.int32))

    def result(self):
        return self.iou_sum / tf.cast(self.total, dtype=tf.float32)

# Create and Build Model

In [None]:
base_model = tf.keras.applications.DenseNet121(input_shape=(IH, IW, IC), weights='imagenet', include_top=False)
# base_model.trainable = False
inputs = tf.keras.Input(shape=(IH, IW, IC))
x = inputs
x = base_model(x)
x = tf.keras.layers.Flatten()(x)
x = tf.keras.layers.Dense(5)(x)
# x = tf.keras.layers.GlobalAveragePooling2D()(x)
# x = tf.keras.layers.Dense(5)(x)
outputs = tf.keras.layers.Activation('sigmoid')(x)
model = tf.keras.Model(inputs=inputs, outputs=outputs)
model.summary()

In [None]:
# load model weights
model.load_weights(MODEL_FILE)

In [None]:
sometimes = lambda aug: iaa.Sometimes(0.5, aug)

aug = iaa.SomeOf((0, 5), [
        iaa.OneOf([
            iaa.GaussianBlur((0, 1.0)), # blur images with a sigma between 0 and 3.0
            iaa.AverageBlur(k=(2, 3)), # blur image using local means with kernel sizes between 2 and 7
            iaa.MedianBlur(k=(3, 5)), # blur image using local medians with kernel sizes between 2 and 7
        ]),
        iaa.Sharpen(alpha=(0, 1.0), lightness=(0.75, 1.5)), # sharpen images
        iaa.Emboss(alpha=(0, 1.0), strength=(0, 1.0)), # emboss images
        iaa.AdditiveGaussianNoise(loc=0, scale=(0.0, 0.05*255), per_channel=0.5), # add gaussian noise to images
        iaa.OneOf([
            iaa.Dropout((0.01, 0.1), per_channel=0.5), # randomly remove up to 10% of the pixels
            iaa.CoarseDropout((0.03, 0.15), size_percent=(0.02, 0.05), per_channel=0.2),
        ]),
        iaa.Invert(0.05, per_channel=True), # invert color channels
        iaa.Add((-10, 10), per_channel=0.5), # change brightness of images (by -10 to 10 of original value)
        iaa.AddToHueAndSaturation((-20, 20)), # change hue and saturation
        # either change the brightness of the whole image (sometimes
        # per channel) or change the brightness of subareas
        iaa.LinearContrast((0.5, 2.0), per_channel=0.5), # improve or worsen the contrast
        iaa.Grayscale(alpha=(0.0, 1.0)),
        sometimes(iaa.ElasticTransformation(alpha=(0.5, 2), sigma=0.1)), # move pixels locally around (with random strengths)
        sometimes(iaa.PiecewiseAffine(scale=(0.01, 0.03))), # sometimes move parts of the image around
    ], random_order=True)

# Load Data

In [None]:
data_set = data_import()
X, y = data_set['X'], data_set['Y']
X_train, X_dev, y_train, y_dev = train_test_split(X, y, test_size=0.05, random_state=SEED)
test_data_set = test_data_import()
X_test = test_data_set['X']
train_size, dev_size, test_size = X_train.shape[0], X_dev.shape[0], X_test.shape[0]
print("训练集数据 {} 条，开发集数据 {} 条，测试集数据 {} 条".format(train_size, dev_size, test_size))

In [None]:
num_epochs = 10000
batch_size = 16
learning_rate = 3e-5
use_gpu = True

optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
loss_object = BBoxError([1, 2, 5, 1, 3])

train_loss = tf.keras.metrics.Mean(name='train_loss')
train_iou = IOUMeanMetric()

test_loss = tf.keras.metrics.Mean(name='test_loss')
test_iou = IOUMeanMetric()

# GPU Config
if use_gpu:
    os.environ['CUDA_VISIBLE_DEVICES'] = "0"
else:
    os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

# Check Data

In [None]:
num_check_imgs = 10
X_batch, y_batch = next(batch_loader(X_train, y_train, batch_size=num_check_imgs))
X_batch_aug = aug(images=X_batch)
y_batch_pred = model(change_range(X_batch)).numpy()

for idx in range(num_check_imgs):
    img_aug = X_batch_aug[idx]
    img = X_batch[idx]
    label = y_batch[idx]
    label_pred = y_batch_pred[idx]
    
    plt.subplot(1, 3, 1)
    plt.title('Origin Image')
    plt.imshow(img)
    
    plt.subplot(1, 3, 2)
    plt.title('Augmented Image')
    plt.imshow(img_aug)

    plt.subplot(1, 3, 3)
    img_with_label = img_add_label(img, label, color=(255, 0, 0))
    img_with_label = img_add_label(img_with_label, label_pred, color=(0, 0, 255))
    plt.title('Origin Image with Label')
    plt.imshow(img_with_label)
    plt.show()

# Training

In [None]:
@tf.function
def train_on_batch(X_batch, y_batch):
    with tf.GradientTape() as tape:
        y_pred = model(X_batch, training=True)
        loss = loss_object(y_true=y_batch, y_pred=y_pred)
        loss = tf.reduce_mean(loss)
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(grads_and_vars=zip(grads, model.trainable_variables))
    
    train_loss(loss)
    train_iou(y_batch, y_pred)
    return loss

@tf.function
def test_on_batch(X_batch, y_batch):
    y_pred = model(X_batch, training=False)
    t_loss = loss_object(y_batch, y_pred)

    test_loss(t_loss)
    test_iou(y_batch, y_pred)
    return t_loss

for epoch in range(num_epochs):

    train_loss.reset_states()
    train_iou.reset_states()
    test_loss.reset_states()
    test_iou.reset_states()

    # Training
    for batch_index, (X_batch, y_batch) in enumerate(batch_loader(X_train, y_train, batch_size=batch_size)):
        X_batch = np.array(aug(images=X_batch))
        X_batch = change_range(X_batch)
        loss = train_on_batch(X_batch, y_batch)
        template = '[Training] Epoch {}, Batch {}/{}, Loss: {}, IOU: {:.2%} '
        print(template.format(epoch+1,
                            batch_index,
                            train_size // batch_size,
                            loss,
                            train_iou.result()),
            end='\r')

    # Testing
    for batch_index, (X_batch, y_batch) in enumerate(batch_loader(X_dev, y_dev, batch_size=batch_size)):
        X_batch = change_range(X_batch)
        loss = test_on_batch(X_batch, y_batch)
        template = '[Testing] Epoch {}, Batch {}/{}, Loss: {}, IOU: {:.2%} '
        print(template.format(epoch+1,
                            batch_index,
                            test_size // batch_size,
                            loss,
                            test_iou.result()),
            end='\r')

    template = 'Epoch {}, Loss: {}, IOU: {:.2%}, Test Loss: {}, Test IOU: {:.2%} '
    print(template.format(epoch+1,
                        train_loss.result(),
                        train_iou.result(),
                        test_loss.result(),
                        test_iou.result()))

    model.save_weights(MODEL_FILE)

# Test

In [None]:
encodings = model(change_range(X_test))
for i in range(test_size):
    img_path = test_data_set["img_paths"][i]
    img = img_read(img_path)
    label = encodings[i].numpy()
    print(label)
    img_plot(img_add_label(img, label, noise=(0, 0)), shape=None)

# Vedio Detect

In [None]:
keyframe_id_set = video_split(TEST_VIDEO, TEST_IMG_DIR, only_keyframes=False, compute_keyframe=THRESH)
video_test_data_set = test_data_import(test_img_dir=TEST_IMG_DIR, cache=None)
num_test_imgs = len(video_test_data_set["X"])

In [None]:
# 前向传播获取编码向量
encodings = np.zeros((num_test_imgs, *outputs.shape[1: ]), dtype=np.float64)
for i in range(0, num_test_imgs, batch_size):
    print("encoding {}/{} ".format(i, num_test_imgs), end="\r")
    X_batch = video_test_data_set["X"][i: i+batch_size]
    X_batch = change_range(X_batch)
    encodings_batch = model(X_batch)
    encodings[i: i+batch_size] = encodings_batch

In [None]:
# 对图片打标签并裁剪出字幕区域
for i in range(num_test_imgs):
    signs = [">>   ", " >>  ", "  >> ", "   >>"]
    print("{} {:6d}/{} ".format(signs[i%len(signs)], i, num_test_imgs), end="\r")
    img_path = video_test_data_set["img_paths"][i]
    img_name = os.path.basename(img_path)
    img = img_read(img_path)
    crop_img_path = os.path.join(CROP_IMGS_DIR, img_name)
    label_img_path = os.path.join(LABEL_IMGS_DIR, img_name)

    label = encodings[i]
    if i in keyframe_id_set:
        crop_area = img_crop_by_label(img, label, noise=(0.05, 0.05))
        if crop_area is not None:
            img_save(crop_area, crop_img_path)
    img_with_label = img_add_label(img, label, noise=(0.05, 0.05))
    img_save(img_with_label, label_img_path)

In [None]:
# 将打好标签的帧合并成视频
pics_merge_into_video(TEST_OUT_VIDEO, LABEL_IMGS_DIR)

In [None]:
# 利用百度 AIP 识别字幕
subtitles = subtitle_recognition(CROP_IMGS_DIR)
with open(SUBTITLES_TXT, "w", encoding="utf8") as f:
    for subtitle in subtitles:
        f.write(subtitle + '\n')