In [None]:
import maze_library
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
from collections import deque
import random

In [None]:
help(maze_library.init_environment)

In [None]:
env = maze_library.init_environment(7,7,allowed_revisits=100, mini_exploit_runs_per_episode=1, mini_explore_runs_per_episode=0)


In [None]:


class DQN(nn.Module):
    def __init__(self, input_size, output_size):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_size, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, 64)
        self.out = nn.Linear(64, output_size)

    def forward(self, x):
        x = F.elu(self.fc1(x))
        x = F.elu(self.fc2(x))
        x = F.elu(self.fc3(x))
        return self.out(x)
    
    
input_size = env.input_shape()
output_size = env.output_shape()
model = DQN(input_size, output_size)
optimizer = torch.optim.NAdam(model.parameters(), lr=1e-4)
loss_fn = nn.HuberLoss()
replay_buffer = deque(maxlen=10000)


In [None]:
def epsilon_greedy_policy(state, epsilon):
    if random.random() < epsilon:
        return random.randint(0, output_size - 1)
    with torch.no_grad():
        state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
        q_values = model(state_tensor)
        return int(torch.argmax(q_values).item())


In [None]:
from collections import deque

replay_buffer = deque(maxlen=8000)

In [None]:


def sample_experiences(batch_size):
    indices = np.random.choice(len(replay_buffer), min(batch_size, len(replay_buffer)), replace=False)
    batch = [replay_buffer[i] for i in indices]
    return zip(*batch)



In [None]:
reward_log = []
def play_one_step(env, state, epsilon):
    action = epsilon_greedy_policy(state, epsilon)
    action_obj = maze_library.create_action(action, 1)
    next_state, reward, done, truncated = env.take_action(action_obj)
    replay_buffer.append((state, action, reward, next_state, done, truncated))
    reward_log.append(reward)
    return next_state, reward, done, truncated

In [None]:
np.random.seed(42)
rewards = [] 
highest_reward = -1000000

In [None]:
discount_factor = 0.96
q_value_log = []


def training_step(batch_size, discount_factor=0.94):
    states, actions, rewards, next_states, dones, truncateds = sample_experiences(batch_size)

    states = torch.tensor(states, dtype=torch.float32)
    actions = torch.tensor(actions, dtype=torch.int64)
    rewards = torch.tensor(rewards, dtype=torch.float32)
    next_states = torch.tensor(next_states, dtype=torch.float32)
    dones = torch.tensor(dones, dtype=torch.float32)
    truncateds = torch.tensor(truncateds, dtype=torch.float32)

    with torch.no_grad():
        next_q = model(next_states)
        max_next_q = next_q.max(dim=1)[0]
        terminal = torch.logical_or(dones.bool(), truncateds.bool()).float()

        target_q = rewards + (1 - dones) * discount_factor * max_next_q

    q_values = model(states)
    selected_q = q_values.gather(1, actions.unsqueeze(1)).squeeze()
    loss = loss_fn(selected_q, target_q)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    q_value_log.extend(selected_q.detach().numpy())

In [None]:
# Visualization function
def plot_q_values():
    plt.figure(figsize=(10, 5))
    plt.plot(q_value_log)
    plt.title("Q-Values Over Time")
    plt.xlabel("Training Steps")
    plt.ylabel("Q-Value")
    plt.grid(True)
    plt.show()


In [None]:
import datetime
import os

timestamp = datetime.datetime.now().strftime("%d-%m_%H-%M")

steps = 300
episodes = 300
batch_size = 64
folder_name = f"../mazeLogs/{timestamp}DQNMaze/Run{0}"
os.makedirs(folder_name, exist_ok=True) 

# In the plotting function:
for episode in range(episodes):
    obs = env.reset()
    cumilative_reward = 0
    for step in range(steps):
        epsilon = max(1 - episode / (episodes * 0.8), 0.01)
        obs, reward, done, truncated = play_one_step(env, obs, epsilon)
        cumilative_reward += reward
        if done or truncated:
            
            break

    # Extra debug information
    if episode% 100 == 0 or cumilative_reward > 0:
        print(f"\rEpisode: {episode}, Steps: {step}, eps: {epsilon:.3f}, reward = {cumilative_reward}")
    rewards.append(cumilative_reward)
    
    if cumilative_reward > highest_reward:
        best_weights = model.state_dict()
        highest_reward = cumilative_reward

    if episode > 800:
        training_step(batch_size)
    if episode % 500 == 499:
        plot_q_values()
        
    with open(f'{folder_name}/dqn{episode}.json', 'w') as file:
        file.write(env.to_json_python())

    


In [None]:
# extra code – this cell generates and saves Figure 18–10
plt.figure(figsize=(8, 4))
plt.plot(rewards)
plt.xlabel("Episode", fontsize=14)
plt.ylabel("Sum of rewards", fontsize=14)
plt.grid(True)
plt.show()

In [None]:
plt.figure(figsize=(10, 5))
plt.hist(reward_log, bins=1, edgecolor='black')
plt.title("Reward Distribution")
plt.xlabel("Reward")
plt.ylabel("Frequency")
plt.grid(True)
plt.show()