## ABSTRACT

This is the code related to the article presenting <b>the methodology IKE-XAI for Implicit Knowledge Extraction with eXplainable Artificial Intelligence</b> from the article. We use the <b>Tower of Hanoï (TOH)</b> as a guiding showcase of the proposed IKE-XAI methodology<br/> 
<b>[Chraibi Kaadoud et al, 2022]</b> 
CHRAIBI KAADOUD, Ikram , BENNETOT, Adrien, MAWHIN, Barbara, DIAZ-RODRIGUEZ, Natalia, Explaining  <i>Aha!</i> moments in artificial agents through IKE-XAI: Implicit Knowledge Extraction for eXplainable AI. <i>Neural Networks</i>, 2022,https://doi.org/10.1016/j.neunet.2022.08.002

<p align = "center">
<img src="Figures/ExperimentalDesign.png" alt="drawing" width="700"/>
</p>
<p align = "center">
    <b>Figure 2 of Chraibi Kaadoud et al (2022):</b> Experimental design of the IKE-XAI methodology to make explicit the process of the autonomous agent (AA) knowledge construction in three steps:<br/>
<b>STEP 1) RL Phase:</b> a Q-learning agent learns to perform the TOH task. At several stages of the learning process,
the training process is suspended to make a recording of the AA’s move sequences while it plays after learning. This step obtains: a) sequences of moves and b) an AA trained to perform TOH whose behavior is observable through its sequences of moves, to inform the solution chosen by the AA to reach the solution state (i.e., sequences of moves).<br/>
<b>STEP 2) Moves Sequence Learning Phase:</b> the recorded sequences of moves of the AA are fed to train an LSTM
to predict the AA’s next move at time t based on the current and past ones. This step returns a dataset of recordings of hidden patterns (i.e, the activity vectors of the hidden layer generated by the network at each input). The trained LSTM model had encoded an implicit representation of the TOH rules due to the learned sequences. Let us note that the trained LSTM model is trained on sequences generated from the TOH abstract representation (Figure B.13). <br/>
<b>STEP 3) XAI Phase:</b> a post-hoc implicit rule extraction algorithm and a graph visualization technique are applied
to the dataset of recorded hidden patterns to extract graphs of AA behavior at different stages of training.
</p>


The current notebook present the main steps of the IKE-XAI methodology. <br/>
A breif presentation of the TOH task is provided in <b>[Chraibi Kaadoud et al, 2022] appendix B</b>


#### Installation of requirements


In [None]:
!pip install numpy
!pip install pandas
!pip install matplotlib
!pip install networkx
!pip install pydot
!pip install tensorflow
!pip install simplejson
!pip install scikit-learn==1.0.1

## STEP 1 - Train the Q learning Artificial Agent (AA) on TOH task

The RL Phase consists in training a Q-learning agent to perform the TOH task. At several stages of the learning process, the training process is suspended to make a recording of the AA’s move sequences while it plays after learning. <br/>
This step obtains: <br/>
<b>a)</b> sequences of moves <br/> 
<b>b)</b> an AA trained to perform TOH whose behavior is observable through its sequences of moves, to inform the solution chosen by the AA to reach the solution state (i.e., sequences of moves).<br/>

In [None]:
import numpy as np
import itertools
import pandas as pd
import matplotlib.pyplot as plt
import random
import tools
import os
from os import path
import pandas as pd
import networkx as nx
import matplotlib.pyplot as plt

#### Declaration of all functions for step 1

In [None]:

class TowersOfHanoi:
    def __init__(self, state):
        # "State" is a tuple of length N, where N is the number of discs, and the elements are peg indices in [0,1,2]
        self.state = state               
        self.discs = len(self.state)

    def discs_on_peg(self, peg):
        return [disc for disc in range(self.discs) if self.state[disc] == peg]

    def move_allowed(self, move):
        discs_from = self.discs_on_peg(move[0])
        discs_to = self.discs_on_peg(move[1])
        if discs_from:
            return (min(discs_to) > min(discs_from)) if discs_to else True
        else:
            return False

    def get_moved_state(self, move):
        if self.move_allowed(move):
            disc_to_move = min(self.discs_on_peg(move[0]))
        moved_state = list(self.state)
        moved_state[disc_to_move] = move[1]
        moved_state_t= tuple(moved_state)
        return moved_state_t


def generate_grammar_from_df(N, R,G,result_dir_for_N_disks, debug=False):
    df_R = R.replace(0, 1)
    df_R = df_R.replace(-np.inf, 0)
    if debug:
        print("type(df_R) : ", type(df_R))
        print("df_R : ", df_R)
        print(" df_R column : ", df_R.columns)
        print(" df_R rows : ", df_R.index)

    df_cols_tuple = df_R.columns.tolist()
    df_cols = [str(x) for x in df_cols_tuple]

    toh_grammar={}
    for i in range(len(df_R.columns)):
        node = str(df_R.columns[i])
        if debug : print("\n node : ", node)
        list_transitions=[]
        for one_neighbor in G.neighbors(node):
            index_one_neighbor= list(df_cols).index(one_neighbor)
            diff = ""
            assert(len(node)==len(one_neighbor))
            for t in range(len(node)):
                if node[t] != one_neighbor[t]:
                    diff = str(node[t]) + "-" + str(one_neighbor[t])
            tuple=(diff, index_one_neighbor)
            list_transitions.append(tuple)
            if debug : print(" =====>one_neighbor : ", one_neighbor, " -  diff : ", diff)

        toh_grammar[i]={"transitions": list_transitions, "label":node.replace(" ","").replace(",","").replace("(","").replace(")","")}

    if debug : print ("toh_grammar : ", toh_grammar)
    res_file =  result_dir_for_N_disks+"/TOH_"+str(N)+"_disks_grammar.json"
    tools.save_dict_in_json(toh_grammar,res_file)
    print("TOH grammar with, ",str(N), "disks saved in :" ,res_file)
    return toh_grammar

def generate_graph_from_reward_matrix(N,R, result_dir_for_N_disks,debug=False):
    from networkx.drawing.nx_agraph import graphviz_layout
    from networkx.drawing.nx_agraph import write_dot

    #1) transform the reward matrix into a numpy array matrices with 0 and 1 that represents the links between the states so that
    #we can plot a graph from it to visualize the links
    df_R =  R.replace(0, 1)
    df_R = df_R.replace(-np.inf, 0)

    if debug:
        print ("type(df_R) : ", type(df_R))
        print ("df_R : ", df_R)
        print(" df_R column : ", df_R.columns)
        print(" df_R rows : ", df_R.index)

    G = nx.Graph()
    labels = {}
    for i in range(len(df_R.columns)):
        a_col= str(df_R.columns[i])
        labels[str(a_col)]= a_col


    for i in range(len(df_R.columns)) :
        if debug: print("i : ", i, " -df_R.columns[i] :  ",df_R.columns[i])
        for j in range(len(df_R.index)):
            if debug :
                print("j : ", j, " -df_R.index[j] :  ",df_R.index[j])
                print("df_R[df_R.columns[i]] : ", df_R[df_R.columns[i]])
            cell_val = df_R.iat[i,j]
            if cell_val>0:

                src_node = str(df_R.columns[i])
                target_node = str(df_R.index[j])
                G.add_node(src_node, label=src_node)
                G.add_node(target_node, label=target_node)
                G.add_edge(src_node, target_node)

    plt.figure(figsize=(15, 8))

    nx.draw(G)
    labels = nx.draw_networkx_labels(G, pos=nx.spring_layout(G))
    plt.tight_layout()
    plt.axis("off")
    res_file = result_dir_for_N_disks+"/TOH_"+str(N)+"_disks_rules.png"
    plt.savefig(res_file)
    print("TOH representation with "+str(N)+" disks saved : ", res_file)

    if debug:
        print("G.nodes(data=True)) : ", G.nodes(data=True))
        print("G.nodes()) : ", G.nodes())
        print("G.edges(data=True) : ", G.edges(data=True))
        print ("labels : ", labels)


    return G

# Generates the reward matrix for the Towers of Hanoi game as a Pandas DataFrame
def generate_reward_matrix(N, debug=False):      # N is the number of discs

    states = list(itertools.product(list(range(1,4)), repeat=N))
    moves = list(itertools.permutations(list(range(1,4)), 2))
    R = pd.DataFrame(index=states, columns=states, data=-np.inf)
    for state in states:
        tower = TowersOfHanoi(state=state)
        for move in moves:
            if tower.move_allowed(move):
                next_state = tower.get_moved_state(move)
                R[state][next_state] = 0
    final_state = tuple([2]*N)          # Define final state as all discs being on the last peg
    R[final_state] += 100               # Add a reward for all moves leading to the final state

    if debug:
        print("states : ", states)
        print("moves : ", moves)
        print("R all : ", R)

    return R, R.values, states, moves

# Learn the Q matrix
def learn_Q(R, gamma=0.8, alpha=1.0, N_episodes=1000, random=False):
    ##a learning rate α = 1, a discount factor γ = 0.8
    Q = np.zeros(R.shape)
    states=list(range(R.shape[0]))
    for n in range(N_episodes):
        Q_previous = Q
        state = np.random.choice(states)                # Randomly select initial state
        next_states = np.where(R[state,:] >= 0)[0]      # Generate a list of possible next states
        next_state = np.random.choice(next_states)      # Randomly select next state from the list of possible next states
        V = np.max(Q[next_state,:])                     # Maximum Q-value of the states accessible from the next state
        if random==False:
            Q[state, next_state] = (1-alpha)*Q[state, next_state] + alpha*(R[state, next_state] + gamma*V)      # Update Q-values
    if np.max(Q) > 0:
        Q /= np.max(Q)      # Normalize Q to its maximum value
    return Q

#Get the policy for each states
def get_policy(Q, R):
    Q_allowed = pd.DataFrame(Q)[pd.DataFrame(R) >= 0].values
    
    policy = []
    for i in range(Q_allowed.shape[0]):
        row = Q_allowed[i,:]
        sorted_vals = np.sort(row)
        sorted_vals = sorted_vals[~np.isnan(sorted_vals)][::-1]
        sorted_args = row.argsort()[np.where(~np.isnan(sorted_vals))][::-1]
        max_vals = [val for val in sorted_vals if val==sorted_vals[0]]
        max_args = [sorted_args[i] for i,val in enumerate(sorted_vals) if val==sorted_vals[0]]
        policy.append(max_args)
        best_policy_value = max_vals[0]
    return policy, best_policy_value

#Play the defined policy
def play(policy, allowed, epsilon=0, last_move=100, debug=False):

    start_state = 0
    end_state = len(policy)-1
    state = start_state
    moves = 0
    number_asked = 0
    ask_for_help = False

    seq_states=[]
    seq_states.append(state)
    while state != end_state:
        if last_move < 3:
            test_eps = 100
        else:
            test_eps = random.uniform(0, 1)
        
        if test_eps < epsilon:
            state = np.random.choice(allowed[state])
        else:
            state = np.random.choice(policy[state])
        moves += 1

        seq_states.append(state)
        #to avoid having verry long sequences, we set a threshold to 10000 moves
        #We advise to rise this number according the value of N disks since the bigger is N, the longer are the sequences
        if moves > 10000:
            print("Maximal number of moves reached: ", moves)
            break


    if debug :
        print("play")
        print("len(seq_states) : ", len(seq_states))
        print("seq_states : ",seq_states)
    return moves, number_asked,seq_states

def play_average(policy, allowed, play_times=100, 
                 epsilon=0, debug=False):

    moves = np.zeros(play_times)
    seq_states = {}
    number_asked = np.zeros(play_times)
    if debug :
        print("play_average")
        print("allowed : ", allowed)
        print("moves : ", moves)

    for n in range(play_times):
        if debug : print("\n PLAY_TIME n° : ", n)
        last_move = np.mean(moves)                
        moves[n], number_asked[n],seq_states[n] = play(policy, allowed,epsilon, last_move)

    if debug : print("moves with update : ", moves)
    return np.mean(moves), np.std(moves), np.mean(number_asked), np.std(number_asked), seq_states

def Q_performance(R, episodes, play_times=100, 
                  random=False, epsilon=0,debug=False):
    if debug : print("\n\n Q_performance-------------------------------------")
    means = np.zeros(len(episodes))
    seq_states = {}
    stds = np.zeros(len(episodes))
    asked = np.zeros(len(episodes))
    asked_stds = np.zeros(len(episodes))
    for n, N_episodes in enumerate(episodes):
        if debug: print(" ==> Training session :", n, " - N_episodes : ", N_episodes)
        Q = learn_Q(R, N_episodes = N_episodes, random=random)
        policy, best_policy_value = get_policy(Q,R)
        if debug:
            print("n :", n, " - N_episodes : ", N_episodes)
            print("policy : ", policy, " - best_policy_value : ", best_policy_value)
        if n == 0:
            allowed, best_policy_value = get_policy(Q,R)
        means[n], stds[n], asked[n], asked_stds[n],seq_states[n] = play_average(policy, allowed, play_times, epsilon, debug)


    return means, stds, asked, asked_stds,seq_states

#Calculate the average Q learning performance
def Q_performance_average(R, episodes, learn_times = 100, play_times=100, 
                          random=False, 
                          epsilon=0,
                          debug=False):
    if debug : print("Q_performance_average")
    means_times = np.zeros((learn_times, len(episodes)))
    seq_states = {}
    stds_times = np.zeros((learn_times, len(episodes)))
    asked_times = np.zeros((learn_times, len(episodes)))
    asked_stds_times = np.zeros((learn_times, len(episodes)))
    for n in range(learn_times):
        print(" learn time : ", n)                                                     
        means_times[n,:], stds_times[n,:], asked_times[n,:], asked_stds_times[n,:], seq_states[n]  = Q_performance(R, episodes, play_times=play_times,
                                                                                                                   random=random, 
                                                                                                                   epsilon=epsilon) 
    means_averaged = np.mean(means_times, axis = 0)
    stds_averaged = np.mean(stds_times, axis = 0)
    asked_averaged = np.mean(asked_times, axis = 0)
    asked_stds_averaged = np.mean(asked_stds_times, axis = 0)
    return means_averaged, stds_averaged,seq_states



def plot_Qlearning_average_number_of_moves(N, episodes,means_averaged,stds_averaged, result_dir):
    fig = plt.figure()
    optimum_moves = 2 ** N - 1
    plt.plot(episodes, means_averaged, 'b-', linewidth=2, label='Average Performance')
    plt.plot(episodes, means_averaged + stds_averaged, 'b-', alpha=0.5)
    plt.plot(episodes, means_averaged - stds_averaged, 'b-', alpha=0.5)
    plt.fill_between(episodes, means_averaged - stds_averaged, means_averaged + stds_averaged, facecolor='blue',
                     alpha=0.5)
    plt.axhline(y=optimum_moves, color='g', label='Optimum')
    plt.xlabel('Number of training episodes')
    plt.ylabel('Number of moves')
    plt.grid('on', which='both')
    plt.title('Q-learning - Towers of Hanoi game with %s discs' % N)
    handles, labels = plt.gca().get_legend_handles_labels()
    plt.legend(handles, labels, fontsize=10)
    res_file = result_dir+ "/TOH_"+str(N)+"_disks_average_number_of_moves.png"
    plt.savefig(res_file)
    plt.show()



#Prepare the final plot

def prepare_plot(episodes, N):
    fig = plt.figure()
    optimum_moves = 2**N - 1
    plt.axhline(y=optimum_moves, color='g', label='Optimum (=%s moves)' % optimum_moves)
    plt.xlabel('Number of training episodes')
    plt.ylabel('Number of moves')
    plt.grid('on', which='both')
    plt.title('Q-learning of the Towers of Hanoi game with %s discs' % N)
    handles, labels = plt.gca().get_legend_handles_labels()
    plt.legend(handles, labels)

def prepare_plot_aksed(episodes, N):
    fig = plt.figure()
    optimum_moves = 2**N - 1
    plt.axhline(y=optimum_moves, color='g', label='Optimum (=%s moves)' % optimum_moves)
    plt.xlabel('Number of training episodes')
    plt.ylabel('Number of asked for help')
    plt.grid('on', which='both')
    plt.title('Q-learning of the Towers of Hanoi game with %s discs' % N)
    handles, labels = plt.gca().get_legend_handles_labels()
    plt.legend(handles, labels)

#Plot mean results
def add_mean_results(episodes, means_averaged, color, label):
    plt.loglog(episodes, means_averaged, color, label)

#Plot mean results
def add_ask_results(episodes, asked_averaged, color, label):
    plt.loglog(episodes, means_averaged, color, label)


#Plot standard deviations
def add_stds_results(episodes, means_averaged, stds_averaged, color, facecolor, label):
    plt.loglog(episodes, means_averaged + stds_averaged, color, alpha=0.5)
    plt.loglog(episodes, means_averaged - stds_averaged, color, alpha=0.5)
    plt.fill_between(episodes, means_averaged-stds_averaged, means_averaged+stds_averaged, facecolor=facecolor, alpha=0.5)

#### MAIN CODE to Launch step 1

In [None]:
N = 3 #Number of disks in TOH
if not path.exists("Results/"):
    os.mkdir("Results/")
    
print("\n\n *********************************************************************** ")
print(" TOH with ", N, " disks ")
print("****************************************************************************\n\n")

result_dir_for_N_disks= "Results/"+str(N)+"_disks_Results"
if not path.exists(result_dir_for_N_disks):
    os.mkdir(result_dir_for_N_disks)


print("\n\n-----------------------------------------------------------------------------------------------")
print("STEP 0 - TOH :  Define game variables and rules")
print("-----------------------------------------------------------------------------------------------")

R, R_values, states, moves = generate_reward_matrix(N)

debug = False
if debug :
    print("R.shape : ", R_values.shape)
    print("R : ", R_values)

G = generate_graph_from_reward_matrix(N,R,result_dir_for_N_disks)
toh_dict_grammar = generate_grammar_from_df(N, R, G,result_dir_for_N_disks)

In [None]:
R.shape
R.describe()

In [None]:
#Advised paramters from the article - Attention to computationnal time and memory consumption
#Parameters for N=3
episodes = [0, 100,300,500,1000]

#Parameters for N=4
#episodes = [0, 1000,2000,3000,4000]

#Parameters for N=6
#episodes = [0, 30000,60000,100000,150000]

learn_times = 100
play_times=100

#Simple parameters to launch and execute quickly the code
#episodes = [0, 1, 2, 5, 10]
#learn_times = 10
#play_times=10



print("\n\n-----------------------------------------------------------------------------------------------")
print("STEP 1 - QLEARNING ")
print("-----------------------------------------------------------------------------------------------")

print("\nSTEP 1.a - QLEARNING :  AA LEARNING, PLAYING and being recorded------------------------------- \n")

print("List of training episodes : ", episodes)
print(" - Learning times : ", learn_times)
print(" - Play times : ", play_times)

means_averaged, stds_averaged ,seq_states= Q_performance_average(R_values, episodes,learn_times, play_times, debug=False)
plot_Qlearning_average_number_of_moves(N, episodes,means_averaged,stds_averaged, result_dir_for_N_disks)



In [None]:
global_dict={}
dict_sequences_of_moves={}
nb_sequences=0

print("\nSTEP 1.b - QLEARNING :  extraction of sequences of moves to explicit the behavioral patterns------------------------------- \n")

dict_sequences_of_moves_per_training_session={}
for j, N_episodes in enumerate(episodes):
    dict_sequences_of_moves_per_training_session[j]={'nb_training_episodes': N_episodes, 'sequences':[]}

result_dir_Q_learning_step = result_dir_for_N_disks + "/Step1-Q-learning/"
if not path.exists(result_dir_Q_learning_step):
    os.mkdir(result_dir_Q_learning_step)

for n in range(learn_times):
    list_sequences_per_learning_time=[]
    if debug : print("\nlearn_times  n :", n)
    sequences_of_learn_times = seq_states[n]
    dict_session={}
    for j, N_episodes in enumerate(episodes):
        list_sequences_per_learning_time_per_episodes=[]
        if debug : print("\n ==> j :", j, " - N_episodes : ",N_episodes)
        list_sequences_of_episode = sequences_of_learn_times[j]
        if debug : print("==> sequences_of_episode :", list_sequences_of_episode)
        dict_play_time={}

        for i in range(play_times):

            sequence_of_one_play_time= list_sequences_of_episode[i]
            if debug :
                print("\n        play_times i :", i)
                print("        sequence_of_one_play_time : ", len(sequence_of_one_play_time), " - ", sequence_of_one_play_time)
            sequence_of_states = []
            sequence_of_moves=[]

            for z in range(len(sequence_of_one_play_time) - 1):
                a_state=sequence_of_one_play_time[z]
                sequence_of_states.append(states[a_state])
                if debug : print("type(states[a_state]) : ",type(states[a_state]))

                current_state = states[a_state]
                next_state = states[sequence_of_one_play_time[z + 1]]

                diff =""
                for t in range(len(current_state)):
                    if current_state[t]!=next_state[t]:
                        diff=str(current_state[t])+"-"+str(next_state[t])

                if debug : print("current_state: ",current_state, " - next_state : ", next_state, " - diff : ", diff)
                if diff !="" :
                    sequence_of_moves.append(diff)

                if (z+1)==(len(sequence_of_one_play_time) - 1) :
                    sequence_of_states.append(next_state)

            if debug :
                print("        sequence_of_states : ", len(sequence_of_states) , " - ",  sequence_of_states)
                print("        sequence_of_moves : ", len(sequence_of_moves), " - ", sequence_of_moves)

            dict_play_time[i] = {'id_play_time': i, 'sequence_of_move':sequence_of_moves, 'sequence_of_states':sequence_of_states}
            list_sequences_per_learning_time_per_episodes.append(sequence_of_moves)
            nb_sequences+=1

        dict_session[j]={'id_session': j, 'session_data':dict_play_time}



        list_sequences_per_learning_time = list_sequences_per_learning_time_per_episodes + list_sequences_per_learning_time

        #Update dict_sequences_of_moves_per_training_session with these sequences of this id_learn_time
        previous_seqs = dict_sequences_of_moves_per_training_session[j]['sequences']
        previous_seqs= previous_seqs+ list_sequences_per_learning_time_per_episodes
        dict_sequences_of_moves_per_training_session[j]['sequences'] = previous_seqs

    dict_sequences_of_moves[n] = {'id_learn_time': n, 'N_episodes': N_episodes, 'learn_times_data': dict_session}
    if debug : print(" j : ", j, " - N_episodes : ", N_episodes, " - len dict_sequences_of_moves_per_training_session[j]['sequences'] : ", len(dict_sequences_of_moves_per_training_session[j]['sequences']))

print("Saving of a total of ", nb_sequences, " sequences into ",result_dir_Q_learning_step,"dict_sequences_of_moves_per_training_session.json...")
tools.save_dict_in_json(dict_sequences_of_moves_per_training_session, result_dir_Q_learning_step+"dict_sequences_of_moves_per_training_session.json")

In [None]:
for key in sorted(dict_sequences_of_moves_per_training_session.keys()):
    file_name = result_dir_Q_learning_step+"/id_training_sessions_" + str(
        key) + "_sequences_of_moves_"+str(learn_times)+"_learn_times_"+str(play_times)+"_play_times"
    print("==>key ", key," : for id_training_session:", str(key), ", with Nb training episodes :  ",dict_sequences_of_moves_per_training_session[key]["nb_training_episodes"],
          " :  ", len(dict_sequences_of_moves_per_training_session[key]["sequences"]), " sequences are saved in : ",
          file_name)
    np.save(file_name, np.asarray(dict_sequences_of_moves_per_training_session[key]["sequences"]))

global_dict={"learn_times":learn_times, "play_times": play_times, "N_disk":N, "episodes":episodes, "dict_sequences_of_moves_per_training_session":dict_sequences_of_moves_per_training_session}
tools.save_dict_in_json(global_dict,result_dir_Q_learning_step+"/2022-04-21-global_dict_with_dict_sequences_of_moves_per_training_session.json")
print("RESULTS: Global_dict with ",str(N), "disks saved in ",result_dir_Q_learning_step,"/2022-04-21-global_dict_with_dict_sequences_of_moves_per_training_session.json")

global_dict={"learn_times":learn_times, "play_times": play_times, "N_disk":N, "episodes":episodes, "dict_sequences_of_moves":dict_sequences_of_moves}
tools.save_dict_in_json(global_dict,result_dir_Q_learning_step+"/2022-04-21-global_dict.json")
print("RESULTS: Global_dict with ",str(N), "disks saved in ",result_dir_Q_learning_step,"2022-04-21-global_dict.json")

## STEP 2 - Train the LSTM RNN with TOH sequences

The Moves Sequence Learning Phase consists in training an RNN with LSTM units (named from here as the LSTM model) with the recorded sequences of moves of the AA. The model needs to learn to predict the AA’s next move at time t based on the current and past ones. <br/>
This step returns a dataset of recordings of hidden patterns (i.e, the activity vectors of the hidden layer generated by the network at each input). <br/>
The trained LSTM model had encoded an implicit representation of the TOH rules due to the learned sequences. 
Let us note that the trained LSTM model is trained on sequences generated from the TOH grammar. <br/>

In [None]:
from tohgrammar import TohGrammar


print("\n\n-----------------------------------------------------------------------------------------------")
print("STEP 2 - RNN-LSTM : Encoding implicit TOH rules representation through learning ")
print("-----------------------------------------------------------------------------------------------")


# NEXT STEP :
# use toh_grammar to generate sequences to train the LSTM network
result_dir_RNN_LSTM_step = result_dir_for_N_disks + "/Step2-RNN_LSTM/"
if not path.exists(result_dir_RNN_LSTM_step):
    os.mkdir(result_dir_RNN_LSTM_step)
    
debug= False
transitions=[]

if debug : print("toh_dict_grammar.keys() : ", toh_dict_grammar.keys())
for k in sorted(toh_dict_grammar.keys(), key=int):
    state = toh_dict_grammar[k]["label"]
    transitions_tuples= toh_dict_grammar[k]["transitions"]
    for tuple in transitions_tuples :
        if tuple[0] not in transitions :
            transitions.append(tuple[0])

print ("Number of transitions in hanoi grammar : ", len(transitions))
print (transitions)


if debug : print( "toh_dict_grammar.keys() : ", toh_dict_grammar.keys())
hanoi_grammar = TohGrammar(transitions, toh_dict_grammar)
TOH_dataset = hanoi_grammar.create_sequences_hanoi(10)

file_name = result_dir_RNN_LSTM_step+"TOH_dataset_" + str(len(TOH_dataset)) + "_sequences_"+ str(N) + "_disks"
print("TOH dataset of :", str(len(TOH_dataset)),  " sequences with TOH of", str(N),", disks saved in : ", file_name, ".npy")
np.save(file_name, np.asarray(TOH_dataset))

transition_dict = hanoi_grammar.get_symbols_dict()

print ("Number of transitions in TOH grammar : ", len(transition_dict))
for k, v in transition_dict.items():
    print(k, " : ", v)

print("\n STEP 2.a STEP 2 - RNN-LSTM :Loading TOH grammar and generation of sequences --------------------------\n")

parameters_json_file = "rnn_lstm_model_parameters.json"
print("Loading parameters for the RNN-LSTM step from the following file : ", parameters_json_file)

dict_of_rnn_lstm_model_parameters = tools.get_dict_from_json(parameters_json_file)

nb_sequences = dict_of_rnn_lstm_model_parameters["nb_sequences"]
keras_model_rnn_backup_file = dict_of_rnn_lstm_model_parameters["keras_model_rnn_backup_file"]

nb_hidden_layer = dict_of_rnn_lstm_model_parameters["nb_hidden_layer"]
nb_LSTM_cell_per_hidden_layer = dict_of_rnn_lstm_model_parameters["nb_LSTM_cell_per_hidden_layer"]
epochs = dict_of_rnn_lstm_model_parameters["epochs"]
debug = dict_of_rnn_lstm_model_parameters["debug"]

# files --------------------------------------------
history_losses_png_file = dict_of_rnn_lstm_model_parameters["history_losses_png_file"]
losses_by_epoch_png_file = dict_of_rnn_lstm_model_parameters["losses_by_epoch_png_file"]
accuracy_by_epoch_png_file = dict_of_rnn_lstm_model_parameters["accuracy_by_epoch_png_file"]

rnn_lstm_model_backup_file = dict_of_rnn_lstm_model_parameters["rnn_lstm_model_backup_file"]

#test_sequences_json_file = dict_of_rnn_lstm_model_parameters["test_sequences_json_file"]
#test_predictions_and_HS_json_file = dict_of_rnn_lstm_model_parameters["test_predictions_and_HS_json_file"]
#rnn_weights_file = dict_of_rnn_lstm_model_parameters["rnn_weights_file"]
#rnn_for_hidden_states_LSTM_layer_weights_file = dict_of_rnn_lstm_model_parameters["rnn_for_hidden_states_LSTM_layer_weights_file"]

#analysis_of_artificial_agent_behavior = dict_of_rnn_lstm_model_parameters["analysis_of_artificial_agent_behavior"],
#artificial_agent_behavior_data_set = dict_of_rnn_lstm_model_parameters["artificial_agent_behavior_data_set"]



In [None]:
from rnnlstmmodel import RnnLstmModel
from datasetofsequences import DataSetOfSequences

print("\n STEP 2.b  - RNN-LSTM :Training, evaluation and test of the model --------------------------\n")
rnn_lstm_model = RnnLstmModel(transition_dict,nb_LSTM_cell_per_hidden_layer,debug)

TOH_dataset_of_sequences = DataSetOfSequences(TOH_dataset,transition_dict)

#TRAINING of the LSTM model --------------------------
input_train_reshape= TOH_dataset_of_sequences.get_input_train_reshape()
expected_output_train_reshape= TOH_dataset_of_sequences.get_expected_output_train_reshape()
input_train_val_reshape=TOH_dataset_of_sequences.get_input_train_val_reshape()
expected_output_train_val_reshape=TOH_dataset_of_sequences.get_expected_output_train_val_reshape()

if debug : 
    print ("epochs : ", epochs," - input_train_reshape.shape : ",input_train_reshape.shape ," - expected_output_train_reshape.shape : ", expected_output_train_reshape.shape ," - input_train_val_reshape.shape : ", input_train_val_reshape.shape ," - expected_output_train_val_reshape.shape : ", expected_output_train_val_reshape.shape ," - keras_model_rnn_backup_file : ",keras_model_rnn_backup_file)


In [None]:
rnn_lstm_model.train_model(epochs,input_train_reshape,expected_output_train_reshape,input_train_val_reshape,expected_output_train_val_reshape,
                           rnn_lstm_model_backup_file, history_losses_png_file, losses_by_epoch_png_file, accuracy_by_epoch_png_file, 
                           result_dir_RNN_LSTM_step)


In [None]:
input_test_reshape= TOH_dataset_of_sequences.get_input_test_reshape()
expected_output_test_reshape= TOH_dataset_of_sequences.get_expected_output_test_reshape()
rnn_lstm_model.evaluate_model (input_test_reshape, expected_output_test_reshape,input_train_reshape ,expected_output_train_reshape)


In [None]:
#TESTING the LSTM model ------------------------------
RNN_LSTM_dict_of_extracted_patterns= TOH_dataset_of_sequences.get_dict_of_extracted_patterns()
rnn_lstm_model.test_model(result_dir_RNN_LSTM_step,input_test_reshape,transition_dict,RNN_LSTM_dict_of_extracted_patterns)


## STEP 3 - eXplainable process to explain the  knowledge construction and evolution of Qlearning Agent

The XAI Phase consist in a post-hoc implicit rule extraction algorithm and a graph visualization technique that are applied to the dataset of recorded hidden patterns to extract graphs of AA behavior at different stages of training.

In [None]:
import time
import tensorflow as tf
from datetime import timedelta


print("\n\n-----------------------------------------------------------------------------------------------")
print("STEP 3 - XAI : Loading XAI parameters and extraction of THE HIDDEN REPRESENTATIONS from RNN LSTM USING sequences from AA playing")
print("---------------------------------------------------------------------------------------------------")
result_dir_xai_step=result_dir_for_N_disks+"/Step3-XAI/"
if not path.exists(result_dir_xai_step):
    os.mkdir(result_dir_xai_step)

xai_parameters_file = "xai_hanoi.json"
print("Loading parameters for the XAI step from the following file : ", xai_parameters_file)
parameters_dict = tools.get_dict_from_json(xai_parameters_file)

clustering_range_min = parameters_dict["clustering_range_min"]
clustering_range_max = parameters_dict["clustering_range_max"]
debug = parameters_dict["debug"]
nb_patterns = parameters_dict["nb_patterns"] #number of patterns you want to extract and explain. If equals to 0, the code will analyze all the recorded patterns


assert (clustering_range_max > clustering_range_min)

print("Parameters for rule extraction algorithms are : ")
print("clustering_range_min : ", clustering_range_min)
print("clustering_range_max : ", clustering_range_max)
print("nb_patterns : ", nb_patterns)

start_time = time.monotonic()
print("\n\n dict_sequences_of_moves_per_training_session---------")
for id_training_session,v in dict_sequences_of_moves_per_training_session.items():
    nb_training_episodes= dict_sequences_of_moves_per_training_session[id_training_session]["nb_training_episodes"]
    print("\n Analysis of training sesion id : ", id_training_session)  # ," - v:  ", v)

    result_dir_id_session = result_dir_xai_step+"N_episode_"+str(id_training_session)+"/"
    if not path.exists(result_dir_id_session):
        os.mkdir(result_dir_id_session)

    print("\n STEP 3.a - XAI : Transform sequences of AA behavior into samples --------------------------\n")

    AA_sequences = dict_sequences_of_moves_per_training_session[id_training_session]["sequences"]

    [AA_input_test, AA_expected_output_test, AA_dict_for_test] = tools.transform_sequences_into_data_for_rnn(transition_dict,
                                                                                                    AA_sequences,
                                                                                                    True, debug)
    print("\nTotal AA_input_test samples          : ", len(AA_input_test))
    print("Total expected_output_test samples: ", len(AA_expected_output_test))

    dict_of_extracted_patterns = tools.initialize_dict_of_extracted_patterns(AA_sequences, debug)
    if debug :
        print("dict_of_extracted_patterns : ", len(dict_of_extracted_patterns))
        print("dict_of_extracted_patterns.keys() : ", len(dict_of_extracted_patterns.keys()))
        print("dict_of_extracted_patterns : ", len(dict_of_extracted_patterns))
        print(dict_of_extracted_patterns[0])

    print("\n STEP 3.b - XAI : Reshape data for Keras Model--------------------------\n")

    AA_input_test = np.asarray(AA_input_test)
    AA_expected_output_test = np.asarray(AA_expected_output_test)

    AA_input_test_reshape = np.reshape(AA_input_test, (AA_input_test.shape[0], 1, AA_input_test.shape[1]))
    AA_expected_output_test_reshape = np.reshape(AA_expected_output_test,
                                              (AA_expected_output_test.shape[0], 1, AA_expected_output_test.shape[1]))

    print("\nReshape test data ...")

    if debug:
        print("\nAA_input_test_reshape.shape: ", AA_input_test_reshape.shape)
        print("input_test_reshape: ", AA_input_test_reshape)
        print("\n---------------")
        print("\nexpected_output_test_reshape.shape: ", AA_expected_output_test_reshape.shape)
        print("expected_output_test_reshape: ", AA_expected_output_test_reshape)

    print("\n STEP 3.c - XAI : Evaluate training for RNN Keras Model for test dataset--------------------------\n")


    rnn_model = rnn_lstm_model.get_rnn_model()
    results = rnn_model.evaluate(AA_input_test_reshape, AA_expected_output_test_reshape, batch_size=1, verbose=1)
    print("     RNN evaluation (test seq)      - Average [loss, accuracy] : ", results)

    print("\n STEP 3.d - XAI : RNN Keras Model predictions for test dataset--------------------------\n")

    AA_input_test_reshape = tf.cast(AA_input_test_reshape, tf.float32)

    model_predictions, hidden_pattern_h, states_c = rnn_model.predict(AA_input_test_reshape)

    print("\n     Nb predictions for test dataset: ", len(model_predictions))
    
    if debug: print(len(dict_of_extracted_patterns), " - ",  len(model_predictions), " - ", len(hidden_pattern_h))
    
    dict_of_extracted_patterns = tools.update_dict_of_extracted_patterns(dict_of_extracted_patterns, model_predictions, hidden_pattern_h,
                                                                          result_dir_RNN_LSTM_step + "/dict_of_extracted_patterns.json",
                                                                         transition_dict, debug)
    
    print("Après update dict_of_extracted_patterns : ", len(dict_of_extracted_patterns))
    print("dict_of_extracted_patterns.keys() : ", len(dict_of_extracted_patterns.keys()))
    print("dict_of_extracted_patterns : ", len(dict_of_extracted_patterns))
    print(dict_of_extracted_patterns[0])
    tools.save_dict_in_json(dict_of_extracted_patterns,result_dir_id_session+"dict_of_extracted_patterns.json" )
    print("\n STEP 3.e - XAI : Knowledge Extraction PROCESS through FSA--------------------------\n")

    tools.knowledge_extraction_process(dict_of_extracted_patterns, clustering_range_min, clustering_range_max,
                                       nb_patterns,result_dir_id_session )


end_time = time.monotonic()
print("Execution time for N : ", N," disks --> ", timedelta(seconds=end_time - start_time) ," seconds")
dict_computational_time.update({N:{"execution_time":(end_time - start_time), "optimum_moves":(2 ** N - 1)}})


