In [9]:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Generate Learning Content for Language Teaching Materials
Prototype for Experiments, work-in-progress with limited comments
"""
__author__ = ["Leo S. Rüdian"]
__copyright__ = "2024, Rüdian"
__credits__ = ["Leo S. Rüdian"]
__license__ = "CC BY-NC-SA"
__version__ = "1.0.0"
__maintainer__ = ["Leo S. Rüdian"]
__email__ =["ruediasy@informatik.hu-berlin.de"]
__status__ = "Work-in-Progress"

import json
from tensorflow import keras
from keras.models import load_model, Model
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.layers import LSTM, Bidirectional, Input, Flatten, Dropout, Dense
#from keras.layers import Dense
from keras.utils.np_utils import to_categorical
import numpy as np
from numpy import argmax

filename1 = "model/sim/model1.hdf5" # model1
filename2 = "model/sim/model2.hdf5" # model2
weight = [.7,.3] # weight to soft-merge model 1 and 2

# Helper Definitions

In [10]:
# define mappings, functions, and cache

with open('newsequences-mesh.json', 'r') as f:
    course = json.load(f)
    
methods_list = ['introduction', 'memorization-challenge', '#motivation', 'translation-challenge', 'translation-correction', '#dashboard', 'listening-correction', 'writing', 'end', 'translation', 'listening', 'memorization', 'listening-challenge', 'flashcard', 'memorization-correction']

def convert_to_hotvector(X,a):
    mapping = {}
    for x in range(len(a)):
      mapping[a[x]] = x
   
    # integer representation
    for x in range(len(X)):
      X[x] = mapping[X[x]]
    
    for x in range(len(a)):
        X.append(x)
        
    C = to_categorical(X, num_classes=len(a))
    C = C[:-(len(a))]
    return C,mapping

def getVector(s1,s2,s3,s4,s5):
    global a,b,c,d,e
    global mapA,mapB,mapC,mapD,mapE
    
    A_c,mapA = convert_to_hotvector(list(s1),a) 
    B_c,mapB = convert_to_hotvector(list(s2),b) 
    C_c,mapC = convert_to_hotvector(list(s3),c)
    D_c,mapD = convert_to_hotvector(list(s4),d) 
    E_c,mapE = convert_to_hotvector(list(s5),e) 
    
    return [A_c,B_c,C_c,D_c]

def getVectorY(s1,s2,s3,s4,s5):
    global a,b,c,d,e
    global mapA,mapB,mapC,mapD,mapE
    A_c,mapA = convert_to_hotvector(list(s1),a) 
    B_c,mapB = convert_to_hotvector(list(s2),b) 
    C_c,mapC = convert_to_hotvector(list(s3),c)
    D_c,mapD = convert_to_hotvector(list(s4),d) 
    E_c,mapE = convert_to_hotvector(list(s5),e) 
    
    A_c2 = np.zeros((B_c.shape[0],B_c.shape[1])) # override by 0 
    C_c = np.zeros((B_c.shape[0],B_c.shape[1])) # override by 0 
    E_c = np.zeros((B_c.shape[0],B_c.shape[1])) # override by 0 
    
    for i in range(len(A_c)):
        for j in range(len(A_c[0])):
            A_c2[i][j] = A_c[i][j]
            
    vec = []
    for l in range(len(A_c)):
        vec.append([A_c[l],B_c[l]])
    return vec

# prepare the dataset of input to output pairs encoded as integers
seq_length = 2
dataX = []
dataXa = []
dataXb = []
dataXc = []
dataXd=[]
dataY = []

trainCategoryY = []
trainColorY = []
dataX_test, dataY_test = [], []
ytrain_1 = []
ytrain_2 = []

dataX_personality = []
dataX_usermodel = []
dataX_method = []

cache = {}
for i in course:
    personality = i[0]
    content = i[1]
    for item in content:
        method = item[0]
        usermodel = item[1]
        cachetest = ''.join(str(x) for x in personality)+''.join(str(x) for x in usermodel)+method
        if not cachetest in cache:
            dataX_personality.append(personality)
            dataX_usermodel.append(usermodel)
            dataX_method.append(method)
            cache[cachetest]=1

dataX_method,dataX_method_map = convert_to_hotvector(list(dataX_method),methods_list) 

# prepare Y
all_dataY_method = []
all_dataX_method = []
all_dataX_personality = []
all_dataX_usermodel = []

for j in range(len(dataX_method)-1):
    all_dataX_personality.append(dataX_personality[j])
    all_dataX_usermodel.append(dataX_usermodel[j])
    all_dataX_method.append(dataX_method[j])
    all_dataY_method.append(dataX_method[j+1])
   

n_patterns = len(all_dataX_method)
X_method = np.reshape(all_dataX_method, (n_patterns, (len(methods_list)), 1))
Y_method = np.reshape(all_dataY_method, (n_patterns, (len(methods_list))))
X_personality = np.reshape(all_dataX_personality, (n_patterns, 3, 1))
X_usermodel = np.reshape(all_dataX_usermodel, (n_patterns, 4, 1))

print(Y_method.shape)

(8130, 15)


# Define the Model

In [11]:
def make_model():
    input_method=Input(shape=(X_method.shape[1], X_method.shape[2]), name="Input_method")
    input_usermodel=Input(shape=(X_usermodel.shape[1], X_usermodel.shape[2]), name="Input_usermodel")
    input_personality=Input(shape=(X_personality.shape[1], X_personality.shape[2]), name="Input_personality")
    
    bi_lstm_method = Bidirectional(LSTM(16,return_sequences=False), name="LSTM_method")(input_method)
    
    f_usermodel = Flatten()(input_usermodel)
    f_personality = Flatten()(input_personality)
    
    all_input = keras.layers.concatenate([bi_lstm_method, f_usermodel, f_personality]) #
        
    dropout1 = Dropout(0.4, name="Dropout")(all_input)

    output_a = Dense(15, activation='softmax', name='output_method')(dropout1)  
    
    model = Model(inputs = [input_method,input_usermodel,input_personality], outputs=[output_a]) # 
 
    return model

model = make_model()

losses ={'output_method':keras.losses.CategoricalCrossentropy()} 
optimizers = keras.optimizers.Adam(clipnorm=1)

model.compile(optimizer=optimizers, loss=losses,metrics="accuracy")
model.summary()

Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 Input_method (InputLayer)      [(None, 15, 1)]      0           []                               
                                                                                                  
 Input_usermodel (InputLayer)   [(None, 4, 1)]       0           []                               
                                                                                                  
 Input_personality (InputLayer)  [(None, 3, 1)]      0           []                               
                                                                                                  
 LSTM_method (Bidirectional)    (None, 32)           2304        ['Input_method[0][0]']           
                                                                                            

# Soft-Merge Models

In [None]:
model1 = load_model(filename1, compile=False) 
weights_1 = model1.get_weights()

model2 = load_model(filename2, compile=False) 
weights_2 = model2.get_weights()

weights_merge  = np.average( np.array([ weights_1, weights_2]), axis=0 , weights=weight)

model.set_weights(weights_merge)
model.save('model/sim/merge.hdf5')

# Evaluation "Amplifying/Unlearning"

In [None]:
with open('newsequences-test-cv.json', 'r') as f:
    course_test = json.load(f)

testdataX_personality = []
testdataX_usermodel = []
testdataX_method = []

for i in course_test:
    personality = i[0]
    content = i[1]
    for item in content:
        method = item[0]
        usermodel = item[1]
        testdataX_personality.append(personality)
        testdataX_usermodel.append(usermodel)
        testdataX_method.append(method)

testdataX_method,testdataX_method_map = convert_to_hotvector(list(testdataX_method),methods_list) 
testdataX_method_map_reverse = {}
for i,val in testdataX_method_map.items():
    testdataX_method_map_reverse[val]=i

# prepare Y
testall_dataY_method = []
testall_dataX_method = []
testall_dataX_personality = []
testall_dataX_usermodel = []

for j in range(len(testdataX_method)-1):
    testall_dataX_personality.append(testdataX_personality[j])
    testall_dataX_usermodel.append(testdataX_usermodel[j])
    testall_dataX_method.append(testdataX_method[j])
    testall_dataY_method.append(testdataX_method[j+1])

def getprediction(pattern):
        
    n_patterns = 1
    
    X_method = np.reshape(pattern[0], (n_patterns, (len(methods_list)), 1))
    X_personality = np.reshape(pattern[2], (n_patterns, 3, 1))
    X_usermodel = np.reshape(pattern[1], (n_patterns, 4, 1))
    
    prediction = model.predict([X_method,X_usermodel,X_personality], verbose=0)
    # make one-hot encoding
    result = prediction
    r_a = list(result[0])    
    max_a = argmax(r_a)
    r_a =  np.zeros((len(methods_list),), dtype=int)
    r_a[max_a] = 1
    
    return r_a

def testmeshing(pattern,current_method,predicted):
    personality = pattern[2]
    hit, nhit = 0, 0
    
    if predicted != 'end' and current_method != 'end':
        if personality[0]==1: # correction
            if predicted != 'writing' and predicted != 'introduction':# and cnt>4:
                if current_method in ['sentenceconstruct','writing']:
                    if '-correction' in predicted: hit += 1
                    else: nhit += 1

        if personality[2]==1: # challenge
            if predicted != 'writing' and predicted != 'introduction' and not 'correction' in predicted:
                if '-challenge' in predicted: hit += 1
                else: nhit += 1
        
        if personality[1]==1 and current_method != '#dashboard' and current_method != 'introduction' and current_method != 'flashcard' and current_method != '#motivation'and predicted != 'writing': # gamification
            if predicted == '#dashboard' or predicted == '#motivation':
                hit += 1
            else: nhit += 1
        
    if hit > 1: hit = 1
    if nhit > 1: nhit = 1
    
    if hit == 1: nhit = 0
    
    return hit, nhit
    
def testantimeshing(pattern,current_method,predicted):
    personality = pattern[2]    
    hit, nhit = 0, 0
    
    if predicted != 'end' and current_method != 'end':
        if personality[0]==0: # correction
            if predicted != 'writing' and predicted != 'introduction':# and cnt>4:
                if current_method in ['sentenceconstruct','writing']:
                    if '-correction' in predicted: hit += 1
                    else: nhit += 1

        if personality[2]==0: # challenge
            if predicted != 'writing' and predicted != 'introduction' and not 'correction' in predicted:
                if '-challenge' in predicted: hit += 1
                else: nhit += 1
        
        if personality[1]==0 and current_method != '#dashboard' and current_method != 'introduction' and current_method != 'flashcard' and current_method != '#motivation'and predicted != 'writing': # gamification
            if predicted == '#dashboard' or predicted == '#motivation':
                hit += 1
            else: nhit += 1
        
    if hit > 1: hit = 1
    if nhit > 1: nhit = 1
    
    if hit == 1: nhit = 0
    
    return hit, nhit

# use merged model
filename = 'model/sim/merge.hdf5'
model.load_weights(filename)
model.compile(optimizer=optimizers, loss=losses,metrics="accuracy")

teststeps = 1000
hit_all, nhit_all, anz = 0,0,0
mesh_hit_all, mesh_nhit_all, mesh_anz = 0,0,0
antimesh_hit_all, antimesh_nhit_all, antimesh_anz = 0,0,0


for num in range(teststeps):
    # get random element
    start = np.random.randint(0, len(testall_dataX_method)-1)
    usermodel = testall_dataX_usermodel[start]
    personality = testall_dataX_personality[start]
    pattern = np.array([testall_dataX_method[start],testall_dataX_usermodel[start],testall_dataX_personality[start]])
    ziel = np.array(testall_dataY_method[start])
    
    current_method = testdataX_method_map_reverse[argmax(testall_dataX_method[start])]
    
    predicted = getprediction(pattern)
    if argmax(ziel) == argmax(predicted):
        hit_all += 1
    else:
        nhit_all += 1
    anz += 1
    
    predicted_method = testdataX_method_map_reverse[argmax(predicted)]
    
    # evaluation
    mesh_hit, mesh_nhit = testmeshing(pattern,current_method,predicted_method)
    if mesh_hit + mesh_nhit > 0:
        mesh_hit_all += mesh_hit
        mesh_nhit_all += mesh_nhit
        mesh_anz += 1
        
    antimesh_hit, antimesh_nhit = testantimeshing(pattern,current_method,predicted_method)
    if antimesh_hit + antimesh_nhit > 0:
        antimesh_hit_all += antimesh_hit
        antimesh_nhit_all += antimesh_nhit
        antimesh_anz += 1      
    
print('---- Evaluation after first merge -----')

print('mesh:',mesh_hit_all, mesh_nhit_all)
print('mesh:',round(mesh_hit_all/mesh_anz,4), mesh_anz, 'not considered:',(teststeps-mesh_anz)) 

print('antimesh:',antimesh_hit_all, antimesh_nhit_all)
print('antimesh:',round(antimesh_hit_all/antimesh_anz,4), antimesh_anz, 'not considered:',(teststeps-antimesh_anz)) 