In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import sklearn as sk
import seaborn as sns

In [4]:
REWARD_NONTERMINAL = -1
REWARD_TERMINAL = 10
REWARD_CLIFF = -100
ACTION_DIRECTIONS = [(0, 1), (1, 0), (0, -1), (-1, 0)]

class Environment:

    def __init__(self, nr_columns, nr_rows, nr_actions=4, init_qa_values=0):
        self.world = np.zeros((nr_rows, nr_columns))
        self.nr_columns = nr_columns
        self.nr_rows = nr_rows
        self.nr_actions = nr_actions
        
    def set_world_rewards(self):
        pass
    
    def is_out_of_bounds(self, state):
        if state[0] < 0 or state[0] > self.nr_rows - 1:
            return True
        
        if state[1] < 0 or state[1] > self.nr_columns -1:
            return True
        
        return False
    
    def next_state(self, state, action):
        pass
    
    def check_termination(self, state):
        pass

class CliffEnvironment(Environment):
    
    def __init__(self, nr_columns, nr_rows, nr_actions=4, init_qa_values=0):
        super().__init__(nr_columns, nr_rows)
        self.set_world_rewards()
        
    def set_world_rewards(self):
        self.world[:, :] = REWARD_NONTERMINAL
        self.world[self.nr_rows - 1:, 1:self.nr_columns - 1] = REWARD_CLIFF
        self.world[self.nr_rows - 1, self.nr_columns - 1] = REWARD_TERMINAL
        
    def next_state(self, state, action_index):
        action = ACTION_DIRECTIONS[action_index]
        next_state = state + action
        
        if self.is_out_of_bounds(next_state):
            next_state = state
        
        return next_state
    
    def check_termination(self, state):
        #TODO: Check if in cliff, or if in goal state, if so, return {True}
        return False

In [6]:
ALPHA = 0.1
GAMMA = 1
class Agent:
    def __init__(self, env, epsilon=0.2, init_position=(0,0)):
        self.curr_state = init_position
        self.env = env
        self.q_table = np.zeros((env.nr_columns, env.nr_rows + 1, env.nr_actions))
        
    def run(self):
        pass
    
    def get_next_action(self):
        """Returns the next index of the action according to the epsilon-greedy choice"""
        actions = self.q_table[self.curr_state]
        
        # If we choose randomly
        if np.random.random() < self.epsilon:
            return np.random.choice(4)
        
        return np.argmax(self.env.world[state])
    
    def get_next_state(self, action):
        """Return next theoretical state according to the environment."""
        return self.env.next_state(self.curr_state, action)
    
    def update_q_table(self, action, next_state, next_best_action):
        pass
    
    def get_reward_for_state(self, state):
        return self.env.world[state]
    
    def terminated(self):
        return self.env.check_termination(self.curr_state)
    
    def update_state(self, next_state):
        self.curr_state = next_state
        
class QLearner(Agent):
    def __init__(self, env, nr_episodes, epsilon=0.2 , init_position=(0,0)):
        super().__init__(env, epsilon, init_position)
        self.nr_episodes = nr_episodes
        
    def run(self):
        for i in range(self.nr_episodes):
            while not self.terminated():
                action_index = self.get_next_action()
                next_state = self.get_next_state(action_index)
                next_state_best_action_index = np.argmax(self.q_table[next_state])

                self.update_q_table(next_state, next_state_best_action_index)
                self.update_state(next_state)
                
    def update_q_table(self, action, next_state, next_best_action):
        curr_q = self.q_table[self.curr_state][action]
        update = (self.get_reward_for_state(next_state) + GAMMA * self.q_table[next_state][next_best_action] - curr_q)
        self.q_table[self.curr_state][action] = curr_q + ALPHA * update
        
    
class SarsaLearner(Agent):
    def __init__(self, env, nr_episodes, epsilon=0.2, init_position=(0,0)):
        super().__init__(env, init_position)
        self.nr_episodes = nr_episodes
        
    def run(self):
        for i in range(self.nr_episodes):
            pass
            

SyntaxError: invalid syntax (<ipython-input-6-5a7f4f10a3c4>, line 29)

In [164]:
###
# Main Run - Initialization
###
NR_COLUMNS = 5

# Including the cliff
NR_ROWS = 4

cliffWorld = CliffEnvironment(NR_COLUMNS, NR_ROWS)
agent = QLearner(cliffWorld, (0, NR_ROWS))

In [165]:
print(agent.q_table)

[[[0. 0. 0. 0.]
  [0. 0. 0. 0.]
  [0. 0. 0. 0.]
  [0. 0. 0. 0.]]

 [[0. 0. 0. 0.]
  [0. 0. 0. 0.]
  [0. 0. 0. 0.]
  [0. 0. 0. 0.]]

 [[0. 0. 0. 0.]
  [0. 0. 0. 0.]
  [0. 0. 0. 0.]
  [0. 0. 0. 0.]]

 [[0. 0. 0. 0.]
  [0. 0. 0. 0.]
  [0. 0. 0. 0.]
  [0. 0. 0. 0.]]

 [[0. 0. 0. 0.]
  [0. 0. 0. 0.]
  [0. 0. 0. 0.]
  [0. 0. 0. 0.]]]
