In [None]:
import os
import glob
import pandas as pd
import numpy as np
import math
import matplotlib.pyplot as plt

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.preprocessing import image
import tensorflow.keras.backend as K
import tensorflow_addons as tfa
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)

## Data preprocessing

In [None]:
if not os.path.exists(r"./data/Third ABAW Annotations/AU_Detection_Challenge/Train_Set_Edit"):
    os.makedirs(r"./data/Third ABAW Annotations/AU_Detection_Challenge/Train_Set_Edit")
if not os.path.exists(r"./data/Third ABAW Annotations/AU_Detection_Challenge/Validation_Set_Edit"):
    os.makedirs(r"./data/Third ABAW Annotations/AU_Detection_Challenge/Validation_Set_Edit")

In [None]:
data_train = glob.glob(r"./data/Third ABAW Annotations/AU_Detection_Challenge/Train_Set/*")
data_val = glob.glob(r"./data/Third ABAW Annotations/AU_Detection_Challenge/Validation_Set/*")

In [None]:
for i in range(len(data_train)):
    with open(data_train[i], 'r') as fr :
        lines = fr.readlines()
    with open(data_train[i].replace("Train_Set","Train_Set_Edit"), 'w') as fw:
        for line in lines:
            if not "-1" in line:
                fw.write("%s" % line)

In [None]:
for i in range(len(data_val)):
    with open(data_val[i], 'r') as fr :
        lines = fr.readlines()
    with open(data_val[i].replace("Validation_Set","Validation_Set_Edit"), 'w') as fw:
        for line in lines:
            if not "-1" in line:
                fw.write("%s" % line)

## Histogram

In [None]:
data_train = glob.glob(r"./data/Third ABAW Annotations/AU_Detection_Challenge/Train_Set_Edit/*")
data_val = glob.glob(r"./data/Third ABAW Annotations/AU_Detection_Challenge/Validation_Set_Edit/*")
exclude_list = ['10-60-1280x720.txt','10-60-1280x720_right.txt','135-24-1920x1080_left.txt', 
                '135-24-1920x1080_right.txt','46-30-484x360_left.txt','46-30-484x360_right.txt','86-24-1920x1080.txt']

In [None]:
count = 0
stas_train = {'AU1':0, 'AU2':0, 'AU4':0, 'AU6':0, 'AU7':0, 'AU10':0, 'AU12':0, 'AU15':0, 'AU23':0, 'AU24':0, 'AU25':0, 'AU26':0}
for i in range(len(data_train)):
    if not os.path.basename(data_train[i]) in exclude_list:
        df = pd.read_csv(data_train[i], delimiter = ",")
        count+=df.shape[0]
        for key, value in stas_train.items():
            if not math.isnan(df.apply(pd.value_counts)[key][1]):
                stas_train[key] += df.apply(pd.value_counts)[key][1]

In [None]:
count = 0
stas_val = {'AU1':0, 'AU2':0, 'AU4':0, 'AU6':0, 'AU7':0, 'AU10':0, 'AU12':0, 'AU15':0, 'AU23':0, 'AU24':0, 'AU25':0, 'AU26':0}
for i in range(len(data_val)):
    df = pd.read_csv(data_val[i], delimiter = ",")
    count+=df.shape[0]
    for key, value in stas_val.items():
        if not math.isnan(df.apply(pd.value_counts)[key][1]):
            stas_val[key] += df.apply(pd.value_counts)[key][1]

In [None]:
X = np.arange(len(stas_train))
ax = plt.subplot(111)
ax.bar(X, stas_train.values(), width=0.5, color='b', align='center')
ax.bar(X-0.5, stas_val.values(), width=0.5, color='g', align='center')
ax.legend(('Train','Validation'))
plt.xticks(X, stas_train.keys())
plt.title("AU dataset distribution ", fontsize=17)
plt.show()

## Calculate weight for positive and negative sample in each class

In [None]:
data_train = glob.glob(r"./data/Third ABAW Annotations/AU_Detection_Challenge/Train_Set_Edit/*")
data_val = glob.glob(r"./data/Third ABAW Annotations/AU_Detection_Challenge/Validation_Set_Edit/*")
exclude_list = ['10-60-1280x720.txt','10-60-1280x720_right.txt','135-24-1920x1080_left.txt', 
                '135-24-1920x1080_right.txt','46-30-484x360_left.txt','46-30-484x360_right.txt','86-24-1920x1080.txt']

In [None]:
count = 0
stas_train = {'AU1':0, 'AU2':0, 'AU4':0, 'AU6':0, 'AU7':0, 'AU10':0, 'AU12':0, 'AU15':0, 'AU23':0, 'AU24':0, 'AU25':0, 'AU26':0}
for i in range(len(data_train)):
    if not os.path.basename(data_train[i]) in exclude_list:
        df = pd.read_csv(data_train[i], delimiter = ",")
        count+=df.shape[0]
        for key, value in stas_train.items():
            if not math.isnan(df.apply(pd.value_counts)[key][1]):
                stas_train[key] += df.apply(pd.value_counts)[key][1]

In [None]:
pos_weights = []
for key, value in stas_train.items():
    pos_weights.append(count/(2*value))

## Create tf.data.Dataset

In [None]:
train_name = os.listdir(r"./data/Third ABAW Annotations/AU_Detection_Challenge/Train_Set_Edit")
val_name = os.listdir(r"./data/Third ABAW Annotations/AU_Detection_Challenge/Validation_Set_Edit")

In [None]:
X_train = []
y_train = []
for file in train_name:
    df = pd.read_csv(r"./data/Third ABAW Annotations/AU_Detection_Challenge/Train_Set_Edit/"+file, delimiter = ",")
    image_names = os.listdir(r"./data/cropped_aligned/"+file.replace(".txt",""))
    if len(image_names)!=df.shape[0]:
        continue
    y_train.extend(np.array(df))
    for name in image_names:
        if not "jpg" in name:
            continue
        X_train.append(r"./data/cropped_aligned/"+file.replace(".txt","")+"/"+name)
X_train = np.array(X_train)
y_train = np.array(y_train,dtype=np.float32)

In [None]:
X_val = []
y_val = []
for file in val_name:
    df = pd.read_csv(r"./data/Third ABAW Annotations/AU_Detection_Challenge/Validation_Set_Edit/"+file, delimiter = ",")
    y_val.extend(np.array(df))
    image_names = os.listdir(r"./data/cropped_aligned/"+file.replace(".txt",""))
    for name in image_names:
        if not "jpg" in name:
            continue
        X_val.append(r"./data/cropped_aligned/"+file.replace(".txt","")+"/"+name)
X_val = np.array(X_val)
y_val = np.array(y_val, dtype=np.float32)

In [None]:
def load_image(image_path,label):
    img = tf.io.read_file(image_path)
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.image.resize(img, (112, 112))
    return img, label

In [None]:
train_loader = tf.data.Dataset.from_tensor_slices((X_train,y_train))
train_dataset = train_loader.shuffle(len(X_train))
train_dataset = train_dataset.map(
    load_image, num_parallel_calls=tf.data.AUTOTUNE).batch(256)
train_dataset = train_dataset.prefetch(buffer_size=tf.data.AUTOTUNE)

In [None]:
val_loader = tf.data.Dataset.from_tensor_slices((X_val,y_val))
val_dataset = val_loader.map(
    load_image, num_parallel_calls=tf.data.AUTOTUNE).batch(256)
val_dataset = val_dataset.prefetch(buffer_size=tf.data.AUTOTUNE)

## Define model and compile

In [None]:
def cnn_block(filters, kernel_size, input_shape, is_last):
    if (input_shape != None):
        return tf.keras.Sequential([
            tf.keras.layers.Conv2D(filters=filters, kernel_size=kernel_size,input_shape=input_shape, padding="same", strides=1),
            tf.keras.layers.BatchNormalization(axis=-1),
            tf.keras.layers.ReLU(),
            tf.keras.layers.MaxPool2D((2, 2))])
    else:
        if is_last:
            return tf.keras.Sequential([
                tf.keras.layers.Conv2D(filters=filters, kernel_size=kernel_size, padding="same", strides=1), 
                tf.keras.layers.BatchNormalization(axis=-1),
                tf.keras.layers.ReLU()])
        else:
            return tf.keras.Sequential([
                tf.keras.layers.Conv2D(filters=filters, kernel_size=kernel_size, padding="same", strides=1), 
                tf.keras.layers.BatchNormalization(axis=-1),
                tf.keras.layers.ReLU(),
                tf.keras.layers.MaxPool2D((2, 2))])    

In [None]:
def get_model():
    inputs = tf.keras.Input(shape=(112, 112, 3))
    face = cnn_block(32, (3,3), (112,112,3), False)(inputs)
    face = cnn_block(64, (3,3), None, False)(face)
    face = cnn_block(128, (3,3), None, False)(face)
    face = cnn_block(128, (3,3), None, False)(face)
    face = cnn_block(256, (3,3), None, False)(face)
    face = cnn_block(256, (3,3), None, True)(face)
    
    N, H, W, C = face.shape
    face_vector = tf.keras.layers.Reshape((H * W, C))(face)
    attention_weight = tf.keras.layers.Dense(units=128, activation='relu')(face_vector)
    attention_weight = tf.keras.layers.BatchNormalization()(attention_weight, training=False)
    attention_weight = tf.keras.layers.Activation("relu")(attention_weight)
    attention_weight = tf.keras.layers.Dense(units=1, activation=None)(attention_weight)
    attention_weight = tf.nn.softmax(attention_weight, axis=1)
    face_vector = tf.keras.layers.Dot(axes=1)([face_vector, attention_weight])
    face_vector = tf.keras.layers.Reshape((C,))(face_vector)

    features = tf.keras.layers.Dense(units=128, activation='relu')(face_vector)
    features = tf.keras.layers.Dropout(rate=0.5)(features)
    outputs = tf.keras.layers.Dense(12,activation = "sigmoid",
                                    kernel_regularizer=tf.keras.regularizers.l2(0.005),
                                    activity_regularizer=tf.keras.regularizers.l1(0.005))(features)
    model = tf.keras.Model(inputs, outputs)
    return model

In [None]:
def weight_BCE_loss(y_true,y_pred):
    loss = tf.reduce_mean(tf.nn.weighted_cross_entropy_with_logits(labels=y_true, logits=y_pred, pos_weight=tf.constant(pos_weights)), axis=-1)
    return loss
def scheduler(epoch, lr):
    if epoch<6:
        return 1e-3
    else:
        return 1e-4

In [None]:
model=get_model()
model.summary()
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
              loss=weight_BCE_loss,
              metrics=[tf.keras.metrics.BinaryAccuracy(), 
                       tfa.metrics.F1Score(num_classes=12, average='macro',threshold=0.5)])

In [None]:
filepath = r"./model" + r"/saved-model-BCE-{epoch:02d}-{val_f1_score:.4f}.hdf5"
cp_callback = [tf.keras.callbacks.ModelCheckpoint(filepath=filepath, monitor="val_f1_score",
                                        save_weights_only=True,save_best_only=False, verbose=1),
               tf.keras.callbacks.LearningRateScheduler(scheduler)]
history = model.fit(train_dataset, validation_data=val_dataset, 
                    epochs=20, callbacks=cp_callback)

## Predict on test set for submission

In [None]:
model=get_model()
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
              loss=weight_BCE_loss,
              metrics=[tf.keras.metrics.BinaryAccuracy(), 
                       tfa.metrics.F1Score(num_classes=12, average='macro',threshold=0.5)])

In [None]:
model.load_weights(r"./model/saved-model-BCE-03-0.4802.hdf5")

In [None]:
def predicted_test(results, pwd, img_num):
    with open(f'{pwd}.txt', 'w') as file:
        file.write(f'id,AU1,AU2,AU4,AU6,AU7,AU10,AU12,AU15,AU23,AU24,AU25,AU26\n')
        for r, n in zip(results, img_num):
            file.write(n)
            file.write(',')
            pred = ','.join([str(_) for _ in r])
            file.write(pred)
            file.write('\n')

In [None]:
test_path = r"./data/test_dir"
test_names = os.listdir(test_path)
for test_dir in test_names:
    croped_dir_list = sorted(os.listdir(f'{test_path}/{test_dir}'))
    results = []
    img_num = []
    for img in tqdm(croped_dir_list):
        img_path = f'{test_path}/{test_dir}/{img}'
        _,  ext = os.path.splitext(img_path)
        if ext != '.jpg':
            continue
        input_img = tf.io.read_file(img_path)
        input_img = tf.image.decode_jpeg(input_img, channels=3)
        input_img = tf.image.resize(input_img, (112, 112))
        input_img = tf.expand_dims(input_img, 0)
        result = model.predict(input_img)
        predict = np.where(result[0] > 0.5, 1, 0)
        img_num.append(img)
        results.append(predict)
    predicted_test(results, f'{TARGET_DIR}/{test_dir}', img_num)