In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec

from keras.models import load_model

from utils.custom_loss import mae_wrap_angle, mse_wrap_angle, angle_diff_deg
from utils.load_data_raw import DataGenerator_raw, load_raw_all_h5
from utils.dataset_split import *
from utils.eval import *
from utils.plot import *
from utils.utils import get_filelist, open_json

NUM_WORKER = 4
custom_obj={'mse_wrap_angle': mse_wrap_angle, 'mae_wrap_angle': mae_wrap_angle}

In [None]:
# load filelist
model_dir = '../data/models_trained/'
filelist = get_filelist(model_dir)

# load data
dset_dir = '/media/feliximmohr/Storage/master_thesis/generated/database/raw/raw_nf10_mid/database_raw_nf10_scaledMM.h5'
feat, targ, ID_ref, pos_t, cond_t, par = load_raw_all_h5(dset_dir)

# load unscaled data
dset_nsc_dir = '/media/feliximmohr/Storage/master_thesis/generated/database/raw/raw_nf10_mid/database_raw_nf10.h5'
feat_nsc, targ_nsc, _, _, _, _ = load_raw_all_h5(dset_nsc_dir)

# create parameter dicts for the test batch generators
params = create_test_params(feat, targ, par, shuffle=False)
params_nsc = create_test_params(feat_nsc, targ_nsc, par, shuffle=False)

cn = feat.columns.tolist()

part = {}
for i in range(len(cond_t)):
    cond = cond_t.iloc[i]
    cond_ids_test, pos_ids_test, subject_ids_test = get_subset_ids([cond[0]], cond_t)
    part[cond[0]] = get_subset_sample_idx(ID_ref, cond_ids_test, pos_ids_test, subject_ids_test)

In [None]:
# extract model and corresponding history and test partition filenames from filelist
m_flist = [x for x in filelist if ('history' not in x) and ('partition_test' not in x)]
h_flist = ['_' for x in np.zeros(len(m_flist))]
pt_flist = ['_' for x in np.zeros(len(m_flist))]
for file in filelist:
        if ('history' in file):
            name = file.replace('_history.json', '.h5')
            h_flist[m_flist.index(name)] = file
        if ('partition_test' in file):
            name = file.replace('_partition_test.json', '.h5')
            pt_flist[m_flist.index(name)] = file

# seperate model list based on test type (topology/wrapping/normalization)
m_flist_tt = [x for x in m_flist if 'toptest' in x]
m_flist_nt = [x for x in m_flist if 'normtest' in x]
m_flist_wt = [x for x in m_flist if 'wraptest' in x]

h_flist_tt = [h_flist[m_flist.index(i)] for i in m_flist_tt]
h_flist_nt = [h_flist[m_flist.index(i)] for i in m_flist_nt]
h_flist_wt = [h_flist[m_flist.index(i)] for i in m_flist_wt]

pt_flist_tt = [pt_flist[m_flist.index(i)] for i in m_flist_tt]
pt_flist_nt = [pt_flist[m_flist.index(i)] for i in m_flist_nt]
pt_flist_wt = [pt_flist[m_flist.index(i)] for i in m_flist_wt]

In [None]:
def print_filelist(flist):
    """Print tuple of filelists incl index."""
    if not isinstance(flist,tuple):
        flist = (flist,)
        
    for i in range(len(flist[0])):
        s = ' -' if i<10 else '-'
        for j in range(len(flist)):
            print(i, s, flist[j][i])

# print model list
#print_filelist((m_flist,h_flist,pt_flist))

In [None]:
def load_m_h_pt(model_dir, m_flist, h_flist, pt_flist):
    """Load models from model list and corresponding history and test partition."""
    m = []
    h = ['_' for x in np.zeros(len(m_flist))]
    p = ['_' for x in np.zeros(len(m_flist))]
    for i,j in enumerate(m_flist):
        m.append(load_model(model_dir + j, custom_objects=custom_obj))
        if h_flist[i] != '_':
            h[i] = open_json(model_dir, h_flist[i])
        if pt_flist[i] != '_':
            p[i] = open_json(model_dir, pt_flist[i])
    return m, h, p

# load models
#m_nt, h_nt, pt_nt = load_m_h_pt(model_dir, m_flist_nt, h_flist_nt, pt_flist_nt)
#m_wt, h_wt, pt_wt = load_m_h_pt(model_dir, m_flist_wt, h_flist_wt, pt_flist_wt)
m_tt, _, _ = load_m_h_pt(model_dir, m_flist_tt, h_flist_tt, pt_flist_tt)

## Prediction Evaluation

##### Print model names

In [None]:
print_filelist(m_flist_tt)

##### Predict on models

In [None]:
#part_x_M027, pos_ids = get_pos_IDs(part['NFCHOA_M027'],ID_ref)
#part_x_R006, _ = get_pos_IDs(part['NFCHOA_R006'],ID_ref)

params_t = params.copy()
params_t['batch_size'] = 1000

params_t_nsc = params_nsc.copy()
params_t_nsc['batch_size'] = 1000

params_t_nic = create_test_params(feat[cn[:64]], targ, par, batch_size=1000, shuffle=False)

def model_pred_pos(model, part, params, ID_ref):
    """Returns list of ndarrays of predictions on model based on test partition per position."""
    part_x, pos_ids = get_pos_IDs(part,ID_ref)
    b_gen = []
    pred = []
    y = []
    for j in range(len(part_x)):
        b_gen.append(DataGenerator_raw(part_x[j], **params))
        pred.append(model.predict_generator(b_gen[j], verbose=0, use_multiprocessing=True, workers=4))
        y.append(get_y_gen(b_gen[j]))
    return pred, y

In [None]:
pred_R006 = {}
pred_M027 = {}
y_R006 = {}
y_M027 = {}
for i,md in enumerate(m_flist_tt):
    if 'no-ic' in md:
        params_tmp = params_t_nic
    elif 'nsc' in md:
        params_tmp = params_t_nsc
    else:
        params_tmp = params_t    
    
    pred_R006[md], y_R006[md] = model_pred_pos(m_tt[i] , part['NFCHOA_R006'], params_tmp, ID_ref)
    pred_M027[md], y_M027[md] = model_pred_pos(m_tt[i] , part['NFCHOA_M027'], params_tmp, ID_ref)
    #print(i)
k = list(pred_M027.keys())

##### Generate Plots

In [None]:
# generate plots
%matplotlib
# all positions overview
for name in k[11:12]:
    model_name, loss_name, tdata, special, bs, _ = get_model_info(name)
    f1 = plot_locaz_all(pred_R006[name],
                        l=True,
                        title='Model: {}{} | Loss: {} | Test-Data: {}'.format(model_name, special, loss_name, 'NFCHOA_R006'))
    #f2 = plot_locaz_all(pred_M027[name],
    #                    l=True,
    #                    title='Model: {}{} | Loss: {} | Test-Data: {}'.format(model_name, special, loss_name, 'NFCHOA_M027'))

In [None]:
# individual positions incl. gt comparison
%matplotlib

m_idx = 12
pos_idx = 0

#model_name, loss_name, tdata, special, bs, _ = get_model_info(k[m_idx])

# manually wrap angles
def wrap(a):
    for i in range(len(a)):
        if a[i] > 180:
            a[i] = a[i]-360
        elif a[i] < -180:
            a[i] = a[i]+360
    return a

#title = 'Model: {}{} | Loss: {} | Test-Data: {}\n Position: {}'.format(model_name, special, loss_name, 'NFCHOA_M027', pos_idx)
f1, ax_p, ax_y = plot_locaz(wrap(pred_M027[k[m_idx]][pos_idx]), y_M027[k[m_idx]][pos_idx], l=True)
#f.suptitle(title, fontsize='x-large')
lines = (ax_p.get_children()[:1][0],ax_y.get_children()[:1][0],ax_p.get_children()[:][1],ax_p.get_children()[:][2],ax_p.get_children()[:][4])
#f.legend(lines, ('predictions', 'human / gt', 'mean over subjects/repititions at 0° head rotation', '± 180°','± 180°'))

#title = 'Model: {}{} | Loss: {} | Test-Data: {}\n Position: {}'.format(model_name, special, loss_name, 'NFCHOA_R006', pos_idx)
f2, ax_p, ax_y = plot_locaz(wrap(pred_R006[k[m_idx]][pos_idx]), y_R006[k[m_idx]][pos_idx], l=True)
#f.suptitle(title, fontsize='x-large')
lines = (ax_p.get_children()[:1][0],ax_y.get_children()[:1][0],ax_p.get_children()[:][1],ax_p.get_children()[:][2],ax_p.get_children()[:][4])
#f.legend(lines, ('predictions', 'human / gt', 'mean over subjects/repititions at 0° head rotation', '± 180°','± 180°'))

In [None]:
f1.set_size_inches(10.5, 5)
f1.savefig('f1.png', dpi=200)
f2.set_size_inches(10.5, 5)
f2.savefig('f2.png', dpi=200)