In [31]:
import gym
from gym import spaces
import numpy as np

class MazeEnv(gym.Env):
    def __init__(self, verbose = True):
        self.action_space = spaces.Discrete(4)
        self.observation_space = spaces.Box(low=0,
                                            high=4,
                                            shape=(4, 4),
                                            dtype=np.int16)
        self.reward_range = (-200, 200)

        self.current_episode = 0
        self.success_episode = []
        self.verbose = verbose

    def reset(self):
        self.current_player = 1
        # P means the game is playable, W means somenone wins, L someone lose
        self.state = 'P'
        self.current_step = 0
        self.max_step = 30
        self.world = np.array([[1, 0, 0, 2],
                              [0, 0, 0, 0],
                              [0, 3, 4, 3],
                              [0, 4, 0, 0]])

        return self._next_observation()

    def _next_observation(self):
        obs = self.world

        obs = np.append(obs, [[self.current_player, 0, 0, 0]], axis=0)

        return obs

    def _take_action(self, action):
        current_pos = np.where(self.world == self.current_player)

        if action == 0:
            next_pos = (current_pos[0] - 1, current_pos[1])

            if next_pos[0] >= 0 and int(self.world[next_pos]) == 0:
                self.world[next_pos] = self.current_player
                self.world[current_pos] = 0

            elif next_pos[0] >= 0 and int(self.world[next_pos]) in (1, 2):
                pass

            elif next_pos[0] >= 0 and (int(self.world[next_pos]) == 3):
                self.world[next_pos] = self.current_player
                self.world[current_pos] = 0
                self.state = 'L'

            elif next_pos[0] >= 0 and (int(self.world[next_pos]) == 4):
                self.world[next_pos] = self.current_player
                self.world[current_pos] = 0
                self.state = 'W'

        elif action == 1:
            next_pos = (current_pos[0], current_pos[1] + 1)

            if next_pos[1] < 3 and int(self.world[next_pos]) == 0:
                self.world[next_pos] = self.current_player
                self.world[current_pos] = 0

            elif next_pos[1] < 3 and int(self.world[next_pos]) in (1, 2):
                pass

            elif next_pos[1] < 3 and (int(self.world[next_pos]) == 3):
                self.world[next_pos] = self.current_player
                self.world[current_pos] = 0
                self.state = 'L'

            elif next_pos[1] < 3 and (int(self.world[next_pos]) == 4):
                self.world[next_pos] = self.current_player
                self.world[current_pos] = 0
                self.state = 'W'

        elif action == 2:
            next_pos = (current_pos[0] + 1, current_pos[1])

            if next_pos[0] <= 3 and int(self.world[next_pos]) == 0:
                self.world[next_pos] = self.current_player
                self.world[current_pos] = 0

            elif next_pos[0] <= 3 and int(self.world[next_pos]) in (1, 2):
                pass

            elif next_pos[0] <= 3 and (int(self.world[next_pos]) == 3):
                self.world[next_pos] = self.current_player
                self.world[current_pos] = 0
                self.state = 'L'

            elif next_pos[0] <= 3 and (int(self.world[next_pos]) == 4):
                self.world[next_pos] = self.current_player
                self.world[current_pos] = 0
                self.state = 'W'

        elif action == 3:
            next_pos = (current_pos[0], current_pos[1] - 1)

            if next_pos[1] >= 0 and int(self.world[next_pos]) == 0:
                self.world[next_pos] = self.current_player
                self.world[current_pos] = 0

            elif next_pos[1] >= 0 and int(self.world[next_pos]) in (1, 2):
                pass

            elif next_pos[1] >= 0 and (int(self.world[next_pos]) == 3):
                self.world[next_pos] = self.current_player
                self.world[current_pos] = 0
                self.state = 'L'

            elif next_pos[1] >= 0 and (int(self.world[next_pos]) == 4):
                self.world[next_pos] = self.current_player
                self.world[current_pos] = 0
                self.state = 'W'

    def step(self, action):
        self._take_action(action)
        self.current_step += 1
        if (self.verbose): print(self.world)

        if self.state == "W":
            if (self.verbose): print(f'Player {self.current_player} won')
            reward = 200
            done = True
        elif self.state == 'L':
            if (self.verbose): print(f'Player {self.current_player} lost')
            reward = -200
            done = True
        elif self.state == 'P':
            reward = -1
            done = False

        if self.current_step >= self.max_step:
            done = True

        if self.current_player == 1:
            self.current_player = 2
        else:
            self.current_player = 1

        if done:
            self.render_episode(self.state)
            self.current_episode += 1

        obs = self._next_observation()

        return obs, reward, done, {}

    def render_episode(self, win_or_lose):
        self.success_episode.append(
            'Success' if win_or_lose == 'W' else 'Failure')

        file = open('render/render.txt', 'a')
        file.write('-------------------------------------------\n')
        file.write(f'Episode number {self.current_episode}\n')
        file.write(f'{self.success_episode[-1]} in {self.current_step} steps\n')
        file.close()

In [33]:
from rl.agents import DQNAgent
from rl.memory import SequentialMemory
from rl.policy import BoltzmannQPolicy
from tensorflow.keras.layers import Dense
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam

from stable_baselines3.common.policies import ActorCriticPolicy  # MlpPolicy
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3 import PPO  # PPO2

from env import MazeEnv

# env = DummyVecEnv([lambda: MazeEnv(verbose = False)])
# model = PPO(ActorCriticPolicy, env, learning_rate=0.001)
# model.learn(10000)

def build_model(states, actions):
    model = Sequential()
    model.add(Dense(24, activation='relu', input_shape=states))
    model.add(Dense(24, activation='relu'))
    model.add(Dense(actions, activation='linear'))
    return model

def build_agent(model, actions):
    policy = BoltzmannQPolicy()
    memory = SequentialMemory(limit=50000, window_length=1)
    dqn = DQNAgent(model=model, memory=memory, policy=policy,
                   nb_actions=actions, nb_steps_warmup=10, target_model_update=1e-2)
    return dqn

env = MazeEnv()
actions = env.action_space.n
model = build_model(env.observation_space.shape, actions)
dqn = build_agent(model, actions)
dqn.compile(Adam(lr=1e-3), metrics=['mae'])
dqn.fit(env, nb_steps=50000, visualize=False, verbose=1)

ValueError: Model output "Tensor("dense_23/BiasAdd:0", shape=(None, 5, 4), dtype=float32)" has invalid shape. DQN expects a model that has one dimension for each action, in this case 4.