In [2]:
from maze_env import MazeEnv_v0
from env_utils.PettingZooEnv_new import PettingZooEnv_new
import supersuit
import numpy as np
from tianshou.env.utils import PettingZooEnv
import tianshou as ts
from tianshou.utils.net.common import Net
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.tensorboard import SummaryWriter
import gymnasium as gym

def preprocess_maze_env(render_mode=None, size=6):
    env = MazeEnv_v0.env(render_mode=render_mode, size=size)
    env = supersuit.multiagent_wrappers.pad_observations_v0(env)
    env = PettingZooEnv_new(env)
    return env

# create a CNN for the observer
class CNN(nn.Module):
    def __init__(self):
        super().__init__()

        self.model = nn.Sequential(
            # assume maze size of 6x6 (13x13 with walls)
            nn.Conv2d(3, 12, 3), nn.ReLU(inplace=True), nn.MaxPool2d(2,2,1), # (13-3)+1 = 11, (11-2+1)/2+1 = 6
            nn.Conv2d(12, 20, 3), nn.ReLU(inplace=True), nn.MaxPool2d(2,2), # 6-3+1=4, (4-2)/2+1 = 2
            nn.Flatten(), nn.Linear(20*2*2, 128), nn.ReLU(inplace=True),
            nn.Linear(128, 64), nn.ReLU(inplace=True),
            nn.Linear(64, 4)
        )
    
    def forward(self, obs, state=None, info={}):
        if not isinstance(obs, torch.Tensor):
            obs = torch.tensor(obs, dtype=torch.float)
        self.batch = obs.shape[0]
        #logits = self.model(obs.view(batch, -1))
        logits = self.model(obs)
        return logits, state

In [3]:
eps_train, eps_test = 0.9, 0.1
eps_decay, eps_min = 0.9995, 0.1
lr, epoch, batch_size = 1e-4, 60, 64
gamma, n_step, target_update_freq = 0.9, 3, 320
train_num, test_num = 10, 10
buffer_size = 20000
step_per_epoch, step_per_collect = 10000, 150
maze_width = 6

#logger = ts.utils.TensorboardLogger(SummaryWriter('log/dqn_no_abs_no_interleaving_cnn'))

In [5]:
# get the vectorized training/testing environments
train_envs = ts.env.DummyVectorEnv([preprocess_maze_env for _ in range(train_num)])
test_envs = ts.env.DummyVectorEnv([preprocess_maze_env for _ in range(test_num)])

env = preprocess_maze_env()

In [9]:
# get agent names
agents = env.agents

# observation spaces/action spaces for the two agents
state_shape = env.observation_space.shape or env.observation_space.n
action_shape = env.action_space.shape or env.action_space.n

# define DQN network (128x3 hidden units linear)
#net_obs = Net(state_shape, action_shape, [128,128,128])
net_obs = Net(state_shape, action_shape, [512, 512, 512])
optim_obs = torch.optim.Adam(params=net_obs.parameters(), lr=lr)
net_exp = Net(state_shape, action_shape, [4])
optim_exp = torch.optim.Adam(params=net_exp.parameters(), lr=lr)

# set up policy and collectors
agent_observer = ts.policy.DQNPolicy(net_obs, optim_obs, gamma, n_step, target_update_freq)
agent_explorer = ts.policy.DQNPolicy(net_exp, optim_exp, gamma, n_step, target_update_freq)
agent_policies = [agent_observer, agent_explorer]
#agent_policies = [ts.policy.RandomPolicy(), ts.policy.RandomPolicy()] # baseline testing
policy = ts.policy.MultiAgentPolicyManager(agent_policies, env)

# define the training collector (the calc q and step functions)
train_collector = ts.data.Collector(
    policy, 
    train_envs, 
    ts.data.VectorReplayBuffer(buffer_size, train_num),
    exploration_noise=True
)

# define the testing collector
test_collector = ts.data.Collector(
    policy, 
    test_envs,
    exploration_noise=True
)

In [10]:
# set up human render environment
env_human = preprocess_maze_env(render_mode="human")
env_human = ts.env.DummyVectorEnv([lambda: env_human])
human_collector = ts.data.Collector(policy, env_human, exploration_noise=True)

In [11]:
maze = MazeEnv_v0.env(size=6, render_mode='human')
maze.reset(options={"n_mazes":3})
maze.render()

In [12]:
human_collector.reset_env(gym_reset_kwargs={"options":{"n_mazes":3}})
human_collector.collect(n_episode=3, render=1/60)

KeyboardInterrupt: 

In [14]:
env_human.close()