<a href="https://colab.research.google.com/github/ashleyak7/MSC151CW5_ashley/blob/main/MSCI151CW5_koesnadi.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import pulp as pl
from collections import defaultdict
import pandas as pd


In [5]:


       def __init__(self):
        # Barista names
        self.baristas = ["Max", "Jiwa", "Fore", "Donna", "Paul"]

        # Time parameters
        self.days_indices = list(range(1, 8))  # Monday=1 to Sunday=7
        self.blocks = list(range(1, 5))  # 4 blocks per day
        self.H = 4  # Hours per block

        # Day and block names for display
        self.day_names = {
            1: "Monday", 2: "Tuesday", 3: "Wednesday", 4: "Thursday",
            5: "Friday", 6: "Saturday", 7: "Sunday"
        }

        self.block_names = {
            1: "07:00-11:00", 2: "11:00-15:00",
            3: "15:00-19:00", 4: "19:00-23:00"
        }

        # Hourly costs (IDR per hour)
        self.cost = {
            "Max": 50000, "Jiwa": 50000, "Fore": 50000,
            "Donna": 150000, "Paul": 150000
        }

        # Employment type: P=Part-time, F=Full-time
        self.etype = {
            "Max": "P", "Jiwa": "P", "Fore": "P",
            "Donna": "F", "Paul": "F"
        }

        # Weekly minimum hours
        self.minWeekly = {
            "Max": 12, "Jiwa": 12, "Fore": 12,
            "Donna": 36, "Paul": 36
        }

        # Daily maximum availability (hours) - from requirement table
        self.avail = {
            ("Max", 1): 8, ("Max", 2): 8, ("Max", 3): 8, ("Max", 4): 0,
            ("Max", 5): 8, ("Max", 6): 4, ("Max", 7): 4,
            ("Jiwa", 1): 4, ("Jiwa", 2): 4, ("Jiwa", 3): 8, ("Jiwa", 4): 8,
            ("Jiwa", 5): 0, ("Jiwa", 6): 4, ("Jiwa", 7): 0,
            ("Fore", 1): 8, ("Fore", 2): 0, ("Fore", 3): 8, ("Fore", 4): 8,
            ("Fore", 5): 8, ("Fore", 6): 8, ("Fore", 7): 0,
            ("Donna", 1): 12, ("Donna", 2): 12, ("Donna", 3): 12, ("Donna", 4): 12,
            ("Donna", 5): 12, ("Donna", 6): 12, ("Donna", 7): 8,
            ("Paul", 1): 12, ("Paul", 2): 8, ("Paul", 3): 12, ("Paul", 4): 12,
            ("Paul", 5): 8, ("Paul", 6): 12, ("Paul", 7): 12
        }


In [8]:


    def __init__(self, data, blocks_override=None, min_weekly_override=None):
        """
        Initialize the baseline scheduler.

        Args:
            data: SchedulingData object containing all parameters
            blocks_override: Optional list of blocks to include (for sensitivity analysis)
            min_weekly_override: Optional dict to override weekly minimums
        """
        self.data = data
        self.blocks_to_use = blocks_override if blocks_override else data.blocks
        self.min_weekly = min_weekly_override if min_weekly_override else data.minWeekly

        self.prob = None
        self.y = None
        self.status = None
        self.total_cost = None
        self.weekly_hours = None
        self.schedule = None

    def build_model(self):
        """Construct the MILP optimization model."""

        # Initialize problem
        self.prob = pl.LpProblem("Java_Co_Baseline_Scheduling", pl.LpMinimize)

        # Decision Variables: y[b,d,k] ∈ {0,1}
        # y[b,d,k] = 1 if barista b works block k on day d, else 0
        self.y = pl.LpVariable.dicts(
            "y",
            ((b, d, k) for b in self.data.baristas
             for d in self.data.days_indices
             for k in self.blocks_to_use),
            cat=pl.LpBinary
        )

        # OBJECTIVE FUNCTION: minimise total weekly staffing cost
        # Z = Σ_o Σ_d Σ_b H·c_o·y_o,d,b
        self.prob += pl.lpSum(
            self.data.H * self.data.cost[b] * self.y[(b, d, k)]
            for b in self.data.baristas
            for d in self.data.days_indices
            for k in self.blocks_to_use
        ), "Total_Weekly_Staffing_Cost"

        # CONSTRAINT 1: at least one barista per block per day
        # Σ_o y_o,d,b ≥ 1  ∀d,b
        for d in self.data.days_indices:
            for k in self.blocks_to_use:
                self.prob += (
                    pl.lpSum(self.y[(b, d, k)] for b in self.data.baristas) >= 1,
                    f"Coverage_Day{d}_Block{k}"
                )

        # CONSTRAINT 2: Per-day availability (hours)
        # H·Σ_b y_o,d,b ≤ a_o,d  ∀o,d
        for b in self.data.baristas:
            for d in self.data.days_indices:
                self.prob += (
                    pl.lpSum(self.data.H * self.y[(b, d, k)]
                            for k in self.blocks_to_use) <= self.data.avail[(b, d)],
                    f"Daily_Availability_{b}_Day{d}"
                )

        # CONSTRAINT 3: Daily block limit by contract type
        # Part-time: Σ_b y_o,d,b ≤ 2  ∀o∈P,d
        # Full-time: Σ_b y_o,d,b ≤ 3  ∀o∈F,d
        for b in self.data.baristas:
            for d in self.data.days_indices:
                max_blocks = 2 if self.data.etype[b] == "P" else 3
                self.prob += (
                    pl.lpSum(self.y[(b, d, k)] for k in self.blocks_to_use) <= max_blocks,
                    f"Contract_Block_Limit_{b}_Day{d}"
                )

        # CONSTRAINT 4: Weekly minimum hours
        # H·Σ_d Σ_b y_o,d,b ≥ minWeekly_o  ∀o
        for b in self.data.baristas:
            self.prob += (
                pl.lpSum(self.data.H * self.y[(b, d, k)]
                        for d in self.data.days_indices
                        for k in self.blocks_to_use) >= self.min_weekly[b],
                f"Weekly_Minimum_Hours_{b}"
            )

    def solve(self, verbose=True):
        """Solve the optimization model."""

        if self.prob is None:
            self.build_model()

        # Solve using CBC solver (default for PuLP)
        self.prob.solve(pl.PULP_CBC_CMD(msg=0))

        self.status = pl.LpStatus[self.prob.status]

        if self.prob.status == 1:  # Optimal solution found
            self._extract_solution()

        if verbose:
            self._print_results()

        return self.prob.status == 1

    def _extract_solution(self):
        """Extract and organize the solution from solved model."""

        # Calculate total cost
        self.total_cost = pl.value(self.prob.objective)

        # Organize schedule by barista and day
        self.schedule = defaultdict(lambda: defaultdict(list))
        self.weekly_hours = {}

        for b in self.data.baristas:
            total_hours = 0
            for d in self.data.days_indices:
                for k in self.blocks_to_use:
                    if self.y[(b, d, k)].varValue > 0.5:
                        self.schedule[b][d].append(k)
                        total_hours += self.data.H
            self.weekly_hours[b] = total_hours

    def _print_results(self):
        """Print formatted results."""

        print(f"\n{'='*80}")
        print(f"BASELINE MODEL RESULTS")
        print(f"{'='*80}")
        print(f"Status: {self.status}")

        if self.prob.status != 1:
            print("No optimal solution found.")
            return

        print(f"\nTOTAL WEEKLY COST: IDR {self.total_cost:,.0f}")
        print(f"\n{'='*80}")
        print(f"WEEKLY SCHEDULE BY BARISTA")
        print(f"{'='*80}\n")

        for b in self.data.baristas:
            emp_type = "Part-time" if self.data.etype[b] == "P" else "Full-time"
            print(f"{b} ({emp_type}, IDR {self.data.cost[b]:,}/hr)")
            print("-" * 80)

            for d in self.data.days_indices:
                day_name = self.data.day_names[d]
                if self.schedule[b][d]:
                    blocks_worked = sorted(self.schedule[b][d])
                    blocks_str = ", ".join([self.data.block_names[k] for k in blocks_worked])
                    daily_hours = len(blocks_worked) * self.data.H
                    print(f"  {day_name:12s}: {blocks_str:50s} ({daily_hours} hrs)")
                else:
                    print(f"  {day_name:12s}: Off")

            weekly_cost = self.weekly_hours[b] * self.data.cost[b]
            print(f"  {'TOTAL':12s}: {self.weekly_hours[b]} hours/week | " +
                  f"IDR {weekly_cost:,}/week")
            print()

In [24]:
class FairnessScheduler:

    def __init__(self, data, baseline_cost):
        """
        Initialize fairness scheduler.

        Args:
            data: SchedulingData object
            baseline_cost: Optimal cost from baseline model (for budget constraint)
        """
        self.data = data
        self.baseline_cost = baseline_cost
        self.budget_multiplier = 1.02  # Allow 2% cost increase

        self.prob = None
        self.y = None
        self.H_max = None
        self.H_min = None
        self.status = None
        self.total_cost = None
        self.weekly_hours = None
        self.schedule = None
        self.disparity = None

    def build_model(self):
        """Construct the fairness-aware MILP model."""

        # Initialize problem
        self.prob = pl.LpProblem("Java_Co_Fairness_Scheduling", pl.LpMinimize)

        # Decision Variables: y[b,d,k] ∈ {0,1}
        self.y = pl.LpVariable.dicts(
            "y",
            ((b, d, k) for b in self.data.baristas
             for d in self.data.days_indices
             for k in self.data.blocks),
            cat=pl.LpBinary
        )

        # Auxiliary variables for fairness objective
        self.H_max = pl.LpVariable("H_max", lowBound=0)
        self.H_min = pl.LpVariable("H_min", lowBound=0)

        # Weekly hours for each barista (auxiliary expression)
        weekly_hours_expr = {}
        for b in self.data.baristas:
            weekly_hours_expr[b] = pl.lpSum(
                self.data.H * self.y[(b, d, k)]
                for d in self.data.days_indices
                for k in self.data.blocks
            )

        # OBJECTIVE: Minimize hour disparity (H_max - H_min)
        self.prob += (self.H_max - self.H_min), "Minimize_Hour_Disparity"

        # BUDGET CONSTRAINT: Total cost ≤ 1.02 × baseline_cost
        total_cost_expr = pl.lpSum(
            self.data.H * self.data.cost[b] * self.y[(b, d, k)]
            for b in self.data.baristas
            for d in self.data.days_indices
            for k in self.data.blocks
        )
        self.prob += (
            total_cost_expr <= self.budget_multiplier * self.baseline_cost,
            "Budget_Constraint"
        )

        # Define H_max and H_min constraints
        for b in self.data.baristas:
            self.prob += (
                weekly_hours_expr[b] <= self.H_max,
                f"Max_Hours_Definition_{b}"
            )
            self.prob += (
                weekly_hours_expr[b] >= self.H_min,
                f"Min_Hours_Definition_{b}"
            )

        # SAME CONSTRAINTS AS BASELINE:

        # Coverage constraints
        for d in self.data.days_indices:
            for k in self.data.blocks:
                self.prob += (
                    pl.lpSum(self.y[(b, d, k)] for b in self.data.baristas) >= 1,
                    f"Coverage_Day{d}_Block{k}"
                )

        # Daily availability
        for b in self.data.baristas:
            for d in self.data.days_indices:
                self.prob += (
                    pl.lpSum(self.data.H * self.y[(b, d, k)]
                            for k in self.data.blocks) <= self.data.avail[(b, d)],
                    f"Daily_Availability_{b}_Day{d}"
                )

        # Block limits by contract
        for b in self.data.baristas:
            for d in self.data.days_indices:
                max_blocks = 2 if self.data.etype[b] == "P" else 3
                self.prob += (
                    pl.lpSum(self.y[(b, d, k)] for k in self.data.blocks) <= max_blocks,
                    f"Contract_Block_Limit_{b}_Day{d}"
                )

        # Weekly minimums
        for b in self.data.baristas:
            self.prob += (
                weekly_hours_expr[b] >= self.data.minWeekly[b],
                f"Weekly_Minimum_Hours_{b}"
            )

    def solve(self, verbose=True):
        """Solve the fairness model."""

        if self.prob is None:
            self.build_model()

        self.prob.solve(pl.PULP_CBC_CMD(msg=0))
        self.status = pl.LpStatus[self.prob.status]

        if self.prob.status == 1:
            self._extract_solution()

        if verbose:
            self._print_results()

        return self.prob.status == 1

    def _extract_solution(self):
        """Extract fairness solution."""

        # Calculate actual total cost
        self.total_cost = sum(
            self.data.H * self.data.cost[b] * self.y[(b, d, k)].varValue
            for b in self.data.baristas
            for d in self.data.days_indices
            for k in self.data.blocks
            if self.y[(b, d, k)].varValue > 0.5
        )

        # Extract disparity
        self.disparity = pl.value(self.prob.objective)

        # Organize schedule
        self.schedule = defaultdict(lambda: defaultdict(list))
        self.weekly_hours = {}

        for b in self.data.baristas:
            total_hours = 0
            for d in self.data.days_indices:
                for k in self.data.blocks:
                    if self.y[(b, d, k)].varValue > 0.5:
                        self.schedule[b][d].append(k)
                        total_hours += self.data.H
            self.weekly_hours[b] = total_hours

    def _print_results(self):
        """Print fairness results."""

        print(f"\n{'='*80}")
        print(f"FAIRNESS MODEL RESULTS")
        print(f"{'='*80}")
        print(f"Status: {self.status}")

        if self.prob.status != 1:
            print("No optimal solution found.")
            return

        print(f"\nTOTAL WEEKLY COST: IDR {self.total_cost:,.0f}")
        print(f"Cost vs Baseline: +IDR {self.total_cost - self.baseline_cost:,.0f} " +
              f"({(self.total_cost/self.baseline_cost - 1)*100:.2f}%)")
        print(f"\nHOUR DISPARITY: {self.disparity:.1f} hours")
        print(f"  Maximum weekly hours: {self.H_max.varValue:.0f}")
        print(f"  Minimum weekly hours: {self.H_min.varValue:.0f}")

In [22]:
import pulp as pl
from collections import defaultdict
import pandas as pd

# Define SchedulingData class (reconstructed from cell 12cS7NNIGMiN)
class SchedulingData:
    def __init__(self):
        self.baristas = ["Max", "Jiwa", "Fore", "Donna", "Paul"]
        self.days_indices = list(range(1, 8))
        self.blocks = list(range(1, 5))
        self.H = 4
        self.day_names = {
            1: "Monday", 2: "Tuesday", 3: "Wednesday", 4: "Thursday",
            5: "Friday", 6: "Saturday", 7: "Sunday"
        }
        self.block_names = {
            1: "07:00-11:00", 2: "11:00-15:00",
            3: "15:00-19:00", 4: "19:00-23:00"
        }
        self.cost = {
            "Max": 50000, "Jiwa": 50000, "Fore": 50000,
            "Donna": 150000, "Paul": 150000
        }
        self.etype = {
            "Max": "P", "Jiwa": "P", "Fore": "P",
            "Donna": "F", "Paul": "F"
        }
        self.minWeekly = {
            "Max": 12, "Jiwa": 12, "Fore": 12,
            "Donna": 36, "Paul": 36
        }
        self.avail = {
            ("Max", 1): 8, ("Max", 2): 8, ("Max", 3): 8, ("Max", 4): 0,
            ("Max", 5): 8, ("Max", 6): 4, ("Max", 7): 4,
            ("Jiwa", 1): 4, ("Jiwa", 2): 4, ("Jiwa", 3): 8, ("Jiwa", 4): 8,
            ("Jiwa", 5): 0, ("Jiwa", 6): 4, ("Jiwa", 7): 0,
            ("Fore", 1): 8, ("Fore", 2): 0, ("Fore", 3): 8, ("Fore", 4): 8,
            ("Fore", 5): 8, ("Fore", 6): 8, ("Fore", 7): 0,
            ("Donna", 1): 12, ("Donna", 2): 12, ("Donna", 3): 12, ("Donna", 4): 12,
            ("Donna", 5): 12, ("Donna", 6): 12, ("Donna", 7): 8,
            ("Paul", 1): 12, ("Paul", 2): 8, ("Paul", 3): 12, ("Paul", 4): 12,
            ("Paul", 5): 8, ("Paul", 6): 12, ("Paul", 7): 12
        }

# Define BaselineScheduler class (reconstructed from cell EEYphilSHt2-)
class BaselineScheduler:
    def __init__(self, data, blocks_override=None, min_weekly_override=None):
        self.data = data
        self.blocks_to_use = blocks_override if blocks_override else data.blocks
        self.min_weekly = min_weekly_override if min_weekly_override else data.minWeekly
        self.prob = None
        self.y = None
        self.status = None
        self.total_cost = None
        self.weekly_hours = None
        self.schedule = None

    def build_model(self):
        self.prob = pl.LpProblem("Java_Co_Baseline_Scheduling", pl.LpMinimize)
        self.y = pl.LpVariable.dicts(
            "y",
            ((b, d, k) for b in self.data.baristas
             for d in self.data.days_indices
             for k in self.blocks_to_use),
            cat=pl.LpBinary
        )
        self.prob += pl.lpSum(
            self.data.H * self.data.cost[b] * self.y[(b, d, k)]
            for b in self.data.baristas
            for d in self.data.days_indices
            for k in self.blocks_to_use
        ), "Total_Weekly_Staffing_Cost"
        for d in self.data.days_indices:
            for k in self.blocks_to_use:
                self.prob += (
                    pl.lpSum(self.y[(b, d, k)] for b in self.data.baristas) >= 1,
                    f"Coverage_Day{d}_Block{k}"
                )
        for b in self.data.baristas:
            for d in self.data.days_indices:
                self.prob += (
                    pl.lpSum(self.data.H * self.y[(b, d, k)]
                            for k in self.blocks_to_use) <= self.data.avail[(b, d)],
                    f"Daily_Availability_{b}_Day{d}"
                )
        for b in self.data.baristas:
            for d in self.data.days_indices:
                max_blocks = 2 if self.data.etype[b] == "P" else 3
                self.prob += (
                    pl.lpSum(self.y[(b, d, k)] for k in self.blocks_to_use) <= max_blocks,
                    f"Contract_Block_Limit_{b}_Day{d}"
                )
        for b in self.data.baristas:
            self.prob += (
                pl.lpSum(self.data.H * self.y[(b, d, k)]
                        for d in self.data.days_indices
                        for k in self.blocks_to_use) >= self.min_weekly[b],
                f"Weekly_Minimum_Hours_{b}"
            )

    def solve(self, verbose=True):
        if self.prob is None:
            self.build_model()
        self.prob.solve(pl.PULP_CBC_CMD(msg=0))
        self.status = pl.LpStatus[self.prob.status]
        if self.prob.status == 1:
            self._extract_solution()
        if verbose:
            self._print_results()
        return self.prob.status == 1

    def _extract_solution(self):
        self.total_cost = pl.value(self.prob.objective)
        self.schedule = defaultdict(lambda: defaultdict(list))
        self.weekly_hours = {}
        for b in self.data.baristas:
            total_hours = 0
            for d in self.data.days_indices:
                for k in self.blocks_to_use:
                    if self.y[(b, d, k)].varValue > 0.5:
                        self.schedule[b][d].append(k)
                        total_hours += self.data.H
            self.weekly_hours[b] = total_hours

    def _print_results(self):
        print(f"\n{'='*80}")
        print(f"BASELINE MODEL RESULTS")
        print(f"{'='*80}")
        print(f"Status: {self.status}")
        if self.prob.status != 1:
            print("No optimal solution found.")
            return
        print(f"\nTOTAL WEEKLY COST: IDR {self.total_cost:,.0f}")
        print(f"\n{'='*80}")
        print(f"WEEKLY SCHEDULE BY BARISTA")
        print(f"{'='*80}\n")
        for b in self.data.baristas:
            emp_type = "Part-time" if self.data.etype[b] == "P" else "Full-time"
            print(f"  {b} ({emp_type}, IDR {self.data.cost[b]:,}/hr)")
            print("  " + "-" * 78)
            for d in self.data.days_indices:
                day_name = self.data.day_names[d]
                if self.schedule[b][d]:
                    blocks_worked = sorted(self.schedule[b][d])
                    blocks_str = ", ".join([self.data.block_names[k] for k in blocks_worked])
                    daily_hours = len(blocks_worked) * self.data.H
                    print(f"    {day_name:12s}: {blocks_str:50s} ({daily_hours} hrs)")
                else:
                    print(f"    {day_name:12s}: Off")
            weekly_cost = self.weekly_hours[b] * self.data.cost[b]
            print(f"    {'TOTAL':12s}: {self.weekly_hours[b]} hours/week | " +
                  f"IDR {weekly_cost:,}/week")
            print()


def main():
    """Main execution function."""

    # Initialize data
    data = SchedulingData()

    print("\n" + "="*80)
    print("JAVA & CO. BARISTA SCHEDULING OPTIMIZATION")
    print("="*80)

    print("\n\n" + "="*80)
    print("PART A: BASELINE MODEL")
    print("="*80)

    baseline = BaselineScheduler(data)
    baseline.solve()

    if baseline.status != "Optimal":
        print("ERROR: Could not find optimal baseline solution.")
        return

    # These variables are typically intended to be available after main() runs, or passed around.
    # For direct execution within this cell, they would be local to main().
    # If you need them outside, you'd need to return them or define them globally.
    # For this fix, they remain local to the executed main() function.
    baseline_cost = baseline.total_cost
    baseline_hours = baseline.weekly_hours.copy()

# Call the main function to execute the scheduling logic
main()


JAVA & CO. BARISTA SCHEDULING OPTIMIZATION


PART A: BASELINE MODEL

BASELINE MODEL RESULTS
Status: Optimal

TOTAL WEEKLY COST: IDR 12,800,000

WEEKLY SCHEDULE BY BARISTA

  Max (Part-time, IDR 50,000/hr)
  ------------------------------------------------------------------------------
    Monday      : 15:00-19:00                                        (4 hrs)
    Tuesday     : 19:00-23:00                                        (4 hrs)
    Wednesday   : Off
    Thursday    : Off
    Friday      : Off
    Saturday    : 19:00-23:00                                        (4 hrs)
    Sunday      : Off
    TOTAL       : 12 hours/week | IDR 600,000/week

  Jiwa (Part-time, IDR 50,000/hr)
  ------------------------------------------------------------------------------
    Monday      : 19:00-23:00                                        (4 hrs)
    Tuesday     : 15:00-19:00                                        (4 hrs)
    Wednesday   : Off
    Thursday    : Off
    Friday      : Off
    Sat

In [26]:
import pulp as pl
from collections import defaultdict
import pandas as pd

# Define SchedulingData class (reconstructed from cell 12cS7NNIGMiN and 8c5N0OKRMB5I)
class SchedulingData:
    def __init__(self):
        self.baristas = ["Max", "Jiwa", "Fore", "Donna", "Paul"]
        self.days_indices = list(range(1, 8))
        self.blocks = list(range(1, 5))
        self.H = 4
        self.day_names = {
            1: "Monday", 2: "Tuesday", 3: "Wednesday", 4: "Thursday",
            5: "Friday", 6: "Saturday", 7: "Sunday"
        }
        self.block_names = {
            1: "07:00-11:00", 2: "11:00-15:00",
            3: "15:00-19:00", 4: "19:00-23:00"
        }
        self.cost = {
            "Max": 50000, "Jiwa": 50000, "Fore": 50000,
            "Donna": 150000, "Paul": 150000
        }
        self.etype = {
            "Max": "P", "Jiwa": "P", "Fore": "P",
            "Donna": "F", "Paul": "F"
        }
        self.minWeekly = {
            "Max": 12, "Jiwa": 12, "Fore": 12,
            "Donna": 36, "Paul": 36
        }
        self.avail = {
            ("Max", 1): 8, ("Max", 2): 8, ("Max", 3): 8, ("Max", 4): 0,
            ("Max", 5): 8, ("Max", 6): 4, ("Max", 7): 4,
            ("Jiwa", 1): 4, ("Jiwa", 2): 4, ("Jiwa", 3): 8, ("Jiwa", 4): 8,
            ("Jiwa", 5): 0, ("Jiwa", 6): 4, ("Jiwa", 7): 0,
            ("Fore", 1): 8, ("Fore", 2): 0, ("Fore", 3): 8, ("Fore", 4): 8,
            ("Fore", 5): 8, ("Fore", 6): 8, ("Fore", 7): 0,
            ("Donna", 1): 12, ("Donna", 2): 12, ("Donna", 3): 12, ("Donna", 4): 12,
            ("Donna", 5): 12, ("Donna", 6): 12, ("Donna", 7): 8,
            ("Paul", 1): 12, ("Paul", 2): 8, ("Paul", 3): 12, ("Paul", 4): 12,
            ("Paul", 5): 8, ("Paul", 6): 12, ("Paul", 7): 12
        }

# Define BaselineScheduler class (reconstructed from cell EEYphilSHt2- and 8c5N0OKRMB5I)
class BaselineScheduler:
    def __init__(self, data, blocks_override=None, min_weekly_override=None):
        self.data = data
        self.blocks_to_use = blocks_override if blocks_override else data.blocks
        self.min_weekly = min_weekly_override if min_weekly_override else data.minWeekly
        self.prob = None
        self.y = None
        self.status = None
        self.total_cost = None
        self.weekly_hours = None
        self.schedule = None

    def build_model(self):
        self.prob = pl.LpProblem("Java_Co_Baseline_Scheduling", pl.LpMinimize)
        self.y = pl.LpVariable.dicts(
            "y",
            ((b, d, k) for b in self.data.baristas
             for d in self.data.days_indices
             for k in self.blocks_to_use),
            cat=pl.LpBinary
        )
        self.prob += pl.lpSum(
            self.data.H * self.data.cost[b] * self.y[(b, d, k)]
            for b in self.data.baristas
            for d in self.data.days_indices
            for k in self.blocks_to_use
        ), "Total_Weekly_Staffing_Cost"
        for d in self.data.days_indices:
            for k in self.blocks_to_use:
                self.prob += (
                    pl.lpSum(self.y[(b, d, k)] for b in self.data.baristas) >= 1,
                    f"Coverage_Day{d}_Block{k}"
                )
        for b in self.data.baristas:
            for d in self.data.days_indices:
                self.prob += (
                    pl.lpSum(self.data.H * self.y[(b, d, k)]
                            for k in self.blocks_to_use) <= self.data.avail[(b, d)],
                    f"Daily_Availability_{b}_Day{d}"
                )
        for b in self.data.baristas:
            for d in self.data.days_indices:
                max_blocks = 2 if self.data.etype[b] == "P" else 3
                self.prob += (
                    pl.lpSum(self.y[(b, d, k)] for k in self.blocks_to_use) <= max_blocks,
                    f"Contract_Block_Limit_{b}_Day{d}"
                )
        for b in self.data.baristas:
            self.prob += (
                pl.lpSum(self.data.H * self.y[(b, d, k)]
                        for d in self.data.days_indices
                        for k in self.blocks_to_use) >= self.min_weekly[b],
                f"Weekly_Minimum_Hours_{b}"
            )

    def solve(self, verbose=True):
        if self.prob is None:
            self.build_model()
        self.prob.solve(pl.PULP_CBC_CMD(msg=0))
        self.status = pl.LpStatus[self.prob.status]
        if self.prob.status == 1:
            self._extract_solution()
        if verbose:
            self._print_results()
        return self.prob.status == 1

    def _extract_solution(self):
        self.total_cost = pl.value(self.prob.objective)
        self.schedule = defaultdict(lambda: defaultdict(list))
        self.weekly_hours = {}
        for b in self.data.baristas:
            total_hours = 0
            for d in self.data.days_indices:
                for k in self.blocks_to_use:
                    if self.y[(b, d, k)].varValue > 0.5:
                        self.schedule[b][d].append(k)
                        total_hours += self.data.H
            self.weekly_hours[b] = total_hours

    def _print_results(self):
        print(f"\n{'='*80}")
        print(f"BASELINE MODEL RESULTS")
        print(f"{'='*80}")
        print(f"Status: {self.status}")
        if self.prob.status != 1:
            print("No optimal solution found.")
            return
        print(f"\nTOTAL WEEKLY COST: IDR {self.total_cost:,.0f}")
        print(f"\n{'='*80}")
        print(f"WEEKLY SCHEDULE BY BARISTA")
        print(f"{'='*80}\n")
        for b in self.data.baristas:
            emp_type = "Part-time" if self.data.etype[b] == "P" else "Full-time"
            print(f"  {b} ({emp_type}, IDR {self.data.cost[b]:,}/hr)")
            print("  " + "-" * 78)
            for d in self.data.days_indices:
                day_name = self.data.day_names[d]
                if self.schedule[b][d]:
                    blocks_worked = sorted(self.schedule[b][d])
                    blocks_str = ", ".join([self.data.block_names[k] for k in blocks_worked])
                    daily_hours = len(blocks_worked) * self.data.H
                    print(f"    {day_name:12s}: {blocks_str:50s} ({daily_hours} hrs)")
                else:
                    print(f"    {day_name:12s}: Off")
            weekly_cost = self.weekly_hours[b] * self.data.cost[b]
            print(f"    {'TOTAL':12s}: {self.weekly_hours[b]} hours/week | " +
                  f"IDR {weekly_cost:,}/week")
            print()

# Re-initialize data and run baseline model to make variables available in this cell
data = SchedulingData()
baseline = BaselineScheduler(data)
baseline.solve(verbose=False) # Run silently, output was already printed
baseline_cost = baseline.total_cost
baseline_hours = baseline.weekly_hours.copy()


# SENSITIVITY ANALYSIS 1: Close at 19:00 (Remove Block 4)

print("\n\n" + "="*80)
print("SENSITIVITY ANALYSIS 1: Close at 19:00 (3 blocks/day)")
print("="*80)

scenario1 = BaselineScheduler(data, blocks_override=[1, 2, 3])
scenario1.solve()

if scenario1.status == "Optimal":
    cost_change = scenario1.total_cost - baseline_cost
    pct_change = (cost_change / baseline_cost) * 100
    print(f"\n{'='*80}")
    print(f"IMPACT ANALYSIS:")
    print(f"  Cost change: IDR {cost_change:,.0f} ({pct_change:+.2f}%)")
    print(f"  Total blocks reduced: 28 \u2192 21 (25% reduction)")
    print(f"{'='*80}")

# SENSITIVITY ANALYSIS 2: Increase Part-time Minimum to 16 hours

print("\n\n" + "="*80)
print("SENSITIVITY ANALYSIS 2: Part-time minimum 12\u219216 hours/week")
print("="*80)

min_weekly_16 = data.minWeekly.copy()
for b in data.baristas:
    if data.etype[b] == "P":
        min_weekly_16[b] = 16

scenario2 = BaselineScheduler(data, min_weekly_override=min_weekly_16)
scenario2.solve()

if scenario2.status == "Optimal":
    cost_change = scenario2.total_cost - baseline_cost
    pct_change = (cost_change / baseline_cost) * 100
    print(f"\n{'='*80}")
    print(f"IMPACT ANALYSIS:")
    print(f"  Cost change: IDR {cost_change:,.0f} ({pct_change:+.2f}%)")
    print(f"\nHours change for part-timers:")
    for b in ["Max", "Jiwa", "Fore"]:
        change = scenario2.weekly_hours[b] - baseline_hours[b]
        print(f"  {b}: {baseline_hours[b]} \u2192 {scenario2.weekly_hours[b]} hrs " +
              f"({change:+.0f})")
    print(f"{'='*80}")



SENSITIVITY ANALYSIS 1: Close at 19:00 (3 blocks/day)

BASELINE MODEL RESULTS
Status: Optimal

TOTAL WEEKLY COST: IDR 12,600,000

WEEKLY SCHEDULE BY BARISTA

  Max (Part-time, IDR 50,000/hr)
  ------------------------------------------------------------------------------
    Monday      : Off
    Tuesday     : Off
    Wednesday   : 11:00-15:00, 15:00-19:00                           (8 hrs)
    Thursday    : Off
    Friday      : Off
    Saturday    : 07:00-11:00                                        (4 hrs)
    Sunday      : Off
    TOTAL       : 12 hours/week | IDR 600,000/week

  Jiwa (Part-time, IDR 50,000/hr)
  ------------------------------------------------------------------------------
    Monday      : Off
    Tuesday     : 11:00-15:00                                        (4 hrs)
    Wednesday   : Off
    Thursday    : 11:00-15:00                                        (4 hrs)
    Friday      : Off
    Saturday    : 07:00-11:00                                        (4 hrs

In [None]:
#PART B: FAIRNESS MODEL

import pulp as pl
from collections import defaultdict

# Define FairnessScheduler class (reconstructed from cell 9Ge0bw_-JZUx, ensuring it's available in this cell)
class FairnessScheduler:

    def __init__(self, data, baseline_cost):
        """
        Initialize fairness scheduler.

        Args:
            data: SchedulingData object
            baseline_cost: Optimal cost from baseline model (for budget constraint)
        """
        self.data = data
        self.baseline_cost = baseline_cost
        self.budget_multiplier = 1.02  # Allow 2% cost increase

        self.prob = None
        self.y = None
        self.H_max = None
        self.H_min = None
        self.status = None
        self.total_cost = None
        self.weekly_hours = None
        self.schedule = None
        self.disparity = None

    def build_model(self):
        """Construct the fairness-aware MILP model."""

        # Initialize problem
        self.prob = pl.LpProblem("Java_Co_Fairness_Scheduling", pl.LpMinimize)

        # Decision Variables: y[b,d,k] ∈ {0,1}
        self.y = pl.LpVariable.dicts(
            "y",
            ((b, d, k) for b in self.data.baristas
             for d in self.data.days_indices
             for k in self.data.blocks),
            cat=pl.LpBinary
        )

        # Auxiliary variables for fairness objective
        self.H_max = pl.LpVariable("H_max", lowBound=0)
        self.H_min = pl.LpVariable("H_min", lowBound=0)

        # Weekly hours for each barista (auxiliary expression)
        weekly_hours_expr = {}
        for b in self.data.baristas:
            weekly_hours_expr[b] = pl.lpSum(
                self.data.H * self.y[(b, d, k)]
                for d in self.data.days_indices
                for k in self.data.blocks
            )

        # OBJECTIVE: Minimize hour disparity (H_max - H_min)
        self.prob += (self.H_max - self.H_min), "Minimize_Hour_Disparity"

        # BUDGET CONSTRAINT: Total cost ≤ 1.02 × baseline_cost
        total_cost_expr = pl.lpSum(
            self.data.H * self.data.cost[b] * self.y[(b, d, k)]
            for b in self.data.baristas
            for d in self.data.days_indices
            for k in self.data.blocks
        )
        self.prob += (
            total_cost_expr <= self.budget_multiplier * self.baseline_cost,
            "Budget_Constraint"
        )

        # Define H_max and H_min constraints
        for b in self.data.baristas:
            self.prob += (
                weekly_hours_expr[b] <= self.H_max,
                f"Max_Hours_Definition_{b}"
            )
            self.prob += (
                weekly_hours_expr[b] >= self.H_min,
                f"Min_Hours_Definition_{b}"
            )

        # SAME CONSTRAINTS AS BASELINE:

        # Coverage constraints
        for d in self.data.days_indices:
            for k in self.data.blocks:
                self.prob += (
                    pl.lpSum(self.y[(b, d, k)] for b in self.data.baristas) >= 1,
                    f"Coverage_Day{d}_Block{k}"
                )

        # Daily availability
        for b in self.data.baristas:
            for d in self.data.days_indices:
                self.prob += (
                    pl.lpSum(self.data.H * self.y[(b, d, k)]
                            for k in self.data.blocks) <= self.data.avail[(b, d)],
                    f"Daily_Availability_{b}_Day{d}"
                )

        # Block limits by contract
        for b in self.data.baristas:
            for d in self.data.days_indices:
                max_blocks = 2 if self.data.etype[b] == "P" else 3
                self.prob += (
                    pl.lpSum(self.y[(b, d, k)] for k in self.data.blocks) <= max_blocks,
                    f"Contract_Block_Limit_{b}_Day{d}"
                )

        # Weekly minimums
        for b in self.data.baristas:
            self.prob += (
                weekly_hours_expr[b] >= self.data.minWeekly[b],
                f"Weekly_Minimum_Hours_{b}"
            )

    def solve(self, verbose=True):
        """Solve the fairness model."""

        if self.prob is None:
            self.build_model()

        self.prob.solve(pl.PULP_CBC_CMD(msg=0))
        self.status = pl.LpStatus[self.prob.status]

        if self.prob.status == 1:
            self._extract_solution()

        if verbose:
            self._print_results()

        return self.prob.status == 1

    def _extract_solution(self):
        """Extract fairness solution."""

        # Calculate actual total cost
        self.total_cost = sum(
            self.data.H * self.data.cost[b] * self.y[(b, d, k)].varValue
            for b in self.data.baristas
            for d in self.data.days_indices
            for k in self.data.blocks
            if self.y[(b, d, k)].varValue > 0.5
        )

        # Extract disparity
        self.disparity = pl.value(self.prob.objective)

        # Organize schedule
        self.schedule = defaultdict(lambda: defaultdict(list))
        self.weekly_hours = {}

        for b in self.data.baristas:
            total_hours = 0
            for d in self.data.days_indices:
                for k in self.data.blocks:
                    if self.y[(b, d, k)].varValue > 0.5:
                        self.schedule[b][d].append(k)
                        total_hours += self.data.H
            self.weekly_hours[b] = total_hours

    def _print_results(self):
        """Print fairness results."""

        print(f"\n{'='*80}")
        print(f"FAIRNESS MODEL RESULTS")
        print(f"{'='*80}")
        print(f"Status: {self.status}")

        if self.prob.status != 1:
            print("No optimal solution found.")
            return

        print(f"\nTOTAL WEEKLY COST: IDR {self.total_cost:,.0f}")
        print(f"Cost vs Baseline: +IDR {self.total_cost - self.baseline_cost:,.0f} " +
              f"({(self.total_cost/self.baseline_cost - 1)*100:.2f}%)")
        print(f"\nHOUR DISPARITY: {self.disparity:.1f} hours")
        print(f"  Maximum weekly hours: {self.H_max.varValue:.0f}")
        print(f"  Minimum weekly hours: {self.H_min.varValue:.0f}")



print("\n\n" + "="*80)
print("PART B: FAIRNESS-AWARE MODEL")
print("="*80)

fairness = FairnessScheduler(data, baseline_cost)
fairness.solve()

if fairness.status == "Optimal":
    print(f"\n{'='*80}")
    print(f"FAIRNESS vs BASELINE COMPARISON")
    print(f"{'='*80}")
    print(f"\n{'Barista':<10} {'Baseline':<12} {'Fairness':<12} {'Change':<12} {'Type'}")
    print("-" * 80)

    for b in data.baristas:
        change = fairness.weekly_hours[b] - baseline_hours[b]
        emp_type = "Part-time" if data.etype[b] == "P" else "Full-time"
        print(f"{b:<10} {baseline_hours[b]:<12.0f} {fairness.weekly_hours[b]:<12.0f} " +
              f"{change:+12.0f} {emp_type}")

    print("\n" + "="*80)
    print("TRADEOFF ANALYSIS:")
    baseline_disparity = max(baseline_hours.values()) - min(baseline_hours.values())
    print(f"  Disparity reduction: {baseline_disparity:.0f} \u2192 {fairness.disparity:.0f} hours " +
          f"({(1-fairness.disparity/baseline_disparity)*100:.1f}% improvement)")
    print(f"  Cost increase: IDR {fairness.total_cost - baseline_cost:,.0f} " +
          f"({(fairness.total_cost/baseline_cost-1)*100:.2f}%)")
    print("="*80)

print("\n\n" + "="*80)
print("ANALYSIS COMPLETE")
print("="*80)
print("\nAll results have been generated successfully.")
print("Review the output above for your management report.")
print("="*80 + "\n")



PART B: FAIRNESS-AWARE MODEL
