<a href="https://colab.research.google.com/github/JKniaaa/Roster-Scheduler/blob/main/SchedulerModel.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [6]:
!pip install pandas
!pip install ortools



In [7]:
import pandas as pd
from datetime import datetime

def load_nurse_profiles(path='nurse_profiles.xlsx'):
    df = pd.read_excel(path)
    df['Name'] = df['Name'].str.strip()
    return df

# def load_shift_preferences(path='nurse_preferences.xlsx'):
#     df = pd.read_excel(path, index_col=0)
#     df.columns = [pd.to_datetime(col).date() for col in df.columns]
#     return df

def load_shift_preferences(path='nurse_preferences.xlsx'):
    df = pd.read_excel(path)
    df.rename(columns={df.columns[0]: 'Name'}, inplace=True)
    df.set_index('Name', inplace=True)

    # Strip text like "Wed" or "Thurs" and parse dates
    cleaned_columns = [
        pd.to_datetime(col.strip().split()[-1], format="%Y-%m-%d").date()
        for col in df.columns
    ]
    df.columns = cleaned_columns

    df.index = df.index.str.strip()  # Ensure names are clean
    return df



In [8]:
def validate_nurse_data(profiles_df, preferences_df):
    profile_names = set(profiles_df['Name'].str.strip())
    preference_names = set(preferences_df.index.str.strip())

    missing_in_prefs = profile_names - preference_names
    extra_in_prefs = preference_names - profile_names

    if missing_in_prefs or extra_in_prefs:
        raise ValueError(
            f"Mismatch between nurse profiles and preferences:\n"
            f"❌ Missing in preferences: {missing_in_prefs}\n"
            f"❌ Extra in preferences: {extra_in_prefs}"
        )
    print("✅ Nurse profile and preference names match.")


In [9]:
from ortools.sat.python import cp_model
from datetime import timedelta
import pandas as pd

def build_schedule_model(profiles_df, preferences_df, start_date, num_days):
    """
    Builds a nurse schedule satisfying hard constraints and optimizing soft preferences.
    Returns a schedule DataFrame and a summary DataFrame.
    """

    # === Constants ===
    # Shift info
    SHIFT_LABELS = ['AM', 'PM', 'Night']
    SHIFT_HOURS = [7, 7, 10]  # AM, PM = 7 hrs, Night = 10 hrs
    DAYS_PER_WEEK = 7

    # Hard constraint parameters
    MIN_NURSES_PER_SHIFT = 4
    MIN_SENIORS_PER_SHIFT = 1
    MAX_WEEKLY_HOURS = 42
    MAX_MC_DAYS_PER_WEEK = 2

    # Soft constraint preferences
    PREFERRED_WEEKLY_HOURS = 40
    MIN_ACCEPTABLE_WEEKLY_HOURS = 30
    PREF_HOURS_PENALTY = 10
    MIN_HOURS_PENALTY = 100

    AM_COVERAGE_MIN_PERCENT = 60  # Adjust this as needed
    AM_COVERAGE_PENALTIES = [5, 50, 1000]  # Penalties for <60%, <50%, <40%

    FAIRNESS_GAP_PENALTY = 2

    # === Model setup ===
    print("📋 Building model...")
    model = cp_model.CpModel()
    nurses = profiles_df.to_dict(orient='records')
    nurse_names = [n['Name'] for n in nurses]
    senior_names = {n['Name'] for n in nurses if n['Title'] == 'Senior'}    # Assume senior nurses have ≥3 years experience
    shift_str_to_idx = {label.upper(): i for i, label in enumerate(SHIFT_LABELS)}

    # === Preferences and MC days ===
    shift_preferences = {}
    mc_days = {}

    for nurse, row in preferences_df.iterrows():
        shift_preferences[nurse] = {}
        mc_days[nurse] = set()
        for date, val in row.items():
            day_idx = (date - start_date).days
            if not pd.notna(val) or not (0 <= day_idx < num_days):
                continue
            val = val.strip().upper()
            if val == 'MC':
                mc_days[nurse].add(day_idx)
            elif val in shift_str_to_idx:
                shift_preferences[nurse][day_idx] = shift_str_to_idx[val]

    weekend_days = [
        (i, i + 1) for i in range(num_days - 1)
        if (start_date + timedelta(days=i)).weekday() == 5
    ]

    # === Variables ===
    work = {
        (n, d, s): model.NewBoolVar(f'work_{n}_{d}_{s}')
        for n in nurse_names for d in range(num_days) for s in range(3)
    }
    satisfied = {}
    total_satisfied = {}
    penalty_terms = []

    min_sat = model.NewIntVar(0, num_days, "min_satisfaction")
    max_sat = model.NewIntVar(0, num_days, "max_satisfaction")

    # === Hard Constraints ===

    # 1. Each nurse can work at most one shift per day
    for n in nurse_names:
        for d in range(num_days):
            model.AddAtMostOne(work[n, d, s] for s in range(3))

    # 2. Each nurse works <= 42 hours/week (hard), ideally 40 (soft), at least 30 (soft)
    for n in nurse_names:
        for w in range(2):
            days = range(w * DAYS_PER_WEEK, min((w + 1) * DAYS_PER_WEEK, num_days))
            weekly_hours = sum(work[n, d, s] * SHIFT_HOURS[s] for d in days for s in range(3))
            model.Add(weekly_hours <= MAX_WEEKLY_HOURS)

            # Soft preferences on hours
            min40 = model.NewBoolVar(f'min40_{n}_w{w}')
            model.Add(weekly_hours >= PREFERRED_WEEKLY_HOURS).OnlyEnforceIf(min40)
            model.Add(weekly_hours < PREFERRED_WEEKLY_HOURS).OnlyEnforceIf(min40.Not())

            min30 = model.NewBoolVar(f'min30_{n}_w{w}')
            model.Add(weekly_hours >= MIN_ACCEPTABLE_WEEKLY_HOURS).OnlyEnforceIf(min30)
            model.Add(weekly_hours < MIN_ACCEPTABLE_WEEKLY_HOURS).OnlyEnforceIf(min30.Not())

            penalty_terms.extend([min40.Not() * PREF_HOURS_PENALTY, min30.Not() * MIN_HOURS_PENALTY])

    # 3. Each shift must have at least 4 nurses and at least 1 senior
    for d in range(num_days):
        for s in range(3):
            model.Add(sum(work[n, d, s] for n in nurse_names) >= MIN_NURSES_PER_SHIFT)
            model.Add(sum(work[n, d, s] for n in senior_names) >= MIN_SENIORS_PER_SHIFT)

    # 4. Weekend work requires rest on the same day next weekend
    for n in nurse_names:
        for d1, d2 in weekend_days:
            for day in (d1, d2):
                if day + 7 < num_days:
                    model.Add(sum(work[n, day, s] for s in range(3)) <=
                              1 - sum(work[n, day + 7, s] for s in range(3)))

    # 5. MC days: cannot assign any shift
    for n in nurse_names:
        for d in mc_days.get(n, []):
            for s in range(3):
                model.Add(work[n, d, s] == 0)

    # 6. Max 2 MC days/week and no more than 2 consecutive MC days
    for n in nurse_names:
        mc_set = mc_days.get(n, set())

        for w in range(2):
            days = range(w * DAYS_PER_WEEK, min((w + 1) * DAYS_PER_WEEK, num_days))
            mc_in_week = sum(1 for d in days if d in mc_set)
            if mc_in_week > MAX_MC_DAYS_PER_WEEK:
                raise ValueError(f"❌ Nurse {n} has more than {MAX_MC_DAYS_PER_WEEK} MCs in week {w+1}.")

        sorted_mc = sorted(mc_set)
        for i in range(len(sorted_mc) - 2):
            if sorted_mc[i + 2] - sorted_mc[i] == 2:
                raise ValueError(f"❌ Nurse {n} has more than 2 consecutive MC days: {sorted_mc[i]}, {sorted_mc[i+1]}, {sorted_mc[i+2]}.")

    # === Soft Constraints ===

    # 1. AM coverage per day should be >=60%, ideally
    for d in range(num_days):
        total_shifts = sum(work[n, d, s] for n in nurse_names for s in range(3))
        am_shifts = sum(work[n, d, 0] for n in nurse_names)

        level1 = AM_COVERAGE_MIN_PERCENT
        level2 = max(AM_COVERAGE_MIN_PERCENT - 10, 0)
        level3 = max(AM_COVERAGE_MIN_PERCENT - 20, 0)

        level1_ok = model.NewBoolVar(f'day_{d}_am_level1')  # ≥60%
        level2_ok = model.NewBoolVar(f'day_{d}_am_level2')  # ≥50%
        level3_ok = model.NewBoolVar(f'day_{d}_am_level3')  # ≥40%

        model.Add(am_shifts * 100 >= level1 * total_shifts).OnlyEnforceIf(level1_ok)
        model.Add(am_shifts * 100 < level1 * total_shifts).OnlyEnforceIf(level1_ok.Not())

        model.Add(am_shifts * 100 >= level2 * total_shifts).OnlyEnforceIf(level2_ok)
        model.Add(am_shifts * 100 < level2 * total_shifts).OnlyEnforceIf(level2_ok.Not())

        model.Add(am_shifts * 100 >= level3 * total_shifts).OnlyEnforceIf(level3_ok)
        model.Add(am_shifts * 100 < level3 * total_shifts).OnlyEnforceIf(level3_ok.Not())

        penalty_terms.append(level1_ok.Not() * AM_COVERAGE_PENALTIES[0])
        penalty_terms.append(level2_ok.Not() * AM_COVERAGE_PENALTIES[1])
        penalty_terms.append(level3_ok.Not() * AM_COVERAGE_PENALTIES[2])

    # 2. Preference satisfaction
    for n in nurse_names:
        prefs = shift_preferences.get(n, {})
        satisfied_list = []

        for d in range(num_days):
            if d in prefs:
                s = prefs[d]
                sat = model.NewBoolVar(f'sat_{n}_{d}')
                model.Add(work[n, d, s] == 1).OnlyEnforceIf(sat)
                model.Add(work[n, d, s] == 0).OnlyEnforceIf(sat.Not())
                satisfied[(n, d)] = sat
                satisfied_list.append(sat)
            else:
                satisfied_const = model.NewConstant(0)
                satisfied[(n, d)] = satisfied_const
                satisfied_list.append(satisfied_const)

        total_satisfied[n] = model.NewIntVar(0, num_days, f'total_sat_{n}')
        model.Add(total_satisfied[n] == sum(satisfied_list))

    # 3. Fairness constraint on preference satisfaction gap
    model.AddMinEquality(min_sat, list(total_satisfied.values()))
    model.AddMaxEquality(max_sat, list(total_satisfied.values()))
    gap = model.NewIntVar(0, num_days, 'gap')
    model.Add(gap == max_sat - min_sat)
    penalty_terms.append(gap * FAIRNESS_GAP_PENALTY)

    # === Objective ===
    total_satisfaction = sum(total_satisfied.values())
    model.Maximize(total_satisfaction * 10 - sum(penalty_terms))

    # === Solve ===
    print("🚀 Solving the model...")
    solver = cp_model.CpSolver()
    solver.parameters.max_time_in_seconds = 90.0
    status = solver.Solve(model)
    print(f"⏱ Solve time: {solver.WallTime():.2f} seconds")

    if status not in (cp_model.OPTIMAL, cp_model.FEASIBLE):
        raise RuntimeError("❌ No feasible solution found.")

    print("✅ Schedule found!")
    print(f"🔍 Total preferences met: {solver.ObjectiveValue()}")
    print(f"📊 Preference gap (max - min): {solver.Value(gap)}")

    # === Extract Results ===
    dates = [start_date + timedelta(days=i) for i in range(num_days)]
    headers = [d.strftime('%a %Y-%m-%d') for d in dates]
    schedule = {}
    summary = []
    violations = {"Low_AM_Days": [], "Low_Hours_Nurses": [], "Preference_Unmet": [], "Fairness_Gap": solver.Value(gap)}

    for n in nurse_names:
        row = []
        hours_w1 = hours_w2 = 0
        counts = [0, 0, 0]
        prefs_met = 0
        prefs_unmet = []

        for d in range(num_days):
            assigned = None
            if d in mc_days.get(n, set()):
                shift = "MC"
            else:
                assigned = next((s for s in range(3) if solver.Value(work[n, d, s])), None)
                shift = SHIFT_LABELS[assigned] if assigned is not None else "Rest"
            row.append(shift)

            if assigned is not None:
                hours = SHIFT_HOURS[assigned]
                if d < 7:
                    hours_w1 += hours
                else:
                    hours_w2 += hours
                counts[assigned] += 1

            pref = shift_preferences.get(n, {}).get(d)
            if pref is not None:
                if assigned == pref:
                    prefs_met += 1
                else:
                    prefs_unmet.append(f"{dates[d].strftime('%a %Y-%m-%d')} (wanted {SHIFT_LABELS[pref]})")

        if hours_w1 < 40:
            violations["Low_Hours_Nurses"].append(f"{n} Week 1: {hours_w1}h")
        if hours_w2 < 40:
            violations["Low_Hours_Nurses"].append(f"{n} Week 2: {hours_w2}h")
        if prefs_unmet:
            violations["Preference_Unmet"].append(f"{n}: {'; '.join(prefs_unmet)}")

        schedule[n] = row
        summary.append({
            'Nurse': n,
            'Hours_Week1': hours_w1,
            'Hours_Week2': hours_w2,
            'AM': counts[0],
            'PM': counts[1],
            'Night': counts[2],
            'Rest': row.count("Rest"),
            'MC_Days': len(mc_days.get(n, [])),
            'Prefs_Met': prefs_met,
            'Prefs_Unmet': len(prefs_unmet),
            'Unmet_Details': "; ".join(prefs_unmet)
        })

    for d in range(num_days):
        am = sum(solver.Value(work[n, d, 0]) for n in nurse_names)
        total = sum(solver.Value(work[n, d, s]) for n in nurse_names for s in range(3))
        if total and am / total < 0.6:
            violations["Low_AM_Days"].append(f"{dates[d].strftime('%a %Y-%m-%d')} ({am/total:.0%})")

    print("\n⚠️ Soft Constraint Violations Summary:")
    for key, items in violations.items():
        print(f"🔸 {key}: {len(items) if isinstance(items, list) else items} cases")
        if isinstance(items, list):
            for item in items:
                print(f"   - {item}")

    print("📁 Schedule and summary generated.")
    return pd.DataFrame.from_dict(schedule, orient='index', columns=headers), pd.DataFrame(summary)


In [10]:
profiles_df   = load_nurse_profiles("nurse_profiles.xlsx")
preferences_df = load_shift_preferences("nurse_preferences.xlsx")
validate_nurse_data(profiles_df, preferences_df)

start_date = preferences_df.columns[0]
num_days = len(preferences_df.columns)

df_schedule, df_summary = build_schedule_model(
    profiles_df, preferences_df, start_date, num_days
)

df_schedule.to_excel("nurse_schedule.xlsx", sheet_name="Schedule", index=True)
df_summary.to_excel("nurse_summary.xlsx",  sheet_name="Summary",  index=False)
df_schedule.to_csv("nurse_schedule.csv")
df_summary.to_csv("nurse_summary.csv")
print("✅ Files generated.")


✅ Nurse profile and preference names match.
📋 Building model...
🚀 Solving the model...
⏱ Solve time: 14.69 seconds
✅ Schedule found!
🔍 Total preferences met: 252.0
📊 Preference gap (max - min): 4

⚠️ Soft Constraint Violations Summary:
🔸 Low_AM_Days: 0 cases
🔸 Low_Hours_Nurses: 0 cases
🔸 Preference_Unmet: 0 cases
🔸 Fairness_Gap: 4 cases
📁 Schedule and summary generated.
✅ Files generated.
