## Dyna : Integrated Learning, Planning and Reacting

3 steps. Q-Learning, Model Update, N-step imagination

In [3]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
import os, jdc, shutil
from tqdm import tqdm

from rl_glue import RLGlue
from agent import BaseAgent
from maze_env import ShortcutMazeEnvironment

os.makedirs('results', exist_ok=True)

In [4]:
class DynaQAgent(BaseAgent):

    def __init__(self, agent_info):
        super(DynaQAgent, self).__init__()

        try:
            self.num_states = agent_info["num_states"]
            self.num_actions = agent_info["num_actions"]
        except:
            print("You need to pass num_states and num_actions")
        
        self.gamma = agent_info.get("discount", 0.95)
        self.step_size = agent_info.get("step_size", 0.1)
        self.epsilon = agent_info.get("epsilon", 0.1)
        self.planning_steps = agent_info.get("planning_steps", 10)

        self.rand_generator = np.random.RandomState(agent_info.get('random_seed', 42))
        self.planning_rand_generator = np.random.RandomState(agent_info.get('planning_random_seed', 42))

        #world model
        #dictionaries of dictionaries {state: {actions : {reward, next_state}}}
        self.q_values = np.zeros((self.num_states, self.num_actions))
        self.actions = list(range(self.num_actions))
        self.past_action = -1
        self.past_state = -1
        self.model = {}

In [5]:
def update_model(self, past_state, past_action, state, reward):
    """updates the model 
    
    Args:
        past_state       (int): s
        past_action      (int): a
        state            (int): s'
        reward           (int): r
    Returns:
        Nothing
    """
    # Update the model with the (s,a,s',r) tuple (1~4 lines)

    if past_state not in self.model:
        self.model[past_state] = {}
    self.model[past_state][past_action] = {state,reward}

    

In [2]:
def planning_step(self):
    """performs planning, i.e. indirect RL.

    Args:
        None
    Returns:
        Nothing
    """
    
    # The indirect RL step:
    # - Choose a state and action from the set of experiences that are stored in the model. (~2 lines)
    # - Query the model with this state-action pair for the predicted next state and reward.(~1 line)
    # - Update the action values with this simulated experience.                            (2~4 lines)
    # - Repeat for the required number of planning steps.
    #
    # Note that the update equation is different for terminal and non-terminal transitions. 
    # To differentiate between a terminal and a non-terminal next state, assume that the model stores
    # the terminal state as a dummy state like -1
    #
    # Important: remember you have a random number generator 'planning_rand_generator' as 
    #     a part of the class which you need to use as self.planning_rand_generator.choice()
    #     For the sake of reproducibility and grading, *do not* use anything else like 
    #     np.random.choice() for performing search control.

    for _ in range(self.planning_step):
        chosen_state = self.planning_rand_generator.choice(self.model.keys())
        chosen_action = self.planning_rand_generator.choice(self.model[chosen_state].keys())
        next_st, rew = self.model[chosen_state][chosen_action]

        if next_st != -1:
            self.q_values[chosen_state][chosen_action]  += self.step_size *(rew + self.gamma* np.max(self.q_values[next_st])  - self.q_values[chosen_state][chosen_action])
        else:
            self.q_values[chosen_state][chosen_action]  += self.step_size *(rew - self.q_values[chosen_state][chosen_action])


UsageError: Cell magic `%%add_to` not found.


In [None]:
def agent_start(self, state):
    action = self.choose_action_egreedy(self.rand_generator, state)
    self.past_state = state
    self.past_action = action

    return self.past_action

def agent_step(self, reward, state):
    self.q_values[self.past_state][self.past_action] += self.step_size*(reward + self.gamma*(np.max(self.q_values[state])) - self.q_values[self.past_state][self.past_action])

    self.update_model(self.past_state, self.past_action, state, reward)
    self.planning_step()

    action = self.choose_action_egreedy(self.rand_generator, state)
    self.past_state = state
    self.past_action = action

def agent_end(self,reward):
    self.q_values[self.past_state][self.past_action] += self.step_size*(reward - self.q_values[self.past_state][self.past_action])

    self.update_model(self.past_state, self.past_action, -1, reward)
    self.planning_step()

