# 1. Create a custom gym environment

In [1]:
from gym import Env
from gym.spaces import Discrete, Box
import numpy as np
import random
import math
from enum import Enum

In [2]:
class Action(Enum):
    UP = 0
    DOWN = 1
    LEFT = 2
    RIGHT = 3

In [3]:
class GameEnv(Env):
    def __init__(self, size, mode='static'):
        self.mode = mode
        self.size = size
        self.GAME_LENGTH = 0.5 * size**2
        self.action_space = Discrete(4)
        self.observation_space = Box(low=-1, high=1, shape=(self.size, self.size), dtype=np.int32)
        self.state, self.player = self.createBoard()
        self.visited = self.state
        self.time_remaining = self.GAME_LENGTH
        
    def step(self, action):
        self.time_remaining -= 1
        done = False
        
        #evaluate move, save value of the new space before move then update the state
        invalid_move = False
        back_track = False
        if Action(action) == Action.UP:
            new_pos = (self.player[0] -1, self.player[1])
            if new_pos[0] >= 0:
                new_space_val, back_track = self.evalMove(new_pos)
            else:
                invalid_move = True
        elif Action(action) == Action.DOWN:
            new_pos = (self.player[0] +1, self.player[1])
            if new_pos[0] < self.size:
                new_space_val, back_track = self.evalMove(new_pos)
            else:
                invalid_move = True
        elif Action(action) == Action.LEFT:
            new_pos = (self.player[0], self.player[1] -1)
            if new_pos[1] >= 0:
                new_space_val, back_track = self.evalMove(new_pos)
            else:
                invalid_move = True
        elif Action(action) == Action.RIGHT:
            new_pos = (self.player[0], self.player[1] +1)
            if new_pos[1] < self.size:
                new_space_val, back_track = self.evalMove(new_pos)
            else:
                invalid_move = True
        else:
            print("Invalid input to step function")
            
        #evaluate reward 
        reward = 0
        if invalid_move:
            reward = -0.8
            done = False
        elif back_track:
            reward = -0.25
            done = False
        else:
            if new_space_val == 0:
                reward = -0.04
                done = False
            elif new_space_val == 3:
                reward = 1
                done = True
        
        #evaluate if out of time
        if self.time_remaining == 0:
            done = True
            reward = -1
            
        #placeholder for required return value
        info = {}
        
        return self.state, reward, done, info
    
    def evalMove(self, new_pos):
        if self.visited[new_pos] == -1:
            back_track = True
        else:
            back_track = False
        self.visited[self.player] = -1
        new_space_val = self.state[new_pos]
        self.state[self.player] = 0
        self.state[new_pos] = 2
        self.player = new_pos
        return new_space_val, back_track
        
    def render(self, mode='human'):
        print(self.state)
    
    def reset(self):
        self.state, self.player =  self.createBoard()
        self.time_remaining = self.GAME_LENGTH
        return self.state
    
    def createBoard(self):
        board = np.zeros((self.size,self.size), dtype=np.int32)
        if self.mode == 'static':
            player_pos = (0,0) #(np.random.randint(self.size), np.random.randint(self.size))
            goal_pos = (7,7)
        elif self.mode == 'random':
            player_pos = (np.random.randint(self.size), np.random.randint(self.size))
            goal_pos = (np.random.randint(self.size), np.random.randint(self.size))
            player_goal_distance = math.sqrt((player_pos[0] - goal_pos[0])**2 + (player_pos[1] - goal_pos[1])**2)
            while player_goal_distance < self.size/2:
                goal_pos = (np.random.randint(self.size), np.random.randint(self.size))
                player_goal_distance = math.sqrt((player_pos[0] - goal_pos[0])**2 + (player_pos[1] - goal_pos[1])**2)
        
        board[player_pos] = 2
        board[goal_pos] = 3
        return board, player_pos

### Test

In [4]:
env = GameEnv(10)
episodes = 15
for episode in range(1, episodes+1):
    state = env.reset()
    done = False
    score = 0
    num_moves = 0
    num_backtrack = 0
    while not done:
        action = env.action_space.sample()
        n_state, reward, done, info = env.step(action)
        if reward == -0.25:
            num_backtrack += 1
        score += reward
        num_moves += 1
        
    print(f'Episode:{episode} Score:{score} Moves:{num_moves} Backtracks:{num_backtrack}')

Episode:1 Score:-16.830000000000002 Moves:50 Backtracks:19
Episode:2 Score:-9.939999999999996 Moves:50 Backtracks:26
Episode:3 Score:-13.43 Moves:50 Backtracks:39
Episode:4 Score:-18.75 Moves:50 Backtracks:39
Episode:5 Score:-17.019999999999996 Moves:50 Backtracks:38
Episode:6 Score:-17.44 Moves:50 Backtracks:40
Episode:7 Score:-16.549999999999997 Moves:50 Backtracks:43
Episode:8 Score:-17.1 Moves:50 Backtracks:42
Episode:9 Score:-16.0 Moves:50 Backtracks:44
Episode:10 Score:-14.529999999999994 Moves:50 Backtracks:37
Episode:11 Score:-16.259999999999998 Moves:50 Backtracks:38
Episode:12 Score:-16.179999999999996 Moves:50 Backtracks:34
Episode:13 Score:-18.750000000000004 Moves:50 Backtracks:39
Episode:14 Score:-16.89 Moves:50 Backtracks:41
Episode:15 Score:-17.1 Moves:50 Backtracks:42


In [5]:
state = env.reset()
done = False
score = 0
while not done:
    action = env.action_space.sample()
    n_state, reward, done, info = env.step(action)
    score += reward
    env.render()

print(f'Episode:{episode} Score:{score}')

[[2 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 3 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]]
[[2 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 3 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]]
[[0 2 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 3 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]]
[[2 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 3 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]]
[[2 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0

# 2. Create Deep Learning Model

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten
from tensorflow.keras.optimizers import Adam

In [None]:
def build_model(states, actions):
    model = Sequential()
    model.add(Flatten(input_shape=(1,states[0],states[1])))
    model.add(Dense(100, activation='relu'))
    model.add(Dense(100, activation='relu'))
    model.add(Dense(actions, activation='linear'))
    return model

In [None]:
del model
del env

In [None]:
env = GameEnv(10, mode='random')
states = env.observation_space.shape
actions = env.action_space.n
model = build_model(states,actions)

In [None]:
model.summary()

# 3. Build Agent

In [None]:
from rl.agents.dqn import DQNAgent
from rl.policy import BoltzmannQPolicy, MaxBoltzmannQPolicy, LinearAnnealedPolicy, EpsGreedyQPolicy
from rl.memory import SequentialMemory
#from tf_agents.environments import tf_py_environment

In [None]:
def build_agent(model, actions):
    policy = BoltzmannQPolicy()
    memory = SequentialMemory(limit=100000, window_length=1)
    dqn = DQNAgent(model=model, memory=memory, policy=policy,
                   nb_actions=actions, nb_steps_warmup=100, target_model_update=1e-2)
    return dqn

In [None]:
#env =  tf_py_environment.TFPyEnvironment(env)

dqn = build_agent(model, actions)
dqn.compile(Adam(lr=1e-3), metrics=['mae'])
dqn.fit(env, nb_steps=100000, visualize=False, verbose=1)

In [None]:
def dqn_tester(dqn, num_eps):
    num_eps = num_eps
    scores = dqn.test(env, nb_episodes=num_eps, visualize=False)
    print("Mean Reward: " + str(np.mean(scores.history['episode_reward'])))
    num_win = 0
    for score in scores.history['episode_reward']:
        if score > 0:
            num_win += 1
    print(f'Win Rate: {round(num_win/num_eps, 2) * 100}%')
dqn_tester(dqn, 15)

# 4. Saving and Reloading Model

In [None]:
dqn.save_weights('saved_models/static_10x10_perfecto.h5f', overwrite=False)

In [None]:
del model
del dqn
del env

In [None]:
env = GameEnv(10, mode='random')
actions = env.action_space.n
states = env.observation_space.shape
model = build_model(states, actions)
dqn = build_agent(model, actions)
dqn.compile(Adam(lr=1e-3), metrics=['mae'])

In [None]:
dqn.load_weights('saved_models/static_10x10_perfecto.h5f')

In [None]:
dqn_tester(dqn, 15)