# Grid2x2 using RLlib

In [1]:
import csv
import os

import ray
from ray.rllib.env.wrappers.multi_agent_env_compatibility import MultiAgentEnvCompatibility
from ray.tune.registry import register_env

from envs import MultiAgentSumoEnv
from observation import Grid2x2ObservationFunction
from reward_functions import combined_reward

In [2]:
import random
import numpy as np
import torch

TEST_NUM = 2
SEED = 23423  # default SUMO seed no.
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

ENV_NAME = "grid2x2"

In [3]:
def train_env_creator(args):
    env_params = {
        "net_file": os.path.join("nets",ENV_NAME,f"{ENV_NAME}.net.xml"),
        "route_file": os.path.join("nets",ENV_NAME,f"{ENV_NAME}.rou.xml"),
        "num_seconds": 3600,
        "reward_fn": combined_reward,
        "sumo_seed": SEED,
        "observation_class": Grid2x2ObservationFunction,
        "add_system_info": False,
    }
    congestion_reward = combined_reward.__defaults__[0].__name__
    alpha = combined_reward.__defaults__[1]  # congestion component coefficient
    print(congestion_reward, alpha)

    env = MultiAgentSumoEnv(**env_params)
    return env

In [4]:
from ray.rllib.algorithms.ppo import PPOConfig

# From https://github.com/ray-project/ray/blob/master/rllib/tuned_examples/ppo/atari-ppo.yaml

train_env = MultiAgentEnvCompatibility(train_env_creator({}))

config: PPOConfig
config = (
    PPOConfig()
    .environment(env=ENV_NAME)
    .framework(framework="torch")
    .rollouts(
        rollout_fragment_length=100,
        num_rollout_workers=10,
    )
    .training(
        lambda_=0.95,
        kl_coeff=0.5,
        clip_param=0.1,
        vf_clip_param=10.0,
        entropy_coeff=0.01,
        train_batch_size=1000,
        sgd_minibatch_size=100,
        num_sgd_iter=10,
    )
    .evaluation(
        evaluation_duration=1,
        evaluation_num_workers=1,
        evaluation_sample_timeout_s=300,
    )
    .debugging(seed=SEED)
    .resources(num_gpus=int(os.environ.get("RLLIB_NUM_GPUS", "1")))
    .multi_agent(
        policies=set(train_env.env.ts_ids),
        policy_mapping_fn=(lambda agent_id, *args, **kwargs: agent_id),
    )
)

delta_wait_time_reward 0.875


In [5]:
csv_dir = os.path.join("outputs",ENV_NAME,f"test_{TEST_NUM}")
if not os.path.exists(csv_dir):
    os.makedirs(csv_dir)

## Play Untrained Agent

In [6]:
def env_creator(csv_path, tb_log_dir):
    env_params = {
        "net_file": os.path.join("nets",ENV_NAME,f"{ENV_NAME}.net.xml"),
        "route_file": os.path.join("nets",ENV_NAME,f"{ENV_NAME}.rou.xml"),
        "num_seconds": 3600,
        "reward_fn": combined_reward,
        "sumo_seed": SEED,
        "observation_class": Grid2x2ObservationFunction,
        "add_system_info": False,
    }
    congestion_reward = combined_reward.__defaults__[0].__name__
    alpha = combined_reward.__defaults__[1]  # congestion component coefficient
    print(congestion_reward, alpha)

    env = MultiAgentSumoEnv(eval=True, csv_path=csv_path, tb_log_dir=tb_log_dir, **env_params)
    return env

In [7]:
ray.init()

csv_path = os.path.join(csv_dir, "untrained.csv")
tb_log_dir = os.path.join("logs", ENV_NAME, f"PPO_{TEST_NUM}", "eval_untrained")

with open(csv_path, "a", newline="") as f:
    csv_writer = csv.writer(f)
    csv_writer.writerow(["sim_time", "arrived_num", "sys_tyre_pm", "sys_stopped",
                         "sys_total_wait", "sys_avg_wait", "sys_avg_speed",
                         "agents_tyre_pm", "agents_stopped", "agents_total_wait",
                         "agents_avg_speed", "agents_total_pressure",])

register_env(ENV_NAME, lambda config: MultiAgentEnvCompatibility(env_creator(csv_path, tb_log_dir)))

2023-06-01 18:01:27,925	INFO worker.py:1625 -- Started a local Ray instance.


In [8]:
algo = config.build()

2023-06-01 18:01:31,613	INFO algorithm.py:527 -- Current log_level is WARN. For more information, set 'log_level': 'INFO' / 'DEBUG' or use the -v and -vv flags.


[2m[36m(RolloutWorker pid=24656)[0m delta_wait_time_reward 0.875
[2m[36m(RolloutWorker pid=24656)[0m Step #0.00 (0ms ?*RT. ?UPS, TraCI: 9ms, vehicles TOT 0 ACT 0 BUF 0)                      




[2m[36m(RolloutWorker pid=21976)[0m delta_wait_time_reward 0.875[32m [repeated 10x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/ray-logging.html#log-deduplication for more options.)[0m
[2m[36m(RolloutWorker pid=19200)[0m Step #0.00 (0ms ?*RT. ?UPS, TraCI: 27ms, vehicles TOT 0 ACT 0 BUF 0)                     [32m [repeated 9x across cluster][0m


2023-06-01 18:01:46,812	INFO trainable.py:172 -- Trainable.setup took 15.202 seconds. If your trainable is slow to initialize, consider setting reuse_actors=True to reduce actor creation overheads.


In [9]:
algo.evaluate()

Step #3600.00 (1ms ~= 1000.00*RT, ~231000.00UPS, TraCI: 263ms, vehicles TOT 1715 ACT 231 B32ms, vehicles TOT 3 ACT 3 BUF 0)      
Step #3600.00 (1ms ~= 1000.00*RT, ~231000.00UPS, TraCI: 263ms, vehicles TOT 1715 ACT 231 B32ms, vehicles TOT 3 ACT 3 BUF 0)      


{'evaluation': {'episode_reward_max': -320852.8744288877,
  'episode_reward_min': -320852.8744288877,
  'episode_reward_mean': -320852.8744288877,
  'episode_len_mean': 720.0,
  'episode_media': {},
  'episodes_this_iter': 1,
  'policy_reward_min': {'1': -76832.12950596321,
   '2': -83696.77696430053,
   '5': -79422.03550824955,
   '6': -80901.93245037406},
  'policy_reward_max': {'1': -76832.12950596321,
   '2': -83696.77696430053,
   '5': -79422.03550824955,
   '6': -80901.93245037406},
  'policy_reward_mean': {'1': -76832.12950596321,
   '2': -83696.77696430053,
   '5': -79422.03550824955,
   '6': -80901.93245037406},
  'custom_metrics': {},
  'hist_stats': {'episode_reward': [-320852.8744288877],
   'episode_lengths': [720],
   'policy_1_reward': [-76832.12950596321],
   'policy_2_reward': [-83696.77696430053],
   'policy_5_reward': [-79422.03550824955],
   'policy_6_reward': [-80901.93245037406]},
  'sampler_perf': {'mean_raw_obs_processing_ms': 1.467065903746966,
   'mean_inferen

In [12]:
ray.shutdown()

[2m[36m(RolloutWorker pid=25400)[0m delta_wait_time_reward 0.875[32m [repeated 8x across cluster][0m
[2m[36m(RolloutWorker pid=25400)[0m Step #0.00 (0ms ?*RT. ?UPS, TraCI: 15ms, vehicles TOT 0 ACT 0 BUF 0)                     [32m [repeated 8x across cluster][0m


## Train RL Agent

In [13]:
ray.init()

register_env(ENV_NAME, lambda config: MultiAgentEnvCompatibility(train_env_creator(config)))

2023-06-01 18:25:42,196	INFO worker.py:1625 -- Started a local Ray instance.


In [14]:
algo = config.build()



[2m[36m(RolloutWorker pid=12764)[0m delta_wait_time_reward 0.875




[2m[36m(RolloutWorker pid=14920)[0m Step #0.00 (0ms ?*RT. ?UPS, TraCI: 8ms, vehicles TOT 0 ACT 0 BUF 0)                      


2023-06-01 18:25:57,062	INFO trainable.py:172 -- Trainable.setup took 11.311 seconds. If your trainable is slow to initialize, consider setting reuse_actors=True to reduce actor creation overheads.


[2m[36m(RolloutWorker pid=8648)[0m delta_wait_time_reward 0.875[32m [repeated 10x across cluster][0m


In [None]:
for i in range(1400):  # 720 * 1400 == 1_008_000 total timesteps
    results = algo.train()

    if (i+1) % 200 == 0:
        algo.save(os.path.join("ray_checkpoints","grid2x2",f"test_{TEST_NUM}"))

## Play Trained Agent

In [None]:
# csv_dir = os.path.join("outputs",ENV_NAME,f"test_{TEST_NUM}")
# if not os.path.exists(csv_dir):
#     os.makedirs(csv_dir)

# csv_path = os.path.join(csv_dir, "untrained.csv")
# tb_log_dir = os.path.join("logs", ENV_NAME, "eval_untrained")

# with open(csv_path, "a", newline="") as f:
#     csv_writer = csv.writer(f)
#     csv_writer.writerow(["sim_time", "arrived_num", "sys_tyre_pm", "sys_stopped",
#                          "sys_total_wait", "sys_avg_wait", "sys_avg_speed",
#                          "agents_tyre_pm", "agents_stopped", "agents_total_wait",
#                          "agents_avg_speed", "agents_total_pressure",])