In [1]:
import matplotlib.pyplot as plt
import numpy as np
import math
from matplotlib.lines import Line2D
import os
import json
import matplotlib.ticker as ticker
import seaborn as sns
from mpl_toolkits.axes_grid1.inset_locator import zoomed_inset_axes
from scipy.interpolate import make_interp_spline, BSpline

os.chdir("C:/Users/Zber/Documents/Dev_program/OpenRadar")
from FER.utils import MapRecord, get_label
from sklearn.model_selection import train_test_split


def rename(old_name, data_folder):
    old_path = os.path.join(data_folder, old_name)
    new_path = old_path.replace('_2_', '_')
    os.rename(old_path, new_path)
    print("Rename from {} to {}".format(old_path, new_path))


def annotation_update(record_list, width=100, total_frame=300):
    for record in record_list:
        # if record.num_frames < width:
        #     pad = (width - record.num_frames)//2
        #     if record.onset < pad:
        #         record.peak += pad*2

        #     elif (total_frame - record.peak) < pad:
        #         record.onset -= pad*2
        #     else:
        #         record.onset -= pad
        #         record.peak += pad
        # else:
        #     pad = record.num_frames - width
        #     record.peak -= pad

        record.path = record.path.replace("Raw_0.bin", "{}.npy").replace("\\", "/")

        if record.num_frames != 100:
            record.peak += 1
        assert record.num_frames == 100, 'the num of frames must equal to 100!'
    return record_list


def annotation_attention(record_list, width=30):
    for record in record_list:
        record.onset = math.floor(record.onset * 3 / 10)
        record.peak = record.onset + width - 1
        record.path = record.relative_path.replace("_{}.npy", "")
    return record_list


def annotation_append(subs=['S6', 'S7']):
    str_arr = []
    str_format = "{} {} {} {} {} {} {} {}"
    npy_path = "{}_{}"
    emotion = 'Neutral'
    # subs = ['S0', 'S1', 'S2', 'S3', 'S4', 'S5']

    for sub in subs:
        for i in range(0, 30):
            path = (os.path.join(sub, npy_path.format(emotion, i, )) + '_{}.npy').replace("\\", "/")
            label = "0"
            onset = 31
            peak = 130
            offset = -1
            e1 = 0
            e2 = 0
            e3 = 0
            str_arr.append(str_format.format(path, label, onset, peak, offset, e1, e2, e3))
    return str_arr


def data_split(record_list):
    labels = [r.label for r in record_list]
    train, test = train_test_split(record_list, test_size=0.2, random_state=25, stratify=labels)
    return train, test


def hm_2_frame(root_path, hm_path, frame_path):
    record_list = [MapRecord(x.strip().split(), root_path) for x in open(hm_path)]

    new_record_list = annotation_attention(record_list)
    str_format = "{} {} {} {}\n"
    with open(frame_path, 'w') as f:
        for record in new_record_list:
            f.write(str_format.format(record.path, record.onset, record.peak, record.label))
    print("Write {} Records to txt file".format(len(new_record_list)))


def hm_2_landmark(root_path, hm_path, frame_path):
    record_list = [MapRecord(x.strip().split(), root_path) for x in open(hm_path)]

    new_record_list = annotation_attention(record_list)
    str_format = "{} {} {} {}\n"
    with open(frame_path, 'w') as f:
        for record in new_record_list:
            re_pa = record.path.replace("_{}", "") + ".npy"
            f.write(str_format.format(re_pa, record.onset, record.peak, record.label))
    print("Write {} Records to txt file".format(len(new_record_list)))


def append_record_to_file(record_list, file):
    str_format = "{} {} {} {} {} {} {} {}"
    with open(file, 'a') as f:
        for record in record_list:
            f.write(str_format.format(record.path + "_{}.npy", record.label, record.onset, record.peak,
                                      record.offset, record.width_err, record.height_err, record.index_err) + '\n')
    print("Write {} Records to txt file".format(len(record_list)))


def hm_2_landmark_v1(record_list, frame_path):
    record_list = record_list

    new_record_list = annotation_attention(record_list)
    str_format = "{} {} {} {}\n"
    with open(frame_path, 'w') as f:
        for record in new_record_list:
            re_pa = record.path.replace("_{}", "") + ".npy"
            f.write(str_format.format(re_pa, record.onset, record.peak, record.label))
    print("Write {} Records to txt file".format(len(new_record_list)))


def check_is_in_badfile(bad_file_list, sub, emo, idx):
    for bf in bad_file_list:
        bfname = bf.replace('\n', '').split(',')
        if sub == bfname[0] and emo == bfname[1] and idx == int(bfname[2]):
            return True
    return False

In [12]:
# merge multi text files
root_path = "C:\\Users\\Zber\\Desktop\\Subjects_Heatmap"
# st = "heatmap_train_Standing.txt"
# sit = "heatmap_train_Sitting.txt"
# ground = "heatmap_train_Ground.txt"
# res = "heatmap_train_Pose.txt"


# st = "heatmap_test_Ground_S1.txt"
# sit = "heatmap_test_Ground_S2.txt"
# ground = "heatmap_test_Ground_S3.txt"
# res = "heatmap_test_Ground.txt"


# st = "heatmap_test_Standing.txt"
# sit = "heatmap_test_Sitting.txt"
# ground = "heatmap_test_Ground.txt"
# res = "heatmap_test_Pose.txt"

# st = "heatmap_test_Distance_70cm.txt"
# sit = "heatmap_test_Distance_100cm.txt"
# ground = "heatmap_test_Distance_150cm.txt"
# g1 = "heatmap_test_Distance_200cm.txt"
# res = "heatmap_test_Distance.txt"

# st = "heatmap_train_Distance_70cm.txt"
# sit = "heatmap_train_Distance_100cm.txt"
# ground = "heatmap_train_Distance_150cm.txt"
# g1 = "heatmap_train_Distance_200cm.txt"
# g2 = "heatmap_train_Distance_250cm.txt"
# res = "heatmap_train_Distance_v1.txt"

# st = "heatmap_test_Distance_70cm.txt"
# sit = "heatmap_test_Distance_100cm.txt"
# ground = "heatmap_test_Distance_150cm.txt"
# g1 = "heatmap_test_Distance_200cm.txt"
# g2 = "heatmap_test_Distance_250cm.txt"
# res = "heatmap_test_Distance_v1.txt"
tr = "heatmap_test_{}.txt"
res = "heatmap_test_Motion_v3.txt"
names = ['M1_0', 'M1_1', 'M1_2', 'M2_0', 'M2_1', 'M2_2', 'Distance_100cm']
# names = ['M3_0', 'M3_1', 'M3_2']

arr = [tr.format(n) for n in names]
txt = []


for a in arr:
    with open(os.path.join(root_path, a)) as fp:
        txt.append(fp.read())

with open (os.path.join(root_path, res), 'w') as fp:
    for t in txt:
        fp.write(t+'\n')


In [4]:
# train_form = "{}_train_S6_7_8_9_v1.txt"
# test_form = "{}_test_S6_7_8_9_v1.txt"

train_form = "{}_train_S10a.txt"
test_form = "{}_test_S10a.txt"

root_path = "C:/Users/Zber/Desktop/Subjects_Heatmap"
train_path = os.path.join(root_path, train_form.format('heatmap'))
test_path = os.path.join(root_path, test_form.format('heatmap'))

frame_root_path = "C:/Users/Zber/Desktop/Subjects_Frames"
frame_train_path = os.path.join(frame_root_path, train_form.format('frames'))
frame_test_path = os.path.join(frame_root_path, test_form.format('frames'))

hm_2_frame("", train_path, frame_train_path)
hm_2_frame("", test_path, frame_test_path)

Write 1311 Records to txt file
Write 663 Records to txt file


### Check file has NAN

In [2]:
arr = []
# annotaton_path = "C:\\Users\\Zber\\Desktop\\Subjects_Heatmap\\heatmap_annotation_full_S8.txt"
# annotaton_path = "C:\\Users\\Zber\\Desktop\\Subjects_Heatmap\\heatmap_train_S3_4_5.txt"
annotaton_path = "C:\\Users\\Zber\\Desktop\\Subjects_Heatmap\\heatmap_train_landmark_S5.txt"
root_path = "C:\\Users\\Zber\\Desktop\\Subjects_Heatmap"

record_list = [MapRecord(x.strip().split(), root_path) for x in open(annotaton_path)]

# ele = [-np.inf]

for record in record_list:
    
    npy_path_ele = record.path.format('ele')
    npy_path_azi = record.path.format('azi')

    ele = np.load(npy_path_ele).flatten()
    azi = np.load(npy_path_azi).flatten()

    ele = np.mean(ele)
    azi = np.mean(azi)

    # if ele == np.NINF or ele==np.Inf or azi == np.NINF or azi==np.Inf:
    if np.isnan(ele) or np.isnan(azi):
        print(record.path.replace(root_path+"\\",""))
    if np.isinf(ele) or np.isinf(azi):
        print(record.path.replace(root_path+"\\",""))

S5/Disgust_14_{}.npy
S5/Disgust_15_{}.npy
