In [None]:
import os

def write_sumo_cfg(cfg_path, net_path, route_path, additional_files=None):
    """
    Write a basic SUMO .sumocfg file.
    additional_files: list of extra XML files (e.g., tls logic, detectors)
    """
    additional_str = ""
    if additional_files:
        additional_str = """
        <additional-files value="{files}"/>""".format(
            files=",".join(additional_files)
        )

    cfg_xml = f"""<configuration>
    <input>
        <net-file value="{net_path}"/>
        <route-files value="{route_path}"/>{additional_str}
    </input>
    <time>
        <begin value="0"/>
        <end value="86400"/> <!-- one day, adjust as needed -->
        <step-length value="1.0"/>
    </time>
</configuration>
"""
    with open(cfg_path, "w") as f:
        f.write(cfg_xml)
    print("Wrote SUMO config to:", cfg_path)


In [None]:
write_sumo_cfg(
    cfg_path="config.sumo.cfg",
    net_path="network.net.xml",
    route_path="routes.rou.xml",
    additional_files=None
)


In [None]:
import os
import numpy as np
import traci

class SUTrafficEnv:
    """
    SUMO traffic environment with emergency-vehicle priority.
    - Controls one or more traffic lights
    - Spawns emergency vehicles at times from EMS events
    """

    def __init__(self, sumo_cfg, ems_events, pems_day,
                 tls_ids, use_gui=False, step_length=1.0):
        """
        sumo_cfg: path to .sumocfg
        ems_events: DataFrame from transform_ems_for_rl for a given day
        pems_day: DataFrame from transform_pems_day_for_rl for that day
        tls_ids: list of traffic light IDs to control
        use_gui: True -> sumo-gui, False -> sumo
        """
        self.sumo_cfg = sumo_cfg
        self.ems_events = ems_events.sort_values("event_time")
        self.pems_day = pems_day.sort_values("Timestamp")
        self.tls_ids = tls_ids
        self.use_gui = use_gui
        self.step_length = step_length

        self.current_step = 0
        self.max_steps = 24 * 60 * 60  # simulate one day in seconds (adjust)

        # Precompute EV spawn schedule (in seconds from start)
        self.ev_schedule = self._build_ev_schedule(self.ems_events)
        self.next_ev_idx = 0

    def _build_ev_schedule(self, ems_events):
        """
        Convert event_time into simulation seconds from day start.
        Assumes all timestamps within one day.
        """
        times = ems_events["event_time"].sort_values()
        start_time = times.iloc[0].floor("D")  # midnight of that day
        sim_times = (times - start_time).dt.total_seconds().astype(int)
        return sim_times.values

    def _start_sumo(self):
        sumo_binary = "sumo-gui" if self.use_gui else "sumo"
        traci.start([sumo_binary, "-c", self.sumo_cfg, "--step-length", str(self.step_length)])

    def reset(self):
        """Reset SUMO simulation and environment state."""
        if traci.isLoaded():
            traci.close()

        self._start_sumo()
        self.current_step = 0
        self.next_ev_idx = 0

        # Build initial observation
        obs = self._get_observation()
        return obs

    def _spawn_emergency_vehicle(self):
        """
        Spawn one emergency vehicle (placeholder).
        You need a route ID and a vehicle type defined in your routes/additional files.
        """
        veh_id = f"EV_{self.current_step}"
        # Example: spawn on route 'ev_route_0' with type 'emergency'
        traci.vehicle.add(veh_id, routeID="ev_route_0", typeID="emergency")
        # Optional: set color or speed
        traci.vehicle.setColor(veh_id, (255, 0, 0, 255))

    def _maybe_spawn_ev(self):
        """Check schedule and spawn EVs when their time comes."""
        while self.next_ev_idx < len(self.ev_schedule) and \
              self.ev_schedule[self.next_ev_idx] <= self.current_step:
            self._spawn_emergency_vehicle()
            self.next_ev_idx += 1

    def _get_observation(self):
        """
        Build state vector from:
        - traffic light phases / queues
        - traffic measures (flow/speed from pems_day)
        - proximity / time-to-next emergency
        """
        # Example: get phase for each TLS
        tls_phases = []
        for tls_id in self.tls_ids:
            phase = traci.trafficlight.getPhase(tls_id)
            tls_phases.append(phase)

        # Simple example: normalized time-of-day
        time_of_day = (self.current_step % 86400) / 86400.0

        # Distance to next EV event in seconds (if any left)
        if self.next_ev_idx < len(self.ev_schedule):
            dt_next_ev = (self.ev_schedule[self.next_ev_idx] - self.current_step)
        else:
            dt_next_ev = 99999.0

        obs = np.array([
            time_of_day,
            dt_next_ev / 3600.0,  # hours until next EV
            *tls_phases           # very rough; you’d usually encode more features
        ], dtype=float)

        return obs

    def step(self, action):
        """
        One simulation step.
        action: typically maps to traffic light control (e.g., which phase to choose).
        """

        # --- apply action ---
        # For example, one action per tls_id: 0..N-1 -> phase index
        for idx, tls_id in enumerate(self.tls_ids):
            act = action[idx] if isinstance(action, (list, np.ndarray)) else action
            # WARNING: in real setups you'd handle phase transitions carefully
            num_phases = traci.trafficlight.getPhaseNumber(tls_id)
            phase_to_set = int(act) % num_phases
            traci.trafficlight.setPhase(tls_id, phase_to_set)

        # --- spawn EVs if needed ---
        self._maybe_spawn_ev()

        # --- advance SUMO ---
        traci.simulationStep()
        self.current_step += self.step_length

        # --- compute reward ---
        reward = self._compute_reward()

        # --- build next obs ---
        obs = self._get_observation()

        # --- done flag ---
        done = self.current_step >= self.max_steps

        info = {}
        return obs, reward, done, info

    def _compute_reward(self):
        """
        Reward example:
        - negative total waiting time
        - minus a larger penalty for EV waiting time
        """
        # total waiting time (all vehicles)
        total_wait = 0.0
        for vid in traci.vehicle.getIDList():
            total_wait += traci.vehicle.getWaitingTime(vid)

        # EV penalty
        ev_wait = 0.0
        for vid in traci.vehicle.getIDList():
            if vid.startswith("EV_"):
                ev_wait += traci.vehicle.getWaitingTime(vid)

        # We want to minimize both, but EV more important
        reward = - (total_wait + 5.0 * ev_wait)
        return reward

    def close(self):
        if traci.isLoaded():
            traci.close()


In [None]:
# ems_rl: one day of EMS events (already filtered to that date)
# pems_day_rl: traffic for that date (currently not used in env, but you can feed it into obs)

env = SUTrafficEnv(
    sumo_cfg="config.sumo.cfg",
    ems_events=ems_rl_for_that_day,
    pems_day=pems_day_rl,
    tls_ids=["junction_0"],  # replace with your real TLS IDs
    use_gui=True
)

obs = env.reset()
done = False
while not done:
    action = [0]  # dummy action, always phase 0
    obs, reward, done, info = env.step(action)

env.close()


In [4]:
!brew update
!brew install sumo


zsh:1: command not found: brew
zsh:1: command not found: brew


In [3]:
!python3 -m pip install brew

Collecting brew
  Using cached brew-0.1.4.zip (48 kB)
  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25lerror
  [1;31merror[0m: [1msubprocess-exited-with-error[0m
  
  [31m×[0m [32mGetting requirements to build wheel[0m did not run successfully.
  [31m│[0m exit code: [1;36m1[0m
  [31m╰─>[0m [31m[23 lines of output][0m
  [31m   [0m Traceback (most recent call last):
  [31m   [0m   File [35m"/opt/anaconda3/envs/rlfinal/lib/python3.14/site-packages/pip/_vendor/pyproject_hooks/_in_process/_in_process.py"[0m, line [35m389[0m, in [35m<module>[0m
  [31m   [0m     [31mmain[0m[1;31m()[0m
  [31m   [0m     [31m~~~~[0m[1;31m^^[0m
  [31m   [0m   File [35m"/opt/anaconda3/envs/rlfinal/lib/python3.14/site-packages/pip/_vendor/pyproject_hooks/_in_process/_in_process.py"[0m, line [35m373[0m, in [35mmain[0m
  [31m   [0m     json_out["return_val"] = [31mhook[0m[1;31m(**hook_input["kwargs"])[0m
  [31m   