scheduler_classes.py

In [2]:
from typing import List, Tuple
from datetime import time
import numpy as np

# Input parameters
ROLE_NAME = {
    0: "Actor",
    1: "Shooting Crew",
    2: "Makeup Crew",
    3: "Lighting Crew",
    4: "Equipment"
}

# Dependency types
AND_DEP = "and"  # All predecessors must complete before task can start
OR_DEP = "or"    # At least one predecessor must complete before task can start

ROLE_MEMBERS = {i:[] for i in ROLE_NAME.keys()}
LOCATIONS = ["Cathay City Loading Bay", "Exteriors Near Parking Lot", "Parking Lot Near Loading Bay", "Entrance", "The Street", "The Galleria", "Studio", "Recruitment Room"]
TRANSPORTATION_TIME = np.array([[0, 5, 10, 15, 20, 25],[5, 0, 5, 10, 15, 20],[10, 5, 0, 5, 10, 15],[15, 10, 5, 0, 5, 10],[20, 15, 10, 5, 0, 5],[25, 20, 15, 10, 5, 0]])

def time_to_minutes(t: time) -> int:
    """Convert time object to minutes since midnight."""
    return t.hour * 60 + t.minute
def minutes_to_time(minutes: int) -> time:
    """Convert minutes since midnight to time object."""
    # Handle overflow
    minutes = minutes % (24 * 60)  # Keep within 24 hours
    hours = minutes // 60
    mins = minutes % 60
    return time(hour=int(hours), minute=int(mins))
def get_time_difference(start: time, end: time) -> int:
    """Calculate minutes between two time objects.
    
    Args:
        start: Start time
        end: End time
        
    Returns:
        int: Minutes between times, handling overnight periods
    """
    start_minutes = start.hour * 60 + start.minute
    end_minutes = end.hour * 60 + end.minute
    
    # Handle overnight cases (when end time is earlier than start time)
    if end_minutes < start_minutes:
        end_minutes += 24 * 60  # Add 24 hours in minutes
        
    return end_minutes - start_minutes
class Task:
    def __init__(self, id: int, location: List[str], estimated_duration: int,
                #  estimated_cost: int, 
                 description: str = "",
                 dependencies: List[int] = [],
                 time_of_day: List[Tuple[time, time]] = [(time(0, 0), time(23, 59))], 
                 members: List[int] = None, roles: List[Tuple[int, int]] = None):
        self.id = id
        self.location = location
        self.estimated_duration = estimated_duration
        # self.estimated_cost = estimated_cost
        self.description = description
        self.dependencies = dependencies # List[Task ID, (Task ID, Task ID)] 
        self.time_of_day = time_of_day
        self.members = members or []
        self.roles = roles or []    # List[(role, number of members)]
        self.time_of_day_penalty = 1e9
        self.dependencies_violation = 1e9
        
class Member:
    def __init__(self, id: int, name: str, blocked_timeslots: List[Tuple[time, time]], 
                 rate: int, ot: int, role: int, transportation_speed: float):
        self.id = id
        self.name = name
        self.blocked_timeslots = blocked_timeslots
        self.rate = rate
        self.ot = ot
        self.role = role    # Role ID
        self.transportation_speed = transportation_speed    # scale from 0 to 1, lower is faster
        self.working_hours = []
        self.assigned_tasks = []
        self.schedule = []
        self._set_role()

    def _set_role(self):
        global ROLE_MEMBERS
        ROLE_MEMBERS[self.role].append(self.id)

    def get_role(self) -> str:
        global ROLE_NAME
        return ROLE_NAME[self.role]
    
    def is_available_on(self, start_time: time, end_time: time) -> bool:
        """Check if the actor is available on the given date."""
        return not any((start <= start_time < end) or (start <= end_time < end) for start, end in self.blocked_timeslots)
    
    def block_time(self, start_time: time, end_time: time):
        """Block the time slot for the actor."""
        if self.working_hours: 
            self.working_hours = [min(self.working_hours[0], start_time), max(self.working_hours[1], end_time)]
        else: 
            self.working_hours = [start_time, end_time]
        self.blocked_timeslots.append((start_time, end_time))


export_schedules.py

In [3]:
import pandas as pd
from matplotlib.patches import Rectangle
import numpy as np
import matplotlib.pyplot as plt

def export_schedules(res, model, location=''):
    for i, sol in enumerate(res):
        def visualize_schedule(solution_index, solution):
            # Create figure and axis
            fig, ax = plt.subplots(figsize=(15, 8))
            
            # Convert times to minutes for plotting
            tasks = []
            for member_id, member_tasks in solution[1].items():
                member_name = model.get_by_id(model.members, member_id).name
                for task in member_tasks:
                    start_mins = sum(int(x) * y for x, y in zip(task['start'].split(':'), [60, 1]))
                    end_mins = sum(int(x) * y for x, y in zip(task['end'].split(':'), [60, 1]))
                    tasks.append({
                        'Member': member_name,
                        'Task': f"Task {task['task']}",
                        'Start': start_mins,
                        'Duration': end_mins - start_mins
                    })
            
            # Convert to DataFrame for easier plotting
            df = pd.DataFrame(tasks)
            
            # Sort members by name
            members = sorted(df['Member'].unique())
            
            # Plot tasks as rectangles
            colors = plt.cm.Set3(np.linspace(0, 1, len(df['Task'].unique())))
            for idx, task in df.iterrows():
                y_pos = members.index(task['Member'])
                rect = Rectangle((task['Start'], y_pos - 0.25), task['Duration'], 0.5, 
                                facecolor=colors[int(task['Task'].split()[1]) % len(colors)])
                ax.add_patch(rect)
                ax.text(task['Start'] + task['Duration']/2, y_pos, 
                        task['Task'], ha='center', va='center')
            
            # Customize the plot
            ax.set_ylim(-0.5, len(members) - 0.5)
            ax.set_xlim(df['Start'].min(), (df['Start'] + df['Duration']).max())
            # ax.set_xlim(0*60, 24*60)  # 8:00 to 18:00
            ax.set_yticks(range(len(members)))
            ax.set_yticklabels(members)
            
            # Add gridlines
            ax.grid(True, axis='x', alpha=0.3)
            
            # Format x-axis as time
            # xticks = np.arange(0*60, 24*60, 60)
            xticks = np.arange(df['Start'].min(),(df['Start'] + df['Duration']).max() , 60)
            ax.set_xticks(xticks)
            ax.set_xticklabels([f'{int(x/60):02d}:00' for x in xticks])
            
            plt.title(f'Schedule Solution {solution_index + 1}')
            plt.tight_layout()
            
            # Save the plot
            plt.savefig(location+f'\\schedule_solution_{solution_index + 1}.png')
            plt.close()
            print(f'Solution {solution_index + 1} saved to {location}')

        visualize_schedule(i, sol)

scheduler.py

In [4]:
from typing import List, Tuple
from ortools.sat.python import cp_model
from collections import defaultdict

class EnhancedSolutionCollector(cp_model.CpSolverSolutionCallback):
    def __init__(self, tasks, members, time_unit=15, max_solutions=1000, timeout_seconds=300):
        cp_model.CpSolverSolutionCallback.__init__(self)
        self.tasks = tasks
        self.time_unit = time_unit
        self.max_solutions = max_solutions
        self.timeout_seconds = timeout_seconds
        self.solutions = []
        self.solution_count = 0
        self.members = members
        
    def OnSolutionCallback(self):

        # Convert to human-readable format
        solution = {} 
        for task_id, (_, start_var, end_var, _) in self.tasks.items():
            start = self.Value(start_var) * self.time_unit
            solution[task_id] = start
            
        sol_member_schedule = defaultdict(list)
        for member in self.members:
            for task_id, interval, presence in member.assigned_tasks:
                if self.Value(presence):  # Only include if actually assigned
                    start = self.Value(interval.StartExpr()) * self.time_unit
                    end = self.Value(interval.EndExpr()) * self.time_unit
                    sol_member_schedule[member.id].append({
                        'task': task_id,
                        'start': self._minutes_to_time(start),
                        'end': self._minutes_to_time(end)
                    })
        self.solutions.append((solution, sol_member_schedule))
        self.solution_count += 1
        
        # Stop if reached max solutions
        if self.solution_count >= self.max_solutions:
            print('Stopping search')
            self.StopSearch()
    
    def _minutes_to_time(self, minutes):
        """Convert minutes since midnight to HH:MM format"""
        return f"{minutes//60:02d}:{minutes%60:02d}"
    
    def get_solutions(self):
        return self.solutions
    


class Optimizer:
    def __init__(self, tasks: List[Task], members: List[Member], clash_penalty: int = 1e11, ot_hours: int = 8):
        self.tasks = tasks
        self.members = members
        self.clash_penalty = clash_penalty
        self.ot_hours = ot_hours
        self.optimized_cost = 0
        
    def get_by_id(self, type: List, id: int):
        """Get an object by its ID."""
        return next((obj for obj in type if obj.id == id), None)
    
    def dependencies_violation(self, task_id, schedule_vector) -> bool:
        """Detects dependencies violation."""
        task = self.tasks[task_id]
        dependencies = task.dependencies
        # Check if the dependencies are met
        for dep in dependencies:
            if isinstance(dep, Tuple):
                if not any(schedule_vector[_dep]+self.tasks[_dep].estimated_duration <= schedule_vector[task_id] for _dep in dep):
                    return True
            else:
                if not schedule_vector[dep]+self.tasks[dep].estimated_duration <= schedule_vector[task_id]:
                    return True
        return False
    
    def cost(self, schedule_vector: List[int]) -> int:      
        
        """Calculate the cost of the schedule."""
            
        def _calculate_salary(member: Member) -> int:
            """Calculate the salary of the member."""
            rate = member.rate
            ot = member.ot
            working_hours = get_time_difference(member.working_hours[1], member.working_hours[0]) / 60
            salary = 0
            # OT checking logic
            if working_hours > self.ot_hours:
                salary += ot * working_hours / 60 - self.ot_hours
            salary += rate * min(self.ot_hours,(get_time_difference(member.working_hours[1], member.working_hours[0]) / 60))
            return salary
        
        
        # List of involved members (Member objects)
        inovlved_members = []
        # Deep copy the members
        # _members = copy.deepcopy(self.members)
        _members = self.members.copy()
        total_cost = 0

        # Enumerate through the schedule vector (each task)
        for i, time in enumerate(schedule_vector):
            task = self.get_by_id(self.tasks, i)
            task_members = [self.get_by_id(_members, member) for member in task.members]
            start_time = minutes_to_time(time)
            end_time = minutes_to_time(time + task.estimated_duration)

            # End time cannot be greater than end of day
            if time + task.estimated_duration > 1439:
                total_cost += self.clash_penalty
                
            # Calculate task member clash penalty
            clashed_members = 0
            for member in task_members:
                # print('Member: ',member.name)
                # print(start_time, ' - ',end_time)
                if member.is_available_on(start_time, end_time):
                    member.block_time(start_time, end_time)
                    inovlved_members.append(member)
                else:
                    clashed_members += 1
                    # print('Clashed member: ',member.name)
            total_cost += self.clash_penalty * clashed_members
            
            # For each role find cost
            for role in task.roles:
                role_members = ROLE_MEMBERS[role[0]]
                member_objs = [self.get_by_id(_members, member) for member in role_members]
                member_objs = [member for member in member_objs if member.is_available_on(start_time, end_time)]
                member_objs = sorted(member_objs, key=lambda x: x.rate)
                member_objs = member_objs[:role[1]]
                for member in member_objs:
                    member.block_time(start_time, end_time)
                inovlved_members += member_objs

                # Check if the number of members is enough
                if len(member_objs) < role[1]:
                    total_cost += self.clash_penalty * (role[1] - len(member_objs))
                    continue

            # Violation of time of day
            time_of_day = task.time_of_day
            if not any((start_time <= end) and (end_time >= start) for start, end in time_of_day):
                total_cost += task.time_of_day_penalty

        inovlved_members = list(set(inovlved_members))
        
        # Calculate the salary of the members
        total_cost += sum(map(_calculate_salary, inovlved_members))
        
        # Calculate the dependencies penalty
        dep_pen = 0
        for i in range(len(schedule_vector)):
            if self.dependencies_violation(i, schedule_vector):
                dep_pen += self.clash_penalty
        total_cost += dep_pen
        return total_cost

    def generate_schedule(self, schedule_vector: List[int]) -> int:      
        
        """Generate formatted schedule."""
        
        # List of involved members (Member objects)
        ttb_output = []
        
        # Deep copy the members
        # _members = copy.deepcopy(self.members)
        _members = self.members.copy()

        # Enumerate through the schedule vector (each task)
        ttb = {key: val for val, key in enumerate(schedule_vector)}
        sorted_ttb = dict(sorted(ttb.items()))
        for time, i in sorted_ttb.items():
            task = self.get_by_id(self.tasks, i)
            task_members = [self.get_by_id(_members, member) for member in task.members]
            task_involved_members = []
            start_time = minutes_to_time(time)
            end_time = minutes_to_time(time + task.estimated_duration)
            row = [(start_time, end_time), i]
                
            # Calculate task member clash penalty
            for member in task_members:
                if member.is_available_on(start_time, end_time):
                    member.block_time(start_time, end_time)
                    task_involved_members.append(member.name)
            
            # For each role find cost
            for role in task.roles:
                role_members = ROLE_MEMBERS[role[0]]
                member_objs = [self.get_by_id(_members, member) for member in role_members]
                member_objs = [member for member in member_objs if member.is_available_on(start_time, end_time)]
                member_objs = sorted(member_objs, key=lambda x: x.rate)
                member_objs = member_objs[:role[1]]
                
                for member in member_objs:
                    member.block_time(start_time, end_time)
                    task_involved_members.append(member.name)
                    
            row.append(tuple(task_involved_members))
            ttb_output.append(row)
            # Sort entries by start time
            ttb_output.sort(key=lambda x: time_to_minutes(x[0][0]))

            # Format for readability
            formatted_output = []
            for (start, end), task_id, members in ttb_output:
                task = self.get_by_id(self.tasks, task_id)
                formatted_output.append({
                    'Time': f'{start} - {end}',
                    'Task': f'Task {task_id}',
                    'Duration': f'{task.estimated_duration} min',
                    'Location': ', '.join(task.location),
                    'Members': ', '.join(members)
                })
        return formatted_output
        
    def schedule_tasks(self, get_all_solutions=False, timeout = 30):
        """Schedule tasks using CP-SAT solver."""
        model = cp_model.CpModel()
        
        task_dict = {}
        member_intervals = defaultdict(list)

        # Convert task dependencies to required format
        dependencies = []

        for task in self.tasks:
            # Process dependencies
            if task.dependencies:
                for dep in task.dependencies:
                    if isinstance(dep, tuple):
                        dependencies.append({
                            "successor": task.id,
                            "predecessors": list(dep),
                            "type": OR_DEP
                        })
                    else:
                        dependencies.append({
                            "successor": task.id,
                            "predecessors": [dep],
                            "type": AND_DEP
                        })

            # Create task interval variables
            start_var = model.NewIntVar(0, int(1440/15), f'start_{task.id}')
            end_var = model.NewIntVar(0, int(1440/15), f'end_{task.id}')
            
            # Time window constraints
            if not task.time_of_day:
                model.Add(end_var == start_var + int(task.estimated_duration/15))
            else:
                task_windows = [(int(time_to_minutes(s)/15), int(time_to_minutes(e)/15)) for s,e in task.time_of_day]
                window_constraints = []
                for window_start, window_end in task_windows:
                    in_window = model.NewBoolVar(f'task{task.id}_in_window_{window_start}')
                    model.Add(start_var >= window_start).OnlyEnforceIf(in_window)
                    model.Add(end_var <= window_end).OnlyEnforceIf(in_window)
                    window_constraints.append(in_window)
                model.Add(sum(window_constraints) >= 1)
            
            interval_var = model.NewIntervalVar(start_var, int(task.estimated_duration/15), end_var, f'task{task.id}_interval')
            task_dict[task.id] = (task, start_var, end_var, interval_var)

        # Handle member assignments
        for task in self.tasks:
            task_data = task_dict[task.id]
            start_var, end_var = task_data[1], task_data[2]
            
            # Role-based assignments
            for role_id, req_count in task.roles:
                presence_vars = []
                for member_id in ROLE_MEMBERS[role_id]:
                    presence = model.NewBoolVar(f"role_{member_id}_task{task.id}")
                    presence_vars.append(presence)
                    
                    interval = model.NewOptionalIntervalVar(
                        start=start_var,
                        size=int(task.estimated_duration/15),
                        end=end_var,
                        is_present=presence,
                        name=f"role_{member_id}_task{task.id}_interval"
                    )
                    member_intervals[member_id].append((task.id, interval, presence))
                    
                    # Handle blocked timeslots
                    member = self.get_by_id(self.members, member_id)
                    self._add_blocked_time_constraints(model, member, start_var, end_var, presence)
                    
                model.Add(sum(presence_vars) == req_count)
            
            # Direct member assignments
            for member_id in task.members:
                assign_var = model.NewBoolVar(f'task{task.id}_mem{member_id}')
                model.Add(assign_var == 1)  # Must be assigned
                
                interval = model.NewOptionalIntervalVar(
                    start=start_var,
                    size=int(task.estimated_duration/15),
                    end=end_var,
                    is_present=assign_var,
                    name=f"direct_{member_id}_task{task.id}_interval"
                )
                member_intervals[member_id].append((task.id, interval, assign_var))
                
                # Handle blocked timeslots
                member = self.get_by_id(self.members, member_id)
                self._add_blocked_time_constraints(model, member, start_var, end_var, assign_var)

        # Add no-overlap constraints per member
        for member_id, intervals in member_intervals.items():
            model.AddNoOverlap([interval[1] for interval in intervals])
            member = self.get_by_id(self.members, member_id)
            member.assigned_tasks = intervals

        # Handle dependencies
        for dep in dependencies:
            successor_id = dep["successor"]
            predecessor_ids = dep["predecessors"]
            dep_type = dep["type"]
            
            succ_start = task_dict[successor_id][1]
            
            if dep_type == AND_DEP:
                for pred_id in predecessor_ids:
                    model.Add(succ_start >= task_dict[pred_id][2])
                    
            elif dep_type == OR_DEP:
                or_constraints = []
                for pred_id in predecessor_ids:
                    or_condition = model.NewBoolVar(f'pred_{pred_id}before_succ{successor_id}')
                    model.Add(succ_start >= task_dict[pred_id][2]).OnlyEnforceIf(or_condition)
                    or_constraints.append(or_condition)
                model.AddAtLeastOne(or_constraints)

        # Optimized cost calculation
        total_cost = self._calculate_costs(model, member_intervals)
        model.Minimize(total_cost)
        
        # Solve and process results
        return self._solve_and_process_results(model, task_dict, return_solutions_only=get_all_solutions, timeout=timeout)
    

    def _add_blocked_time_constraints(self, model, member, start_var, end_var, presence_var):
        """Helper method to add blocked time constraints for a member"""
        member_blocked_timeslot = [
            (int(time_to_minutes(s)/15), int(time_to_minutes(e)/15)) 
            for s, e in member.blocked_timeslots
        ]
        
        for block_start, block_end in member_blocked_timeslot:
            before = model.NewBoolVar(f"mem{member.id}_before_{block_start}")
            after = model.NewBoolVar(f"mem{member.id}_after_{block_end}")
                                
            model.Add(end_var <= block_start).OnlyEnforceIf(before)
            model.Add(start_var >= block_end).OnlyEnforceIf(after)
            
            # At least one must be true if member is assigned
            model.AddBoolOr([before, after]).OnlyEnforceIf(presence_var)

    def _calculate_costs(self, model, member_intervals):
        """Cost calculation"""
        total_cost = 0
        
        for member_id, intervals in member_intervals.items():
            if not intervals:
                continue

            member = self.get_by_id(self.members, member_id)
            
            # Calculate time span using adjusted dummy values
            starts = [interval[1].StartExpr() for interval in member.assigned_tasks]
            ends = [interval[1].EndExpr() for interval in member.assigned_tasks]
            
            earliest_start = model.NewIntVar(0, int(1440/15), f'earliest_start_{member_id}')
            latest_end = model.NewIntVar(0, int(1440/15), f'latest_end_{member_id}')
            
            # For unassigned tasks, starts will have 1440 and ends 0
            model.AddMinEquality(earliest_start, starts)
            model.AddMaxEquality(latest_end, ends)
            
            # Create conditional time span
            time_span = model.NewIntVar(0, int(1440/15), f'time_span_{member_id}')
            model.Add(time_span == latest_end - earliest_start)
            # model.Add(time_span == 0).OnlyEnforceIf(has_tasks.Not())
            
            # Improved overtime calculation
            regular_hours = int(self.ot_hours * 60/15)
            regular_time = model.NewIntVar(0, regular_hours, f'reg_time_{member_id}')
            overtime = model.NewIntVar(0, int(1440/15) - regular_hours, f'ot_{member_id}')
            
            # Use linear constraints instead of conditional
            model.AddMinEquality(regular_time, [time_span, regular_hours])
            model.Add(overtime == time_span - regular_time)
            
            # Calculate cost
            member_cost = model.NewIntVar(0, 999999999, f'cost_{member_id}')
            rate = int(member.rate * 15/60)
            ot_rate = int(member.ot * 15/60)
            model.Add(member_cost == regular_time* rate + overtime* ot_rate)
            total_cost += member_cost
        
        return total_cost

    def _solve_and_process_results(self, model, task_dict, return_solutions_only=False, timeout=30):
        """Process solving results and return schedule"""
        solver = cp_model.CpSolver()
        solver.parameters.max_time_in_seconds = timeout
        if return_solutions_only:

            collector = EnhancedSolutionCollector(
                tasks=task_dict,
                members=self.members,
                time_unit=15,
                max_solutions=100,  # Return first 100 solutions
                timeout_seconds=30  # Stop after 1 minute
            )
            
            status = solver.Solve(model, collector)
            
            if status in [cp_model.OPTIMAL, cp_model.FEASIBLE]:
                print('Status')
                print(f'Found {collector.solution_count} solutions in {timeout:.2f} seconds')
                solutions = collector.get_solutions()
                # best_function = np.inf
                # best_schedule = None
                # for solution in solutions:
                #     sorted_sol = [solution[i] for i in range(len(solution))]

                #     cost = self.cost(sorted_sol)
                #     if cost < best_function:
                #         best_function = cost
                #         best_schedule = sorted_sol
                return solutions
            
        status = solver.Solve(model)
        
        print(status)
        if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
            # Build schedule
            solution = {}
            for task_id, (_, start_var, end_var, _) in task_dict.items():
                start = solver.Value(start_var) * 15
                solution[task_id] = start
            
            # Set member schedules
            sol_member_schedule = defaultdict(list)
            for member in self.members:
                for task_id, interval, presence in member.assigned_tasks:
                    if solver.Value(presence):  # Only include if actually assigned
                        start = solver.Value(interval.StartExpr()) * 15
                        end = solver.Value(interval.EndExpr()) * 15
                        sol_member_schedule[member.id].append({
                            'task': task_id,
                            'start': self._minutes_to_time(start),
                            'end': self._minutes_to_time(end)
                        })
            self.optimized_cost = solver.ObjectiveValue()
            return [(solution, sol_member_schedule)]
        else:
            return None, None
        
    def _minutes_to_time(self, minutes):
        """Convert minutes since midnight to HH:MM format"""
        return f"{minutes//60:02d}:{minutes%60:02d}"


Extract Members

In [5]:
from openpyxl import load_workbook

workbook = load_workbook(filename="Member&Task_Info.xlsx", data_only=True)
sheet = workbook['Members']
sheet2 = workbook['Tasks']

member_names = []
roles = []
member_hr_rates = []
member_ot_rates = []

for cell in sheet[3][1:]:
    if cell.value:
        member_names.append(cell.value)

        col_index = cell.col_idx
        roles.append(sheet.cell(row=4, column=col_index).value)
        member_hr_rates.append(sheet.cell(row=5, column=col_index).value)
        member_ot_rates.append(sheet.cell(row=6, column=col_index).value)
        
member_roles = [key for string in roles for key, value in ROLE_NAME.items() if value == string]

def format_time(row_index):
    hours = row_index // 4
    minutes = (row_index % 4) * 15
    if hours == 24 and minutes == 0:
        return 23, 59
    else:
        return hours, minutes

def process_unavailability(file_path, sheet_name="Members_availability"):
    df = pd.read_excel(file_path, sheet_name=sheet_name, header=None)
    times = df.iloc[:, 0]
    results = {}
    for col_idx in range(1, df.shape[1]): 
        availability = df.iloc[:, col_idx].values 
        intervals = []
        start = None
        for i, available in enumerate(availability):
            if available == 0 and start is None:
                start = i 
            elif available == 1 and start is not None:
                intervals.append((format_time(start-1), format_time(i-1)))
                start = None
        if start is not None:
            intervals.append((format_time(start-1), format_time(len(availability)-1)))
        results[f"Person_{col_idx}"] = intervals
    return results

file_path = "Member&Task_Info.xlsx"
sheet_name = "Members_availability"
unavailability_results = process_unavailability(file_path, sheet_name)
member_intervals = []
for person, intervals in unavailability_results.items():
    member_intervals.append(intervals)

for i, name in enumerate(member_names):
    converted_intervals = [
        f"(time({start[0]}, {start[1]}), time({end[0]}, {end[1]}))" for start, end in member_intervals[i]]
    converted_intervals_str = f"[{', '.join(converted_intervals)}]"
    exec(f"{name} = Member({i}, '{name}', {converted_intervals_str}, member_hr_rates[{i}], member_ot_rates[{i}], member_roles[{i}], 0.5)")

members = [globals()[name] for name in member_names]

Extract Tasks

In [6]:
task_names = []
task_locations = []
task_durations = []
task_dependencies = []
# task_timeofday = []
task_actors = []
task_roles = []

for row in range(3, sheet2.max_row+1):
    cell_value = sheet2.cell(row=row, column=3).value
    if cell_value:
        task_names.append(cell_value)
        task_locations.append(sheet2.cell(row=row, column = 9).value)
        task_durations.append(sheet2.cell(row=row, column = 10).value)
        task_dependencies.append(sheet2.cell(row=row, column = 12).value)
        task_actors.append(sheet2.cell(row=row, column = 15).value)
        task_roles.append(sheet2.cell(row=row, column = 24).value)

for i, name in enumerate(task_names):
    task_location = task_locations[i]
    task_duration = task_durations[i]
    task_actor = task_actors[i]
    task_role = task_roles[i]
    task_dependency = task_dependencies[i]
    exec(f"{name} = Task({i}, {task_location}, {task_duration}, members={task_actor}, roles={task_role}, dependencies={task_dependency})")

tasks = [globals()[name] for name in task_names]

test.py

In [7]:
model = Optimizer(tasks, members)
res = model.schedule_tasks(timeout=60)
print('The optimized cost is: ', model.optimized_cost)
export_schedules(res, model, location=r'C:\Users\nicho\OneDrive - HKUST Connect\Study\FYP\output')

2
The optimized cost is:  30338.0
Solution 1 saved to C:\Users\nicho\OneDrive - HKUST Connect\Study\FYP\output
