<a href="https://colab.research.google.com/github/Heather306/test-repo/blob/cursor%2Ftrain-system-app-scheduling-with-reinforcement-learning-095a/notebooks/clinic_scheduling_rl.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Clinic Scheduling via Reinforcement Learning (Gymnasium + Stable-Baselines3)

This Colab-ready notebook builds and trains a PPO agent to schedule patients under clinic constraints:
- Monday–Saturday only (Sunday closed)
- Operating hours: 08:00–12:00 and 13:00–16:00
- Lunch break 12:00–13:00 (no scheduling)
- Max 60 scheduled patient slots per day
- Walk-ins accepted until cutoff; excess wait in walk-in queue
- If a scheduled patient is not on-site at their time, move to late list and serve next
- If a late patient arrives later, admin can restore to original position; they get priority next after current patient

We'll define a custom Gymnasium environment, train a PPO policy with Stable-Baselines3, and evaluate/visualize outcomes.

In [2]:
# If running in Colab, uncomment the next line to install packages
# !pip -q install gymnasium==0.29.1 stable-baselines3==2.3.2 sb3-contrib==2.3.2 shimmy==1.3.0 plotly==5.24.1 numpy pandas

import sys, os
print(sys.version)
print("Working dir:", os.getcwd())

# Ensure proper imports in Colab kernels
import warnings
warnings.filterwarnings("ignore")

3.12.12 (main, Oct 10 2025, 08:52:57) [GCC 11.4.0]
Working dir: /content


In [3]:
from dataclasses import dataclass
from typing import Tuple, Dict, Any, Optional
import numpy as np
import gymnasium as gym
from gymnasium import spaces

# Domain constants
MINUTES_OPEN_AM = 8 * 60
MINUTES_LUNCH_START = 12 * 60
MINUTES_LUNCH_END = 13 * 60
MINUTES_CLOSE_PM = 16 * 60
WORK_MINUTES = (MINUTES_LUNCH_START - MINUTES_OPEN_AM) + (MINUTES_CLOSE_PM - MINUTES_LUNCH_END)
MAX_SCHEDULED_PER_DAY = 60
MAX_WALKIN_QUEUE = 200
DAYS_OPEN = set(range(6))  # 0=Mon ... 5=Sat, 6=Sun closed


def is_open_minute(minute_of_day: int) -> bool:
    return (MINUTES_OPEN_AM <= minute_of_day < MINUTES_LUNCH_START) or (MINUTES_LUNCH_END <= minute_of_day < MINUTES_CLOSE_PM)


def minute_to_slot(minute_of_day: int, slot_minutes: int) -> int:
    # Map minute to contiguous slot index excluding lunch
    if minute_of_day < MINUTES_OPEN_AM:
        return 0
    if MINUTES_OPEN_AM <= minute_of_day < MINUTES_LUNCH_START:
        return (minute_of_day - MINUTES_OPEN_AM) // slot_minutes
    if MINUTES_LUNCH_START <= minute_of_day < MINUTES_LUNCH_END:
        return (MINUTES_LUNCH_START - MINUTES_OPEN_AM) // slot_minutes
    if MINUTES_LUNCH_END <= minute_of_day < MINUTES_CLOSE_PM:
        am_slots = (MINUTES_LUNCH_START - MINUTES_OPEN_AM) // slot_minutes
        return am_slots + (minute_of_day - MINUTES_LUNCH_END) // slot_minutes
    return ((MINUTES_LUNCH_START - MINUTES_OPEN_AM) + (MINUTES_CLOSE_PM - MINUTES_LUNCH_END)) // slot_minutes


@dataclass
class Patient:
    id: int
    scheduled_slot: Optional[int]  # None for walk-in
    arrival_time_min: Optional[int]  # None means not arrived yet
    is_late: bool = False


class ClinicSchedulingEnv(gym.Env):
    metadata = {"render.modes": ["human"]}

    def __init__(self,
                 slot_minutes: int = 10,
                 max_scheduled: int = MAX_SCHEDULED_PER_DAY,
                 max_walkin_queue: int = MAX_WALKIN_QUEUE,
                 no_show_prob: float = 0.05,
                 late_prob: float = 0.1,
                 walkin_rate_per_hour: float = 8.0,
                 walkin_cutoff_minute: Optional[int] = None,
                 day_of_week: Optional[int] = None,
                 seed: Optional[int] = None):
        super().__init__()
        self.slot_minutes = slot_minutes
        self.slots_per_day = WORK_MINUTES // slot_minutes
        self.max_scheduled = min(max_scheduled, MAX_SCHEDULED_PER_DAY)
        self.max_walkin_queue = max_walkin_queue
        self.no_show_prob = no_show_prob
        self.late_prob = late_prob
        self.walkin_rate_per_hour = walkin_rate_per_hour
        self.walkin_cutoff_minute = walkin_cutoff_minute or MINUTES_CLOSE_PM
        self._configured_day_of_week = day_of_week
        self.rng = np.random.default_rng(seed)

        # Action space: choose next source to serve
        # 0 = next scheduled on-time, 1 = next walk-in, 2 = recall-priority late (if any)
        self.action_space = spaces.Discrete(3)

        # Observation space (compact):
        # [current_slot_index, scheduled_remaining, walkin_queue_len, late_list_len, next_scheduled_on_site(0/1), time_to_next_arrival_minutes]
        high = np.array([
            self.slots_per_day,
            MAX_SCHEDULED_PER_DAY,
            self.max_walkin_queue,
            MAX_SCHEDULED_PER_DAY,
            1,
            60
        ], dtype=np.float32)
        self.observation_space = spaces.Box(low=0.0, high=high, dtype=np.float32)

        self.reset_state()

    def reset_state(self):
        self.minute = MINUTES_OPEN_AM
        self.current_slot = 0
        # self.day_of_week is set in reset(); don't override here
        self.scheduled: Dict[int, Patient] = {}
        self.walkin_queue: list[Patient] = []
        self.late_list: list[Patient] = []
        self.served_ids: list[int] = []
        self.served_log: list[Dict[str, Any]] = []
        self.generated_patients: Dict[int, Patient] = {}
        self._generate_day_schedule()

    def _generate_day_schedule(self):
        # Pre-generate scheduled patients across slots (max 60)
        max_slots = self.slots_per_day
        chosen_slots = self.rng.choice(max_slots, size=min(self.max_scheduled, max_slots), replace=False)
        pid = 1
        for slot in sorted(chosen_slots.tolist()):
            # arrival: on time or late or no-show
            ontime = self.rng.random() > self.late_prob
            if self.rng.random() < self.no_show_prob:
                arrival = None
            else:
                if ontime:
                    # arrive within slot's first 5 minutes
                    slot_minute = self._slot_to_minute(slot)
                    jitter = int(self.rng.integers(0, min(5, self.slot_minutes)))
                    arrival = slot_minute + jitter
                else:
                    # late: arrive between +5 and +60 minutes later
                    base = self._slot_to_minute(slot) + 5
                    arrival = min(base + int(self.rng.integers(0, 60)), MINUTES_CLOSE_PM - 1)
            p = Patient(id=pid, scheduled_slot=slot, arrival_time_min=arrival)
            self.scheduled[slot] = p
            self.generated_patients[pid] = p
            pid += 1
        self.next_walkin_id = pid

    def _slot_to_minute(self, slot_index: int) -> int:
        am_slots = (MINUTES_LUNCH_START - MINUTES_OPEN_AM) // self.slot_minutes
        if slot_index < am_slots:
            return MINUTES_OPEN_AM + slot_index * self.slot_minutes
        else:
            return MINUTES_LUNCH_END + (slot_index - am_slots) * self.slot_minutes

    def _poisson(self, lam):
        # simple Poisson sampler via numpy
        return self.rng.poisson(lam)

    def _maybe_generate_walkins(self):
        if not is_open_minute(self.minute):
            return
        if self.minute >= self.walkin_cutoff_minute:
            return
        # per-minute rate
        lam = self.walkin_rate_per_hour / 60.0
        arrivals = self._poisson(lam)
        for _ in range(arrivals):
            p = Patient(id=self.next_walkin_id, scheduled_slot=None, arrival_time_min=self.minute)
            self.next_walkin_id += 1
            if len(self.walkin_queue) < self.max_walkin_queue:
                self.walkin_queue.append(p)
                self.generated_patients[p.id] = p

    def _update_late_status(self):
        # mark scheduled as late if slot passed and not on-site
        for slot, p in list(self.scheduled.items()):
            slot_minute = self._slot_to_minute(slot)
            if p.arrival_time_min is None:
                continue  # no-show remains
            if p.arrival_time_min > self.minute and self.minute >= slot_minute:
                p.is_late = True
                # move to late list if not already served and past slot
                if p not in self.late_list and p.id not in self.served_ids and self.minute >= slot_minute:
                    self.late_list.append(p)

        # move any arrived late patients out of late_list priority bucket if they just arrived now
        for p in self.late_list:
            if p.arrival_time_min is not None and p.arrival_time_min <= self.minute:
                p.is_late = True  # keep flag

    def reset(self, *, seed: Optional[int] = None, options: Optional[dict] = None):
        super().reset(seed=seed)
        if seed is not None:
            self.rng = np.random.default_rng(seed)
        # set day-of-week (exclude Sunday by default)
        if options and "day_of_week" in options:
            self.day_of_week = int(options["day_of_week"])
        else:
            if self._configured_day_of_week is not None:
                self.day_of_week = int(self._configured_day_of_week)
            else:
                self.day_of_week = int(self.rng.integers(0, 6))
                if self.day_of_week not in DAYS_OPEN:
                    # force into open set
                    self.day_of_week = int(self.rng.integers(0, 6))
        # If Sunday (closed), start and immediately end episode by returning a terminal state
        self.reset_state()
        obs = self._get_obs()
        info = {}
        return obs, info

    def _get_obs(self):
        # next scheduled slot that is not served yet
        remaining_slots = [s for s, p in self.scheduled.items() if p.id not in self.served_ids]
        scheduled_remaining = len(remaining_slots)

        # check if the next scheduled patient is on-site now
        next_slot = min(remaining_slots) if remaining_slots else None
        on_site = 0
        if next_slot is not None:
            p = self.scheduled[next_slot]
            if p.arrival_time_min is not None and p.arrival_time_min <= self.minute:
                on_site = 1

        # estimate time to next arrival (scheduled or walk-in), clipped
        next_arrival = 999
        # next scheduled arrival
        sched_arrivals = [p.arrival_time_min for p in self.scheduled.values() if p.arrival_time_min is not None and p.arrival_time_min > self.minute]
        if sched_arrivals:
            next_arrival = min(next_arrival, min(sched_arrivals) - self.minute)
        # approximate next walk-in arrival as inverse rate
        if is_open_minute(self.minute) and self.walkin_rate_per_hour > 0 and self.minute < self.walkin_cutoff_minute:
            expected_walkin_gap = max(1, int(60.0 / self.walkin_rate_per_hour))
            next_arrival = min(next_arrival, expected_walkin_gap)
        next_arrival = int(min(next_arrival, 60))

        obs = np.array([
            minute_to_slot(self.minute, self.slot_minutes),
            scheduled_remaining,
            len(self.walkin_queue),
            len(self.late_list),
            on_site,
            next_arrival
        ], dtype=np.float32)
        return obs

    def step(self, action: int):
        # simulate minute-by-minute until either we serve someone or day ends
        reward = 0.0
        info: Dict[str, Any] = {}

        # At each step, first update arrivals
        self._maybe_generate_walkins()
        self._update_late_status()

        # determine candidate queues based on action
        # 2: late priority (admin recalls) if available
        # 0: scheduled on-time if on-site
        # 1: walk-in otherwise
        served_patient: Optional[Patient] = None
        served_source = None
        served_via_recall = False

        if action == 2 and self.late_list:
            served_patient = self.late_list.pop(0)
            served_source = "late_recall"
            served_via_recall = True
        else:
            # next scheduled
            remaining_slots = sorted([s for s, p in self.scheduled.items() if p.id not in self.served_ids])
            if action == 0 and remaining_slots:
                next_slot = remaining_slots[0]
                p = self.scheduled[next_slot]
                if p.arrival_time_min is not None and p.arrival_time_min <= self.minute:
                    served_patient = p
                    served_source = "scheduled"
            if served_patient is None and self.walkin_queue:
                served_patient = self.walkin_queue.pop(0)
                served_source = "walkin"

        # apply serving and time advance
        if served_patient is not None:
            self.served_ids.append(served_patient.id)
            # reward per patient served; small bonus for serving scheduled/late via recall
            reward += 1.0
            if served_source in ("scheduled", "late_recall"):
                reward += 0.05
            # compute wait
            served_start_minute = self.minute
            arrival = served_patient.arrival_time_min
            wait = None
            if arrival is not None:
                wait = max(0, served_start_minute - arrival)
            # log
            self.served_log.append({
                "id": served_patient.id,
                "served_start_minute": served_start_minute,
                "scheduled_slot": served_patient.scheduled_slot,
                "arrival_time": arrival,
                "is_walkin": served_patient.scheduled_slot is None,
                "is_late": bool(served_patient.is_late),
                "served_via_recall": served_via_recall,
                "wait_minutes": wait,
                "source": served_source,
            })
            advance = self.slot_minutes
        else:
            # idle minute penalty for waiting while queues exist
            advance = 1
            reward -= 0.01

        # advance time respecting lunch/closed periods
        self.minute += advance
        if MINUTES_LUNCH_START <= self.minute < MINUTES_LUNCH_END:
            self.minute = MINUTES_LUNCH_END
        done = self.minute >= MINUTES_CLOSE_PM

        obs = self._get_obs()

        # small penalty for leaving late list unserved to encourage recalls
        reward -= 0.001 * len(self.late_list)

        # Encourage finishing scheduled by end of day
        if done:
            remaining_sched = len([p for p in self.scheduled.values() if p.id not in self.served_ids])
            if remaining_sched > 0:
                reward -= 0.1 * remaining_sched

        return obs, reward, done, False, info

    def render(self):
        print({
            "time": self.minute,
            "served": len(self.served_ids),
            "walkin_q": len(self.walkin_queue),
            "late": len(self.late_list)
        })

In [7]:
!pip -q install stable-baselines3==2.3.2 gymnasium==0.29.1 shimmy==1.3.0 plotly==5.24.1 ipywidgets

In [8]:
# Quick smoke test of env dynamics (7-min slots)
env = ClinicSchedulingEnv(slot_minutes=7, seed=42)
obs, info = env.reset()
print("obs shape:", obs.shape, "obs:", obs)
for _ in range(5):
    a = env.action_space.sample()
    obs, reward, terminated, truncated, info = env.step(a)
    print({"a": int(a), "r": float(reward), "done": bool(terminated or truncated), "obs": obs.tolist()})

obs shape: (6,) obs: [ 0. 60.  0.  0.  0.  1.]
{'a': 2, 'r': 1.05, 'done': False, 'obs': [1.0, 59.0, 0.0, 0.0, 0.0, 7.0]}
{'a': 1, 'r': -0.011, 'done': False, 'obs': [1.0, 59.0, 0.0, 1.0, 0.0, 7.0]}
{'a': 1, 'r': -0.011, 'done': False, 'obs': [1.0, 59.0, 0.0, 1.0, 0.0, 7.0]}
{'a': 0, 'r': 0.999, 'done': False, 'obs': [2.0, 59.0, 0.0, 1.0, 0.0, 7.0]}
{'a': 2, 'r': 1.0490000000000002, 'done': False, 'obs': [3.0, 58.0, 0.0, 1.0, 0.0, 2.0]}


In [9]:
# Training: PPO on the scheduling environment (7-min slots)
import os
from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_vec_env

log_dir = "/content/logs" if os.path.exists("/content") else "/workspace/logs"
os.makedirs(log_dir, exist_ok=True)

# make_vec_env handles Monitor and Gymnasium compatibility wrappers
vec_env = make_vec_env(lambda: ClinicSchedulingEnv(slot_minutes=7), n_envs=1, monitor_dir=log_dir)

model = PPO(
    "MlpPolicy",
    vec_env,
    verbose=1,
    learning_rate=3e-4,
    n_steps=2048,
    batch_size=256,
    n_epochs=10,
    gamma=0.995,
    gae_lambda=0.95,
)

# Train
timesteps = 200_000
model.learn(total_timesteps=timesteps)

# Save
model_path = os.path.join(log_dir, "ppo_clinic_scheduling")
model.save(model_path)
print("Saved model to", model_path)

Gym has been unmaintained since 2022 and does not support NumPy 2.0 amongst other critical functionality.
Please upgrade to Gymnasium, the maintained drop-in replacement of Gym, or contact the authors of your software and request that they upgrade.
See the migration guide at https://gymnasium.farama.org/introduction/migration_guide/ for additional information.


Using cpu device


  return datetime.utcnow().replace(tzinfo=utc)


---------------------------------
| rollout/           |          |
|    ep_len_mean     | 116      |
|    ep_rew_mean     | 49.8     |
| time/              |          |
|    fps             | 899      |
|    iterations      | 1        |
|    time_elapsed    | 2        |
|    total_timesteps | 2048     |
---------------------------------


  return datetime.utcnow().replace(tzinfo=utc)


-----------------------------------------
| rollout/                |             |
|    ep_len_mean          | 116         |
|    ep_rew_mean          | 49.9        |
| time/                   |             |
|    fps                  | 843         |
|    iterations           | 2           |
|    time_elapsed         | 4           |
|    total_timesteps      | 4096        |
| train/                  |             |
|    approx_kl            | 0.013261056 |
|    clip_fraction        | 0.119       |
|    clip_range           | 0.2         |
|    entropy_loss         | -1.09       |
|    explained_variance   | 0.045379877 |
|    learning_rate        | 0.0003      |
|    loss                 | 5.71        |
|    n_updates            | 10          |
|    policy_gradient_loss | -0.00547    |
|    value_loss           | 27.9        |
-----------------------------------------
-----------------------------------------
| rollout/                |             |
|    ep_len_mean          | 108   

In [11]:
# Evaluation and visualization
import numpy as np
import pandas as pd
import plotly.express as px
from stable_baselines3 import PPO

# load
loaded = PPO.load(model_path)

def run_episode(env_seed=None):
    env = ClinicSchedulingEnv(slot_minutes=10, seed=env_seed)
    obs, info = env.reset()
    done = False
    events = []
    t = 0
    while not done:
        action, _ = loaded.predict(obs, deterministic=True)
        prev_min = env.minute
        prev_walk = len(env.walkin_queue)
        prev_late = len(env.late_list)
        obs, reward, terminated, truncated, info = env.step(int(action))
        done = terminated or truncated
        events.append({
            "t": t,
            "minute": prev_min,
            "action": int(action),
            "reward": float(reward),
            "served": len(env.served_ids),
            "walkin_q": prev_walk,
            "late": prev_late,
        })
        t += 1
    return pd.DataFrame(events)

 df = run_episode(env_seed=123)
fig = px.line(df, x="minute", y=["served", "walkin_q", "late"], title="Clinic metrics over time")
fig.show()

IndentationError: unindent does not match any outer indentation level (<tokenize>, line 35)

In [None]:
# Improved evaluation: metrics and charts (7-min slots)
import numpy as np
import pandas as pd
import plotly.express as px
from stable_baselines3 import PPO

# Load trained model
loaded = PPO.load(model_path)


def run_episode(env_seed=None, walkin_cutoff_minute=None):
    env = ClinicSchedulingEnv(slot_minutes=7, seed=env_seed, walkin_cutoff_minute=walkin_cutoff_minute)
    obs, info = env.reset()
    done = False
    timeline = []
    step_idx = 0
    while not done:
        action, _ = loaded.predict(obs, deterministic=True)
        prev_state = {
            "step": step_idx,
            "minute": env.minute,
            "walkin_q": len(env.walkin_queue),
            "late": len(env.late_list),
            "served": len(env.served_ids),
            "action": int(action),
        }
        obs, reward, terminated, truncated, _ = env.step(int(action))
        done = bool(terminated or truncated)
        prev_state["reward"] = float(reward)
        timeline.append(prev_state)
        step_idx += 1
    # Build DataFrames
    timeline_df = pd.DataFrame(timeline)
    served_df = pd.DataFrame(env.served_log)
    return env, timeline_df, served_df


# Run one evaluation episode
env, timeline_df, served_df = run_episode(env_seed=123)

# Summary metrics
scheduled_total = len(env.scheduled)
served_total = len(served_df)
served_scheduled = int((~served_df["is_walkin"]).sum()) if not served_df.empty else 0
served_walkin = int((served_df["is_walkin"]).sum()) if not served_df.empty else 0
served_via_recall = int((served_df["served_via_recall"]).sum()) if not served_df.empty else 0
avg_wait_scheduled = float(served_df.loc[~served_df["is_walkin"], "wait_minutes"].dropna().mean()) if served_scheduled > 0 else np.nan
avg_wait_walkin = float(served_df.loc[served_df["is_walkin"], "wait_minutes"].dropna().mean()) if served_walkin > 0 else np.nan

print("Scheduled total:", scheduled_total)
print("Served total:", served_total, "(scheduled:", served_scheduled, ", walk-in:", served_walkin, ")")
print("Late recalls served:", served_via_recall)
print("Avg wait (scheduled):", avg_wait_scheduled)
print("Avg wait (walk-in):", avg_wait_walkin)
print("Remaining late list:", len(env.late_list))
print("Remaining walk-in queue:", len(env.walkin_queue))

# Charts
fig1 = px.line(timeline_df, x="minute", y=["served", "walkin_q", "late"], title="Queues and served over time")
fig1.show()

if not served_df.empty and served_df["wait_minutes"].notna().any():
    fig2 = px.histogram(served_df.dropna(subset=["wait_minutes"]), x="wait_minutes", nbins=30, title="Wait time distribution (minutes)")
    fig2.show()
else:
    print("No served patients to plot wait distribution.")

## 7-minute availability booking (user selects an hour)

This section adds a simple booking planner so users can pick an hour label based on availability. Each hour has 7-minute sub-slots:
- Hours: 08, 09, 10, 11, 13, 14, 15 (12:00–13:00 lunch, closed; 16:00 close)
- Capacity rules per hour (7-min consult):
  - Normal hours (08, 09, 10, 13, 14): 9 bookings (last may bleed a few minutes into next hour)
  - Boundary hours (11 before lunch, 15 before day close): 8 bookings (must finish by 12:00 and 16:00 respectively)
- Example behavior: After 9 bookings at 08:00, availability shows 09–12 and 13–16.

## Env V2: Multi‑provider, variable service times, action masking

Enhancements:
- Multiple providers can serve patients in parallel (concurrent servers)
- Variable service durations (lognormal around 7 minutes, configurable)
- Action masking for invalid choices (late recall only if late patient has arrived; etc.)
- Optional seeding of scheduled appointments from the 7‑min `BookingPlanner`
- Richer logs and provider utilization metrics

In [None]:
import numpy as np
import gymnasium as gym
from gymnasium import spaces
from typing import Optional, Dict, Any, List, Tuple

class ClinicSchedulingEnvV2(gym.Env):
    """Multi-provider scheduling with variable service times and action masking."""
    metadata = {"render.modes": ["human"]}

    def __init__(self,
                 slot_minutes: int = 7,
                 num_providers: int = 2,
                 service_mean_min: float = 7.0,
                 service_sigma_min: float = 2.0,
                 walkin_rate_per_hour: float = 8.0,
                 no_show_prob: float = 0.05,
                 late_prob: float = 0.1,
                 walkin_cutoff_minute: Optional[int] = None,
                 seed: Optional[int] = None,
                 seeded_schedule: Optional[List[Tuple[int, int]]] = None  # list of (patient_id, start_minute)
                 ):
        super().__init__()
        self.slot_minutes = slot_minutes
        self.num_providers = num_providers
        self.service_mean_min = service_mean_min
        self.service_sigma_min = service_sigma_min
        self.walkin_rate_per_hour = walkin_rate_per_hour
        self.no_show_prob = no_show_prob
        self.late_prob = late_prob
        self.walkin_cutoff_minute = walkin_cutoff_minute or MINUTES_CLOSE_PM
        self.rng = np.random.default_rng(seed)

        # Action space: 0 scheduled, 1 walk-in, 2 late recall
        self.action_space = spaces.Discrete(3)
        # Observation: [minute_slot, walkin_q, late_len, scheduled_remaining, free_providers, mask0, mask1, mask2]
        high = np.array([
            WORK_MINUTES // self.slot_minutes,
            500,
            MAX_SCHEDULED_PER_DAY,
            MAX_SCHEDULED_PER_DAY,
            self.num_providers,
            1, 1, 1
        ], dtype=np.float32)
        self.observation_space = spaces.Box(low=0.0, high=high, dtype=np.float32)

        # Providers state: remaining service time if busy else 0
        self.provider_busy_remaining: List[int] = [0 for _ in range(self.num_providers)]

        # Queues
        self.walkin_queue: List[Patient] = []
        self.late_list: List[Patient] = []
        self.scheduled_slots: Dict[int, Patient] = {}  # by slot index
        self.served_ids: List[int] = []
        self.served_log: List[Dict[str, Any]] = []
        self.generated_patients: Dict[int, Patient] = {}
        self.minute = MINUTES_OPEN_AM
        self.next_walkin_id = 1
        self.seeded_schedule = seeded_schedule
        self._build_scheduled_from_seed()

    def _build_scheduled_from_seed(self):
        self.scheduled_slots.clear()
        pid = 1
        if self.seeded_schedule:
            for pid_seed, start_min in self.seeded_schedule:
                slot = minute_to_slot(start_min, self.slot_minutes)
                arrival = start_min  # assume on-time unless randomized below
                if self.rng.random() < self.no_show_prob:
                    arrival = None
                else:
                    # lateness
                    if self.rng.random() < self.late_prob:
                        arrival = min(start_min + int(self.rng.integers(5, 30)), MINUTES_CLOSE_PM - 1)
                p = Patient(id=pid_seed, scheduled_slot=slot, arrival_time_min=arrival)
                self.scheduled_slots[slot] = p
                self.generated_patients[pid_seed] = p
                pid = max(pid, pid_seed + 1)
        self.next_walkin_id = pid

    def _service_duration(self) -> int:
        # lognormal-ish clamp around mean
        val = max(1.0, self.rng.lognormal(mean=np.log(max(1e-6, self.service_mean_min)), sigma=self.service_sigma_min / max(1.0, self.service_mean_min)))
        return int(max(1, round(val)))

    def _maybe_generate_walkins(self):
        if not is_open_minute(self.minute) or self.minute >= self.walkin_cutoff_minute:
            return
        lam = self.walkin_rate_per_hour / 60.0
        arrivals = self.rng.poisson(lam)
        for _ in range(arrivals):
            p = Patient(id=self.next_walkin_id, scheduled_slot=None, arrival_time_min=self.minute)
            self.walkin_queue.append(p)
            self.generated_patients[p.id] = p
            self.next_walkin_id += 1

    def _update_late_status(self):
        for slot, p in list(self.scheduled_slots.items()):
            slot_minute = self._slot_to_minute(slot)
            if p.arrival_time_min is None:
                continue
            if p.arrival_time_min > self.minute and self.minute >= slot_minute:
                p.is_late = True
                if p not in self.late_list and p.id not in self.served_ids and self.minute >= slot_minute:
                    self.late_list.append(p)

    def _slot_to_minute(self, slot_index: int) -> int:
        am_slots = (MINUTES_LUNCH_START - MINUTES_OPEN_AM) // self.slot_minutes
        if slot_index < am_slots:
            return MINUTES_OPEN_AM + slot_index * self.slot_minutes
        else:
            return MINUTES_LUNCH_END + (slot_index - am_slots) * self.slot_minutes

    def _mask(self) -> np.ndarray:
        mask = np.array([0, 0, 0], dtype=np.int8)
        # scheduled on-site available?
        remaining_slots = sorted([s for s, p in self.scheduled_slots.items() if p.id not in self.served_ids])
        if remaining_slots:
            p = self.scheduled_slots[remaining_slots[0]]
            if p.arrival_time_min is not None and p.arrival_time_min <= self.minute:
                mask[0] = 1
        # walk-in available?
        if len(self.walkin_queue) > 0:
            mask[1] = 1
        # late recall available (arrived late and in list)?
        if any(p.arrival_time_min is not None and p.arrival_time_min <= self.minute for p in self.late_list):
            mask[2] = 1
        # if no providers free, no actions available
        if self.free_providers() == 0:
            mask[:] = 0
        return mask

    def free_providers(self) -> int:
        return sum(1 for t in self.provider_busy_remaining if t <= 0)

    def _get_obs(self):
        remaining_slots = [s for s, p in self.scheduled_slots.items() if p.id not in self.served_ids]
        obs = np.array([
            minute_to_slot(self.minute, self.slot_minutes),
            len(self.walkin_queue),
            len(self.late_list),
            len(remaining_slots),
            self.free_providers(),
            *self._mask().tolist()
        ], dtype=np.float32)
        return obs

    def step(self, action: int):
        reward = 0.0
        info: Dict[str, Any] = {}

        # process arrivals and late updates
        self._maybe_generate_walkins()
        self._update_late_status()

        served_patients: List[Patient] = []
        mask = self._mask()
        # serve up to number of free providers, consistent with intended action preference
        to_serve = self.free_providers()
        for _ in range(to_serve):
            candidate: Optional[Patient] = None
            if action == 2 and mask[2]:
                # serve arrived late first
                for i, p in enumerate(self.late_list):
                    if p.arrival_time_min is not None and p.arrival_time_min <= self.minute:
                        candidate = self.late_list.pop(i)
                        break
            if candidate is None and action == 0 and mask[0]:
                remaining_slots = sorted([s for s, p in self.scheduled_slots.items() if p.id not in self.served_ids])
                if remaining_slots:
                    p = self.scheduled_slots[remaining_slots[0]]
                    if p.arrival_time_min is not None and p.arrival_time_min <= self.minute:
                        candidate = p
            if candidate is None and action in (1, 0, 2) and mask[1] and self.walkin_queue:
                candidate = self.walkin_queue.pop(0)
            if candidate is not None:
                self.served_ids.append(candidate.id)
                served_patients.append(candidate)
                # assign to a provider
                for i in range(self.num_providers):
                    if self.provider_busy_remaining[i] <= 0:
                        self.provider_busy_remaining[i] = self._service_duration()
                        break

        # reward and logs
        reward += 1.0 * len(served_patients)
        for p in served_patients:
            wait = None
            if p.arrival_time_min is not None:
                wait = max(0, self.minute - p.arrival_time_min)
            self.served_log.append({
                "id": p.id,
                "served_minute": self.minute,
                "arrival_time": p.arrival_time_min,
                "is_walkin": p.scheduled_slot is None,
                "is_late": bool(p.is_late),
                "wait_minutes": wait,
            })
            if p.scheduled_slot is not None:
                reward += 0.05

        # idle penalty if queues but no serve due to mask or no providers free
        if len(served_patients) == 0 and (self.walkin_queue or self.late_list or any(p.id not in self.served_ids for p in self.scheduled_slots.values())):
            reward -= 0.01

        # advance time by 1 minute; decrement providers
        self.minute += 1
        for i in range(self.num_providers):
            if self.provider_busy_remaining[i] > 0:
                self.provider_busy_remaining[i] -= 1
        if MINUTES_LUNCH_START <= self.minute < MINUTES_LUNCH_END:
            self.minute = MINUTES_LUNCH_END
        done = self.minute >= MINUTES_CLOSE_PM

        # terminal penalty for unserved scheduled
        if done:
            remaining_sched = len([p for p in self.scheduled_slots.values() if p.id not in self.served_ids])
            reward -= 0.1 * remaining_sched

        obs = self._get_obs()
        return obs, reward, done, False, info

    def reset(self, *, seed: Optional[int] = None, options: Optional[dict] = None):
        super().reset(seed=seed)
        if seed is not None:
            self.rng = np.random.default_rng(seed)
        self.minute = MINUTES_OPEN_AM
        self.provider_busy_remaining = [0 for _ in range(self.num_providers)]
        self.walkin_queue = []
        self.late_list = []
        self.served_ids = []
        self.served_log = []
        self.generated_patients = {}
        self._build_scheduled_from_seed()
        return self._get_obs(), {}

    def render(self):
        print({
            "minute": self.minute,
            "free_providers": self.free_providers(),
            "walkin_q": len(self.walkin_queue),
            "late": len(self.late_list)
        })

In [None]:
# Calibration/config cell for Env V2
CALIB = {
    "slot_minutes": 7,
    "num_providers": 2,
    "service_mean_min": 7.0,
    "service_sigma_min": 2.0,
    "walkin_rate_per_hour": 10.0,
    "no_show_prob": 0.07,
    "late_prob": 0.12,
    "walkin_cutoff_minute": MINUTES_CLOSE_PM,
}

# Example: seed scheduled patients from BookingPlanner (first N bookings at 8:00 then 9:00)
seed_schedule = []  # list of (patient_id, start_minute)
planner = BookingPlanner()
patient_id = 1
for hour in [8, 9]:
    for _ in range(5):  # first 5 bookings per hour for demo
        t = planner.book(hour)
        if t is None:
            break
        # parse t like '8:07am' to minute-of-day
        # simpler: compute directly from planner's counters instead of parsing
    
# Instead, generate fixed seeds at minute marks
seed_schedule = [(i + 1, 8 * 60 + i * CALIB["slot_minutes"]) for i in range(9)]  # 8:00 to ~8:56
seed_schedule += [(10 + i, 9 * 60 + i * CALIB["slot_minutes"]) for i in range(5)]

print("Seeds (first 3):", seed_schedule[:3])

In [None]:
# Training with MaskablePPO if available, else PPO
from stable_baselines3 import PPO
try:
    from sb3_contrib import MaskablePPO
    MASKABLE = True
except Exception:
    MASKABLE = False

from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.callbacks import EvalCallback
import os

log_dir_v2 = "/content/logs_v2" if os.path.exists("/content") else "/workspace/logs_v2"
os.makedirs(log_dir_v2, exist_ok=True)

# build env factory with seeded schedule

def make_env_v2():
    return ClinicSchedulingEnvV2(
        slot_minutes=CALIB["slot_minutes"],
        num_providers=CALIB["num_providers"],
        service_mean_min=CALIB["service_mean_min"],
        service_sigma_min=CALIB["service_sigma_min"],
        walkin_rate_per_hour=CALIB["walkin_rate_per_hour"],
        no_show_prob=CALIB["no_show_prob"],
        late_prob=CALIB["late_prob"],
        walkin_cutoff_minute=CALIB["walkin_cutoff_minute"],
        seeded_schedule=seed_schedule,
    )

vec_env_v2 = make_vec_env(make_env_v2, n_envs=1, monitor_dir=log_dir_v2)

if MASKABLE:
    model_v2 = MaskablePPO(
        "MlpPolicy",
        vec_env_v2,
        verbose=1,
        learning_rate=3e-4,
        n_steps=2048,
        batch_size=256,
        n_epochs=10,
        gamma=0.995,
        gae_lambda=0.95,
    )
else:
    model_v2 = PPO(
        "MlpPolicy",
        vec_env_v2,
        verbose=1,
        learning_rate=3e-4,
        n_steps=2048,
        batch_size=256,
        n_epochs=10,
        gamma=0.995,
        gae_lambda=0.95,
    )

# Optional evaluation callback
# eval_env_v2 = make_vec_env(make_env_v2, n_envs=1)
# eval_cb = EvalCallback(eval_env_v2, best_model_save_path=log_dir_v2, log_path=log_dir_v2, eval_freq=10_000)

timesteps_v2 = 200_000
model_v2.learn(total_timesteps=timesteps_v2)  # , callback=eval_cb)

model_v2_path = os.path.join(log_dir_v2, "model_v2")
model_v2.save(model_v2_path)
print("Saved V2 model to", model_v2_path)

In [None]:
# Evaluation for Env V2
import pandas as pd
from statistics import mean

loaded_v2 = None
try:
    from sb3_contrib import MaskablePPO
    loaded_v2 = MaskablePPO.load(model_v2_path)
except Exception:
    from stable_baselines3 import PPO
    loaded_v2 = PPO.load(model_v2_path)


def evaluate_v2(episodes=3):
    metrics = []
    for ep in range(episodes):
        env = make_env_v2()
        obs, info = env.reset()
        done = False
        while not done:
            action, _ = loaded_v2.predict(obs, deterministic=True)
            obs, reward, terminated, truncated, _ = env.step(int(action))
            done = bool(terminated or truncated)
        # compute metrics
        served_df = pd.DataFrame(env.served_log)
        scheduled_served = int((served_df["is_walkin"] == False).sum()) if not served_df.empty else 0
        walkin_served = int((served_df["is_walkin"] == True).sum()) if not served_df.empty else 0
        avg_wait = float(served_df["wait_minutes"].dropna().mean()) if not served_df.empty else float("nan")
        util = 1.0 - (sum(1 for t in env.provider_busy_remaining if t <= 0) / env.num_providers)
        metrics.append({
            "scheduled_served": scheduled_served,
            "walkin_served": walkin_served,
            "avg_wait": avg_wait,
            "late_remaining": len(env.late_list),
            "walkins_remaining": len(env.walkin_queue),
            "providers": env.num_providers,
        })
    return pd.DataFrame(metrics)

m = evaluate_v2(episodes=3)
print(m.describe(include="all"))

In [None]:
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple

SLOT_MINUTES_BOOKING = 7
OPEN_HOURS = [8, 9, 10, 11, 13, 14, 15]


def _capacity_for_hour(hour: int, slot_minutes: int = SLOT_MINUTES_BOOKING) -> int:
    # Hours just before a closed boundary (12:00 lunch, 16:00 close) cannot bleed over
    if hour == 11:
        return (60) // slot_minutes  # end by 12:00
    if hour == 15:
        return (60) // slot_minutes  # end by 16:00
    # Other hours may bleed into next hour a bit
    from math import ceil
    return int(ceil(60.0 / slot_minutes))


def _minute_to_str(minute_of_day: int) -> str:
    h = minute_of_day // 60
    m = minute_of_day % 60
    suffix = "am" if h < 12 else "pm"
    h12 = h if 1 <= h <= 12 else (h - 12 if h > 12 else 12)
    return f"{h12}:{m:02d}{suffix}"


def _format_hour_ranges(hours: List[int]) -> str:
    if not hours:
        return "(none)"
    hours = sorted(hours)
    ranges: List[Tuple[int, int]] = []  # [start, end_exclusive]
    start = hours[0]
    prev = hours[0]
    for h in hours[1:]:
        if h == prev + 1 or (prev == 11 and h == 13):
            # treat lunch gap as break; so 11->13 is not consecutive
            if prev == 11 and h == 13:
                ranges.append((start, prev + 1))
                start = h
            # else keep extending
        else:
            ranges.append((start, prev + 1))
            start = h
        prev = h
    ranges.append((start, prev + 1))

    def to_12h(h):
        suffix = "am" if h < 12 else "pm"
        h12 = h if 1 <= h <= 12 else (h - 12 if h > 12 else 12)
        return f"{h12}{suffix}"

    return ", ".join([f"{to_12h(s)}–{to_12h(e)}" for s, e in ranges])


@dataclass
class BookingPlanner:
    slot_minutes: int = SLOT_MINUTES_BOOKING
    open_hours: List[int] = field(default_factory=lambda: OPEN_HOURS.copy())
    capacity_by_hour: Dict[int, int] = field(init=False)
    booked_count_by_hour: Dict[int, int] = field(init=False)

    def __post_init__(self):
        self.capacity_by_hour = {h: _capacity_for_hour(h, self.slot_minutes) for h in self.open_hours}
        self.booked_count_by_hour = {h: 0 for h in self.open_hours}

    def available_hours(self) -> List[int]:
        return [h for h in self.open_hours if self.booked_count_by_hour[h] < self.capacity_by_hour[h]]

    def book(self, hour: int) -> Optional[str]:
        if hour not in self.open_hours:
            return None
        cap = self.capacity_by_hour[hour]
        used = self.booked_count_by_hour[hour]
        if used >= cap:
            return None
        # Assign next 7-min slot within the hour
        start_minute = hour * 60 + used * self.slot_minutes
        self.booked_count_by_hour[hour] += 1
        return _minute_to_str(start_minute)

    def availability_label(self) -> str:
        return _format_hour_ranges(self.available_hours())


# Demo: fill 8:00 hour and show remaining availability
planner = BookingPlanner()
print("Initial availability:", planner.availability_label())
assigned = []
for i in range(9):
    assigned.append(planner.book(8))
print("Assigned times at 8am:", assigned)
print("Availability after filling 8am:", planner.availability_label())

In [None]:
# Optional: Interactive booking widget (Colab)
try:
    import ipywidgets as widgets
    from IPython.display import display, clear_output

    planner_widget = BookingPlanner()

    def hour_options():
        return [(f"{h}:00 ({planner_widget.capacity_by_hour[h] - planner_widget.booked_count_by_hour[h]} left)", h)
                for h in planner_widget.available_hours()]

    hour_dd = widgets.Dropdown(options=hour_options(), description='Hour:')
    out = widgets.Output()

    def on_book(_):
        with out:
            clear_output()
            if not hour_dd.options:
                print("No hours available.")
                return
            hour = hour_dd.value
            assigned_time = planner_widget.book(hour)
            if assigned_time is None:
                print("Selected hour is full. Choose another.")
            else:
                print(f"Booked at {assigned_time}")
                print("Remaining availability:", planner_widget.availability_label())
            # refresh dropdown
            hour_dd.options = hour_options()

    btn = widgets.Button(description='Book')
    btn.on_click(on_book)

    display(widgets.VBox([hour_dd, btn, out]))
except Exception as e:
    print("Widgets unavailable:", e)