In [274]:
from IPython.display import clear_output
from random import random
from time import sleep

In [286]:
class Game:
    '''
    init params:
        grid_size (int) - side length of grid
        player_pos (list as in [x, y]) - starting player pos
        obstacles (list of tuples as in [(o1r, o1c), (o2r, o2c)...]) - positions of all obstacles
        holes (list of tuples as in [(h1r, h1c), (h2r, h2c)...]) - positions of all holes
        bonuses (list of tuples as in [(b1r, b1c), (b2r, b2c)...]) - positions of all bonuses
    '''
    def __init__(self, grid_size = 5, player_pos = None, obstacles = [(2, 3)], holes = [(3, 1)], bonuses = [(0, 0)]):
        self.grid_size = grid_size
        if not player_pos:
            self.player_pos = [self.grid_size-1, 0]
            self.starting_player_pos = self.player_pos.copy()
        else:
            self.player_pos = player_pos
            self.starting_player_pos = self.player_pos.copy()
        self.obstacles = obstacles
        self.holes = holes
        self.start_bonuses = bonuses.copy()
        self.bonuses = bonuses
        self.done = False

    def reset(self):
        '''
        resets environment with deault params

        returns:
        state - list describing player position in form [row, col]
        '''
        self.__init__(player_pos=self.starting_player_pos, bonuses=self.start_bonuses)
        return self.player_pos

    def update(self, action):
        '''
        updates environment with action
        returns: (done, observation, reward)
        '''
        if not self.done:
            reward = -1
            self.player_pos[0] -= action[1]
            self.player_pos[1] += action[0]
            if tuple(self.player_pos) in self.obstacles:
                self.player_pos[0] += action[1]
                self.player_pos[1] -= action[0]
            elif (tuple(self.player_pos)[0] > self.grid_size-1) or (tuple(self.player_pos)[0] < 0) or (tuple(self.player_pos)[1] > self.grid_size-1) or (tuple(self.player_pos)[1] < 0):
                self.player_pos[0] += action[1]
                self.player_pos[1] -= action[0]
            elif tuple(self.player_pos) in self.bonuses:
                reward = 1
                self.bonuses.remove(tuple(self.player_pos))
            elif tuple(self.player_pos) in self.holes:
                reward = -100
                self.done = True
            elif tuple(self.player_pos) == (0, self.grid_size-1):
                self.done = True
            return self.done, tuple(self.player_pos), reward
        else:
            print('\033[1;32mEpisode is done, reset environment')
            return self.done, tuple(self.player_pos), 0

    def render(self):
        '''
        renders environment as string
        '''
        for i in range(self.grid_size):
            for j in range(self.grid_size):
                if (i, j) == tuple(self.player_pos):
                    print('[\033[1;36mX\033[1;37m]', end='')
                elif (i, j) in self.obstacles:
                    print('[#]', end='')
                elif (i, j) in self.holes:
                    print('[\033[1;31mO\033[1;37m]', end='')
                elif (i, j) in self.bonuses:
                    print('[\033[1;33m*\033[1;37m]', end='')
                elif (i, j) == (0, self.grid_size-1):
                    print('[\033[1;32mG\033[1;37m]', end='')
                else:
                    print('\033[1;37m[ ]', end='')
            print()

    def state_to_frame(self):
        '''
        
        '''
        frame = ''
        for i in range(self.grid_size):
            for j in range(self.grid_size):
                if (i, j) == tuple(self.player_pos):
                    frame += '[\033[1;36mX\033[1;37m]'
                elif (i, j) in self.obstacles:
                    frame += '[#]'
                elif (i, j) in self.holes:
                    frame += '[\033[1;31mO\033[1;37m]'
                elif (i, j) in self.bonuses:
                    frame += '[\033[1;33m*\033[1;37m]'
                elif (i, j) == (0, self.grid_size-1):
                    frame += '[\033[1;32mG\033[1;37m]'
                else:
                    frame += '\033[1;37m[ ]'
            frame += '\n'
        return frame

    def player_input(self):
        '''
        asks for and processes player input
        returns: action
        '''
        player_in = input('Input: up, down, left, right')
        action = ()
        while player_in not in ['up', 'down', 'left', 'right']:
            player_in = input('Input not valid\nInput: up, down, left, right')
        if player_in == 'up':
            action = (0, 1)
        elif player_in == 'down':
            action = (0, -1)
        elif player_in == 'left':
            action = (-1, 0)
        elif player_in == 'right':
            action = (1, 0)
        return action

In [276]:
class Agent:
    def __init__(self, env):
        self.Q_table = [[{'up': 0, 'down': 0, 'left': 0, 'right': 0} for col in range(env.grid_size)] for row in range(env.grid_size)]
        self.base = 1

    def update_Q_value(self, reward, prev_pos, cur_pos, last_action, discount_factor=.9):
        '''
        params:
        reward - reward earned from taking last action
        cur_pos - position after taking last action
        last_action - last action taken
        discount_factor - factor that discounts rewards earned later, default is .9

        updates Q-value for state, action pair as mean of old and new Q-values
        '''
        #prev_pos = [cur_pos[0]+last_action[1], cur_pos[1]-last_action[0]]
        next_state_q_values = self.Q_table[cur_pos[0]][cur_pos[1]]
        next_action_sum = 0
        prob_dist = self.make_probability_distribution(cur_pos)
        for next_action in self.Q_table[cur_pos[0]][cur_pos[1]]:
            next_action_sum += prob_dist[next_action]*self.Q_table[cur_pos[0]][cur_pos[1]][next_action]
        new_q_val = reward + discount_factor*next_action_sum
        action_to_word = {(0, 1): 'up', (0, -1): 'down', (-1, 0): 'left', (1, 0): 'right'}
        old_q_val = self.Q_table[prev_pos[0]][prev_pos[1]][action_to_word[last_action]]
        self.Q_table[prev_pos[0]][prev_pos[1]][action_to_word[last_action]] = (new_q_val+old_q_val)/2
        
    def make_probability_distribution(self, state):
        '''
        param:
        state - list describing player position in form [row, col]

        returns:
        probability distribution - dictionary in the form {'up': p_up, 'down': p_down, 'left': p_left, 'right': p_right}
        '''
        prob_dist = {'up': 0, 'down': 0, 'left': 0, 'right': 0}
        q_sum = 0
        for action in prob_dist:
            q_sum += self.base**self.Q_table[state[0]][state[1]][action]
        for action in prob_dist:
            prob_dist[action] = self.base**self.Q_table[state[0]][state[1]][action]/q_sum
        return prob_dist
    
    def predict(self, state):
        '''
        param:
        state - list describing player position in form [row, col]

        returns:
        action - predicted best action by the model
        
        predict best action from state
        '''
        prob_dist = self.make_probability_distribution(state)
        index_to_action = {0: (0, 1), 1: (0, -1), 2: (-1, 0), 3: (1, 0)}
        to_use = [0, 0, 0, 0]
        cur_sum = 0
        for i, p in enumerate(list(prob_dist.values())):
            cur_sum += p
            to_use[i] = cur_sum
        rand_num = random()
        for i, p in enumerate(to_use):
            if rand_num <= p:
                return index_to_action[i]
    
    def update_base(self, inc):
        '''
        increases the base (b) used for generating probability disributions

        given two actions, A and C, with Q-values a and c, respectively - action A will be b^(a-c) times as likely as action C

        param:
        inc - amount to increase the base by
        '''
        self.base += inc

# Default Environment

In [287]:
env = Game()
env.reset()
env.render()
agent = Agent(env)

[[1;33m*[1;37m][1;37m[ ][1;37m[ ][1;37m[ ][[1;32mG[1;37m]
[1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ]
[1;37m[ ][1;37m[ ][1;37m[ ][#][1;37m[ ]
[1;37m[ ][[1;31mO[1;37m][1;37m[ ][1;37m[ ][1;37m[ ]
[[1;36mX[1;37m][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ]


In [288]:
frames = {}
for episode in range(10000):
    done = False
    state = env.reset()
    last_state = state
    frames[episode+1] = [(env.state_to_frame(), 0)]
    score = 0
    while not done:
        action = agent.predict(state)
        done, state, reward = env.update(action)
        agent.update_Q_value(reward, last_state, state, action)
        last_state = state
        score += reward
        agent.update_base(.01)
        frames[episode+1].append((env.state_to_frame(), reward))
    print(f'episode {episode+1} complete')

episode 1 complete
episode 2 complete
episode 3 complete
...
episode 10000 complete


In [289]:
for frame, reward in frames[10000]:
    print(f'episode: 1000\n{frame}reward: {reward}')
    sleep(.2)
    clear_output(wait=True)

episode: 1000
[1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][[1;36mX[1;37m]
[1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ]
[1;37m[ ][1;37m[ ][1;37m[ ][#][1;37m[ ]
[1;37m[ ][[1;31mO[1;37m][1;37m[ ][1;37m[ ][1;37m[ ]
[1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ]
reward: -1


In [200]:
for episode in frames:
    for frame, reward in frames[episode]:
        print(f'episode: {episode}\n{frame}reward: {reward}')
        sleep(.1)
        clear_output(wait=True)

episode: 17
[[1;33m*[1;37m][1;37m[ ][1;37m[ ][1;37m[ ][[1;32mG[1;37m]
[1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ]
[1;37m[ ][1;37m[ ][[1;36mX[1;37m][#][1;37m[ ]
[1;37m[ ][[1;31mO[1;37m][1;37m[ ][1;37m[ ][1;37m[ ]
[1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ]
reward: -1


KeyboardInterrupt: 

In [290]:
agent.Q_table

[[{'up': -4.13126379206069,
   'down': -3.5484342912167746,
   'left': -4.136625519591362,
   'right': -3.447911610818853},
  {'up': -3.4481905274514353,
   'down': -4.1340777030045635,
   'left': -4.332170973552806,
   'right': -2.7146850782856173},
  {'up': -2.7148656953631867,
   'down': -3.4721017653885315,
   'left': -3.5384548184296216,
   'right': -1.9018366553460935},
  {'up': -1.9020797980458197,
   'down': -2.7075419435788293,
   'left': -2.744123520853549,
   'right': -1.0},
  {'up': 0, 'down': 0, 'left': 0, 'right': 0}],
 [{'up': -2.5127872516420426,
   'down': -4.6930189666653455,
   'left': -4.324917669937912,
   'right': -4.101659609754959},
  {'up': -3.448239363461256,
   'down': -4.856712726218718,
   'left': -4.684476720829288,
   'right': -3.443769376200086},
  {'up': -2.714879485937714,
   'down': -4.122216157607167,
   'left': -4.062915502052766,
   'right': -2.7146782906855904},
  {'up': -1.9020257901783266,
   'down': -2.7640648991257715,
   'left': -2.7085041650

# Large Environment

In [273]:
int(random()*10)+1

4

In [282]:
large_env = Game(grid_size=10)

In [283]:
large_env.render()

[[1;33m*[1;37m][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][[1;32mG[1;37m]
[1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ]
[1;37m[ ][1;37m[ ][1;37m[ ][#][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ]
[1;37m[ ][[1;31mO[1;37m][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ]
[[1;36mX[1;37m][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ]
[1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ]
[1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ]
[1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ]
[1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ]
[1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ][1;37m[ ]