In [None]:
import pandas as pd
import numpy as np
import json
from dateutil import rrule, parser
from dataclasses import dataclass, field
from datetime import date, datetime, timedelta
from intervaltree import Interval, IntervalTree

# Definition of the different classes
@dataclass
class Operator:
    name: str
    skills: list
    holidays: IntervalTree
    availability: IntervalTree = field(default_factory=IntervalTree)
    shift: list = field(default_factory=list)   # Is it used ?

@dataclass
class Step:
    name: str
    previous_steps: list
    duration: timedelta
    required: timedelta
    capacity: int
    log: pd.DataFrame

    def __repr__(self):
        return f"Step {self.name}, duration {self.duration}, capacity {self.capacity}, log {self.log}"
    
# Load data from the JSON configuration file
with open("SimulatorInputs.json", "r") as file:
    data = json.load(file)

# Extract the simulation parameters
json_simulation_start: str = data['SimulationParameters']['simulation_start']
simulation_start: datetime = datetime.fromisoformat(json_simulation_start)

json_simeulation_end: str = data['SimulationParameters']['simulation_end']
simulation_end: datetime = datetime.fromisoformat(json_simeulation_end)

modules_planned_deliveries: str = data["ComponentArrivalTimes"]["bare modules"]

total_modules_number: int = 0
for delivery in modules_planned_deliveries:
    total_modules_number += modules_planned_deliveries[delivery]

# Initialize instances of the operator class
# All the operators are stored in the operators list
operators = []

# Initialize the CERN holidays to the good format
CERN_holidays_series = pd.Series(data["CERNHolidays"])
CERN_holidays_series[:] = CERN_holidays_series.apply(lambda x: [datetime.fromisoformat(date) for date in x])

for id, operator in enumerate(data["Operators"]):
    # Loads the operator's individual holidays
    individual_holidays = pd.Series(data["OperatorHolidays"][f"Operator{id + 1}"])
    individual_holidays[:] = individual_holidays.apply(lambda x: [datetime.fromisoformat(date) for date in x])

    # Merges the individual holidays with the CERN holidays
    operator_holidays = pd.concat([individual_holidays, CERN_holidays_series])
    operator_holidays[:] = operator_holidays.apply(lambda x: Interval(x[0], x[1], "holiday"))

    # Converts the pandas series to a list and then to an interval tree
    operator_holidays_list = operator_holidays.tolist()
    operator_holidays_tree = IntervalTree(operator_holidays_list)

    globals()[f"Operator{id + 1}"] = Operator(name=f"Operator{id + 1}", skills=data["Operators"][operator], holidays=operator_holidays_tree)
    operators.append(globals()[f"Operator{id + 1}"])


# Initialization of the Step class instances, they are stored in the "Chronologically_Ordered_Steps" dict, the member log is a dataframe that will contain 
# the entry and exit dates of the modules at each step

Chronologically_Ordered_Steps = {}
for step in data["StagesAndSteps"]:
    step_name = step["Step"].replace("-", "_").replace(" ", "_")    # Just converting the name to python valid name without blank spaces and

    if step["Previous"][0] == "None":
        globals()[step_name] = Step(name=step["Step"], previous_steps=[None], duration=timedelta(minutes = step["Duration"]),required=timedelta(minutes = step["Required"]), capacity= step["Capacity"],log=pd.DataFrame(columns=["Entry_Date", "Exit_Date"]))

    else:
        previous_steps = []
        for prev_step in step["Previous"]:
            previous_steps.append(Chronologically_Ordered_Steps[prev_step])
            globals()[step_name] = Step(name=step["Step"], previous_steps=previous_steps, duration=timedelta(minutes = step["Duration"]), required=timedelta(minutes = step["Required"]), capacity= step["Capacity"],log=pd.DataFrame(columns=["Entry_Date", "Exit_Date"]))
    Chronologically_Ordered_Steps |= {step["Step"]: globals()[step_name]}

time: datetime = simulation_start
finished_modules_count: int = sum(S___PDB_Shipment_of_modules_to_loading_sites.log["Exit_Date"].notna())

############################################################################################
# Extract the holidays, and daily shift from the data
dict_int_to_day = {"0": "Monday", "1": "Tuesday", "2": "Wednesday", "3": "Thursday", "4": "Friday"}

def generate_operators_availability(date) -> None:
    
    whole_day = Interval(date, date + timedelta(days=1))    #TODO: big issue with whole_day, need to bound it to the working hours of the laboratory
    for operator in operators:

        if operator.holidays.overlaps(whole_day) or date.weekday() in {5,6}:  # Check if the day is either in the holidays or weekend
            continue  

        else:
        
            today_shift = data["OperatorWorkHours"][operator.name][dict_int_to_day[str(date.weekday())]]
            list_today_shift = list(today_shift.keys())

            # TODO: check for even number of time constraints
            beginning_slot = list_today_shift[::2]  # Extract the beginning and ending time of the different operator's daily slots
            ending_slot = list_today_shift[1::2]    # Implicitly we assume that there is an even number of time constraints (namely the beginning and the ending of the time slot)

            for begin, end in zip(beginning_slot, ending_slot):
                start_time = str(date)[:10] + " " + today_shift[begin]
                start_time = datetime.fromisoformat(start_time)
                end_time = str(date)[:10] + " " + today_shift[end]
                end_time = datetime.fromisoformat(end_time)
                operator.availability.add(Interval(start_time, end_time, "idle"))
############################################################################################

############################################################################################
# Load the state of production csv data into each step
state_of_production = pd.read_csv("inventory.csv", sep=";", dtype={"Quantity":'Int64'}, index_col=0, parse_dates=["Launching Time"])
state_of_production.fillna({"Quantity": 0}, inplace=True)

# This piece of code fills by default the missing values of the Ready components launching time
# to simulation_start_time - duration of the task to simulate the fact that they are just ready to 
# be moved to the next step when the simulation is launched. 
for index, row in state_of_production.iterrows():   # I prefer to work with literal values of Index and Columns rather than the integer values for more reliability and legibility
    if row["Quantity"] > 0:     # We only care about the Steps where there are modules
        for task in iter(Chronologically_Ordered_Steps):
            if task in index:   # Looks for the task associated with the row TODO : suppress this loop by implementing yet another lookup table
                if pd.isna(state_of_production.loc[index, "Launching Time"]):
                    Time_ready_by_simulation_start = simulation_start - Chronologically_Ordered_Steps[task].duration  # In the last part we extract the duration associated with the task
                    state_of_production.loc[index, "Launching Time"] = Time_ready_by_simulation_start

# Now we fill all those initial data into the log attribute of each step
for Step in Chronologically_Ordered_Steps.values():
    row_Ready = state_of_production.loc[Step.name + " Ready"]
    row_WIP = state_of_production.loc[Step.name + " WIP"]
    df_Ready = pd.DataFrame([[row_Ready.loc["Launching Time"],pd.NaT] for _ in range(row_Ready.loc["Quantity"])],columns=["Entry_Date", "Exit_Date"])
    df_WIP = pd.DataFrame([[row_WIP.loc["Launching Time"],pd.NaT] for _ in range(row_WIP.loc["Quantity"])],columns=["Entry_Date", "Exit_Date"])
    Step.log = pd.concat([Step.log, df_Ready, df_WIP], ignore_index=True)   # TODO : maybe numpy arrays are better suited in this context, since it's a single type of datatype, in addition to that this line raises a warning

def tasks_by_priority(time: timedelta) -> list: # Returns the list of tasks that can be done at the current time

    steps_to_do = []

    for step in list(reversed(Chronologically_Ordered_Steps.values())): 
        # First step, check if the step is ready to process new modules
        condition = sum(pd.isna(step.log["Exit_Date"])) >= step.capacity # We check if the number of steps not exited yet is greater than the capacity
        if condition:  # If the step is not ready to process new modules
            continue    # we break the loop and go to the next step

        if step.previous_steps[0] is None:  # If previous_steps is None. That means that this the first step                        
            continue    # By default the first step is always ready, in fact that means that there are not enough components left

        else:
            # Second step, we compute the reception capacity of the step
            #reception_capacity = step.capacity - sum(pd.isna(step.log["Exit_Date"]))  # We compute the number of modules ready to be processed in the next step
            
            # Now we check if among the previous steps there are enough modules ready to be processed in the next step
            ready_to_be_processed: bool = True
            for previous_step in Step.previous_steps:   

                previous_time_array: np.ndarray = np.full_like(previous_step.log.loc[:,"Entry_Date"], time) 
                time_spent_in_previous_task: pd.DataFrame = previous_time_array - previous_step.log.loc[:,"Entry_Date"]   # Displays the time spent in the previous task for each module
                modules_ready: int = sum((time_spent_in_previous_task >= previous_step.duration) & (pd.isna(previous_step.log["Exit_Date"])))      # If there are not enough modules ready to be processed in the previous steps
                if modules_ready <= 0:                                                       
                    ready_to_be_processed = False
            
            if ready_to_be_processed:
                steps_to_do.append(step)


    return steps_to_do
############################################################################################

############################################################################################
# Once a task is assigned to operators, a module has to be withdrawn 
# from the previous step and added to the next step
def update_log(task, time):

    # First step, we remove the module from the previous step
    if task.previous_steps[0] is None:  # There is no need to update the log for the first step
        pass
    else:
        for previous_step in task.previous_steps:
            condition = (previous_step.log["Entry_Date"] + previous_step.duration <= time) & (pd.isna(previous_step.log["Exit_Date"]))    # Filter all the modules ready that have not been moved yet
                # Check if any rows match the condition
            if condition.any():
                # Get the first index that matches the condition
                first_matching_index = previous_step.log[condition].index[0]    # Take the first ready module and fill its exit date
                
                # Update the 'Exit_Date' for the first matching row
                previous_step.log.at[first_matching_index, "Exit_Date"] = time
            else:
                print("There is a big issue !!")

    # Second step, we add the module to the next step
    df = pd.DataFrame([[time, pd.NaT]], columns=["Entry_Date", "Exit_Date"])
    task.log = pd.concat([task.log, df], ignore_index=True)
############################################################################################


# Initialization of the list of the differents tasks and steps to be done
tasks_to_do: list = [task['Step'] for task in data["StagesAndSteps"]]

# Initialization of the dataframe that will contain the assignments of the operators
operators_assignments: pd.DataFrame = pd.DataFrame(columns=tasks_to_do, dtype=object)


while (time < simulation_end or finished_modules_count < total_modules_number):
    generate_operators_availability(time)    # TODO: Duplication of computation, we could do it once for the day
    to_do: list = tasks_by_priority(time)
    was_a_task_assigned: bool = False
    for task in to_do:
        # if two operators are available, and qualified for the task we assign them to the task
        task_duration: Interval = Interval(time, time + task.required, task.name)
        operators_available: list = [operator for operator in operators if operator.availability.overlaps(task_duration) and task.name in operator.skills]

        # For now we chose at random two of the available operators to perform the task, the law according to which we chose the operators my be reweighted according to their skills
        # Wire Bonding journée entière et 2 fois par semaine. 
        if len(operators_available) >= 2:
            chosen_operators: list = np.random.choice(operators_available, 2, replace=False)
            first_operator, second_operator = chosen_operators
            first_operator.availability.chop(time, time + task.required)
            second_operator.availability.chop(time, time + task.required)
            assigned_operators = (first_operator.name, second_operator.name)
            operators_assignments.loc[time, task.name] = assigned_operators
            print(f"A task was assigned at {time} to {assigned_operators} for the task {task.name}")
            update_log(task, time)
            print(f"task log : \n{task.log}\n\n")
            for previous_steps in task.previous_steps:
                print(f"previous step log : \n{previous_steps.log}\n\n")
            time += task.duration
            was_a_task_assigned = True
            break
    
    if was_a_task_assigned == False:
        time += timedelta(minutes=60)
    




In [2]:
time = datetime(2024,11,1,16)

In [3]:
WB___Wire_bonding.log

Unnamed: 0,Entry_Date,Exit_Date


In [4]:
WB___PDB_Upload_of_wire_bonding_pull_test_results.previous_steps[0].log

Unnamed: 0,Entry_Date,Exit_Date


In [5]:
time

datetime.datetime(2024, 11, 1, 16, 0)

In [6]:
previous_time_array = np.full_like(WB___PDB_Upload_of_wire_bonding_pull_test_results.previous_steps[0].log.loc[:,"Entry_Date"], time) 
time_spent_in_previous_task = previous_time_array - WB___PDB_Upload_of_wire_bonding_pull_test_results.previous_steps[0].log.loc[:,"Entry_Date"]
time_spent_in_previous_task

Series([], Name: Entry_Date, dtype: object)

In [7]:
condition = (time_spent_in_previous_task >= WB___PDB_Upload_of_wire_bonding_pull_test_results.previous_steps[0].duration) & (pd.isna(WB___PDB_Upload_of_wire_bonding_pull_test_results.previous_steps[0].log["Exit_Date"]))
condition

Series([], dtype: bool)

In [8]:
steps_to_do = []

for Step in list(reversed(Chronologically_Ordered_Steps.values())): # This is not the good way to do it, we need to start from the last step and branch out to the first steps

    if Step.previous_steps[0] is None:  # If previous_steps is None. That means that this the first step 
                                
        steps_to_do.append(Step)    # By default the first step is always ready, in fact that means that there are not enough components left

    else:
        for previous_step in Step.previous_steps:   
            previous_time_array = np.full_like(previous_step.log.loc[:,"Entry_Date"], time) 
            time_spent_in_previous_task = previous_time_array - previous_step.log.loc[:,"Entry_Date"]   # Displays the time spent in the previous task for each module
            if sum(time_spent_in_previous_task >= previous_step.duration) < Step.capacity:              # If there are not enough modules ready to be processed in the next step                                                        
                continue                                                                                # we break the loop and go to the next step
            else : 
                steps_to_do.append(Step)

In [10]:
tasks_by_priority(time)

AttributeError: 'NoneType' object has no attribute 'log'

In [None]:
WB___PDB_Upload_of_wire_bonding_pull_test_results.log