# Prepare

## Import

In [1]:
import gym
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from datetime import datetime


# Configuration parameters for the whole setup
seed = 42
gamma = 0.75  # Discount factor for past rewards
max_steps_per_episode = 100 #10000
# env = gym.make("CartPole-v0")  # Create the environment
# env.seed(seed)
eps = np.finfo(np.float32).eps.item()  # Smallest number such that 1.0 + eps != 1.0

2022-12-30 12:54:27.943127: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE4.1 SSE4.2
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


## Two Step Task

In [2]:
# encoding of the higher stages
S_1 = 0
S_2 = 1
S_3 = 2
nb_states = 3

class two_step_task():
    def __init__(self):
        # start in S_1
        self.state = S_1
        
        # defines what is the stage with the highest expected reward. Initially random
        self.highest_reward_second_stage = np.random.choice([S_2,S_3])
        
        self.num_actions = 2
        self.reset()
        
        # initialization of plotting variables
        common_prob = 0.8
        self.transitions = np.array([
            [common_prob, 1-common_prob],
            [1-common_prob, common_prob]
        ])
        self.transition_count = np.zeros((2,2,2))
        
        self.last_action = None
        self.last_state = None
    
    def get_state(self):
        one_hot_array = [0.,0.,0.]
        one_hot_array[self.state] = 1.0
        return tf.convert_to_tensor(one_hot_array)

    def possible_switch(self):
        if (np.random.uniform() < 0.025):
            # switches which of S_2 or S_3 has expected reward of 0.9
            self.highest_reward_second_stage = S_2 if (self.highest_reward_second_stage == S_3) else S_3
            
    def get_rprobs(self):
        """
        probability of reward of states S_2 and S_3, in the form [[p, 1-p], [1-p, p]]
        """
        if (self.highest_reward_second_stage == S_2):
            r_prob = 0.9
        else:
            r_prob = 0.1
        
        rewards = np.array([
            [r_prob, 1-r_prob],
            [1-r_prob, r_prob]
        ])
        return rewards
            
    def isCommon(self,action,state):
        if self.transitions[action][state] >= 1/2:
            return True
        return False
        
    def updateStateProb(self,action):
        if self.last_is_rewarded: #R
            if self.last_is_common: #C
                if self.last_action == action: #Rep
                    self.transition_count[0,0,0] += 1
                else: #URep
                    self.transition_count[0,0,1] += 1
            else: #UC
                if self.last_action == action: #Rep
                    self.transition_count[0,1,0] += 1
                else: #URep
                    self.transition_count[0,1,1] += 1
        else: #UR
            if self.last_is_common:
                if self.last_action == action:
                    self.transition_count[1,0,0] += 1
                else:
                    self.transition_count[1,0,1] += 1
            else:
                if self.last_action == action:
                    self.transition_count[1,1,0] += 1
                else:
                    self.transition_count[1,1,1] += 1
                    
        
    def stayProb(self):
        print(self.transition_count)
        row_sums = self.transition_count.sum(axis=-1)
        stay_prob = self.transition_count / row_sums[:,:,np.newaxis] 
       
        return stay_prob

    def reset(self):
        self.timestep = 0
        
        # for the two-step task plots
        self.last_is_common = None
        self.last_is_rewarded = None
        self.last_action = None
        self.last_state = None
        
        # come back to S_1 at the end of an episode
        self.state = S_1
        
        return self.get_state()
        
    def step(self,action):
        self.timestep += 1
        self.last_state = self.state
        
        # get next stage
        if (self.state == S_1):
            # get reward
            reward = 0
            # update stage
            self.state = S_2 if (np.random.uniform() < self.transitions[action][0]) else S_3
            # keep track of stay probability after first action
            if (self.last_action != None):    
                self.updateStateProb(action)
            self.last_action = action
            # book-keeping for plotting
            self.last_is_common = self.isCommon(action,self.state-1)
            
        else:# case S_2 or S_3
            # get probability of reward in stage
            r_prob = 0.9 if (self.highest_reward_second_stage == self.state) else 0.1
            # get reward
            reward = 1 if np.random.uniform() < r_prob else 0
            # update stage
            self.state = S_1
            # book-keeping for plotting
            self.last_is_rewarded = reward

        # new state after the decision
        new_state = self.get_state()
        if self.timestep >= 200: 
            done = True
        else: 
            done = False
        return new_state,reward,done,self.timestep
    
    def trial(self,action):
        # do one action in S_1, and keep track of the perceptually distinguishable state you arive in
        observation,_,_,_ = self.step(action)
        # do the same action in the resulting state (S_2 or S_3). The action doesn't matter, the reward does
        _,reward,done,_ = self.step(action)
        return observation,reward,done,self.timestep
    
env = two_step_task()

## LOSS

In [3]:

def get_n_step_return(
    rewards: tf.Tensor,
    values: tf.Tensor,
    n: int,
    gamma: float):
    '''Fonction qui retourne R_t, le gamma utilisé est celui préconisé par 
    Wang et al. (2018), Methods/Simulation1
    Version AVEC bootstrap (utilisation de la valeur prédite au dernier step
    comme point de départ)
    '''
    returns = tf.TensorArray(dtype=tf.float32, size=n)
    # Start from the end of `rewards` and accumulate reward sums
    #into the `returns` array
    rewards = rewards[::-1]
    values =  values[::-1]
    
    # values is inverted
    discounted_sum = values[0]
    for i in tf.range(n):
        discounted_sum = rewards[i] + gamma * discounted_sum
        returns = returns.write(i, int(discounted_sum))
    
    return returns.stack()



In [4]:
def compute_loss(
        action_probs: tf.Tensor,
        values: tf.Tensor,
        rewards: tf.Tensor,
        entropy : tf.Tensor,  
        gamma: float = gamma,
        beta_v: float = 0.05,
        beta_e : float = 0.05 ) -> tf.Tensor:
    """Computes the combined actor-critic loss."""
    
    R_t = get_n_step_return(
            rewards=rewards,
            values=values, 
            n=rewards.shape[0], 
            gamma=gamma)
    delta = R_t - values
    delta_nogradient = tf.stop_gradient(delta)
          
    critic_loss = 0.5 * tf.reduce_sum(tf.square(delta))
    action_log_probs = tf.math.log(action_probs + 1e-7)
    actor_loss = tf.reduce_sum(action_log_probs * delta_nogradient)
    total_loss = actor_loss + beta_v * critic_loss + beta_e * entropy
    
    return total_loss

## LSTM

In [5]:
num_inputs = 6 #states + reward + action = 3 + 1 + 2
num_actions = 2
num_hidden = 48

inputs = layers.Input(shape=(None,num_inputs))
common = layers.LSTM(num_hidden)(inputs)
action = layers.Dense(num_actions, activation="softmax")(common)
critic = layers.Dense(1)(common)

model = keras.Model(inputs=inputs, outputs=[action, critic])

# Train

In [6]:

optimizer = keras.optimizers.Adam(learning_rate=0.01)
action_probs_history = []
critic_value_history = []
rewards_history = []
running_reward = 0
episode_count = 0
  

while True:  # Run until solved
    state = env.reset()
    episode_reward = 0
    reward = 0.0
    action_probs = [1,0]
    inputs = tf.zeros((1,1,6))
    
    episode_entropy = tf.zeros(())
    
    with tf.GradientTape() as tape:
        for timestep in range(1, max_steps_per_episode):
            
            input = tf.concat([state, action_probs],0)
            input = tf.concat([input, [reward]],0)
            input = tf.reshape(input, (1, 1, num_inputs))

            inputs = tf.concat([inputs, input],1)

            # Predict action probabilities and estimated future rewards from environment state
            action_probs, critic_value = model(inputs)
            
            if np.isnan(action_probs.numpy()).any():
                print(action_probs)
                break                
            
            critic_value_history.append(critic_value[0, 0])

            # Sample action from action probability distribution
            action = np.random.choice(num_actions, p=np.squeeze(action_probs))
            action_probs_history.append(tf.math.log(action_probs[0, action]))

            # Apply the sampled action in our environment
            state, reward, done, _ = env.trial(action)
            #state, reward, done, _ = env.step(np.random.randint(0,2))
            rewards_history.append(reward)
            episode_reward += reward
            
            if done:
                break

        # Update running reward to check condition for solving
        #running_reward = episode_reward + running_reward

        # Calculating loss values to update our network
        
        #entropy
        entropy = -tf.math.reduce_sum(tf.math.multiply(action_probs,tf.math.log(action_probs + 1e-7)))
        episode_entropy += entropy

        if len(rewards_history) >= max_steps_per_episode - 1:
            loss_value = compute_loss(
                tf.convert_to_tensor(np.float32(action_probs_history)), 
                tf.convert_to_tensor(np.float32(critic_value_history)), 
                tf.convert_to_tensor(np.float32(rewards_history)), 
                tf.convert_to_tensor(episode_entropy))
            
            # Backpropagation
            grads = tape.gradient(loss_value, model.trainable_variables[0:-2])
            optimizer.apply_gradients(zip(grads, model.trainable_variables[0:-2]))

        # Clear the loss and reward history
        action_probs_history.clear()
        critic_value_history.clear()
        rewards_history.clear()

    # Log details
    episode_count += 1
    if episode_count % 10 == 0:
        template = "reward: {:.2f} at episode {}"
        print(template.format(episode_reward, episode_count))
        #print(env.stayProb())

    if episode_reward > 90:  # Condition to consider the task solved
        print("Solved at episode {}!".format(episode_count))
        break

    
    # if episode_count > 100:
    #     break
    
path = "model_" + datetime.now().strftime("%m%d-%H%M%S") + ".h5"
model.save(path)

reward: 26.00 at episode 10
reward: 22.00 at episode 20
reward: 30.00 at episode 30
reward: 25.00 at episode 40
reward: 27.00 at episode 50
reward: 32.00 at episode 60
reward: 24.00 at episode 70
reward: 29.00 at episode 80
reward: 19.00 at episode 90
reward: 25.00 at episode 100
reward: 19.00 at episode 110
reward: 25.00 at episode 120
reward: 32.00 at episode 130
reward: 20.00 at episode 140
reward: 28.00 at episode 150
reward: 21.00 at episode 160
reward: 25.00 at episode 170
reward: 25.00 at episode 180
reward: 24.00 at episode 190
reward: 30.00 at episode 200
reward: 25.00 at episode 210
reward: 16.00 at episode 220
reward: 27.00 at episode 230
reward: 29.00 at episode 240
reward: 25.00 at episode 250
reward: 24.00 at episode 260
reward: 26.00 at episode 270
reward: 25.00 at episode 280
reward: 31.00 at episode 290
reward: 26.00 at episode 300
reward: 25.00 at episode 310
reward: 24.00 at episode 320
reward: 27.00 at episode 330
reward: 27.00 at episode 340
reward: 23.00 at episod

KeyboardInterrupt: 