In [1]:
import random
import pygame
import numpy as np
import pandas as pd
from sklearn.ensemble import ExtraTreesRegressor

import warnings
warnings.filterwarnings('ignore')
def warn(*args, **kwargs):
    return None
warnings.warn = warn

pygame 2.5.2 (SDL 2.28.3, Python 3.9.12)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [2]:
# define constants
width, height = 400, 400
tile_size = 40
x_tiles = width // tile_size
y_tiles = height // tile_size

In [3]:
class Agent:
    def __init__(self, action_space):
        self.x = 0
        self.y = 0
        self.action_space = action_space

    def move(self, action):
        if action == self.action_space['LEFT']:
            self.x = max(self.x - 1, 0)
        elif action == self.action_space['RIGHT']:
            self.x = min(self.x + 1, x_tiles - 1)
        elif action == self.action_space['UP']:
            self.y = max(self.y - 1, 0)
        elif action == self.action_space['DOWN']:
            self.y = min(self.y + 1, y_tiles - 1)
            
    def sample(self):
        actions = list(self.action_space.keys())
        action = random.choice(actions)
        action_num = self.action_space[action]
        return action, action_num

class Game:
    def __init__(self):
        self.action_space = {'LEFT': 0, 'RIGHT': 1, 'UP': 2, 'DOWN': 3}
        self.agent = Agent(self.action_space)
        self.goal = {'x': x_tiles - 1, 'y': y_tiles - 1}
        self.holes = [(2, 2), (7, 2), (2, 7), (7, 7)]  # space holes evenly around grid
        self.reset()

    def reset(self):
        self.agent.x = 0
        self.agent.y = 0
        self.game_over = False
        return (self.agent.x, self.agent.y)

    def is_hole(self, x, y):
        return (x, y) in self.holes

    def step(self, action):
        self.agent.move(action)

        if self.is_hole(self.agent.x, self.agent.y):
            result = 'GAME OVER!'  # agent fell into hole
            reward = -10  # discourage negative behavior
            self.game_result(result, reward)
            done = True
            return (self.agent.x, self.agent.y), reward, True

        elif self.agent.x == self.goal['x'] and self.agent.y == self.goal['y']:
            result = 'GAME COMPLETE!'  # agent reached goal
            reward = 1  # reinforce positive behavior
            self.game_result(result, reward)
            done = True
            return (self.agent.x, self.agent.y), reward, True

        else:
            return (self.agent.x, self.agent.y), 0, False

    def game_result(self, result, reward):
        self.game_over = True
        
    def wait(self):
        waiting = True

In [4]:
game = Game()
current_position = game.reset()

print(current_position)

(0, 0)


In [5]:
print(f'Action space: {game.action_space}')

agent = Agent(game.action_space)
action = agent.sample()

print(f'Sample action: {action[0]}')

Action space: {'LEFT': 0, 'RIGHT': 1, 'UP': 2, 'DOWN': 3}
Sample action: UP


In [6]:
old_position = game.reset()
done = False
reward = 0
i = 0

while not done:
    if i == 0:
        position = old_position
    else:
        action = agent.sample()[1]
        position, reward, done = game.step(action)
        
    print(f'Turn: {i}\nPosition: {position}\nReward: {reward}\nDone: {done}\n----------------')  
    i += 1  # now have episode simulator - should run many simulations and store resulting data

Turn: 0
Position: (0, 0)
Reward: 0
Done: False
----------------
Turn: 1
Position: (0, 0)
Reward: 0
Done: False
----------------
Turn: 2
Position: (1, 0)
Reward: 0
Done: False
----------------
Turn: 3
Position: (0, 0)
Reward: 0
Done: False
----------------
Turn: 4
Position: (1, 0)
Reward: 0
Done: False
----------------
Turn: 5
Position: (0, 0)
Reward: 0
Done: False
----------------
Turn: 6
Position: (0, 1)
Reward: 0
Done: False
----------------
Turn: 7
Position: (0, 1)
Reward: 0
Done: False
----------------
Turn: 8
Position: (1, 1)
Reward: 0
Done: False
----------------
Turn: 9
Position: (1, 2)
Reward: 0
Done: False
----------------
Turn: 10
Position: (2, 2)
Reward: -10
Done: True
----------------


In [7]:
num_episodes = 40
life_memory = []  # entire game data

for i in range(num_episodes):
    old_position = game.reset()
    done = False
    tot_reward = 0
    ep_memory = []  # current episode data

    while not done:
        action = agent.sample()[1]
        position, reward, done = game.step(action)
        tot_reward += reward
        
        (x, y) = old_position
        old_tile = y * 10 + x  # tile values represent grid positions in single dimension
        (x, y) = position
        tile = y * 10 + x
        
        ep_memory.append({
            'episode': i,
            'position': old_position,
            'tile': old_tile,
            'action': action,
            'new_position': position,
            'new_tile': tile,
            'reward': reward
        })

        old_position = position
        old_tile = tile
        
    # incorporate total, decay reward
    num_steps = len(ep_memory)
    for i, ep in enumerate(ep_memory):
        ep['tot_reward'] = tot_reward
        ep['decay_reward'] = i * tot_reward / num_steps
        
    life_memory.extend(ep_memory)
    
memory_df = pd.DataFrame(life_memory)

In [8]:
# check how first episode plays out and second begins
ep_change = memory_df.index[(memory_df['episode'] == 1) & (memory_df['episode'].shift(1) == 0)].min()
memory_df.loc[:ep_change]

Unnamed: 0,episode,position,tile,action,new_position,new_tile,reward,tot_reward,decay_reward
0,0,"(0, 0)",0,2,"(0, 0)",0,0,-10,0.0
1,0,"(0, 0)",0,0,"(0, 0)",0,0,-10,-0.222222
2,0,"(0, 0)",0,2,"(0, 0)",0,0,-10,-0.444444
3,0,"(0, 0)",0,3,"(0, 1)",10,0,-10,-0.666667
4,0,"(0, 1)",10,2,"(0, 0)",0,0,-10,-0.888889
5,0,"(0, 0)",0,3,"(0, 1)",10,0,-10,-1.111111
6,0,"(0, 1)",10,3,"(0, 2)",20,0,-10,-1.333333
7,0,"(0, 2)",20,0,"(0, 2)",20,0,-10,-1.555556
8,0,"(0, 2)",20,3,"(0, 3)",30,0,-10,-1.777778
9,0,"(0, 3)",30,3,"(0, 4)",40,0,-10,-2.0


In [9]:
memory_df.groupby('episode').reward.sum().value_counts()  # agent reached goal in 0/40 games

-10    40
Name: reward, dtype: int64

In [10]:
x = memory_df[['tile', 'action']]
y = 0.5 * memory_df.reward + 0.1 * memory_df.decay_reward + memory_df.tot_reward

model = ExtraTreesRegressor(n_estimators=50)
model.fit(x, y)

ExtraTreesRegressor(n_estimators=50)

In [11]:
def predictions(num_episodes, explore_per):
    life_memory = []

    for i in range(num_episodes):
        old_position = game.reset()
        done = False
        tot_reward = 0
        ep_memory = []

        while not done:
            action = agent.sample()[1]  # guarantee exploration at least every other turn
            position, reward, done = game.step(action)
            tot_reward += reward
            
            (x, y) = old_position
            old_tile = y * 10 + x
            (x, y) = position
            tile = y * 10 + x

            # epsilon-greedy strategy for exploration vs. exploitation
            if np.random.rand() < explore_per:  # agent explores new information with probability explore_per
                action = agent.sample()[1]
            else:  # agent exploits current knowledge with probability (1 - explore_per)
                pred_in = [[tile, i] for i in range(4)]
                action = np.argmax(model.predict(pred_in))

            position, reward, done = game.step(action)
            tot_reward += reward

            ep_memory.append({
                'reward': reward,
                'episode': i,
            })

            old_position = position
            old_tile = tile

        for ep in ep_memory:
            ep['tot_reward'] = tot_reward

        life_memory.extend(ep_memory)
    
    memory_df = pd.DataFrame(life_memory)
    return memory_df.groupby('episode').reward.sum().value_counts()

In [12]:
predictions(40, .5)  # agent reached goal in 19/40 games - substantial improvement

-10    21
 1     19
Name: reward, dtype: int64