In [None]:
# consider a full treatment day to be 11 hours with 1 hour lunch break (600 minutes)

import random
import simpy
import statistics
from clinicdata import linac_data

linac_names = linac_data.keys()

verbose = False

class Patient:
    next_id = 1
    def __init__(self, env, scheduled_linac_name):
        self.patient_id = Patient.next_id
        Patient.next_id += 1
        self.scheduled_linac_name = scheduled_linac_name
        self.env = env
        self.num_slots = random.choices(list(linac_data[scheduled_linac_name]["ApptSlotDistribution"].keys()), linac_data[scheduled_linac_name]["ApptSlotDistribution"].values())[0]
        if verbose:
            print(f"{self.env.now}: Patient {self.patient_id} arrived for {self.num_slots} appointment slot(s) on linac {scheduled_linac_name}.")
        
class Clinic:
    def __init__(self, env, engineer, linac_names):
        self.env = env
        self.engineer = engineer
        self.linac_names = linac_names
        self.target_num_linacs = len(linac_names) # how many linacs we need
        self.current_num_linacs = len(linac_names) # how many linacs we have
        self.linacs = simpy.FilterStore(self.env) # pool of linacs in the clinic
        self.linacs.items = [Linac(self.env, self.engineer, name) for name in self.linac_names]

    def run(self, sim_time):
        for linac in self.linacs.items:
            # begin the treatment day with one patient per linac
            initial_patient = self.patient_arrived(linac)
            self.env.process(self.begin_treatment(initial_patient))
            # then new patients start arriving sporadically for each linac
            self.env.process(self.patients_arriving(linac))
            # machine issues start occurring at random intervals
            self.env.process(linac.await_malfunction(self.env))
        # engineer starts low-priority routine tasks
        self.env.process(routine_work(self.env, self.engineer))
        # start resolving processes
        if verbose:
            print(f"{self.env.now}: Commencing simulation.")
        self.env.run(until=sim_time)

    def patient_arrived(self, linac):
        new_patient = Patient(self.env, linac.name)
        linac.patients_arrived += 1
        return new_patient
    
    def report_treatment_progress(self):
        return {linac.name: {"PatientsScheduled": linac.scheduled_patients,
                             "PatientsCancelled": linac.cancelled_patients,
                             "PatientsArrived": linac.patients_arrived,
                             "PatientsTreated": linac.patients_treated,
                             "TotalDowntime": linac.total_downtime}
                for linac in self.linacs.items}
    
    def add_linac(self):
        # bring a linac online
        self.target_num_linacs += 1
        if self.current_num_linacs < self.target_num_linacs:
            new_linac = Linac(self.env, self.engineer, self.malfunction_rate)
            self.linacs.put(linac)
            self.current_num_linacs += 1

    def remove_linac(self):
        # take a linac offline
        self.target_num_linacs -= 1
        if self.current_num_linacs > self.target_num_linacs:
            if len(self.linacs.items) > 0:
                linac = yield self.linacs.get()
                self.current_num_linacs -= 1
            else:
                pass # wait for a linac to come online

    def get(self, linac_name):
        # request a linac by name (i.e., the name of the linac a patient is scheduled for)
        linac_request = self.linacs.get(lambda linac: linac.name == linac_name)
        return linac_request

    def put(self, linac):
        if self.current_num_linacs <= self.target_num_linacs:
            self.linacs.put(linac)
        else:
            # have more linacs than can fit in the clinic
            self.current_num_linacs -= 1

    def patients_arriving(self, linac):
        while linac.patients_treated < linac.scheduled_patients - linac.cancelled_patients:  # switch to while True: if this breaks
            # is the next patient a no-show?
            if not linac.sample_cancellation():
                # sample a time interval until next patient arrives for the given linac
                new_patient_arrival_time = linac.sample_arrival_time()
                yield self.env.timeout(new_patient_arrival_time)
                new_patient = self.patient_arrived(linac)
                self.env.process(self.begin_treatment(new_patient))
        if verbose:
            print(f"{self.env.now}: {linac.name} has completed treatments for the day.")

    def begin_treatment(self, patient):
        # request the appropriate linac for this patient
        linac = yield self.get(patient.scheduled_linac_name)
        # commence the treatment
        linac.process = self.env.process(linac.treat_patient(patient))
        if verbose:
            print(f"{self.env.now}: Beginning treatment for Patient {patient.patient_id} on linac {linac.name}.")
        # free up the linac for the next patient scheduled to be treated on it
        self.linacs.put(linac)

        
class Linac:
    def __init__(self, env, engineer, name):
        self.env = env
        self.engineer = engineer
        self.name = name
        self.machine_type = self.name[1:]
        # whiteboard (below) seems to be under-reporting the number of patients per day
        # self.scheduled_patients = random.choices(list(linac_data[self.name]["PatientDistribution"].keys()), linac_data[self.name]["PatientDistribution"].values())[0]
        self.scheduled_patients = int(random.normalvariate(linac_data[self.name]["MeanNumPatientsMosaiq"], linac_data[self.name]["StdNumPatientsMosaiq"]))
        self.cancellation_rate = linac_data[self.name]["MeanCancellationRate"]
        self.cancelled_patients = 0

        self.broken = False
        # parameter for sampling occurrences of interruptions
        self.malfunction_rate = linac_data[self.name]["MalfunctionRate"] # minutes^-1
        # parameter for sampling delays due to interruptions
        self.mean_downtime = linac_data[self.name]["MeanDowntime"] # minutes
        # downtime performance
        self.total_downtime = 0 # minutes
        
        # parameter for gating patient arrival
        #self.cancellation_rate = linac_data[self.name]["CancellationRate"] # dimensionless
        # parameters for sampling time between arrivals of this linac's patients to the clinic
        self.mean_appt_interval = linac_data[self.name]["MeanApptInterval"] # minutes
        self.std_appt_interval = linac_data[self.name]["StdApptInterval"] # minutes
        self.min_appt_interval = linac_data[self.name]["MinApptInterval"] # minutes
        # number of patients who showed up
        self.patients_arrived = 0
        
        # parameters for sampling the time it takes for a treatment to be completed on this linac
        self.mean_treat_time = linac_data[self.name]["MeanTreatTime"] # minutes
        self.std_treat_time = linac_data[self.name]["StdTreatTime"] # minutes
        self.min_treat_time = linac_data[self.name]["MinTreatTime"] # minutes
        # number of patients who completed treatment
        self.patients_treated = 0

        if verbose:
            print(f"{self.env.now}: Initialized new linac: {self.name}. Number of scheduled patients: {self.scheduled_patients}")

    # determine whether the next patient cancels
    def sample_cancellation(self):
        cancelled = random.random() < self.cancellation_rate # True if patient cancels, False if patient arrives
        self.cancelled_patients += int(cancelled)
        return cancelled
        
    # sample patient arrival interval from normal distribution with nonzero minimum cutoff
    def sample_arrival_time(self):
        while (new_patient_arrival_time := random.normalvariate(self.mean_appt_interval, self.std_appt_interval)) <= self.min_appt_interval:
                new_patient_arrival_time = random.normalvariate(self.mean_appt_interval, self.std_appt_interval)
        return new_patient_arrival_time

    # sample treatment duration from normal distribution with nonzero minimum cutoff
    def sample_treatment_time(self):
        while (treatment_time := random.normalvariate(self.mean_treat_time, self.std_treat_time)) <= self.min_treat_time:
            treatment_time = random.normalvariate(self.mean_treat_time, self.std_treat_time)
        return treatment_time

    # sample time to this linac's next downtime
    def sample_malfunction_interval(self):
        return random.expovariate(self.malfunction_rate)
    
    # sample duration of downtime
    def sample_downtime(self):
        # could be the whole day
        #if (whole_day := random.randint(0, 1)):
        #    return 660
        #else:
        return random.expovariate(1/self.mean_downtime)
    
    # regular clinical function
    def treat_patient(self, patient):
        completed_slots = 0
        for slot in range(patient.num_slots):
            treatment_time = self.sample_treatment_time()
            if verbose:
                print(f"{self.env.now}: Treatment during appointment slot {slot + 1} for Patient {patient.patient_id} will take {treatment_time} minutes.")
            while treatment_time:
                start = self.env.now
                try:
                    # treatment in progress
                    yield self.env.timeout(treatment_time)
                    # treatment complete for this slot!
                    treatment_time = 0 # done, exit the loop
                    completed_slots += 1
                except simpy.Interrupt:
                    # treatment interrupted by machine issue
                    self.broken = True
                    # subtract already elapsed time from treatment
                    treatment_time -= self.env.now - start
                    # request engineer with urgent priority
                    with self.engineer.request(priority=1) as engineer_request:
                        yield engineer_request
                        downtime = self.sample_downtime()
                        if verbose:
                            print(f"{self.env.now}: Linac {self.name} will take {downtime} minutes to repair.")
                        self.total_downtime += downtime
                        yield self.env.timeout(downtime)
                    # back online
                    self.broken = False
                    if verbose:
                        print(f"{self.env.now}: Linac {self.name} is back online.")
        # treatment complete
        if completed_slots == patient.num_slots:
            self.patients_treated += 1
        if verbose:
            print(f"{self.env.now}: Treatment completed for Patient {patient.patient_id}.")
        
    # linac malfunction
    def await_malfunction(self, env):
        # sample the time we wait until the next linac malfunction
        while self.patients_treated < self.scheduled_patients - self.cancelled_patients: # switch to while True: if this breaks
            yield self.env.timeout(self.sample_malfunction_interval())
            if not self.broken:
                # assume the engineer can't break the machine even further
                if verbose:
                    print(f"{self.env.now}: Linac {self.name} is experiencing downtime.")
                if self.process.is_alive:
                    self.process.interrupt()
                    

def routine_work(env, engineer):
    # engineer's duties which are less important than linac repair
    while True:
        # start a task every 30 minutes
        while (task_time := 30):
            with engineer.request(priority=2) as engineer_request:
                yield engineer_request
                start = env.now
                try:
                    yield env.timeout(task_time)
                    task_time = 0
                except simpy.Interrupt:
                    task_time -= env.now - start


simulation_results = {name: {"PatientsScheduled": [], "PatientsCancelled": [], "PatientsArrived": [], "PatientsTreated": [], "TotalDowntime": []} for name in linac_names}
clinic_hours = 10
for _ in range((N_histories := 5000)):
    env = simpy.Environment()
    engineer = simpy.PreemptiveResource(env, capacity=1)
    clinic = Clinic(env, engineer, linac_names)
    clinic.run(sim_time=60*clinic_hours)
    results = clinic.report_treatment_progress()
    for linac_name, linac_results in results.items():
        for k, v in linac_results.items():
            simulation_results[linac_name][k].append(v)

for linac, results in simulation_results.items():
    print(f"{linac} results:")
    for category, data in results.items():
        print(f"{category}: {sum(data)/len(data)} ± {statistics.stdev(data)}")
    print("\n")

# Data Requirements

Each linear accelerator in the clinic should be represented as a data dictionary with the following fields:

In [None]:
linac_data = {
    "LinacName": {
        # discrete probability distribution of daily patient volume (according to Whiteboard)
        "PatientDistribution": {
            1: ...,
            2: ...,
            3: ...
        },
        # statistics regarding the daily patient volume (according to Mosaiq)
        "MeanNumPatientsMosaiq": ...,
        "StdNumPatientsMosaiq": ...,
        # fraction of appointments that have X status instead of C in Mosaiq
        "MeanCancellationRate": ...,
        "StdCancellationRate": ...,
        # discrete probability distribution of isocenters per patient
        "IsocenterDistribution": {
            1: ...,
            2: ...,
            3: ...
        },
        # discrete probability distribution of number of appointment slots per patient
        "ApptSlotDistribution": {
            1: ...,
            2: ...,
        }
        # discrete probability distribution of time allotted per appointment slot
        "ApptIntervalDistribution": {
            5: ...,
            10: ...,
            15: ...,
            20: ...,
            30: ...,
            45: ...,
            60: ...
        },
        # statistics regarding the actual duration of treatments, assuming a normal distribution
        "MeanTreatTime": ...,
        "StdTreatTime": ...,
        "MinTreatTime": ...,
        # parameters for sampling downtime due to machine issues
        "MalfunctionRate": ...,
        "MeanDowntime": ...
    }
}