In [1]:
from datetime import date, timedelta
from optapy import problem_fact, planning_id, planning_entity, planning_variable, constraint_provider, planning_solution, problem_fact_collection_property, value_range_provider, planning_entity_collection_property, planning_score, solver_manager_create
from optapy.types import Joiners, HardSoftScore, SolverConfig, Duration
 
@problem_fact
class WorkDay:
    def __init__(
            self,
            date):
        self.date = date

    @planning_id
    def get_ordinal(self):
        return self.date.toordinal()

    def __str__(self):
        return f"WorkDay(date={self.date.isoformat()})"
    
@problem_fact
class TeamMember:
    def __init__(
            self,
            name,
            profile,
            product):
        self.name = name
        self.profile = profile
        self.products = product
        self.daysoff = []

    def add_day_off(
        self,
        year,
        month, 
        day):
        self.daysoff.append(date(year, month, day))

    @planning_id
    def get_name(self):
        return self.name
    
    @planning_id
    def get_profile(self):
        return self.profile,
    
    @planning_id
    def get_products(self):
        return self.product
         
    def __str__(self):
        return f"TeamMember(name={self.name}, profile={self.profile}, product={self.product}, daysoff={len(self.daysoff)})"


@planning_entity
class PlanningItem:
    def __init__(
            self,
            task):
        self.task = task

    def get_name(self):
        return self.task.name
    
    def get_product(self):
        return self.task.product
    
    def get_profile(self):
        return self.task.profile

    @planning_variable(WorkDay, ["workDays"])
    def get_workday(self):
        return self.workday

    def set_workday(self, new_workday):
        self.workday = new_workday

    @planning_variable(TeamMember, ["TeamMembers"])
    def get_team_member(self):
        return self.team_member

    def set_team_member(self, new_team_member):
        self.team_member = new_team_member


class Task:
    def __init__(
            self,
            name,
            product,
            profile,
            workload):
        self.name = name
        self.product = product
        self.profile = profile
        self.workload = workload

    def append_planning_items(self, list):
        for d in range(self.workload):
            list.append(PlanningItem(self))

@constraint_provider
def planning_constraints(constraint_factory):
    return [
        # Hard constraints
        team_member_conflict(constraint_factory)
        # Soft constraints
    ]

def team_member_conflict(constraint_factory):
    # A team mmeber can work one item at a time
    return constraint_factory \
        .for_each(PlanningItem) \
        .join(PlanningItem,
              Joiners.equal(lambda item: item.workday),
              Joiners.equal(lambda item: item.team_member),
              Joiners.less_than(lambda item: item.name)
              ) \
        .penalize("Team member conflict", HardSoftScore.ONE_HARD)


@planning_solution
class TeamPlanning:
    def __init__(self, work_days, team_members, planning_items, score=None):
        self.work_days = work_days
        self.team_members = team_members
        self.planning_items = planning_items
        self.score = score

    @problem_fact_collection_property(WorkDay)
    @value_range_provider("workDays")
    def get_workday_list(self):
        return self.work_days

    @problem_fact_collection_property(TeamMember)
    @value_range_provider("TeamMembers")
    def get_team_members(self):
        return self.team_members

    @planning_entity_collection_property(PlanningItem)
    def get_planning_items(self):
        return self.planning_items

    @planning_score(HardSoftScore)
    def get_score(self):
        return self.score

    def set_score(self, score):
        self.score = score


def generate_work_days(
        start_year,
        start_month,
        start_day,
        end_year,
        end_month,
        end_day):
    
    current_date = date(start_year, start_month, start_day)
    end_date = date(end_year, end_month, end_day)

    while current_date.toordinal() < end_date.toordinal():
        if current_date.isoweekday() < 6: # Excluding week-end days
            yield WorkDay(current_date)
        current_date = current_date + timedelta(days=1)
 

def generate_problem():
    work_days = list(generate_work_days(2023, 5, 1, 2023, 6 ,30))

    team_members = [
        TeamMember("AA", "FS", "nitro"),
        TeamMember("AB", "FS", "helios"),
        TeamMember("AC", "FS", "becredit"),
        TeamMember("AA", "FS", "nitro"),
        TeamMember("BA", "FS", "helios"),
        TeamMember("BB", "FS", "becredit"),        
        TeamMember("BC", "DE", "helios"),
        TeamMember("AE", "DE", "nitro"),
        TeamMember("AF", "DE", "becredit"),
        TeamMember("AG", "DS", "becredit"),
        TeamMember("AH", "DS", "becredit"),
        TeamMember("AI", "DS", "becredit"),
        TeamMember("AJ", "FS", "helios")
    ]

    planning_items = []
    Task("DevEnv", "nitro", "FS", 5).append_planning_items(planning_items)
    Task("CI & Infra", "nitro", "FS", 5).append_planning_items(planning_items)
    Task("Model Automation", "nitro", "DS", 20).append_planning_items(planning_items)
   
    item = planning_items[0]
    item.set_work_day(work_days[0])
    item.set_team_member(team_members[0])

    TeamPlanning(work_days, team_members, planning_items)

    solver_config = SolverConfig().withEntityClasses(PlanningItem) \
        .withSolutionClass(TeamPlanning) \
        .withConstraintProviderClass(planning_constraints) \
        .withTerminationSpentLimit(Duration.ofSeconds(30))
    
    solver_manager = solver_manager_create(solver_config)

    planning_items