In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten
from tensorflow.keras.optimizers import Adam
import random
from collections import deque

class MazeEnvironment:
    def __init__(self, size=5, walls_percentage=0.2):
        self.size = size
        self.walls_percentage = walls_percentage
        self.reset()

    def reset(self):
        self.maze = np.zeros((self.size, self.size))
        self._add_walls()
        self.player_position = (0, 0)
        self.goal_position = (self.size - 1, self.size - 1)
        self.maze[self.goal_position] = 2
        self.state = self._get_state()
        return self.state

    def _add_walls(self):
        num_walls = int(self.size * self.size * self.walls_percentage)
        for _ in range(num_walls):
            x, y = random.randint(0, self.size - 1), random.randint(0, self.size - 1)
            if (x, y) != (0, 0) and (x, y) != (self.size - 1, self.size - 1):
                self.maze[x, y] = -1

    def _get_state(self):
        state = self.maze.copy()
        state[self.player_position] = 1
        return state.flatten()

    def step(self, action):
        x, y = self.player_position
        if action == 0:  # up
            x = max(0, x - 1)
        elif action == 1:  # down
            x = min(self.size - 1, x + 1)
        elif action == 2:  # left
            y = max(0, y - 1)
        elif action == 3:  # right
            y = min(self.size - 1, y + 1)

        if self.maze[x, y] != -1:
            self.player_position = (x, y)

        self.state = self._get_state()
        reward = -1
        done = self.player_position == self.goal_position

        if done:
            reward = 100

        return self.state, reward, done

    def render(self):
        maze = self.maze.copy()
        maze[self.player_position] = 1
        print(maze)

class DQN:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=2000)
        self.gamma = 0.95
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.learning_rate = 0.001
        self.model = self._build_model()

    def _build_model(self):
        model = Sequential()
        model.add(Dense(24, input_dim=self.state_size, activation='relu'))
        model.add(Dense(24, activation='relu'))
        model.add(Dense(self.action_size, activation='linear'))
        model.compile(loss='mse', optimizer=Adam(lr=self.learning_rate))
        return model

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        act_values = self.model.predict(state)
        return np.argmax(act_values[0])

    def replay(self, batch_size):
        minibatch = random.sample(self.memory, batch_size)
        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
                target = (reward + self.gamma * np.amax(self.model.predict(next_state)[0]))
            target_f = self.model.predict(state)
            target_f[0][action] = target
            self.model.fit(state, target_f, epochs=1, verbose=0)
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

if __name__ == "__main__":
    env = MazeEnvironment(size=5)
    state_size = env.size * env.size
    action_size = 4
    agent = DQN(state_size, action_size)
    episodes = 1000

    for e in range(episodes):
        state = env.reset()
        state = np.reshape(state, [1, state_size])
        for time in range(500):
            action = agent.act(state)
            next_state, reward, done = env.step(action)
            next_state = np.reshape(next_state, [1, state_size])
            agent.remember(state, action, reward, next_state, done)
            state = next_state
            if done:
                print(f"episode: {e}/{episodes}, score: {time}, epsilon: {agent.epsilon}")
                break
        if len(agent.memory) > 32:
            agent.replay(32)




episode: 0/1000, score: 180, epsilon: 1.0
episode: 1/1000, score: 87, epsilon: 0.995


episode: 3/1000, score: 118, epsilon: 0.985074875
episode: 4/1000, score: 469, epsilon: 0.9801495006250001


episode: 5/1000, score: 151, epsilon: 0.9752487531218751
episode: 6/1000, score: 27, epsilon: 0.9703725093562657


episode: 7/1000, score: 92, epsilon: 0.9655206468094844
episode: 8/1000, score: 265, epsilon: 0.960693043575437


episode: 10/1000, score: 84, epsilon: 0.9511101304657719
episode: 11/1000, score: 22, epsilon: 0.946354579813443


episode: 12/1000, score: 181, epsilon: 0.9416228069143757
episode: 13/1000, score: 151, epsilon: 0.9369146928798039


episode: 14/1000, score: 110, epsilon: 0.9322301194154049
episode: 15/1000, score: 41, epsilon: 0.9275689688183278


episode: 16/1000, score: 98, epsilon: 0.9229311239742362


episode: 18/1000, score: 41, epsilon: 0.9137248860125932
episode: 19/1000, score: 218, epsilon: 0.9091562615825302


episode: 20/1000, score: 32, epsilon: 0.9046104802746175


episode: 21/1000, score: 462, epsilon: 0.9000874278732445
episode: 22/1000, score: 173, epsilon: 0.8955869907338783


episode: 23/1000, score: 44, epsilon: 0.8911090557802088
episode: 24/1000, score: 474, epsilon: 0.8866535105013078


episode: 25/1000, score: 105, epsilon: 0.8822202429488013
episode: 26/1000, score: 96, epsilon: 0.8778091417340573


episode: 28/1000, score: 45, epsilon: 0.8690529955452602


episode: 29/1000, score: 92, epsilon: 0.8647077305675338
episode: 30/1000, score: 71, epsilon: 0.8603841919146962


episode: 31/1000, score: 18, epsilon: 0.8560822709551227
episode: 32/1000, score: 64, epsilon: 0.851801859600347


episode: 33/1000, score: 35, epsilon: 0.8475428503023453
episode: 34/1000, score: 56, epsilon: 0.8433051360508336


episode: 35/1000, score: 34, epsilon: 0.8390886103705794
episode: 36/1000, score: 199, epsilon: 0.8348931673187264


episode: 37/1000, score: 91, epsilon: 0.8307187014821328
episode: 38/1000, score: 84, epsilon: 0.8265651079747222




episode: 40/1000, score: 110, epsilon: 0.8183201210226743
episode: 41/1000, score: 85, epsilon: 0.8142285204175609


episode: 42/1000, score: 69, epsilon: 0.810157377815473


In [None]:
import gym
import numpy as np
import random
from collections import deque
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam

class MazeEnv(gym.Env):
    def __init__(self, maze_layout):
        super(MazeEnv, self).__init__()
        self.maze = np.array(maze_layout)
        self.rows = len(maze_layout)
        self.cols = len(maze_layout[0])
        self.action_space = gym.spaces.Discrete(4)
        self.observation_space = gym.spaces.Box(low=0, high=max(self.rows, self.cols), shape=(2,), dtype=int)
        self.start_state = np.argwhere(self.maze == 'S')[0]
        self.current_state = self.start_state

    def reset(self):
        self.current_state = self.start_state
        return self.current_state 

    def step(self, action):
        new_row, new_col = self.current_state

        if action == 0: new_row -= 1  # Up
        elif action == 1: new_row += 1  # Down
        elif action == 2: new_col -= 1  # Left
        elif action == 3: new_col += 1  # Right

        # Ensure the new position is within maze boundaries and not a wall
        if 0 <= new_row < self.rows and 0 <= new_col < self.cols and self.maze[new_row][new_col] != 'W':
            self.current_state = (new_row, new_col)

        reward = -1  # Default movement penalty
        done = False

        # Check if the new position is the goal after ensuring it's within bounds
        if self.maze[self.current_state[0]][self.current_state[1]] == 'G':
            reward = 100
            done = True

        return self.current_state, reward, done, {}



class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=2000)
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.gamma = 0.95
        self.learning_rate = 0.001
        self.model = self._build_model()

    def _build_model(self):
        model = Sequential()
        model.add(Dense(24, input_dim=self.state_size, activation='relu'))
        model.add(Dense(24, activation='relu'))
        model.add(Dense(self.action_size, activation='linear'))
        model.compile(loss='mse', optimizer=Adam(learning_rate=self.learning_rate))
        return model

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        act_values = self.model.predict(state)
        return np.argmax(act_values[0])

    def replay(self, batch_size):
        minibatch = random.sample(self.memory, batch_size)
        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
                target = reward + self.gamma * np.amax(self.model.predict(next_state)[0])
            target_f = self.model.predict(state)
            target_f[0][action] = target
            self.model.fit(state, target_f, epochs=1, verbose=0)
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

if __name__ == "__main__":
    maze_layout = [
        ['S', ' ', ' ', ' ', 'W'],
        [' ', 'W', ' ', 'W', ' '],
        [' ', ' ', 'W', ' ', ' '],
        ['W', ' ', ' ', 'W', ' '],
        [' ', 'W', ' ', ' ', 'G']
    ]

    env = MazeEnv(maze_layout)
    state_size = 2  # Because the state is represented as (row, col)
    action_size = env.action_space.n
    agent = DQNAgent(state_size, action_size)
    episodes = 1000

    for e in range(episodes):
        state = env.reset()
        state = np.reshape(state, [1, state_size])
        for time in range(500):
            action = agent.act(state)
            next_state, reward, done, _ = env.step(action)
            next_state = np.reshape(next_state, [1, state_size])
            agent.remember(state, action, reward, next_state, done)
            state = next_state
            if done:
                print(f"Episode: {e+1}/{episodes}, Score: {time}, Epsilon: {agent.epsilon:.2}")
                break
        if len(agent.memory) > 32:
            agent.replay(32)


Episode: 1/1000, Score: 261, Epsilon: 1.0
Episode: 2/1000, Score: 413, Epsilon: 0.99
Episode: 3/1000, Score: 362, Epsilon: 0.99


Episode: 4/1000, Score: 76, Epsilon: 0.99
Episode: 5/1000, Score: 89, Epsilon: 0.98


Episode: 6/1000, Score: 318, Epsilon: 0.98
Episode: 7/1000, Score: 100, Epsilon: 0.97




Episode: 10/1000, Score: 68, Epsilon: 0.96
Episode: 11/1000, Score: 254, Epsilon: 0.95
Episode: 12/1000, Score: 22, Epsilon: 0.95


Episode: 13/1000, Score: 55, Epsilon: 0.94
Episode: 14/1000, Score: 23, Epsilon: 0.94


Episode: 15/1000, Score: 148, Epsilon: 0.93
Episode: 16/1000, Score: 212, Epsilon: 0.93


Episode: 17/1000, Score: 218, Epsilon: 0.92
Episode: 18/1000, Score: 71, Epsilon: 0.92


Episode: 19/1000, Score: 449, Epsilon: 0.91


Episode: 21/1000, Score: 41, Epsilon: 0.9


Episode: 22/1000, Score: 373, Epsilon: 0.9


Episode: 23/1000, Score: 348, Epsilon: 0.9
Episode: 24/1000, Score: 291, Epsilon: 0.89


Episode: 25/1000, Score: 29, Epsilon: 0.89
Episode: 26/1000, Score: 210, Epsilon: 0.88
