In [1]:
import utils
from ortools.sat.python import cp_model
from params import *

In [2]:
senior, junior = "Senior", "Junior"
am, am_s, pm, n, no, do, al = "Morning", "Morning stayback", "Evening", "Night", "Night off", "Day off", "Annual leave"
shifts = [am, am_s, pm, n]
leaves = [no, do, al]
date_range = utils.date_range(start=(2022,12,1), end=(2023,1,1))

In [6]:
utils.add_days(date_range[0], 0)

datetime.datetime(2022, 12, 1, 0, 0)

In [14]:
class Model:
    def __init__(self, _nurses, _roles, _shifts, _leaves, _date_range):
        self.model = cp_model.CpModel()
        self._nurses = _nurses
        self._roles = _roles
        self._shifts = _shifts
        self._leaves = _leaves
        self._date_range = _date_range
        self._works = {}
        self._slots = _shifts + _leaves
        self.obj_bool_vars_max = []
        self.obj_bool_coeffs_max = []
        self.obj_bool_vars_min = []
        self.obj_bool_coeffs_min = []
        

        for n in _nurses:
            for s in self._slots:
                for d in _date_range:
                    self._works[n, s, d] = self.model.NewBoolVar(f'work_{n}_{s}_{d}')
                    
        for n in _nurses:
            for d in _date_range:
                self.model.AddExactlyOne(self._works[n, s, d] for s in self._slots)

################################################################################################
    
    def handle_previous_roster_day(self, fixed_assignments):
        for n, s, d in fixed_assignments:
            self.model.Add(self._works[n, s, d] == 1)
            
################################################################################################
    
    def off_day_per_week(self, day=1):
        for w in self.workers_list:
            for a_week in utils.chunk(self._date_range, 7):
                self.model.Add(sum(self._works[(n, d, do)] for d in a_week) == day)

################################################################################################
    
    def number_workers_per_shift(self, shift_min_covers):
        am, am_s = "Morning", "Morning stayback"
        assert type(shift_min_covers) == dict, "min shift covers must be dictionary { am: 3 }"
        for d in self._date_range:
            for s, num in shift_min_covers.items():
                if type(num) == tuple:
                    min_, max_ = num
                    self.model.Add(sum(self._works[(n, d, s)] for n in self._nurses) >= min_)
                    self.model.Add(sum(self._works[(n, d, s)] for n in self._nurses) <= max_)
                if s == am:
                    self.model.Add(sum(self._works[(n, d, morning)] for n in self._nurses for morning in [am, am_s]) >= min_)
                else:
                    self.model.Add(sum(self._works[(n, d, s)] for n in self._nurses) >= num)

    def maximize_workers_per_shift(self):
        for d in self._date_range:
            for s in self._shifts:
                self.model.Maximize(
                    sum(self._works[(w, d, s)] for n in self._nurses)
                )
                    
################################################################################################

    def previous_roster(self, day_1_data: list):
        for n, d, s in day_1_data:
            self.model.Add(self._works[(n, d, s)] == 1)
            
################################################################################################
            
    def handle_shift_transitions(self, transitions, strategy, cost):
        for day, shift in transitions:
            for n in self._nurses:
                trans = []
                for d in self._date_range:
                    trans.apprend(n, utils.add_days(d, day), shift)
                    
    def implement_transition_for_each_nurse(self, nurse, transition, strategy):
        for d in date_range:
            transition = []
            for day, shift in transitions.items():
                transition.append((nurse, utils.add_days(d, day), shift))
            self.implement_transition_according_to_strat(transition, strategy)
    
    def implement_transition_according_to_strat(self, transitions, strat, cost):
        if strategy == "never":
            assert len(transition) == 2, "never transition should be only 2 days"
            transition = [self._works[t].Not() for t in transitions]
            try:
                self.model.AddBoolOr(transition)
            except Exception as e:
                print(e, 'has no transition')
            
        elif strategy == "always":
            transition = [self._works[t] for t in transitions]
            try:
                self.model.AddBoolAnd(transition)
            except Exception as e:
                print(e, 'has no transition')
            
        elif strategy == 'max':
            assert len(transition) == 2, "maximize transition should be only 2 days"
            prev_shift, next_shift = transitions 
            transition = [self._works[prev_shift], self._works[next_shift]]
            n, d, s = prev_shift

            trans_var = self.model.NewBoolVar(f'transition (n={n}, day={d})')
            transition.append(trans_var)

            self.obj_bool_vars_max.append(trans_var)
            self.obj_bool_coeffs_max.append(cost)
            try:
                self.model.AddImplication(self.work[prev_shift], self.work[next_shift])
            except Exception as e:
                print(e, 'has no transition')
                
###################################################################################
    
    def implement_slot_sequence_constraints(self, sequence_constraints=None):
        if !sequence_constraints:
            sequence_constraints = []
        for seq_constraint in sequence_constraints:
            slot, slot_type, hard_min, soft_min, min_cost, soft_max, hard_max, max_cost = seq_constraint
            for n in self._nurses:
                works = []
                for s in self._slots:
                    for d in self._date_range:
                        try:
                            works.append(self.work[(w, d, s)])
                        except Exception as e:
                            print(e)
                
                try:
                    variables, coeffs = utils.add_soft_sequence_constraint(
                        self.model,
                        works,
                        hard_min,
                        soft_min,
                        min_cost,
                        soft_max,
                        hard_max,
                        max_cost,
                        f'sequence_constraint({n}, {slot})')
                    self.obj_bool_vars_min.extend(variables)
                    self.obj_bool_coeffs_min.extend(coeffs)
                except Exception as e:
                    print(e)

###################################################################################

    def implement_sum_constraint(self, date_prior):
        all_dates = date_prior + self._date_range
        
        for sum_const in self.sum_constraints: # need to build data model
            slot, slot_type, hard_min, soft_min, min_cost, soft_max, hard_max, max_cost = seq_constraint
            for n in self._nurses:
                weeks = utils.chunk(all_dates, 7)
                for index, week in enumerate(weeks):
                    self.sum_constraint(
                        n, 
                        slot,
                        week,
                        index,
                        hard_min, 
                        soft_min, 
                        min_cost, 
                        soft_max,
                        hard_max, 
                        max_cost
                    )
                            
    def sum_constraint(
        self,
        n, 
        slot, 
        date_list, 
        date_list_index,
        hard_min, 
        soft_min, 
        min_cost, 
        soft_max,
        hard_max, 
        max_cost
    ):
        try:
            works = [self.work[n, slot, d] for d in date_list]
            prefix = f'weekly_sum_constraint({n}, {slot}, {date_list_index})'
            variables, coeffs = utils.add_soft_sum_constraint(
                self.model, 
                works, 
                hard_min, 
                soft_min, 
                min_cost, 
                soft_max,
                hard_max, 
                max_cost, 
                prefix
            )
            self.obj_bool_vars_min.extend(variables)
            self.obj_bool_coeffs_min.extend(coeffs)
        except Exception as e:
            print(e)

###################################################################################

    def fairness_allocation(self, difference=1):
        fairshift = {}
        sum_of_shifts = {}
        num_days = len(self._date_range) 
        for n in self._nurses:
            for s in self._shifts:
                sum_of_shifts[(n, s)] = self.model.NewIntVar(0, num_days, f'sum_of_shifts_{n}_{s}')
                shift_list = []
                for d in self._date_range:
                    try:
                        shift_list.append(self._works[(n, d, s)])
                    except Exception as e:
                        pass
                self.model.Add(sum_of_shifts[(n, s)] == sum(shift_list))
                                
        for s in self.duty_types:
            try:
                min_fair_shift = self.model.NewIntVar(0, num_days, f'min_fair_shift_{s}')
                max_fair_shift = self.model.NewIntVar(0, num_days, f'max_fair_shift_{s}')
                self.model.AddMinEquality(min_fair_shift, [sum_of_shifts[(n, s)] for n in self._nurses])
                self.model.AddMaxEquality(max_fair_shift, [sum_of_shifts[(n, s)] for n in self._nurses]) 

                self.model.Add(max_fair_shift - min_fair_shift <= difference)

            except Exception as e:
                pass
            
###################################################################################




In [11]:
m = Model(nurses, roles, shifts, leaves, date_range)