In [None]:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Evaluates Combined Models
Prototype for Experiments
"""
__author__ = ["Leo S. Rüdian"]
__copyright__ = "2024, Rüdian"
__credits__ = ["Leo S. Rüdian"]
__license__ = "CC BY-NC-SA"
__version__ = "1.0.0"
__maintainer__ = ["Leo S. Rüdian"]
__email__ =["ruediasy@informatik.hu-berlin.de"]
__status__ = "Prototype"

In [None]:
from numpy import array
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Flatten,Embedding,Dense
import pandas as pd
import json
import numpy as np
from numpy import ones
from numpy import zeros
from numpy import hstack
from numpy import argmax
from matplotlib import pyplot
from numpy.random import rand
from numpy.random import randn
from keras.layers import Dense, LSTM
from keras.layers import LeakyReLU, Dropout
from keras.layers import Conv1DTranspose
from keras.layers import Conv1D, MaxPooling1D, RepeatVector
from keras.layers import Reshape
from tensorflow.keras.layers import Embedding
from tensorflow import keras
from tensorflow.keras import layers

from numpy.random import randint
from keras.models import Sequential
from keras.models import load_model
from keras.utils.np_utils import to_categorical
import random

from tensorflow.keras.layers import LSTM, Bidirectional, Input, TimeDistributed,ZeroPadding1D
from tensorflow.keras.callbacks import ModelCheckpoint
from keras.models import Model

from os import walk

In [None]:
# Prepare dummy model

a = ['B','P','SCD','SCE','MC','H','W','SP','T_9','T_10']
b = list(range(0,50)) # 50: items: 0-49
b_cluster = [
    [0,1,2,3],
    [4,5],
    [6,7,8,9],
    [10,11,12,13],
    [14,15,16],
    [17,18],
    [19,20],
    [21,22],
    [23,24,25],
    [26,27,28,29],
    [30,31,32],
    [33,34,35,36],
    [37,38,39,40],
    [41,42],
    [43,44],
    [45,46,47],
    [48,49],
]
c = [0,1] # knowledge binary level: 1:'known','0:unknown'
d = ['pass','fail'] # performance
e = [0,1] # groups preference level
f = [0,1] # collapsing parameter

def convert_to_hotvector(X,a):
    mapping = {}
    for x in range(len(a)):
      mapping[a[x]] = x
   
    # integer representation
    for x in range(len(X)):
      X[x] = mapping[X[x]]
    
    for x in range(len(a)):
        
        X.append(x)
    C = to_categorical(X, num_classes=len(a))
    C = C[:-(len(a))]
    return C,mapping

def getVector(s1,s2,s3,s4,s5):
    global a,b,c,d,e
    global mapA,mapB,mapC,mapD,mapE
    
    A_c,mapA = convert_to_hotvector(list(s1),a) 
    B_c,mapB = convert_to_hotvector(list(s2),b) 
    C_c,mapC = convert_to_hotvector(list(s3),c)
    D_c,mapD = convert_to_hotvector(list(s4),d) 
    E_c,mapE = convert_to_hotvector(list(s5),e) 
    
    return [A_c,B_c,C_c,D_c,E_c]

def getVectorY(s1,s2,s3,s4,s5):
    global a,b,c,d,e
    global mapA,mapB,mapC,mapD,mapE
    A_c,mapA = convert_to_hotvector(list(s1),a) 
    B_c,mapB = convert_to_hotvector(list(s2),b) 
       
    vec = []
    for l in range(len(A_c)):
        vec.append([A_c[l],B_c[l]])
    return vec

'''
Generate random data for evaluation
'''
def model_rand(num):
    global a,c,d
    pad = []
    for i in range(num):
        A = random.choices(population=a,k=20)
        B = random.choices(population=b,k=20) #dummy
        C = random.choices(population=c,k=50)
        D = random.choices(population=d,k=20, weights=[0.6,0.4])
        E = random.choices(population=e,k=5)
        while sum(E)== 0: # at least 1 pref=1
            E = random.choices(population=e,k=5)  
        active_B = None
        B_pointer = 0
        for j in range(len(B)):
            if active_B == None:
                active_B = random.choice(b_cluster) 
            
            B[j] = active_B[B_pointer]
            if len(active_B)-1 == B_pointer:
                active_B = None
                B_pointer = 0
            else:
                B_pointer += 1

        pad.append([A,B,C,D,E])
        
    return pad

'''
Generate 1000 Samples
'''
raw_text = model_rand(1000)

n_chars = len(raw_text)
n_vocab_a, n_vocab_b, n_vocab_c, n_vocab_d, n_vocab_e = len(a), len(b), len(c), len(d), len(e)

# prepare the dataset of input to output pairs encoded as integers
seq_length = 2
dataX = []
dataXa = []
dataXb = []
dataXd = []
dataXc = []
dataXe = []
dataY = []

trainCategoryY = []
trainColorY = []
dataX_test, dataY_test = [], []
ytrain_1 = []
ytrain_2 = []
for j in range(len(raw_text)):
    for i in range(0, len(raw_text[j][0]) - seq_length, 1):
        
        seq_in_A = raw_text[j][0][i:i + seq_length]
        seq_out_A = raw_text[j][0][i + seq_length]
        
        seq_in_B = raw_text[j][1][i:i + seq_length]
        seq_out_B = raw_text[j][1][i + seq_length]
        
        seq_in_C = raw_text[j][2]
        seq_out_C = raw_text[j][2][i + seq_length]
        
        seq_in_D = raw_text[j][3][i:i + seq_length]
        seq_out_D = raw_text[j][3][i + seq_length]
                
        seq_in_E = raw_text[j][4][i:i + seq_length]
       
        v_x = getVector(seq_in_A,seq_in_B,seq_in_C,seq_in_D,seq_in_E)
        v_y = getVectorY([seq_out_A],[seq_out_B],[seq_out_C],[seq_out_D],[])[0]

        dataX_test.append([v_x[0],v_x[1],seq_in_C,v_x[3],raw_text[j][4]])
        dataY_test.append(v_y)
        
        dataX.append(v_x)
        dataXa.append(v_x[0])
        dataXb.append(v_x[1])
        dataXd.append(v_x[3])
        dataXe.append(raw_text[j][4])
        dataXc.append(seq_in_C) # known/unknown
        
        ytrain_1.append(v_y[0])
        ytrain_2.append(v_y[1])

n_patterns = len(dataX)

ytrain_2 = np.array(ytrain_2)
ytrain_1 = np.array(ytrain_1)

Xa = np.reshape(dataXa, (n_patterns, seq_length*(len(a)), 1))
Xb = np.reshape(dataXb, (n_patterns, seq_length*(len(b)), 1))
Xd = np.reshape(dataXd, (n_patterns, seq_length*(len(d)), 1))
Xc = np.reshape(dataXc, (n_patterns, (len(b)), 1)) # length b = length of known/unknown vector
Xe = np.reshape(dataXe, (n_patterns, 5, 1))

'''
Define Model architecture
'''
def make_model(Xa):
    input_a=Input(shape=(Xa.shape[1], Xa.shape[2]), name="Input_a")
    input_b=Input(shape=(Xb.shape[1], Xb.shape[2]), name="Input_b")
    input_d=Input(shape=(Xd.shape[1], Xd.shape[2]), name="Input_d")
    input_c=Input(shape=(Xc.shape[1], Xc.shape[2]), name="Input_c")
    input_e=Input(shape=(Xe.shape[1], Xe.shape[2]), name="Input_e")
    
    embed = Embedding(input_dim=Xc.shape[1], output_dim=10, input_length=50)(input_c)
    flat2 = Flatten()(embed)
    
    flat_pers = Flatten()(input_e)
    
    bi_lstm1 = Bidirectional(LSTM(16,return_sequences=False), name="LSTM_a")(input_a)
    bi_lstm2 = Bidirectional(LSTM(16,return_sequences=False), name="LSTM_b")(input_b)
    bi_lstm3 = Bidirectional(LSTM(16,return_sequences=False), name="LSTM_d")(input_d)
    
    all_input = keras.layers.concatenate([bi_lstm1, bi_lstm2, bi_lstm3, flat2,flat_pers]) #bi_lstm1,
        
    dropout1 = Dropout(0.4, name="Dropout")(all_input)

    output_a = Dense(10, activation='softmax', name='output_a')(dropout1)    
    output_b = Dense(50, activation='softmax', name='output_b')(dropout1)
    
    model = Model(inputs = [input_a,input_b,input_d,input_c,input_e], outputs=[output_a,output_b]) #,output_b
 
    return model

model = make_model(Xa)

In [None]:
'''
Mapping to recover categorical to classes
'''  
mapAx = dict((v, k) for k, v in mapA.items())
mapBx = dict((v, k) for k, v in mapB.items())
mapCx = dict((v, k) for k, v in mapC.items())
mapDx = dict((v, k) for k, v in mapD.items())
mapEx = dict((v, k) for k, v in mapE.items())

'''
Get the prediction using the model, given the input: <pattern>
'''
def getprediction(pattern):
        
    n_patterns = 1
    Xa = np.reshape(pattern[0], (n_patterns, seq_length*(len(a)), 1))
    Xb = np.reshape(pattern[1], (n_patterns, seq_length*(len(b)), 1))
    Xd = np.reshape(pattern[3], (n_patterns, seq_length*(len(d)), 1))
    Xc = np.reshape(pattern[2], (n_patterns, (len(b)), 1)) # length b = length of known/unknown vector
    Xe = np.reshape(pattern[4], (n_patterns, 5, 1))
    
    prediction = model.predict([Xa,Xb,Xd,Xc,Xe], verbose=0)
    
    # make one-hot encoding
    result = prediction
    
    r_a = list(result[0])
    r_b = list(result[1])
    
    max_a = argmax(r_a)
    max_b = argmax(r_b)
    r_a, r_b, r_c, r_d, r_e = np.zeros((len(a),), dtype=int), np.zeros((len(b),), dtype=int), np.zeros((len(c),), dtype=int), np.zeros((len(d),), dtype=int), np.zeros((len(e),), dtype=int)
    r_a[max_a],  r_b[max_b] = 1,1

    predicted = hotvector_to_list([[r_a],[r_b]],True)
        
    return predicted

'''
Convert hot vector representation to list
'''
def hotvector_to_list(hot,ziel=False):
    global sumabc, features
    i = hot
    course_A ,course_B,course_C,course_D,course_E,course_F = [],[],[],[],[],[]
    num = 0
    
    #if features['A']:
    for k in hot[0]:
        #print('a',k[0])
        A = mapAx[argmax(k)]
        course_A.append(A)
        #print(A)
    
    #if features['B']:
    for k in hot[1]:
        B = mapBx[argmax(k)]
        course_B.append(B)
        #print(B)
    
    if not ziel:
        course_C.append(hot[2])
        
    if not ziel:
        for k in hot[3]:
            #print('d',k[3])
            D = mapDx[argmax(k)]
            course_D.append(D)
            #print(D)
  
    num += 1
        
    return [course_A,course_B,course_C,course_D,course_E,course_F]

'''
Identifies whether the meshing hypothesis is fulfilled by the model
''' 
def check_meshing(source,predict,preferences):
    global a
    s_eoi = source[0][0]
    p_eoi = predict[0][0]
    
    hit, nhit = 0,0
    
    select_from = []
    for j in range(5):
        if preferences[j]==1: select_from.append(a[j*2:j*2+2])
    select_from = np.concatenate(select_from)        
    
    # is predicted element in allowed list of methods?
    if p_eoi in select_from:
        hit += 1
    else:
        nhit += 1
    
    return [hit,nhit]

'''
Identifies whether the model selects only new items
''' 
def check_on(source,predict):
    
    #element of interest
    p_eoi = predict[1][0]
    user_c = source[2][0]
    
    hit, nhit = 0,0
    b_new = []
    for i in range(len(user_c)):
        if user_c[i] == 0:
            b_new.append(i)
    if p_eoi in b_new:
        hit += 1
    else:
        nhit += 1
    
    return [hit,nhit]

'''
Identifies whether the model considers the learning progression
''' 
def check_ros(source,predict):
    s_eoi = source[1][1] #element of interest
    p_eoi = predict[1][0]
    cluster_oi = None
    position = -1
    hit, nhit = 0,0
    
    for i in b_cluster:
        if s_eoi in i:
            cluster_oi = i
            position = cluster_oi.index(s_eoi)
    if position+1 < len(cluster_oi):
        if cluster_oi[position+1] == p_eoi:
            hit += 1
        else:
            nhit += 1
    
    return [hit,nhit]


def testmerge(m1,m2,m3,weight=[.33,.33,.33],show=False):    
    model1 = load_model('model/tesla/'+m1, compile=False) 
    weights_1 = model1.get_weights()
    
    model2 = load_model('model/tesla/'+m2, compile=False) 
    weights_2 = model2.get_weights()
    
    model3 = load_model('model/tesla/'+m3, compile=False) 
    weights_3 = model3.get_weights()
       

    # overide weights of model 1 to mean of 1 and 2
    weights_merge  = np.average( np.array([ weights_1, weights_2, weights_3 ]), axis=0 , weights=weight)
    model.set_weights(weights_merge)
    model.save('tesla/final/merge.hdf5')
    
    losses ={'output_a':keras.losses.CategoricalCrossentropy(),'output_b':keras.losses.CategoricalCrossentropy()}
    optimizers = keras.optimizers.Adam(clipnorm=1)
    model.compile(optimizer=optimizers, loss=losses,metrics="accuracy")
    
    print('model compiled')

    hit_all_mesh, nhit_all_mesh, anz_mesh = 0,0,0
    hit_all_on, nhit_all_on, anz_on = 0,0,0 
    hit_all_ros, nhit_all_ros, anz_ros = 0,0,0 
    
    # test with 100 samples
    for test in range(100):
        start = np.random.randint(0, len(dataX)-1)
        pattern = dataX_test[start]
        ziel = dataY_test[start]
        source = hotvector_to_list(pattern)
        preferences = pattern[4]
        
        target = hotvector_to_list([[ziel[0]],[ziel[1]]],True)
        predicted = getprediction(pattern)
        if show: print('Y pred: ',predicted)
        
        # A
        hit_mesh,nhit_mesh = check_meshing(source,predicted,preferences)
        if show: print('Mesh hit',hit_mesh,', nhit',nhit_mesh)
        hit_all_mesh += hit_mesh
        nhit_all_mesh += nhit_mesh
        anz_mesh += 1
        
        # B
        hit_on,nhit_on = check_on(source,predicted)
        if show: print('ON hit',hit_on,', nhit',nhit_on)
        hit_all_on += hit_on
        nhit_all_on += nhit_on
        if hit_on + nhit_on > 0:
            anz_on += 1
        
        # B
        hit_ros,nhit_ros = check_ros(source,predicted)
        if show: print('ROS hit',hit_ros,', nhit',nhit_ros)
        hit_all_ros += hit_ros
        nhit_all_ros += nhit_ros
        if hit_ros + nhit_ros>0:
            anz_ros += 1
        
  
        if show: print()
        
    print('Mesh',hit_all_mesh, nhit_all_mesh,'\tAcc:',round(hit_all_mesh/anz_mesh,4), anz_mesh)
    print('ON',hit_all_on, nhit_all_on,'\tAcc:',round(hit_all_on/anz_on,4), anz_on)
    print('ROS',hit_all_ros, nhit_all_ros,'\tAcc:',round(hit_all_ros/anz_ros,4), anz_ros)
    
    print()
    return [round(hit_all_mesh/anz_mesh,4), round(hit_all_on/anz_on,4), round(hit_all_ros/anz_ros,4)]

'''
how to use:
[acc_e2h, acc_fl2, acc_cdfe] = testmerge('model_1.hdf5',
                                         'model_2.hdf5',
                                         'model_3.hdf5',
                                         [0,1,.0], False) 
print([acc_e2h, acc_fl2, acc_cdfe])
'''

steps = .05
res_acc = []
all_acc_mesh = []
all_acc_on = []
all_acc_ros = []
start = False
for i in np.arange(1,-.1,-1*(steps)):
    for j in np.arange(0,1-i+0.001,steps):
        k = 1-i-j    
        
        print([round(i,2),round(j,2),abs(round(k,2))])
        [acc_mesh, acc_on, acc_ros] = testmerge(
            'model_1.hdf5', 
            'model_2.hdf5',  
            'model_3.hdf5',
            [round(i,2),round(j,2),abs(round(k))], False)
        res_acc.append([round(i,2),round(j,2),abs(round(k,2))])

        all_acc_mesh.append(acc_mesh)
        all_acc_on.append(acc_on)
        all_acc_ros.append(acc_ros)


In [None]:
# write to file
res_acc_json = json.dumps([res_acc,all_acc_mesh,all_acc_on,all_acc_ros])
with open("evaluation.json", "a") as outfile:
    outfile.write(res_acc_json)