In [19]:
import gymnasium as gym
from gymnasium import Env
from gymnasium.spaces import Discrete, Box, Tuple
import copy
import numpy as np
import random, json, csv, os

In [20]:
train = True
model_name = "Double_Q_Learning_tuning_alp_"

In [21]:
ACTIONS = [ "none",
            "attack",
            "scolding",
            "intimidate",
            "grudge",
            "sharing_happiness",
            "happy_person",
            "satisfaction",
            "sharing_fear",
            "running_away",
            "sharing_sadness",
            "disappointment",
            "surprise",
            "disbelief",
            "astonishment"
            ]

            
            
EMOTIONS = ["none",
            "fear",
            "joy",
            "surprise",
            "sadness",
            "anger"
            ]

EMOTION_PER_ACTION = { "none": "none",
            "attack" : "anger",
            "scolding": "anger",
            "intimidate": "anger",
            "grudge": "anger",
            "sharing_happiness": "joy",
            "happy_person": "joy",
            "satisfaction": "joy",
            "sharing_fear": "fear",
            "running_away": "fear",
            "sharing_sadness": "sadness",
            "disappointment": "sadness",
            "surprise": "surprise",
            "disbelief": "surprise",
            "astonishment": "surprise"
}

In [22]:
SCENIC_ACTION_SIZE = len(ACTIONS)
EMOTIONAL_REACTION_SIZE = len(EMOTIONS)

MAXIMUM_SKETCH_LENGTH = 15

In [23]:
# returns the code number of an action given by name
def parse_action(action):
    return ACTIONS.index(action)

#returns the action associated to a code
def unparse_action(code):
    return ACTIONS[code]

# returns the code number of an action given by name
def parse_emotion(action):
    return EMOTIONS.index(action)

#returns the action associated to a code
def unparse_emotion(code):
    return EMOTIONS[code]

def convert_action_into_emo(action):
    return EMOTION_PER_ACTION[action]

In [24]:
class ImprobotEnv(Env):

    def __init__(self, actor_space, robot_space, sketch_lenght, env_matrix, P0):

        self.action_space = Discrete(len(robot_space))
        # environment conditions space
        self.observation_space = Tuple((Discrete(len(robot_space)), Discrete(len(actor_space)))) # (robot reaction, actor action)
        self.state = () # (last robot reaction, last actor action)
        self.initial_action_P = P0
        self.sketch_length = sketch_lenght
        self.counter = 0
        self.P = env_matrix # {action: [probability, actor_action, reward]}
        self.q_table_1 = np.zeros([self.observation_space[0].n, self.observation_space[1].n, self.action_space.n])
        self.q_table_2 = np.zeros([self.observation_space[0].n, self.observation_space[1].n, self.action_space.n])
    
    def render(self):
        print(f"Last actor scenic action: {self.state[1]}")
        print(f"Last robot reaction: {self.state[0]}")

    def step(self, action):
        possible_next_states = self.P[action]
        probabilities = []

        for elem in possible_next_states:
            probabilities.append(elem[0])

        #print(probabilities)
        #print(possible_next_states)
        new_observation_idx = np.random.choice(len(possible_next_states), p=probabilities)
        
        new_observation = possible_next_states[new_observation_idx]
        # update state
        new_reaction = action
        new_actor_action = new_observation[1]
        self.state = tuple([new_reaction, new_actor_action])
        reward = new_observation[2]

        #self.counter += 1

        if self.counter == self.sketch_length:
            done = True
        else:
            done = False

        return self.state, reward, done

    def reset(self):
        actor_action = np.random.choice(SCENIC_ACTION_SIZE,p=self.initial_action_P)
        #print(f"initial action: {actor_action}")
        self.state = (0, actor_action) # (none, none)
        self.counter = 0
        return self.state


In [25]:
# initial action probability distribution
equal_prob = 1/(SCENIC_ACTION_SIZE-1)
P0 = np.zeros(15)
P0[1:] = np.full(14, equal_prob)

# DOUBLE-Q-Learning

In [26]:
def double_q_learning_train(alpha, gamma, epsilon, TD_step, min_q_diff, env):
    # mask in order to non choose action none during the training
    mask = np.ones(env.observation_space[0].n, dtype=np.int8)
    mask[0] = 0

    epochs_number = 100000

    for i in range(1, epochs_number):
        q_old_1 = copy.deepcopy(env.q_table_1)
        q_old_2 = copy.deepcopy(env.q_table_2)
        state = env.reset()

        epochs, reward = 0, 0
        done = False
        
        epsilon = -0.9/50000 * i + 1 if i < 50000 else 0.1

        while not done:

            if random.uniform(0, 1) < epsilon:
                action = env.action_space.sample(mask=mask) # Explore action space
                #print(f"random action: {action}")
            else:
                # choose action using q1+q2 value
                action_values = env.q_table_1[state[0]][state[1]][1:] + env.q_table_2[state[0]][state[1]][1:]
                action = np.argmax(action_values) + 1 # Exploit learned values
                #print(f"argmax action: {action}")

            initial_action = action
            states = []
            G = []
            rewds = []
            gamma_arr = []
            
            if random.uniform(0,1) < 0.5:
                old_value = env.q_table_1[state[0]][state[1]][action]
                table_to_update = 1
            else:
                old_value = env.q_table_2[state[0]][state[1]][action]
                table_to_update = 2

            for j in range(TD_step+1):
                if env.counter + j < env.sketch_length:
                    next_state, reward, done = env.step(action)
                    states.append(next_state)
                    rewds.append(reward)
                    gamma_arr.append(gamma**(j))
                    # update q1 or q2 table with probbility 0.5
                    if random.uniform(0, 1) < epsilon:
                        action = env.action_space.sample(mask=mask) # Explore action space
                        #print(f"random action: {action}")
                    else:
                        action_values = env.q_table_1[next_state[0]][next_state[1]][1:] + env.q_table_2[next_state[0]][next_state[1]][1:]
                        action = np.argmax(action_values) + 1 # Exploit learned values
                        #print(f"argmax action: {action}")
                else:
                    break
            
            if env.counter + TD_step < env.sketch_length:
                # update q1 or q2 table with probbility 0.5
                if table_to_update == 1:
                    next_value = env.q_table_2[next_state[0]][next_state[1]][np.argmax(env.q_table_1[next_state[0]][next_state[1]][1:]) + 1]
                    rewds.append(next_value)
                    gamma_arr.append(gamma ** (TD_step+1))
                    #G = np.multiply(rewds,gamma)
            

                else:
                    next_value = env.q_table_1[next_state[0]][next_state[1]][np.argmax(env.q_table_2[next_state[0]][next_state[1]][1:]) + 1]
                    rewds.append(next_value)
                    gamma_arr.append(gamma ** (TD_step+1))
                    #G = np.multiply(rewds,gamma)
            
                    #new_value = (1 - alpha) * old_value + np.sum(alpha * np.array(G))
                    #env.q_table_2[state[0]][state[1]][initial_action] = new_value
            
            G = np.multiply(rewds,gamma)

            if table_to_update == 1:
                new_value = (1 - alpha) * old_value + np.sum(alpha * np.array(G))
                env.q_table_1[state[0]][state[1]][initial_action] = new_value
            else:
                new_value = (1 - alpha) * old_value + np.sum(alpha * np.array(G))
                env.q_table_2[state[0]][state[1]][initial_action] = new_value  
            
            state = states[0]
            
            #state = next_state
            epochs += 1

            env.counter += 1

            if env.counter == env.sketch_length:
                done = True
            else:
                done = False

            #print(f"Episode: {i}")
            #print(f"Epoch: {epochs}")
            #print(f"actual_state: {env.state}")
            #print(f"action choosen: {action}")
            #print(f"next_state: {next_state}")
            #print(f"q_value updated: {env.q_table[state[0]][state[1]][action]}")

        if i % 100 == 0:
            print(f"Episode: {i}")
            print(f"action choosen: {action}")
            print(f"next_state: {next_state}")
            print(f"q_value updated: {env.q_table_1[state[0]][state[1]][action]}")




In [27]:
def policy_eval(env):
    total_epochs = 0
    total_rewards = 0
    episodes = 100

    for _ in range(episodes):
        state = env.reset()
        epochs, reward = 0, 0
        
        done = False
        
        while not done:
            action_values = env.q_table_1[state[0]][state[1]][1:] + env.q_table_2[state[0]][state[1]][1:]
            action = np.argmax(action_values) + 1 # Exploit learned values
            state, reward, _ = env.step(action)

            total_rewards += reward

            env.counter += 1

            if env.counter == env.sketch_length:
                done = True
            else:
                done = False

            epochs += 1

        total_epochs += epochs

    print(f"Results after {episodes} episodes:")
    print(f"Average timesteps per episode: {total_epochs / episodes}")
    print(f"Average rewards per episode: {total_rewards / episodes}")
    return total_rewards / episodes

In [28]:
def save_q_table(env,table_name, dir_name):
    reshaped_Q_1 = env.q_table_1.reshape((env.q_table_1.shape[0]*env.q_table_1.shape[1]),env.q_table_1.shape[2])
    reshaped_Q_2 = env.q_table_2.reshape((env.q_table_2.shape[0]*env.q_table_2.shape[1]),env.q_table_2.shape[2])
    reshaped_Q = np.add(reshaped_Q_1,reshaped_Q_2)
    header = ["S(robot reaction, actor action)", "none", "attack", "scolding", "intimidate", "grudge", "sharing_happiness",
          "happy_person", "satisfaction", "sharing_fear", "running_away", "sharing_sadness", "disappointment",
          "surprise", "disbelief", "astonishment"] if env.observation_space[0].n == 15 else ["S(robot reaction, actor action)", "none", "fear", "joy", "surprise", "sadness", "anger"]
    reaction_unparser = unparse_action if env.observation_space[0].n == 15 else unparse_emotion
    
    data = []
    
    for i in range(reshaped_Q.shape[0]):
        if int(i/env.observation_space[1].n) == 0:
            reaction = reaction_unparser(0)
            row = [(reaction,unparse_action(i))]
        else:
            reaction = int(i/env.observation_space[1].n)
            #print(f"here is the reaction: {reaction}")
            #print(f"here is the i: {i}")
            reaction = reaction_unparser(reaction)
            row = [(reaction, unparse_action((i%env.observation_space[1].n)))]
        row.extend(reshaped_Q[i])
        data.append(row)

    # Scrittura del file CSV
    if not os.path.exists('trials/' + dir_name):
        os.makedirs('trials/' + dir_name)

    with open('trials/' + dir_name + '/' + table_name + '.csv', mode='w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(header)
        writer.writerows(data)

# Action-per-Action learning Test

### Load P action per action matrix

In [29]:
with open('P_action_per_action.json') as f:
    P_aa = json.load(f)

print(P_aa)

# convert keys into numbers
new_P = {}
for k in P_aa.keys():
    new_P[int(k)]=P_aa[k]

P_aa = new_P
print(P_aa)

{'1': [[0.125, 3, 2], [0.125, 5, 0], [0.125, 6, -4], [0.125, 7, 0], [0.125, 8, 4], [0.25, 10, -1], [0.125, 11, 3]], '2': [[0.14285714285714285, 1, 1], [0.14285714285714285, 2, 4], [0.14285714285714285, 8, 0], [0.2857142857142857, 10, 13], [0.14285714285714285, 11, 4], [0.14285714285714285, 14, 2]], '3': [[0.25, 3, 5], [0.08333333333333333, 4, 0], [0.08333333333333333, 5, 12], [0.16666666666666666, 6, 6], [0.08333333333333333, 7, 0], [0.08333333333333333, 8, 2], [0.25, 10, 9]], '4': [[0.125, 3, 0], [0.125, 8, 1], [0.5, 10, 2], [0.25, 11, 1]], '5': [[0.1, 2, 1], [0.1, 5, 1], [0.2, 6, 8], [0.2, 8, 11], [0.2, 10, 15], [0.1, 11, 2], [0.1, 13, 6]], '6': [[0.08333333333333333, 2, 1], [0.08333333333333333, 3, -2], [0.08333333333333333, 5, 0], [0.3333333333333333, 6, 26], [0.08333333333333333, 8, 0], [0.16666666666666666, 11, -2], [0.08333333333333333, 12, -1], [0.08333333333333333, 14, 3]], '7': [[0.25, 3, 6], [0.125, 7, 0], [0.375, 10, 4], [0.125, 11, 3], [0.125, 12, -1]], '8': [[0.2, 3, 6], 

In [30]:
def write_csv(directory_name, avg_reward_aa, avg_reward_ae):
    data = [directory_name, avg_reward_aa, avg_reward_ae]
    with open('trials/summary.csv', mode='a', newline='') as file:
        writer = csv.writer(file)
        #writer.writerow(header)
        writer.writerow(data)

### Create action-per-action environment

## Q-Learning off-policy TD Training

In [31]:
bst_alpha = 0.2
bst_gamma = 0.4 # or 0.6
bst_td = 2

history = { "aa_history" : [],
            "ae_history" : []}

gamma = bst_gamma
epsilon = 0.1
min_difference = 0.000001 # 10^-7
td_step = 14
alpha = bst_alpha

hyperparameters = []
#a = [i / 100.0 for i in range(0,21, 2)]
g = [i / 100.0 for i in range(0,61, 10)]
td = range(0, 16, 1)

for idx, elem in enumerate(g):
    if train:
        hp = {  "gamma" : elem,
                "alpha": alpha,
                "td_step": td_step,
                "epsilon_min": epsilon}
        hyperparameters.append(hp)
        env = ImprobotEnv(actor_space=ACTIONS,robot_space=ACTIONS,sketch_lenght=15,env_matrix=P_aa, P0=P0)
        double_q_learning_train(alpha=alpha,gamma=elem,epsilon=epsilon, TD_step=td_step,min_q_diff=min_difference,env=env)
        history["aa_history"].append(policy_eval(env))

max_idx = np.argmax(history["aa_history"])
avg_reward_aa = history["aa_history"][max_idx]
directory_name = model_name + "bst_aa" + "_hp_gm_" + str(hyperparameters[max_idx]["gamma"]) + "_alph_" + str(hyperparameters[max_idx]["alpha"]) + "_td_" + str(hyperparameters[max_idx]["td_step"]) + "_eps_" + str(hyperparameters[max_idx]["epsilon_min"])
write_csv(directory_name, avg_reward_aa, 0)

Episode: 100
action choosen: 14
next_state: (13, 10)
q_value updated: 0.0
Episode: 200
action choosen: 11
next_state: (12, 10)
q_value updated: 0.0
Episode: 300
action choosen: 10
next_state: (10, 2)
q_value updated: 0.0
Episode: 400
action choosen: 5
next_state: (14, 2)
q_value updated: 0.0
Episode: 500
action choosen: 3
next_state: (7, 3)
q_value updated: 0.0
Episode: 600
action choosen: 7
next_state: (12, 10)
q_value updated: 0.0
Episode: 700
action choosen: 7
next_state: (11, 8)
q_value updated: 0.0
Episode: 800
action choosen: 10
next_state: (6, 3)
q_value updated: 0.0
Episode: 900
action choosen: 12
next_state: (10, 12)
q_value updated: 0.0
Episode: 1000
action choosen: 8
next_state: (6, 14)
q_value updated: 0.0
Episode: 1100
action choosen: 4
next_state: (12, 10)
q_value updated: 0.0
Episode: 1200
action choosen: 6
next_state: (8, 10)
q_value updated: 0.0
Episode: 1300
action choosen: 6
next_state: (2, 2)
q_value updated: 0.0
Episode: 1400
action choosen: 12
next_state: (9, 6)
q

## Q_table visualization

In [32]:
#save_q_table(env_aa,table_name='q_table_aa', dir_name=directory_name)

# Action-per-Emotion Model

### Load P action per emotion matrix

In [33]:
with open('P_action_per_emotion.json') as f:
    P_ae = json.load(f)

print(P_ae)

# convert keys into numbers
new_P = {}
for k in P_ae.keys():
    new_P[int(k)]=P_ae[k]

P_ae = new_P
print(P_ae)

{'1': [[0.15384615384615385, 3, 6], [0.07692307692307693, 5, 4], [0.07692307692307693, 6, 4], [0.07692307692307693, 7, 0], [0.3076923076923077, 8, 7], [0.3076923076923077, 10, 2]], '2': [[0.06666666666666667, 2, 2], [0.1, 3, 4], [0.06666666666666667, 5, 1], [0.2, 6, 34], [0.03333333333333333, 7, 0], [0.1, 8, 11], [0.16666666666666666, 10, 19], [0.13333333333333333, 11, 3], [0.06666666666666667, 12, -2], [0.03333333333333333, 13, 6], [0.03333333333333333, 14, 3]], '3': [[0.019230769230769232, 1, -1], [0.07692307692307693, 2, 6], [0.07692307692307693, 3, 2], [0.038461538461538464, 4, 4], [0.07692307692307693, 5, 1], [0.17307692307692307, 6, 8], [0.07692307692307693, 7, 6], [0.038461538461538464, 8, 3], [0.23076923076923078, 10, 6], [0.11538461538461539, 11, 6], [0.019230769230769232, 12, 4], [0.057692307692307696, 14, 3]], '4': [[0.041666666666666664, 2, 3], [0.08333333333333333, 3, 1], [0.08333333333333333, 4, 0], [0.041666666666666664, 5, -1], [0.16666666666666666, 6, 0], [0.125, 8, -2

## Double-Q-Learning off-policy Training

In [34]:
'''
bst_alpha = 0.06
bst_gamma = 0.6
bst_td = 14

gamma = bst_gamma
epsilon = 0.1
min_difference = 0.000001 # 10^-7
td_step = bst_td
alpha = bst_alpha

hyperparameters = []


#a = [i / 100.0 for i in range(0, 31, 3)]
a = [i / 100.0 for i in range(0, 61, 10)]
td = range(0,16,1)

for idx,elem in enumerate(a):
    if train:
        hp = {  "gamma" : gamma,
                "alpha": elem,
                "td_step": td_step,
                "epsilon_min": epsilon}
        hyperparameters.append(hp)
        env = ImprobotEnv(actor_space=ACTIONS,robot_space=EMOTIONS,sketch_lenght=15,env_matrix=P_ae, P0=P0)
        double_q_learning_train(alpha=elem,gamma=gamma,epsilon=epsilon, TD_step=td_step,min_q_diff=min_difference,env=env)
        history["ae_history"].append(policy_eval(env))

max_idx = np.argmax(history["ae_history"])
avg_reward_ae = history["ae_history"][max_idx]
directory_name = model_name + "bst_ae" + "_hp_gm_" + str(hyperparameters[max_idx]["gamma"]) + "_alph_" + str(hyperparameters[max_idx]["alpha"]) + "_td_" + str(hyperparameters[max_idx]["td_step"]) + "_eps_" + str(hyperparameters[max_idx]["epsilon_min"])
write_csv(directory_name, 0, avg_reward_ae)
'''

'\nbst_alpha = 0.06\nbst_gamma = 0.6\nbst_td = 14\n\ngamma = bst_gamma\nepsilon = 0.1\nmin_difference = 0.000001 # 10^-7\ntd_step = bst_td\nalpha = bst_alpha\n\nhyperparameters = []\n\n\n#a = [i / 100.0 for i in range(0, 31, 3)]\na = [i / 100.0 for i in range(0, 61, 10)]\ntd = range(0,16,1)\n\nfor idx,elem in enumerate(a):\n    if train:\n        hp = {  "gamma" : gamma,\n                "alpha": elem,\n                "td_step": td_step,\n                "epsilon_min": epsilon}\n        hyperparameters.append(hp)\n        env = ImprobotEnv(actor_space=ACTIONS,robot_space=EMOTIONS,sketch_lenght=15,env_matrix=P_ae, P0=P0)\n        double_q_learning_train(alpha=elem,gamma=gamma,epsilon=epsilon, TD_step=td_step,min_q_diff=min_difference,env=env)\n        history["ae_history"].append(policy_eval(env))\n\nmax_idx = np.argmax(history["ae_history"])\navg_reward_ae = history["ae_history"][max_idx]\ndirectory_name = model_name + "bst_ae" + "_hp_gm_" + str(hyperparameters[max_idx]["gamma"]) + "_

## Q_table visualization

In [35]:
#save_q_table(env_ae,table_name='q_table_ae', dir_name=directory_name)

In [36]:
print(history)

{'aa_history': [4.16, 83.3, 88.31, 78.87, 77.33, 89.62, 87.83], 'ae_history': []}
