### Install Libraries

In [3]:
from pathlib import Path
import pandas as pd
from ortools.sat.python import cp_model
import pandas as pd
import numpy as np
import random
import os
from ortools.sat.python.cp_model import LinearExpr

# os.chdir("/home/ashaar/medical-rotation-scheduling")
os.chdir("/Users/ashaar/medical-rotation-scheduling")


In [4]:
#####
# GRAD_REQ = {
#     "R1": {
#         ("Medical Teams",): (5, 6, 7, 8),
#         ("AMAU",): (1, 2),
#         ("Cardiology",): (2,),
#         ("Infectious Disease",): (1, 2),
#         ("Endocrine",): (1, 2),
#     },
#     "R2": {
#         ("Senior Rotation",): (1,),
#         ("CCU",): (2,),
#         ("MICU",): (2,),
#         ("Nephrology",): (1, 2),
#         ("Neurology",): (1,),
#         ("Cardiology",): (1,),
#         ("Geriatrics",): (1,),
#         ("AMAU",): (1, 2),
#         ("Al Khor",): (1,),
#         ("MOP",): (1,),
#     },
#     "R3": {
#         ("Senior Rotation",): (1, 2),
#         ("Oncology",): (1,),
#         ("Hematology",): (1,),
#         ("Al Wakra",): (1, 2),
#         ("GI",): (2,),
#         ("Pulmonology",): (2,),
#         ("Rheumatology",): (1,),
#         ("MOP",): (1,),
#         ("AMAU",): (1,),
#         ("Cardiology", "ED", "Medical Consultation"): (1,),
#     },
#     "R4": {
#         ("Registrar Rotation",): (4, 5, 6),
#         ("Medical Consultation",): (4, 5, 6),
#         ("Al Wakra",): (0, 1, 2),
#         ("Al Khor",): (0, 1),
#         ("Hematology", "Oncology"): (0, 1),
#     },
#     }

### Global Variables

In [6]:
# -------------------------------
# Global Constants and Parameters
# -------------------------------
INPUT_FILE = "real_example_input.xlsx"
OUTPUT_XLSX = "output_file.xlsx"
BLOCKS = 13

# -------------------------------
# Rotation Definitions
# -------------------------------
ROTATIONS = [
    "Cardiology", "Endocrine", "Infectious Disease", "AMAU", "Nephrology",
    "Neurology", "CCU", "MICU", "Al Khor", "MOP", "Geriatrics", "Hematology",
    "Oncology", "Al Wakra", "GI", "Pulmonology", "Rheumatology", "ED",
    "Medical Consultation", "Medical Teams", "Senior Rotation",
    "Registrar Rotation",
]

LEAVE_ROTATION = "LEAVE"
ROTATIONS.append(LEAVE_ROTATION)

TRANSFER_ROTATION = "TRANSFER"
ROTATIONS.append(TRANSFER_ROTATION)

# -------------------------------
# Leave-Eligible Rotations by PGY
# -------------------------------
LEAVE_ALLOWED = {
    "R1": {"Endocrine", "AMAU", "Infectious Disease"},
    "R2": {"MOP", "Nephrology", "AMAU"},
    "R3": {"GI", "Pulmonology", "AMAU"},
    "R4": {"Medical Consultation"},
    "R_NEURO": {"AMAU",}
}

# -------------------------------
# Graduation Requirements by PGY
# -------------------------------
#TODO: Calculate the average and then get negative score for larger variance. (Medical Teams)
GRAD_REQ = {
    "R1": {
        ("Medical Teams",): (4, 5, 6, 7), #TODO: ADD LOCKED.
        ("AMAU",): (1, 2),
        ("Cardiology",): (2,),
        ("Infectious Disease",): (1, 2),
        ("Endocrine",): (1, 2),
    },
    "R2": {
        ("Senior Rotation",): (1,2),
        ("CCU",): (2,),
        ("MICU",): (2,),
        ("Nephrology",): (1, 2),
        ("Neurology",): (1,),
        ("Cardiology",): (1,),
        ("Geriatrics",): (1,),
        ("AMAU",): (1, 2),
        ("Al Khor",): (0, 1),
        ("MOP",): (1,),
    },
    "R3": {
        ("Senior Rotation",): (2,),
        ("Oncology",): (1,),
        ("Hematology",): (1,),
        ("Al Wakra",): (1,),
        ("GI",): (2,),
        ("Pulmonology",): (2,),
        ("Rheumatology",): (1,),
        ("MOP",): (1,),
        ("AMAU",): (1,),
        ("Cardiology", "ED", "Medical Consultation"): (1,),
    },
    "R4": {
        ("Registrar Rotation",): (5, 6),
        ("Medical Consultation",): (3, 4, 5,),
        ("Al Wakra",): (1, 2),
        ("Al Khor",): (1, 2),
        ("Hematology", "Oncology"): (1, 2),
    },
    "R4_Chiefs": {
        ("Registrar Rotation", ): (5, 6, 7),
        ("Medical Consultation",): (6, 7, 8),
    },
    "R_NEURO": {
        ("Medical Teams", ): (3, ),
        ("AMAU",): (3,4,),
        ("MICU",): (1,),
        ("Rheumatology",): (1,),
        ("ED",): (2,),
        ("TRANSFER",): (2,)
    },
}

# -------------------------------
# Per-block Minimum Requirements
# -------------------------------
PER_BLOCK_MIN = {
    "Cardiology": 11,
    "AMAU": 11,
    "CCU": 7,
    "MICU": 7,
    "Al Khor": 6,
    "MOP": 6,
    "Hematology": 5,
    "Oncology": 5,
    "Al Wakra": 9,
    "Medical Consultation": 10,
    "Medical Teams": 20,
    # "Registrar Rotation": 20,
    # "Senior Rotation": 10,
}


# -------------------------------
# Eligibility Matrix
# -------------------------------
ELIGIBILITY = {
    pgy: {rot for group in GRAD_REQ[pgy] for rot in group}
    for pgy in GRAD_REQ
}

for pgy in ELIGIBILITY:
    ELIGIBILITY[pgy].add(LEAVE_ROTATION)

# -------------------------------
# Rotation Group Definitions
# -------------------------------
group_defs = {
    "Floater": {"Nephrology", "Endocrine"},
    "2ndOnCall": {"GI", "Rheumatology", "Pulmonology"},
}


In [7]:
ELIGIBILITY

{'R1': {'AMAU',
  'Cardiology',
  'Endocrine',
  'Infectious Disease',
  'LEAVE',
  'Medical Teams'},
 'R2': {'AMAU',
  'Al Khor',
  'CCU',
  'Cardiology',
  'Geriatrics',
  'LEAVE',
  'MICU',
  'MOP',
  'Nephrology',
  'Neurology',
  'Senior Rotation'},
 'R3': {'AMAU',
  'Al Wakra',
  'Cardiology',
  'ED',
  'GI',
  'Hematology',
  'LEAVE',
  'MOP',
  'Medical Consultation',
  'Oncology',
  'Pulmonology',
  'Rheumatology',
  'Senior Rotation'},
 'R4': {'Al Khor',
  'Al Wakra',
  'Hematology',
  'LEAVE',
  'Medical Consultation',
  'Oncology',
  'Registrar Rotation'},
 'R4_Chiefs': {'LEAVE', 'Medical Consultation', 'Registrar Rotation'},
 'R_NEURO': {'AMAU',
  'ED',
  'LEAVE',
  'MICU',
  'Medical Teams',
  'Rheumatology',
  'TRANSFER'}}

### Parser

In [8]:
# -------------------------------
# Excel Input Parser and Index Maps
# -------------------------------
def parse_input(path: str):
    """Read Excel, return residents, PGYs, leave dict, forced/forbidden maps."""
    df = pd.read_excel(path).fillna({
        "Leave1Block": 0,
        "Leave2Block": 0,
        "Leave1Half": "",
        "Leave2Half": ""
    })

    residents = df["ID"].tolist()
    pgys = df["PGY"].tolist()
    leave_dict = {}
    forced_assignments = {}
    forbidden_assignments = {}

    for row in df.itertuples(index=False):
        b1, b2 = int(row.Leave1Block), int(row.Leave2Block)
        full, half = set(), set()

        if b1 and b1 == b2:
            full.add(b1)
        else:
            if b1:
                half.add(b1)
            if b2:
                half.add(b2)

        leave_dict[row.ID] = {
            "pgy": row.PGY,
            "full": full,
            "half": half
        }

        for b in range(1, 14):
            val = getattr(row, f"Block_{b}", "")
            val = str(val).strip()
            if not val or val.lower() in {"nan", "none"}:
                continue
            if val.startswith("!"):
                forbidden_assignments[(row.ID, b - 1)] = val[1:]
            else:
                forced_assignments[(row.ID, b - 1)] = val

    return residents, pgys, leave_dict, forced_assignments, forbidden_assignments

residents, pgys, leave_dict, forced_assignments, forbidden_assignments = (
    parse_input(os.path.join("sample_data", INPUT_FILE))
)

n = len(residents)

rotation_to_idx = {rot: i for i, rot in enumerate(ROTATIONS)}
idx_to_rotation = {i: rot for rot, i in rotation_to_idx.items()}
LEAVE_IDX = rotation_to_idx[LEAVE_ROTATION]

rotation_to_idx

forced_assignments

{}

In [9]:
set(rotation_to_idx.keys()) == set(ROTATIONS)


True

### Decision Variable Definition and Assignment Domain


In [18]:
# -------------------------
# CP-SAT Model Setup
# -------------------------
model = cp_model.CpModel()

# x[r, b]: Integer variable representing the rotation index assigned to
# resident r in block b (0-based: 0..12).
# - If block b is a full leave block, x[r, b] is fixed to LEAVE_IDX.
# - Else, x[r, b] is constrained to eligible rotations based on PGY and
#   further restricted to LEAVE_ALLOWED if it's a half-leave block.
x = {}
for r in range(n):
	res_id = residents[r]
	pgy = pgys[r]
	for b in range(BLOCKS):
		if (b + 1) in leave_dict[res_id]["full"]:
			x[r, b] = model.NewIntVarFromDomain(
				cp_model.Domain.FromValues([LEAVE_IDX]), f"x_{r}_{b}"
			)
		else:
			eligible = [
				rotation_to_idx[rot]
				for rot in ELIGIBILITY[pgy]
				if rot in rotation_to_idx and
				rotation_to_idx[rot] != LEAVE_IDX
			]
			if (b + 1) in leave_dict[res_id]["half"]:
				eligible = [
					rotation_to_idx[rot]
					for rot in ELIGIBILITY[pgy]
					if rot in rotation_to_idx and
					rot in LEAVE_ALLOWED.get(pgy, set()) and
					rotation_to_idx[rot] != LEAVE_IDX
				]
			x[r, b] = model.NewIntVarFromDomain(
				cp_model.Domain.FromValues(eligible), f"x_{r}_{b}"
			)


# -------------------------
# Indicator Variables y[r, b, rot]
# -------------------------
# For each rotation (including LEAVE), y[r, b, rot] = 1 iff resident r
# is assigned to rotation `rot` in block b. These are boolean indicators
# tied to the integer decision variable x[r, b].
y = {}
for r in range(n):
	for b in range(BLOCKS):  # BLOCKS = 13, blocks 0 to 12
		for rot in ROTATIONS:
			rot_id = rotation_to_idx[rot]
			var = model.NewBoolVar(f"y_{r}_{b}_{rot}")
			model.Add(x[r, b] == rot_id).OnlyEnforceIf(var)
			model.Add(x[r, b] != rot_id).OnlyEnforceIf(var.Not())
			y[r, b, rot] = var

# Enforce that each block has exactly one assigned rotation
for r in range(n):
	for b in range(BLOCKS):
		model.AddExactlyOne([y[r, b, rot] for rot in ROTATIONS])


# -------------------------
# Leave Weights
# -------------------------
# Weight each resident-block pair by:
#   - 1 if it's a half-leave block (less time available),
#   - 2 otherwise.
# These weights are used to scale contributions to minimum coverage constraints.
half_leave_weights = {
	(r, b): 1 if (b + 1) in leave_dict[residents[r]]["half"] else 2
	for r in range(n)
	for b in range(BLOCKS)
}

# -------------------------
# R_NEURO Hard Constraints
# -------------------------
for r in range(n):
    if pgys[r] == "R_NEURO":
        # First block must be Medical Teams
        model.Add(x[r, 0] == rotation_to_idx["Medical Teams"])
        model.Add(x[r, 1] == rotation_to_idx["Medical Teams"])
        model.Add(x[r, 2] == rotation_to_idx["Medical Teams"])
        
        
        # Last two blocks must be TRANSFER
        model.Add(x[r, 11] == rotation_to_idx["TRANSFER"])
        model.Add(x[r, 12] == rotation_to_idx["TRANSFER"])

# -------------------------
# Per-Block Minimum Rotation Coverage
# -------------------------
# For each rotation `rot` that has a minimum coverage requirement:
# In every block, the sum of weighted assignments to `rot`
# must be ≥ 2 × PER_BLOCK_MIN[rot].
# for b in range(BLOCKS):
# 	for rot in PER_BLOCK_MIN:
# 		model.Add(
# 			sum(half_leave_weights[r, b] * y[r, b, rot] for r in range(n))
# 			>= 2 * PER_BLOCK_MIN[rot]
# 		)

# -------------------------
# Graduation Requirements
# -------------------------
# Each resident must fulfill graduation requirements for specific rotation
# groups, unless exempt (R3s with full leave can skip one group).

for r in range(n):
	res_id = residents[r]
	pgy = pgys[r]
	leave_blocks = leave_dict[res_id]["full"]

	for rotation_group, required_counts in GRAD_REQ[pgy].items():
		if (
			pgy == "R3"
			and set(rotation_group) == {"Cardiology", "ED", "Medical Consultation"}
			and leave_blocks
		):
			continue

		count_vars = []
		for b in range(BLOCKS):
			if (b+1) not in leave_blocks:
				for rot in rotation_group:
					if rot in rotation_to_idx:
						count_vars.append(y[r, b, rot])

		if not count_vars:
			print(f"[GRAD EMPTY] {res_id} ({pgy}) → {rotation_group} → SKIPPED")
		elif len(count_vars) < min(required_counts):
			print(f"[GRAD TOO FEW BLOCKS] {res_id} ({pgy}) needs ≥{min(required_counts)} "
				f"but only has {len(count_vars)} possible")

		model.Add(sum(count_vars) >= min(required_counts))
		model.Add(sum(count_vars) <= max(required_counts))



# -------------------------
# On-call Rules and Coverage
# -------------------------
# 1. Every block must have exactly 10 Senior Rotations.
# 2. Every block must have exactly 20 Registrar Rotations.
# 3. For 2nd-on-call rotations (GI, Rheumatology, Pulmonology), the total
#    weighted count (3 if half leave, 6 otherwise) must be ≥ 60.
# 4. For Floater rotations (Nephrology, Endocrine), ensure at least 10
#    residents assigned in each block.

second_wt = {
	(r, b): 3 if (b + 1) in leave_dict[residents[r]]["half"] else 6
	for r in range(n)
	for b in range(BLOCKS)
}

print(f"Residents: {n}")
for b in range(BLOCKS):
	n_leave = sum(1 for r in range(n) if (b + 1) in leave_dict[residents[r]]["full"])
	print(f"Block {b+1}: {n_leave} on full leave")

for b in range(BLOCKS):
	# Enforce fixed counts
	model.Add(sum(y[r, b, "Senior Rotation"] for r in range(n)) == 10)
	# model.Add(sum(y[r, b, "Senior Rotation"] for r in range(n)) <= 13)
 
	model.Add(sum(y[r, b, "Registrar Rotation"] for r in range(n)) == 20)

	# PER_BLOCK_MIN: simple unweighted counts
	for rot in PER_BLOCK_MIN:
		model.Add(
			sum(y[r, b, rot] for r in range(n)) >= PER_BLOCK_MIN[rot]
		)
  
	# Weighted 2nd on-call minimum
	# TODO: Add soft 2x +ve constraint where the sum is at least 64.
	model.Add(
		sum(
			second_wt[r, b] * y[r, b, rot]
			for r in range(n)
			if pgys[r] != "R_NEURO"
			for rot in group_defs["2ndOnCall"]
		) >= 60
	)

	# Floater (Nephrology + Endocrine) ≥ 10 residents
	model.Add(
		sum(
			y[r, b, rot]
			for r in range(n)
			if pgys[r] != "R_NEURO"
			for rot in group_defs["Floater"]
		) >= 10
	)
	
		# Extra Nephrology + Extra Endocrine = total - 5 each
	total_endo = sum(y[r, b, "Endocrine"] for r in range(n))
	total_neph = sum(y[r, b, "Nephrology"] for r in range(n))

	# Define new IntVar for extra_endo and extra_neph
	extra_endo = model.NewIntVar(0, n, f"extra_endo_b{b}")
	extra_neph = model.NewIntVar(0, n, f"extra_neph_b{b}")

	# Symbolic max(0, total - 5)
	model.AddMaxEquality(extra_endo, [0, total_endo - 5])
	model.AddMaxEquality(extra_neph, [0, total_neph - 5])
	
 	# R2s in MOP
	r2_in_mop = sum(
		y[r, b, "MOP"]
		for r in range(n)
		if pgys[r] == "R2"
	)

	# Neurology
	neuro = sum(y[r, b, "Neurology"] for r in range(n))

	# Combined condition
	# model.Add(neuro + r2_in_mop + extra_endo + extra_neph >= 4)

# -------------------------
# Post-Block-3 Medical Teams Coverage
# -------------------------
# From Block 4 onwards (index 3), enforce exactly 20 residents on
# Medical Teams per block.

model.Add(sum(y[r, 1, "Medical Teams"] for r in range(n)) >= 25)

for b in range(3, BLOCKS):
	model.Add(sum(y[r, b, "Medical Teams"] for r in range(n)) == 20)

# -------------------------
# Block-1 PGY Restrictions
# -------------------------
# PGY-1 residents must start in Medical Teams in Block 1.
# PGY-2 residents cannot be assigned to Senior Rotation in Block 1.

med_teams_idx = rotation_to_idx["Medical Teams"]
senior_idx = rotation_to_idx["Senior Rotation"]

for r in range(n):
	if pgys[r] == "R1":
		model.Add(x[r, 0] == med_teams_idx)
	elif pgys[r] == "R2":
		model.Add(x[r, 0] != senior_idx)

# -------------------------
# R1: No 6 Consecutive Medical Teams
# -------------------------
# For each PGY-1 resident, in any 6-block window, they cannot have all
# 6 blocks assigned to Medical Teams.

for r in range(n):
	if pgys[r] == "R1":
		for start in range(BLOCKS - 5):
			model.Add(
				sum(y[r, b, "Medical Teams"] for b in range(start, start + 6))
				<= 5
			)
	if pgys[r] == "R3" or pgys[r] == "R2":
		for b in range(BLOCKS - 1):
			model.AddBoolOr([
				y[r, b, "Senior Rotation"].Not(),
				y[r, b + 1, "Senior Rotation"].Not()
			])


# -------------------------
# Soft Objective Components
# -------------------------

# R2/R3 mix in Cardiology and AMAU
rotation_mix_score = []
for b in range(BLOCKS):
	for rot in ["Cardiology", "AMAU"]:
		eligible_vars = [
			y[r, b, rot]
			for r in range(n)
			if pgys[r] in ("R2", "R3")
		]
		if eligible_vars:
			mix_ok = model.NewBoolVar(f"mix_r2r3_b{b}_{rot}")
			model.Add(sum(eligible_vars) >= 2).OnlyEnforceIf(mix_ok)
			model.Add(sum(eligible_vars) < 2).OnlyEnforceIf(mix_ok.Not())
			rotation_mix_score.append(mix_ok)

# R1 penalties
penalty_r1_long_medical = []
penalty_r1_consec_cardiology = []

for r in range(n):
	if pgys[r] == "R1":
		for start in range(BLOCKS - 3):
			window = [y[r, b, "Medical Teams"] for b in range(start, start + 4)]
			all_four = model.NewBoolVar(f"r1_medical_4consec_{r}_{start}")
			model.Add(sum(window) == 4).OnlyEnforceIf(all_four)
			model.Add(sum(window) != 4).OnlyEnforceIf(all_four.Not())
			penalty_r1_long_medical.append(all_four)

		for b in range(BLOCKS - 1):
			both = model.NewBoolVar(f"r1_cardio_consec_{r}_{b}")
			model.AddBoolAnd([
				y[r, b, "Cardiology"],
				y[r, b + 1, "Cardiology"]
			]).OnlyEnforceIf(both)
			model.AddBoolOr([
				y[r, b, "Cardiology"].Not(),
				y[r, b + 1, "Cardiology"].Not()
			]).OnlyEnforceIf(both.Not())
			penalty_r1_consec_cardiology.append(both)

# R2 rewards
reward_r2_consec_micu = []
reward_r2_consec_ccu = []

for r in range(n):
	if pgys[r] == "R2":
		for b in range(BLOCKS - 1):
			both_micu = model.NewBoolVar(f"r2_micu_consec_{r}_{b}")
			model.AddBoolAnd([
				y[r, b, "MICU"],
				y[r, b + 1, "MICU"]
			]).OnlyEnforceIf(both_micu)
			model.AddBoolOr([
				y[r, b, "MICU"].Not(),
				y[r, b + 1, "MICU"].Not()
			]).OnlyEnforceIf(both_micu.Not())
			reward_r2_consec_micu.append(both_micu)


			both_ccu = model.NewBoolVar(f"r2_ccu_consec_{r}_{b}")
			model.AddBoolAnd([
				y[r, b, "CCU"],
				y[r, b + 1, "CCU"]
			]).OnlyEnforceIf(both_ccu)
			model.AddBoolOr([
				y[r, b, "CCU"].Not(),
				y[r, b + 1, "CCU"].Not()
			]).OnlyEnforceIf(both_ccu.Not())
			reward_r2_consec_ccu.append(both_ccu)
   
# R3 penalties: insufficient spacing between Senior blocks
penalty_r3_senior_spacing = []

for r in range(n):
	if pgys[r] == "R3":
		for b in range(BLOCKS - 1):
			p = model.NewBoolVar(f"r3_senior_consec_{r}_{b}")
			model.AddBoolAnd([
				y[r, b, "Senior Rotation"],
				y[r, b + 1, "Senior Rotation"]
			]).OnlyEnforceIf(p)
			model.AddBoolOr([
				y[r, b, "Senior Rotation"].Not(),
				y[r, b + 1, "Senior Rotation"].Not()
			]).OnlyEnforceIf(p.Not())
			penalty_r3_senior_spacing.append(p)

		for b in range(BLOCKS - 2):
			p = model.NewBoolVar(f"r3_senior_gap1_{r}_{b}")
			model.AddBoolAnd([
				y[r, b, "Senior Rotation"],
				y[r, b + 2, "Senior Rotation"]
			]).OnlyEnforceIf(p)
			model.AddBoolOr([
				y[r, b, "Senior Rotation"].Not(),
				y[r, b + 2, "Senior Rotation"].Not()
			]).OnlyEnforceIf(p.Not())
			penalty_r3_senior_spacing.append(p)

# R4 penalties: >5 consecutive Registrar rotations
penalty_r4_long_registrar = []

for r in range(n):
	if pgys[r] == "R4":
		for start in range(BLOCKS - 5):
			window = [y[r, b, "Registrar Rotation"] for b in range(start, start + 6)]
			too_long = model.NewBoolVar(f"r4_registrar_6consec_{r}_{start}")
			model.Add(sum(window) == 6).OnlyEnforceIf(too_long)
			model.Add(sum(window) != 6).OnlyEnforceIf(too_long.Not())
			penalty_r4_long_registrar.append(too_long)

# AMAU & CCU cannot span across batch boundaries
for r in range(n):
	for b in range(1, 10):  # check transitions up to block 9
		if b % 2 == 1:  # 1,3,5,7,9 are ends of batches
			model.AddBoolOr([
				y[r, b, "AMAU"].Not(),
				y[r, b + 1, "AMAU"].Not()
			])
			model.AddBoolOr([
				y[r, b, "CCU"].Not(),
				y[r, b + 1, "CCU"].Not()
			])

# -------------------------
# Final Objective
# -------------------------
#TODO: Get preferences score (normalized to 1). Provide summary with the score.
model.Maximize(
	sum(rotation_mix_score)
	- sum(penalty_r1_long_medical)
	- sum(penalty_r1_long_medical)
	- sum(penalty_r1_consec_cardiology)
 
	+ sum(reward_r2_consec_micu)
	+ sum(reward_r2_consec_micu)
	+ sum(reward_r2_consec_ccu)
	+ sum(reward_r2_consec_ccu)
 
	- sum(penalty_r3_senior_spacing)
	- sum(penalty_r3_senior_spacing)
 
	- sum(penalty_r4_long_registrar)
	- sum(penalty_r4_long_registrar)
 
)


Residents: 183
Block 1: 0 on full leave
Block 2: 0 on full leave
Block 3: 0 on full leave
Block 4: 1 on full leave
Block 5: 1 on full leave
Block 6: 0 on full leave
Block 7: 0 on full leave
Block 8: 2 on full leave
Block 9: 1 on full leave
Block 10: 0 on full leave
Block 11: 0 on full leave
Block 12: 0 on full leave
Block 13: 0 on full leave


### Solving the Model and Writing the Schedule Output

In [19]:
# -------------------------
# Solve and Output Schedule
# -------------------------
solver = cp_model.CpSolver()
# solver.parameters.log_search_progress = True
# solver.parameters.cp_model_probing_level = 0
# solver.parameters.enumerate_all_solutions = False

status = solver.Solve(model)


if status not in (cp_model.FEASIBLE, cp_model.OPTIMAL):
    raise RuntimeError("No feasible solution found.")

output = []
for r in range(n):
    row = [residents[r]]
    for b in range(BLOCKS):
        rot = idx_to_rotation[solver.Value(x[r, b])]
        row.append(rot)
    output.append(row)


columns = ["Resident"] + [f"Block_{i+1}" for i in range(BLOCKS)]
output_df = pd.DataFrame(output, columns=columns)
output_df.to_excel(OUTPUT_XLSX, index=False)
output_df.head()


Unnamed: 0,Resident,Block_1,Block_2,Block_3,Block_4,Block_5,Block_6,Block_7,Block_8,Block_9,Block_10,Block_11,Block_12,Block_13
0,R1_001,Medical Teams,Medical Teams,Endocrine,Infectious Disease,Medical Teams,Infectious Disease,Cardiology,Endocrine,Medical Teams,Medical Teams,Cardiology,Medical Teams,AMAU
1,R1_002,Medical Teams,Endocrine,AMAU,Cardiology,AMAU,Medical Teams,Medical Teams,Medical Teams,Infectious Disease,Medical Teams,Medical Teams,Endocrine,Cardiology
2,R1_003,Medical Teams,Medical Teams,Infectious Disease,Medical Teams,Medical Teams,AMAU,Cardiology,Endocrine,Endocrine,Medical Teams,Medical Teams,Cardiology,AMAU
3,R1_004,Medical Teams,Medical Teams,Endocrine,Medical Teams,Cardiology,AMAU,Infectious Disease,Medical Teams,AMAU,Medical Teams,Infectious Disease,Medical Teams,Cardiology
4,R1_005,Medical Teams,Medical Teams,Medical Teams,AMAU,Cardiology,Medical Teams,AMAU,Infectious Disease,Medical Teams,Medical Teams,Endocrine,Infectious Disease,Cardiology


In [1]:
from collections import Counter
print("Forced assignments per block:")
for (rid, b), rot in forced_assignments.items():
    print(f"{rid} → Block {b+1}: {rot}")


Forced assignments per block:


NameError: name 'forced_assignments' is not defined

### Rotation Distribution Summary by Block

In [None]:
# -------------------------
# Rotation Distribution Summary by Block
# -------------------------
blocks = [f"Block_{i}" for i in range(1, 14)]

melted = output_df.melt(
    id_vars=["Resident"],
    value_vars=blocks,
    var_name="Block",
    value_name="Rotation"
)

count_df = (
    melted.groupby(["Rotation", "Block"])
    .size()
    .unstack(fill_value=0)
    .sort_index()
)

count_df.index.name = None
count_df.columns.name = None
count_df = count_df[blocks]
count_df.to_excel("rotation_distribution.xlsx")
count_df

### Draft (Extra):

In [None]:
# TODO count in extra nephrology and endocrine.
model.Add(sum(y[r, b, "Neurology"] + (y[r, b, "Nephrology"] + y[r, b, "Endocrine "] - 10) for r in range(b)

In [None]:
import pandas as pd

# ---------- helper ----------
def weight(r, b, leave_dict):
	"""2 full, 1 half"""
	return 1 if (b + 1) in leave_dict[r]["half"] else 2

# ---------- graduation range ----------
def check_grad(output_df, pgys, leave_dict):
	"""return {(resident,group):count} out-of-range"""
	viol = {}
	for r, row in output_df.iterrows():
		res = row["Resident"]
		pgy = pgys[r]
		full_leave = leave_dict[res]["full"]
		for grp, rng in GRAD_REQ[pgy].items():
			if pgy == "R3" and grp == (
				"Cardiology", "ED", "Medical Consultation"
			) and full_leave:
				continue
			ct = sum(row[f"Block_{b}"] in grp for b in range(1, BLOCKS + 1))
			if ct < min(rng) or ct > max(rng):
				viol[(res, grp)] = ct
	return viol

# ---------- staffing minima ----------
def check_block_min(output_df, leave_dict):
	"""return {(block,rot):count} below minimum"""
	viol = {}
	for b in range(1, BLOCKS + 1):
		for rot, req in PER_BLOCK_MIN.items():
			ct = 0
			for r, row in output_df.iterrows():
				if row[f"Block_{b}"] == rot:
					ct += weight(row["Resident"], b - 1, leave_dict)
			if ct < 2 * req:
				viol[(b, rot)] = ct
	return viol

# ---------- exact tallies ----------
def check_exact(output_df, leave_dict):
	"""return {(block,tag):count} wrong"""
	viol = {}
	for b in range(1, BLOCKS + 1):
		def tall(rot):
			return sum(
				weight(row["Resident"], b - 1, leave_dict)
				for _, row in output_df.iterrows()
				if row[f"Block_{b}"] == rot
			)
		if tall("Senior Rotation") != 20:
			viol[(b, "Senior")] = tall("Senior Rotation")
		if tall("Registrar Rotation") != 40:
			viol[(b, "Registrar")] = tall("Registrar Rotation")
		if b >= 4 and tall("Medical Teams") != 40:
			viol[(b, "MedTeams")] = tall("Medical Teams")
	return viol

# ---------- on-call groups ----------
def check_groups(output_df, leave_dict):
	"""return {(block,group):count} below target"""
	targets = {"FLOATER": 8, "NEURO": 4, "2NDONCALL": 20}
	viol = {}
	for b in range(1, BLOCKS + 1):
		for g, members in group_defs.items():
			ct = 0
			for _, row in output_df.iterrows():
				if row[f"Block_{b}"] in members:
					w = weight(row["Resident"], b - 1, leave_dict)
					ct += w if g == "2NDONCALL" else 1
			if ct < targets[g] * (2 if g == "2NDONCALL" else 1):
				viol[(b, g)] = ct
	return viol

# ---------- PGY first block ----------
def check_first(output_df, pgys):
	"""return list of resident ids violating first-block rule"""
	bad = []
	for r, row in output_df.iterrows():
		if pgys[r] == "R1" and row["Block_1"] != "Medical Teams":
			bad.append(row["Resident"])
		if pgys[r] == "R2" and row["Block_1"] == "Senior Rotation":
			bad.append(row["Resident"])
	return bad

# ---------- consecutive Med-Teams ----------
def check_consecutive(output_df, pgys):
	"""return list of resident ids with >5 consecutive MedTeams"""
	bad = []
	for r, row in output_df.iterrows():
		if pgys[r] != "R1":
			continue
		seq = 0
		for b in range(4, BLOCKS + 1):
			if row[f"Block_{b}"] == "Medical Teams":
				seq += 1
				if seq > 5:
					bad.append(row["Resident"])
					break
			else:
				seq = 0
	return bad

# ---------- master validator ----------
def validate(output_df, residents, pgys, leave_dict):
	"""run all checks, return dict of violations"""
	return {
		"grad": check_grad(output_df, pgys, leave_dict),
		"block_min": check_block_min(output_df, leave_dict),
		"exact": check_exact(output_df, leave_dict),
		"groups": check_groups(output_df, leave_dict),
		"first_block": check_first(output_df, pgys),
		"consecutive": check_consecutive(output_df, pgys),
	}

# example:
viol = validate(output_df, residents, pgys, leave_dict)
print(viol)


In [None]:
# -------------------------
# CP-SAT Model
# -------------------------
model = cp_model.CpModel()

x = {}
for r in range(n):
	res_id = residents[r]
	pgy = pgys[r]
	for b in range(BLOCKS):
		block = b + 1
		if block in leave_dict[res_id]["full"]:
			x[r, b] = model.NewIntVarFromDomain(
				cp_model.Domain.FromValues([LEAVE_IDX]), f'x_{r}_{b}'
			)
		else:
			eligible = [
				rotation_to_idx[rot]
				for rot in ELIGIBILITY[pgy]
				if rot in rotation_to_idx
			]
			if block in leave_dict[res_id]["half"]:
				eligible = [
					rotation_to_idx[rot]
					for rot in ELIGIBILITY[pgy]
					if rot in rotation_to_idx and
					rot in LEAVE_ALLOWED.get(pgy, set())
				]

			x[r, b] = model.NewIntVarFromDomain(
				cp_model.Domain.FromValues(eligible), f'x_{r}_{b}'
			)


# -------------------------
# Constraints
# -------------------------

# Create indicator variables y[r, b, rot]
y = {}
for r in range(n):
	for b in range(BLOCKS):
		for rot in PER_BLOCK_MIN:
			y[r, b, rot] = model.NewBoolVar(f'y_{r}_{b}_{rot}')
			rot_idx = rotation_to_idx[rot]
			model.Add(x[r, b] == rot_idx).OnlyEnforceIf(y[r, b, rot])
			model.Add(x[r, b] != rot_idx).OnlyEnforceIf(
				y[r, b, rot].Not()
			)

# Use integer weights: 2 (full), 1 (half)
half_leave_weights = {
	(r, b): 1 if (b + 1) in leave_dict[residents[r]]["half"] else 2
	for r in range(n)
	for b in range(BLOCKS)
}

# Enforce scaled minimum coverage
for b in range(BLOCKS):
	for rot in PER_BLOCK_MIN:
		model.Add(
			sum(
				half_leave_weights[(r, b)] * y[r, b, rot]
				for r in range(n)
			) >= 2 * PER_BLOCK_MIN[rot]
		)


# Graduation Requirements
for r in range(n):
	res_id = residents[r]
	pgy = pgys[r]
	leave_blocks = leave_dict[res_id]["full"]

	for rotation_group, required_counts in GRAD_REQ[pgy].items():
		# Skip this specific R3 exemption if full leave taken
		if pgy == "R3" and rotation_group == (
			"Cardiology", "ED", "Medical Consultation"
		) and leave_blocks:
			continue

		match_vars = []
		for b in range(BLOCKS):
			match = model.NewBoolVar(f'grad_{r}_{b}_{rotation_group}')
			model.AddAllowedAssignments(
				[x[r, b]],
				[[rotation_to_idx[rot]]                        # allowed
				 for rot in rotation_group if rot in rotation_to_idx]
			).OnlyEnforceIf(match)
			model.AddForbiddenAssignments(
				[x[r, b]],
				[[rotation_to_idx[rot]]                        # forbidden
				 for rot in ROTATIONS
				 if rot not in rotation_group and rot in rotation_to_idx]
			).OnlyEnforceIf(match)
			match_vars.append(match)

		model.Add(sum(match_vars) >= min(required_counts))
		model.Add(sum(match_vars) <= max(required_counts))


group_defs = {
	"FLOATER": {"Nephrology", "Endocrine"},
	"NEURO": {"Neurology"},
	"2NDONCALL": {"GI", "Rheumatology", "Pulmonology"}
}

z = {}
for r in range(n):
	for b in range(BLOCKS):
		for group_name, rotations in group_defs.items():
			z[r, b, group_name] = model.NewBoolVar(f'z_{r}_{b}_{group_name}')

			rot_ids = [
				rotation_to_idx[rot]
				for rot in rotations if rot in rotation_to_idx
			]
			if not rot_ids:
				model.Add(z[r, b, group_name] == 0)
				continue

			group_matches = []
			for rot_id in rot_ids:
				match = model.NewBoolVar(f'is_{residents[r]}_{b}_r{rot_id}')
				model.Add(x[r, b] == rot_id).OnlyEnforceIf(match)
				model.Add(x[r, b] != rot_id).OnlyEnforceIf(match.Not())
				group_matches.append(match)

			model.AddBoolOr(group_matches).OnlyEnforceIf(z[r, b, group_name])
			model.AddBoolAnd([m.Not() for m in group_matches])\
				.OnlyEnforceIf(z[r, b, group_name].Not())

for b in range(BLOCKS):
	# model.Add(
	# 	sum(y[r, b, "Senior Rotation"] for r in range(n)) == 10
	# )

	# model.Add(
	# 	sum(y[r, b, "Registrar Rotation"] for r in range(n)) == 20
	# )

	model.Add(
		sum(half_leave_weights[(r, b)] *
			y[r, b, "Senior Rotation"] for r in range(n)) == 2 * 10
	)
	model.Add(
		sum(half_leave_weights[(r, b)] *
			y[r, b, "Registrar Rotation"] for r in range(n)) == 2 * 20
	)

	model.Add(
		sum(z[r, b, "FLOATER"] for r in range(n)) >= 8
	)

	model.Add(
		sum(z[r, b, "NEURO"] for r in range(n)) >= 4
	)

	model.Add(
		sum(half_leave_weights[(r, b)] * z[r, b, "2NDONCALL"]
			for r in range(n)) >= 20
	)

for b in range(3, BLOCKS):
	# model.Add(
	# 	sum(y[r, b, "Senior Rotation"] for r in range(n)) == 10
	# )

	# model.Add(
	# 	sum(y[r, b, "Registrar Rotation"] for r in range(n)) == 20
	# )

	model.Add(
		sum(half_leave_weights[(r, b)] *
			y[r, b, "Medical Teams"] for r in range(n)) == 2 * 20
	)


# -------------------------
# Block-1 PGY rules
# -------------------------
med_teams_idx = rotation_to_idx["Medical Teams"]
senior_idx = rotation_to_idx["Senior Rotation"]

for r in range(n):
	if pgys[r] == "R1":
		model.Add(x[r, 0] == med_teams_idx)
	elif pgys[r] == "R2":
		model.Add(x[r, 0] != senior_idx)

# -------------------------
# Consecutive-Medical-Teams guard (R1)
# -------------------------
for r in range(n):
	if pgys[r] == "R1":
		for start in range(BLOCKS - 5):       # window size = 6
			model.Add(
				sum(y[r, b, "Medical Teams"] for b in range(start, start + 6))
				<= 5
			)


In [None]:

# TODO: First block for R1 must be medical teams.
# 1. R2 cannot be senior in the first block.
# 2. R1 first block must be Medical Teams.
# 3. There should be a minimum number of residents available per half (add half block constraint).
# 4. Add maximum number of residents for certain rotations, but for a specific number of blocks this constraint do not need to be satisified.
# 5. Add preferences (e.g. 1 Senior in Cardiology)
# 6. Add summary per block (mention floaters/ 2nd on-call etc.).
# 7. R1 cannot do more than 5 Medical Teams Rotations in a row.
# Color Code + Predetrimned blocks as options in the input file.


In [20]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
A CP-SAT model for solving the medical residency rotation scheduling problem.

This script reads resident data, leave requests, and pre-assignments from an
Excel file, then uses Google's OR-Tools to generate an optimal rotation
schedule that satisfies a comprehensive set of hard constraints and maximizes a
weighted objective of soft constraints.

The script is structured into three main classes:
1.  RotationData: Parses and stores all input data.
2.  ScheduleModelBuilder: Constructs the CP-SAT model, including all variables,
    constraints, and the objective function.
3.  RotationScheduler: Orchestrates the entire process from data loading to
    solving and writing the final schedule.
"""

import os
import pandas as pd
from ortools.sat.python import cp_model
from typing import Any, Dict, List, Set, Tuple, cast

# ============================================================================
# Configuration Constants
# ============================================================================

# --- File Paths ---
INPUT_FILE = "real_example_input.xlsx"
OUTPUT_XLSX = "output_schedule.xlsx"
DISTRIBUTION_XLSX = "rotation_distribution.xlsx"
SAMPLE_DATA_DIR = "sample_data"

# --- Model Parameters ---
BLOCKS = 13

# --- Rotation Definitions ---
ROTATIONS = [
	"Cardiology", "Endocrine", "Infectious Disease", "AMAU", "Nephrology",
	"Neurology", "CCU", "MICU", "Al Khor", "MOP", "Geriatrics", "Hematology",
	"Oncology", "Al Wakra", "GI", "Pulmonology", "Rheumatology", "ED",
	"Medical Consultation", "Medical Teams", "Senior Rotation",
	"Registrar Rotation",
]
LEAVE_ROTATION = "LEAVE"
TRANSFER_ROTATION = "TRANSFER"
ALL_ROTATIONS = ROTATIONS + [LEAVE_ROTATION, TRANSFER_ROTATION]

# --- Leave-Eligible Rotations by PGY ---
LEAVE_ALLOWED = {
	"R1": {"Endocrine", "AMAU", "Infectious Disease"},
	"R2": {"MOP", "Nephrology", "AMAU"},
	"R3": {"GI", "Pulmonology", "AMAU"},
	"R4": {"Medical Consultation"},
	"R_NEURO": {"AMAU",},
}

# --- Graduation Requirements (Min, Max) blocks per rotation group ---
GRAD_REQ = {
	"R1": {
		("Medical Teams",): (4, 7),
		("AMAU",): (1, 2),
		("Cardiology",): (2, 2),
		("Infectious Disease",): (1, 2),
		("Endocrine",): (1, 2),
	},
	"R2": {
		("Senior Rotation",): (1, 2),
		("CCU",): (2, 2),
		("MICU",): (2, 2),
		("Nephrology",): (1, 2),
		("Neurology",): (1, 1),
		("Cardiology",): (1, 1),
		("Geriatrics",): (1, 1),
		("AMAU",): (1, 2),
		("Al Khor",): (0, 1),
		("MOP",): (1, 1),
	},
	"R3": {
		("Senior Rotation",): (2, 2),
		("Oncology",): (1, 1),
		("Hematology",): (1, 1),
		("Al Wakra",): (1, 1),
		("GI",): (2, 2),
		("Pulmonology",): (2, 2),
		("Rheumatology",): (1, 1),
		("MOP",): (1, 1),
		("AMAU",): (1, 1),
		("Cardiology", "ED", "Medical Consultation"): (1, 1),
	},
	"R4": {
		("Registrar Rotation",): (5, 6),
		("Medical Consultation",): (3, 5),
		("Al Wakra",): (1, 2),
		("Al Khor",): (1, 2),
		("Hematology", "Oncology"): (1, 2),
	},
	"R4_Chiefs": {
		("Registrar Rotation", ): (5, 7),
		("Medical Consultation",): (6, 8),
	},
	"R_NEURO": {
		("Medical Teams", ): (3, 3),
		("AMAU",): (3, 4),
		("MICU",): (1, 1),
		("Rheumatology",): (1, 1),
		("ED",): (2, 2),
		("TRANSFER",): (2, 2),
	},
}

# --- Per-Block Minimum Staffing Requirements ---
PER_BLOCK_MIN = {
	"Cardiology": 11, "AMAU": 11, "CCU": 7, "MICU": 7, "Al Khor": 6,
	"MOP": 6, "Hematology": 5, "Oncology": 5, "Al Wakra": 9,
	"Medical Consultation": 10, "Medical Teams": 20,
}

# --- Rotation Group Definitions for Coverage Rules ---
GROUP_DEFS = {
	"Floater": {"Nephrology", "Endocrine"},
	"2ndOnCall": {"GI", "Rheumatology", "Pulmonology"},
}

# --- Objective Function Weights ---
REWARD_WEIGHT = 1
PENALTY_WEIGHT = -1


# ============================================================================
# Data Handling Class
# ============================================================================

class RotationData:
	"""Parses and holds all input data for the scheduling problem."""
	def __init__(self, input_path: str):
		"""
		Initializes the data object by parsing the input Excel file.

		Args:
			input_path: The file path to the input Excel data.
		"""
		self.residents: List[str] = []
		self.pgys: List[str] = []
		self.leave_dict: Dict[str, Dict[str, Any]] = {}
		self.forced: Dict[Tuple[int, int], str] = {}
		self.forbidden: Dict[Tuple[int, int], str] = {}
		self.eligibility: Dict[str, Set[str]] = {}

		self._parse_input(input_path)
		self._build_eligibility_map()

		self.num_residents: int = len(self.residents)
		self.resident_to_idx: Dict[str, int] = {
			res: i for i, res in enumerate(self.residents)
		}
		self.rotation_to_idx: Dict[str, int] = {
			rot: i for i, rot in enumerate(ALL_ROTATIONS)
		}
		self.idx_to_rotation: Dict[int, str] = {
			i: rot for rot, i in self.rotation_to_idx.items()
		}
		self.leave_idx: int = self.rotation_to_idx[LEAVE_ROTATION]

	def _parse_input(self, path: str) -> None:
		"""
		Reads the Excel file and populates resident data structures.
		"""
		df = pd.read_excel(path).fillna({
			"Leave1Block": 0, "Leave2Block": 0,
			"Leave1Half": "", "Leave2Half": ""
		})

		self.residents = df["ID"].tolist()
		self.pgys = df["PGY"].tolist()

		for r_idx, row in enumerate(df.itertuples(index=False)):
			res_id = row.ID
			b1, b2 = int(row.Leave1Block), int(row.Leave2Block)
			full, half = set(), set()

			if b1 and b1 == b2:
				full.add(b1)
			else:
				if b1:
					half.add(b1)
				if b2:
					half.add(b2)

			self.leave_dict[res_id] = {"pgy": row.PGY, "full": full, "half": half}

			for b in range(1, BLOCKS + 1):
				val = str(getattr(row, f"Block_{b}", "")).strip()
				if not val or val.lower() in {"nan", "none"}:
					continue
				
				# Note: Blocks in input are 1-based, model is 0-based.
				key = (r_idx, b - 1)
				if val.startswith("!"):
					self.forbidden[key] = val[1:]
				else:
					self.forced[key] = val

	def _build_eligibility_map(self) -> None:
		"""Builds a map of PGY levels to their eligible rotations."""
		self.eligibility = {
			pgy: {rot for group in GRAD_REQ[pgy] for rot in group}
			for pgy in GRAD_REQ
		}
		for pgy in self.eligibility:
			self.eligibility[pgy].add(LEAVE_ROTATION)
			if pgy == "R_NEURO":
				self.eligibility[pgy].add(TRANSFER_ROTATION)


# ============================================================================
# Model Builder Class
# ============================================================================

class ScheduleModelBuilder:
	"""Builds the CP-SAT model for the rotation scheduling problem."""
	def __init__(self, data: RotationData):
		"""
		Initializes the model builder.

		Args:
			data: An instance of RotationData containing all input.
		"""
		self.data = data
		self.model = cp_model.CpModel()
		self.x: Dict[Tuple[int, int], cp_model.IntVar] = {}
		self.y: Dict[Tuple[int, int, str], cp_model.BoolVar] = {}
		self.objective_terms: List[cp_model.LinearExpr] = []

	def build(self) -> cp_model.CpModel:
		"""
		Constructs and returns the complete CP-SAT model.
		"""
		self._create_decision_variables()
		self._apply_hard_constraints()
		self._set_objective_function()
		return self.model

	def _create_decision_variables(self) -> None:
		"""Creates the main decision and indicator variables."""
		# x[r, b] is the rotation_idx for resident r in block b.
		for r in range(self.data.num_residents):
			res_id = self.data.residents[r]
			pgy = self.data.pgys[r]
			for b in range(BLOCKS):
				domain = self._get_assignment_domain(r, b, res_id, pgy)
				self.x[r, b] = self.model.NewIntVarFromDomain(
					domain, f"x_{r}_{b}"
				)

		# y[r, b, rot] is a boolean indicator variable.
		for r in range(self.data.num_residents):
			for b in range(BLOCKS):
				for rot in ALL_ROTATIONS:
					rot_id = self.data.rotation_to_idx[rot]
					var = self.model.NewBoolVar(f"y_{r}_{b}_{rot}")
					self.model.Add(self.x[r, b] == rot_id).OnlyEnforceIf(var)
					self.model.Add(self.x[r, b] != rot_id).OnlyEnforceIf(var.Not())
					self.y[r, b, rot] = var

		# Each resident has exactly one rotation per block.
		for r in range(self.data.num_residents):
			for b in range(BLOCKS):
				self.model.AddExactlyOne(
					[self.y[r, b, rot] for rot in ALL_ROTATIONS]
				)

	def _get_assignment_domain(
		self, r: int, b: int, res_id: str, pgy: str
	) -> cp_model.Domain:
		"""
		Determines the set of allowed rotation indices for a given
		resident-block slot, considering leaves and PGY eligibility.
		"""
		leave_info = self.data.leave_dict[res_id]
		block_num = b + 1

		if block_num in leave_info["full"]:
			return cp_model.Domain.FromValues([self.data.leave_idx])

		# Start with all rotations eligible for the PGY
		eligible_rots = self.data.eligibility.get(pgy, set())

		# If it's a half-leave block, restrict to leave-allowed rotations
		if block_num in leave_info["half"]:
			leave_allowed_rots = LEAVE_ALLOWED.get(pgy, set())
			eligible_rots = eligible_rots.intersection(leave_allowed_rots)

		# Convert rotation names to indices, excluding LEAVE
		eligible_indices = [
			self.data.rotation_to_idx[rot]
			for rot in eligible_rots
			if rot != LEAVE_ROTATION and rot in self.data.rotation_to_idx
		]
		return cp_model.Domain.FromValues(eligible_indices)

	def _apply_hard_constraints(self) -> None:
		"""Applies all absolute, non-negotiable rules to the model."""
		self._add_hard_forced_and_forbidden_assignments()
		self._add_hard_graduation_requirements()
		self._add_hard_block_coverage_rules()
		self._add_hard_pgy_specific_rules()
		self._add_hard_consecutive_rotation_rules()
		self._add_hard_cross_batch_rules()
		self._add_hard_neuro_resident_rules()

	def _add_hard_forced_and_forbidden_assignments(self) -> None:
		"""Applies pre-assignments from the input file."""
		for (r, b), rot_name in self.data.forced.items():
			rot_idx = self.data.rotation_to_idx.get(rot_name)
			if rot_idx is not None:
				self.model.Add(self.x[r, b] == rot_idx)

		for (r, b), rot_name in self.data.forbidden.items():
			rot_idx = self.data.rotation_to_idx.get(rot_name)
			if rot_idx is not None:
				self.model.Add(self.x[r, b] != rot_idx)
	
	def _add_hard_graduation_requirements(self) -> None:
		"""Ensures each resident meets their PGY-specific block counts."""
		for r in range(self.data.num_residents):
			pgy = self.data.pgys[r]
			full_leave = self.data.leave_dict[self.data.residents[r]]["full"]

			for group, (min_c, max_c) in GRAD_REQ[pgy].items():
				# R3 exemption for a specific elective group if on full leave
				if (pgy == "R3" and
					set(group) == {"Cardiology", "ED", "Medical Consultation"} and
					full_leave):
					continue
				
				# Sum rotations for the resident within the current group
				total_rotations_in_group = sum(
					self.y[r, b, rot]
					for b in range(BLOCKS)
					for rot in group
					if rot in self.data.rotation_to_idx
				)
				self.model.Add(total_rotations_in_group >= min_c)
				self.model.Add(total_rotations_in_group <= max_c)

	def _add_hard_block_coverage_rules(self) -> None:
		"""Enforces minimum and exact staffing for rotations each block."""
		for b in range(BLOCKS):
			# Exact counts for on-call rotations
			self.model.Add(sum(self.y[r, b, "Senior Rotation"]
						   for r in range(self.data.num_residents)) == 10)
			self.model.Add(sum(self.y[r, b, "Registrar Rotation"]
						   for r in range(self.data.num_residents)) == 20)
			
			# Coverage for Medical Teams (post-block 3)
			if b >= 3:
				self.model.Add(sum(self.y[r, b, "Medical Teams"]
							   for r in range(self.data.num_residents)) == 20)
			
			# Minimum staffing for other core rotations
			for rot, min_val in PER_BLOCK_MIN.items():
				self.model.Add(sum(self.y[r, b, rot]
							   for r in range(self.data.num_residents)) >= min_val)
			
			# Weighted coverage for 2nd On-Call group
			self.model.Add(sum(
				(3 if (b + 1) in self.data.leave_dict[self.data.residents[r]]["half"] else 6) *
				self.y[r, b, rot]
				for r in range(self.data.num_residents) if self.data.pgys[r] != "R_NEURO"
				for rot in GROUP_DEFS["2ndOnCall"]
			) >= 60)

			# Coverage for Floater group
			self.model.Add(sum(
				self.y[r, b, rot]
				for r in range(self.data.num_residents) if self.data.pgys[r] != "R_NEURO"
				for rot in GROUP_DEFS["Floater"]
			) >= 10)

	def _add_hard_pgy_specific_rules(self) -> None:
		"""Adds rules specific to PGY levels, like first-block assignments."""
		med_teams_idx = self.data.rotation_to_idx["Medical Teams"]
		senior_idx = self.data.rotation_to_idx["Senior Rotation"]

		for r in range(self.data.num_residents):
			pgy = self.data.pgys[r]
			if pgy == "R1":
				# R1s must start in Medical Teams.
				self.model.Add(self.x[r, 0] == med_teams_idx)
			elif pgy == "R2":
				# R2s cannot start in Senior Rotation.
				self.model.Add(self.x[r, 0] != senior_idx)
		
		# Block 2 must have at least 25 on Medical Teams
		self.model.Add(sum(
			self.y[r, 1, "Medical Teams"] for r in range(self.data.num_residents)
		) >= 25)

	def _add_hard_consecutive_rotation_rules(self) -> None:
		"""Prevents residents from being in certain rotations for too long."""
		for r in range(self.data.num_residents):
			pgy = self.data.pgys[r]
			# R1s cannot do 6 consecutive Medical Teams blocks.
			if pgy == "R1":
				for start in range(BLOCKS - 5):
					self.model.Add(sum(
						self.y[r, b, "Medical Teams"]
						for b in range(start, start + 6)
					) <= 5)
			
			# R2/R3 cannot do consecutive Senior blocks.
			if pgy in ("R2", "R3"):
				for b in range(BLOCKS - 1):
					self.model.AddBoolOr([
						self.y[r, b, "Senior Rotation"].Not(),
						self.y[r, b + 1, "Senior Rotation"].Not()
					])
	
	def _add_hard_cross_batch_rules(self) -> None:
		"""Prevents certain rotations from spanning across two-block batches."""
		for r in range(self.data.num_residents):
			for b in [1, 3, 5, 7, 9]: # End of a two-block batch
				if b < BLOCKS -1:
					for rot in ["AMAU", "CCU"]:
						self.model.AddBoolOr([
							self.y[r, b, rot].Not(),
							self.y[r, b + 1, rot].Not()
						])

	def _add_hard_neuro_resident_rules(self) -> None:
		"""Applies the fixed schedule for R_NEURO residents."""
		med_teams_idx = self.data.rotation_to_idx["Medical Teams"]
		transfer_idx = self.data.rotation_to_idx["TRANSFER"]

		for r in range(self.data.num_residents):
			if self.data.pgys[r] == "R_NEURO":
				# First three blocks must be Medical Teams
				self.model.Add(self.x[r, 0] == med_teams_idx)
				self.model.Add(self.x[r, 1] == med_teams_idx)
				self.model.Add(self.x[r, 2] == med_teams_idx)
				
				# Last two blocks must be TRANSFER
				self.model.Add(self.x[r, 11] == transfer_idx)
				self.model.Add(self.x[r, 12] == transfer_idx)

	def _set_objective_function(self) -> None:
		"""Defines the soft constraints to be optimized."""
		self._add_soft_r1_penalties()
		self._add_soft_r2_rewards()
		self._add_soft_r3_penalties()
		self._add_soft_r4_penalties()
		
		# The final objective is to maximize the sum of all collected terms.
		self.model.Maximize(sum(self.objective_terms))

	def _add_soft_r1_penalties(self) -> None:
		"""Penalize undesirable schedules for R1 residents."""
		for r in range(self.data.num_residents):
			if self.data.pgys[r] == "R1":
				# Penalize 4 consecutive Medical Teams blocks.
				for start in range(BLOCKS - 3):
					window = [self.y[r, b, "Medical Teams"] for b in range(start, start + 4)]
					all_four = self.model.NewBoolVar(f"r1_med_4_{r}_{start}")
					self.model.Add(sum(window) == 4).OnlyEnforceIf(all_four)
					self.model.Add(sum(window) != 4).OnlyEnforceIf(all_four.Not())
					self.objective_terms.append(PENALTY_WEIGHT * all_four)
				
				# Penalize consecutive Cardiology blocks.
				for b in range(BLOCKS - 1):
					is_consecutive = self._create_consecutive_bool(r, b, "Cardiology")
					self.objective_terms.append(PENALTY_WEIGHT * is_consecutive)

	def _add_soft_r2_rewards(self) -> None:
		"""Reward desirable schedules for R2 residents."""
		for r in range(self.data.num_residents):
			if self.data.pgys[r] == "R2":
				# Reward consecutive MICU or CCU blocks (as they are 2-block rotations)
				for b in range(BLOCKS - 1):
					# Double reward for these as they are strongly preferred
					micu_consecutive = self._create_consecutive_bool(r, b, "MICU")
					self.objective_terms.append(2 * REWARD_WEIGHT * micu_consecutive)
					
					ccu_consecutive = self._create_consecutive_bool(r, b, "CCU")
					self.objective_terms.append(2 * REWARD_WEIGHT * ccu_consecutive)

	def _add_soft_r3_penalties(self) -> None:
		"""Penalize poor spacing of Senior rotations for R3s."""
		for r in range(self.data.num_residents):
			if self.data.pgys[r] == "R3":
				# Penalize Senior rotations that are too close together.
				for b in range(BLOCKS - 2):
					# Consecutive
					consecutive = self._create_consecutive_bool(r, b, "Senior Rotation")
					self.objective_terms.append(2 * PENALTY_WEIGHT * consecutive)

					# Spaced by only one block
					gap1_var = self.model.NewBoolVar(f"r3_senior_gap1_{r}_{b}")
					self.model.AddBoolAnd([
						self.y[r, b, "Senior Rotation"],
						self.y[r, b + 2, "Senior Rotation"]
					]).OnlyEnforceIf(gap1_var)
					self.objective_terms.append(PENALTY_WEIGHT * gap1_var)

	def _add_soft_r4_penalties(self) -> None:
		"""Penalize undesirable schedules for R4 residents."""
		for r in range(self.data.num_residents):
			if self.data.pgys[r] in ("R4", "R4_Chiefs"):
				# Penalize more than 5 consecutive Registrar rotations.
				for start in range(BLOCKS - 5):
					window = [self.y[r, b, "Registrar Rotation"] for b in range(start, start + 6)]
					too_long = self.model.NewBoolVar(f"r4_reg_6_{r}_{start}")
					self.model.Add(sum(window) == 6).OnlyEnforceIf(too_long)
					self.model.Add(sum(window) != 6).OnlyEnforceIf(too_long.Not())
					self.objective_terms.append(2 * PENALTY_WEIGHT * too_long)

	def _create_consecutive_bool(
		self, r: int, b: int, rot: str
	) -> cp_model.BoolVar:
		"""Helper to create a BoolVar if a resident is in a rotation for two
		consecutive blocks."""
		is_consecutive = self.model.NewBoolVar(f"consecutive_{r}_{b}_{rot}")
		self.model.AddBoolAnd([
			self.y[r, b, rot],
			self.y[r, b + 1, rot]
		]).OnlyEnforceIf(is_consecutive)
		self.model.AddBoolOr([
			self.y[r, b, rot].Not(),
			self.y[r, b + 1, rot].Not()
		]).OnlyEnforceIf(is_consecutive.Not())
		return is_consecutive


# ============================================================================
# Scheduler Orchestrator Class
# ============================================================================

class RotationScheduler:
	"""Orchestrates the rotation scheduling process."""
	def __init__(self, input_dir: str, input_file: str, output_file: str):
		"""
		Initializes the scheduler.

		Args:
			input_dir: The directory containing input data.
			input_file: The name of the input Excel file.
			output_file: The name for the output schedule Excel file.
		"""
		self.input_path = os.path.join(input_dir, input_file)
		self.output_path = output_file
		self.data = RotationData(self.input_path)
		self.model_builder = ScheduleModelBuilder(self.data)

	def run(self) -> None:
		"""
		Executes the full scheduling workflow: build, solve, and write output.
		"""
		print("Building the scheduling model...")
		model = self.model_builder.build()

		print("Solving the model... (this may take a few moments)")
		solver = cp_model.CpSolver()
		solver.parameters.log_search_progress = True
		status = solver.Solve(model)

		if status in (cp_model.OPTIMAL, cp_model.FEASIBLE):
			print(f"Solution found! Objective value: {solver.ObjectiveValue()}")
			self._write_output(solver)
		else:
			print("Could not find a feasible solution.")
			self._troubleshoot(solver)

	def _write_output(self, solver: cp_model.CpSolver) -> None:
		"""
		Writes the solved schedule and distribution summary to Excel files.
		"""
		output_data = []
		for r in range(self.data.num_residents):
			row = {"Resident": self.data.residents[r]}
			for b in range(BLOCKS):
				rot_idx = cast(int, solver.Value(self.model_builder.x[r, b]))
				row[f"Block_{b+1}"] = self.data.idx_to_rotation[rot_idx]
			output_data.append(row)

		output_df = pd.DataFrame(output_data)
		output_df.to_excel(self.output_path, index=False)
		print(f"Schedule successfully saved to '{self.output_path}'")
		
		# Generate and save the rotation distribution summary
		melted = output_df.melt(
			id_vars=["Resident"],
			value_vars=[f"Block_{i}" for i in range(1, BLOCKS + 1)],
			var_name="Block",
			value_name="Rotation"
		)
		count_df = (
			melted.groupby(["Rotation", "Block"])
			.size()
			.unstack(fill_value=0)
			.sort_index()
		)
		count_df.to_excel(DISTRIBUTION_XLSX)
		print(f"Rotation distribution saved to '{DISTRIBUTION_XLSX}'")


	def _troubleshoot(self, solver: cp_model.CpSolver) -> None:
		"""Provides information if the model is infeasible."""
		print("\n--- INFEASIBILITY ANALYSIS ---")
		print("The model could not find a solution. This usually means the "
			  "combination of hard constraints is too restrictive.")
		print("Consider the following:")
		print("1.  Are there enough eligible residents to meet the minimum "
			  "coverage for every block (PER_BLOCK_MIN)?")
		print("2.  Do the 'forced' assignments in the input file conflict "
			  "with graduation requirements or other rules?")
		print("3.  Are the graduation requirements (GRAD_REQ) achievable "
			  "given the number of available blocks (minus leave)?")
		# The following line requires a newer version of OR-Tools
		# print(solver.SufficientAssumptionsForInfeasibility())


# ============================================================================
# Main Execution Block
# ============================================================================


scheduler = RotationScheduler(
    input_dir=SAMPLE_DATA_DIR,
    input_file=INPUT_FILE,
    output_file=OUTPUT_XLSX
)


AttributeError: module 'ortools.sat.python.cp_model' has no attribute 'BoolVar'