In [148]:
import random
from datetime import datetime, timedelta
import pandas as pd # hoping to not need
import numpy as np

class BedModel:
    """
    Generalised Inpatient Bed Model
    
    Example:
    
    warmup_n = 574

    source_prob = {"Emergency Department": 0.8, "Non-ED Admissions": 0.14, "Elective Waiting List": 0.06}

    category_prob = {"Emergency Department": {'Surgical Elective':0, 'Surgical Emergency':0.2, 'Medical Emergency':0.8},
                 "Non-ED Admission": {'Surgical Elective':0, 'Surgical Emergency':0.4, 'Medical Emergency':0.6}
                 }

    los_distributions = {
        'Emergency Department': {'Surgical Elective': (1, 0.5), 'Surgical Emergency': (2, 0.7), 'Medical Emergency': (3, 1)},
        'Non-ED Admission': {'Surgical Elective': (1.5, 0.6), 'Surgical Emergency': (2.5, 0.8), 'Medical Emergency': (3.5, 1.2)},
        'Elective': {'Elective': (2, 0.7)}
    }

    hospital = BedModel(n_surgical_elective_beds=40,
                        n_surgical_emergency_beds=150,
                        n_medical_emergency_beds=450,
                        n_escalation_beds=20, 
                        source_probability=source_prob, 
                        category_probability=category_prob, 
                        los_distributions=los_distributions)
    
    hospital.warm_up_model(warmup_number=warmup_n)
    
    """
    def __init__(self, n_surgical_elective_beds, n_surgical_emergency_beds, n_medical_emergency_beds, n_escalation_beds, source_probability, category_probability, los_distributions):
        """
        :param num_surgical_elective_beds:
        :param num_surgical_emergency_beds:
        :param num_medical_emergency_beds:
        :param num_escalation_beds:
        """
        # Total Beds
        self.n_surgical_elective_beds = n_surgical_elective_beds # ? can these four use a dictionary to reduce n parameters
        self.n_surgical_emergency_beds = n_surgical_emergency_beds
        self.n_medical_emergency_beds = n_medical_emergency_beds
        self.n_escalation_beds = n_escalation_beds
        
        # Global Variables
        self.los_distributions = los_distributions
        self.category_probability = category_probability
        self.source_probability = source_probability
        
        # Beds occupied
        self.occupied_surgical_elective_beds = []
        self.occupied_surgical_emergency_beds = []
        self.occupied_medical_emergency_beds = []
        self.occupied_escalation_beds = []
        
        # All Patients
        self.patient_master = [] # A master holding place for newly generated patients until they are assigned to the correct holder or bed
        
        #Queues / cancellations
        self.ed_queue = [] # If an emergency patient from ed cannot be admitted they wait here (trolley wait)
        self.non_ed_queue = [] # If an emergency patient from a non ed source cannot be admitted they wait here
        self.elective_cancellations = [] # If a patient from the elective waiting list cannot be admitted they are cancelled

        # Metrics
        self.record_available_beds = {'surgical elective': [], 'surgical emergency': [], 'medical emergency': [], 'escalation': []}
        self.record_n_occupied_beds = {'surgical elective': [], 'surgical emergency': [], 'medical emergency': [], 'escalation': []}
        self.record_n_outliers = {'surgical elective': [], 'surgical emergency': [], 'medical emergency': []}
        self.record_n_escalation = []
        self.record_n_admissions_by_hour = []
        self.record_n_discharges_by_hour = []
        self.record_mean_length_of_stay = []
        self.record_mean_ed_queue = []
        self.record_mean_non_ed_queue = []
        self.record_n_trolley_waits = []
        self.record_n_cancellations = []


    # Tools
    
    # These functions are used as tools throughout the model

    def unique_id_generator(self):
        """
        Prevents Patient IDS being duplicated in self.patient when generating a new patient
        
        !! Change to yield id  
        ?? double check if this is checking the UID in the correct position
        :return: 
        """
        
        all_patients = self.patient_master + self.occupied_escalation_beds + self.occupied_surgical_elective_beds + self.n_surgical_emergency_beds + self.occupied_surgical_emergency_beds # eliminate the need for a master patient index to always exist
        
        while True:
            new_id = random.randint(100000, 999999) # IDs based on a 6-digit number 
            if new_id not in [patient[0] for patient in all_patients]:
                return new_id


    def patient_generator(self, n):
        """
        Create stochastic patients based on probability and distributions
        :param n: Number of patients to generate
        :param source_prob: Probability of being from 'Emergency Department', 'Non-ED Admission', 'Elective'
        :param category_prob: Probability of, factoring in the source 'Surgical Elective', 'Surgical Emergency', 'Medical Emergency'
        :param los_distributions: Factoring in source and category the mean and sigma of LOS
        :return: patients to patient_master
        """
    
        for i in range(0, n):
    
            patient_id, source, category, los = None, None, None, None
    
            patient_id = self.unique_id_generator()
    
            source = np.random.choice(['Emergency Department', 'Non-ED Admission', 'Elective'],
                                      1,
                                      p=[self.source_probability.get('Emergency Department'),
                                         self.source_probability.get('Non-ED Admissions'),
                                         self.source_probability.get('Elective Waiting List')]).item(0)
    
            # Get the corresponding probability that the Patient will be Surgical Elective, Surgical Emergency or Medical Emergency 
    
            if source == 'Emergency Department':
                category = np.random.choice(['Surgical Elective', 'Surgical Emergency', 'Medical Emergency'],
                                            1,
                                            p=[self.category_probability.get(source)['Surgical Elective'],
                                               self.category_probability.get(source)['Surgical Emergency'],
                                               self.category_probability.get(source)['Medical Emergency']]).item(0)
    
                # Pick from a los distribution using the mean and sigma !! Assuming that LOS is log normal just while building, need to check this
    
                if category == 'Surgical Elective':
                    los = int(np.random.lognormal(self.los_distributions.get(source)['Surgical Elective'][0],
                                                  self.los_distributions.get(source)['Surgical Elective'][1],
                                                  1).item(0))
    
                elif category == 'Surgical Emergency':
                    los = int(np.random.lognormal(self.los_distributions.get(source)['Surgical Emergency'][0],
                                                  self.los_distributions.get(source)['Surgical Emergency'][1],
                                                  1).item(0))
    
                elif category == 'Medical Emergency':
                    los = int(np.random.lognormal(self.los_distributions.get(source)['Medical Emergency'][0],
                                                  self.los_distributions.get(source)['Medical Emergency'][1],
                                                  1).item(0))
    
            elif source == 'Non-ED Admission':
                category = np.random.choice(['Surgical Elective', 'Surgical Emergency', 'Medical Emergency'],
                                            1,
                                            p=[self.category_probability.get(source)['Surgical Elective'],
                                               self.category_probability.get(source)['Surgical Emergency'],
                                               self.category_probability.get(source)['Medical Emergency']]).item(0)
    
                if category == 'Surgical Elective':
                    los = int(np.random.lognormal(self.los_distributions.get(source)['Surgical Elective'][0],
                                                  self.los_distributions.get(source)['Surgical Elective'][1],
                                                  1).item(0))
    
                elif category == 'Surgical Emergency':
                    los = int(np.random.lognormal(self.los_distributions.get(source)['Surgical Emergency'][0],
                                                  self.los_distributions.get(source)['Surgical Emergency'][1],
                                                  1).item(0))
    
                elif category == 'Medical Emergency':
                    los = int(np.random.lognormal(self.los_distributions.get(source)['Medical Emergency'][0],
                                                  self.los_distributions.get(source)['Medical Emergency'][1],
                                                  1).item(0))
    
            elif source == 'Elective':
                # Waiting list patients will always be admitted as a Surgical Elective
                category = 'Elective'
                los = int(np.random.lognormal(self.los_distributions.get(source)['Elective'][0],
                                              self.los_distributions.get(source)['Elective'][1],
                                              1).item(0))
    
            patient_data = [patient_id, source, category, los]
    
            self.patient_master.append(patient_data)
            
            
    # This function acts as a warm-up, setting the starting figures in the simulation, so it does not begin with an empty system
            
    def warm_up_model(self, warmup_number):
        """
        
        :param warmup_number: Number of patients to be generated at warm up
        :return: 
        """
        
        if warmup_number > self.n_surgical_emergency_beds + self.n_surgical_elective_beds + self.n_medical_emergency_beds + self.n_escalation_beds:
            raise ValueError("The number of patients at warm-up cannot exceed the beds available")
        
        else:
            self.patient_generator(n=warmup_number)
        
        # Admit patients              
        self.admit_patient(warm=True)
        
        # Empty the patient master now that all patients are assigned a bed
        self.patient_master.clear() 
                

    # Calculation Functions
    
    # These functions are to calculate the various metrics in the simulation
                
    def calculate_outliers(self):
        """
        Add the number of outliers to  self.record_n_outliers existing in the Elective, Medical Emergency and Surgical Emergency Beds
        !! Escalation usage is recorded in a different function 
        """

        elective_outliers = 0
        medical_outliers = 0
        surgical_outliers = 0

        for outlier in self.occupied_medical_emergency_beds:
            if outlier[2] == 'Elective':
                elective_outliers += 1
            elif outlier[2] == 'Surgical Emergency':
                surgical_outliers += 1
        
        for outlier in self.occupied_surgical_emergency_beds:
            if outlier[2] == 'Elective':
                elective_outliers += 1
            elif outlier[2] == 'Medical Emergency':
                medical_outliers += 1

        for outlier in self.occupied_surgical_elective_beds:
            if outlier[2] == 'Medical Emergency':
                medical_outliers += 1
            elif outlier[2] == 'Surgical Emergency':
                surgical_outliers += 1
        
        self.record_n_outliers['surgical elective'].append(elective_outliers)
        self.record_n_outliers['surgical emergency'].append(surgical_outliers)
        self.record_n_outliers['medical emergency'].append(medical_outliers)
        
    def calculate_escalation(self):
        """
        
        :return: 
        """
        escalation = 0 
        
        if self.occupied_escalation_beds:
            self.record_n_escalation.append(len(self.occupied_escalation_beds))
        else: 
            self.occupied_escalation_beds.append(escalation)
            
            



    # Core Functions
    
    # All these functions are used in the running of the simulation
          
    def discharge_patient(self):
        """
        This function should discharge the patient when their los counter records 0 
        :return: 
        """
        
    
    def cancel_patient(self, patient_being_cancelled):
        """
        This function should add any patients marked as cancelled to the cancellations list
        :param patient_being_cancelled: a list of record(s) of the patient(s) being cancelled
        :return: 
        """
        self.elective_cancellations.append(patient_being_cancelled)
        
       
    def admit_patient(self, warm):
        """
        This function should admit patients from the holding areas
        :return: 
        """
        
        if warm:
            for patient in self.patient_master:
                # Prioritise the placement of emergency patients

                if patient[2] == 'Medical Emergency' and len(self.occupied_medical_emergency_beds) < self.n_medical_emergency_beds:
                    self.occupied_medical_emergency_beds.append(patient)

                # If this is not possible place the patients in surgical beds
                elif patient[2] == 'Medical Emergency' and len(self.occupied_medical_emergency_beds) >= self.n_medical_emergency_beds and len(self.occupied_surgical_emergency_beds) < self.n_surgical_emergency_beds:
                    self.occupied_surgical_emergency_beds.append(patient)

                elif patient[2] == 'Surgical Emergency' and len(self.occupied_surgical_emergency_beds) < self.n_surgical_emergency_beds:
                    self.occupied_surgical_emergency_beds.append(patient)

                elif patient[2] == 'Surgical Emergency' and len(self.occupied_surgical_emergency_beds) >= self.n_surgical_emergency_beds and len(self.occupied_medical_emergency_beds) < self.n_medical_emergency_beds:
                    self.occupied_medical_emergency_beds.append(patient)

                elif patient[2] == 'Elective' and len(self.occupied_surgical_elective_beds) < self.n_surgical_elective_beds:
                    self.occupied_surgical_elective_beds.append(patient)

                elif patient[2] == 'Elective' and len(self.occupied_surgical_elective_beds) >= self.n_surgical_elective_beds and len(self.occupied_surgical_emergency_beds) < self.n_surgical_emergency_beds:
                    self.occupied_surgical_emergency_beds.append(patient)

                elif patient[2] == 'Elective' and len(self.occupied_surgical_elective_beds) >= self.n_surgical_elective_beds and len(self.occupied_medical_emergency_beds) < self.n_medical_emergency_beds:
                    self.occupied_medical_emergency_beds.append(patient)

                # If patient cannot fit within the core bed base then use escalation beds
                else:
                    self.occupied_escalation_beds.append(patient)
        
    def arrivals(self):
        """
        This function handles generating the new arrivals each hour and puts them in the holding area ready for admission
        :return: 
        """
    
    # End Results
    
    # These functions are used to record the end results of the model and graphically show them
        
    def collect_results(self):
        """
        This function should collect the results into a tabular format
        
        :return: 
        """
    
    def graph_results(self):
        """
        This function should graphically show the results of the simulation
        :return: 
        """

    # Main Function 
    
    # This is the core function called to run the simulation (after set up and warm up)

    def simulate_inpatient_system(self, start_time, end_time, runs=100):
        """
        
        :param start_time: datetime for when the simulation should start
        :param end_time: datetime for when the simulation should end
        :param runs: the number of runs that should be executed to collect results, default: 100 runs
        :return: 
        """
        
        for i in range(runs):
            current_time = start_time
            
            while current_time <= end_time:
                # Beds occupied
                #self.occupied_surgical_elective_beds = []
                #self.occupied_surgical_emergency_beds = []
                #self.occupied_medical_emergency_beds = []
                #self.occupied_escalation_beds = []
        
                # All Patients
                #self.patient_master = [] # A master holding place for newly generated patients until they are assigned to the correct holder or bed
        
                #Queues / cancellations
                #self.ed_queue = [] # If an emergency patient from ed cannot be admitted they wait here (trolley wait)
                #self.non_ed_queue = [] # If an emergency patient from a non ed source cannot be admitted they wait here
                #self.elective_cancellations = [] # If a patient from the elective waiting list cannot be admitted they are cancelled
        
                # Metrics
                #self.record_available_beds = {'surgical elective': [], 'surgical emergency': [], 'medical emergency': [], 'escalation': []}
                #self.record_n_occupied_beds = {'surgical elective': [], 'surgical emergency': [], 'medical emergency': [], 'escalation': []}
                #self.record_n_outliers = {'surgical elective': [], 'surgical emergency': [], 'medical emergency': []}
                #self.record_n_admissions_by_hour = []
                #self.record_n_discharges_by_hour = []
                #self.record_mean_length_of_stay = []
                #self.record_mean_ed_queue = []
                #self.record_mean_non_ed_queue = []
                
                #Finished
                self.record_n_escalation.append(len(self.occupied_escalation_beds))
                self.record_n_trolley_waits.append(sum(len(self.non_ed_queue), len(self.ed_queue)))
                
                # Count how many elective patients couldn't be admitted and were moved to the cancelled parameter
                self.record_n_cancellations.append(len(self.cancellations))
                # Clear the Cancellations after they have been recorded
                self.cancellations.clear()
                
                current_time += timedelta(hours=1)
        



In [149]:


warmup_n = 574

source_prob = {"Emergency Department": 0.8, "Non-ED Admissions": 0.14, "Elective Waiting List": 0.06}

category_prob = {"Emergency Department": {'Surgical Elective':0, 'Surgical Emergency':0.2, 'Medical Emergency':0.8},
                 "Non-ED Admission": {'Surgical Elective':0, 'Surgical Emergency':0.4, 'Medical Emergency':0.6}
                 }

los_distributions = {
    'Emergency Department': {'Surgical Elective': (1, 0.5), 'Surgical Emergency': (2, 0.7), 'Medical Emergency': (3, 1)},
    'Non-ED Admission': {'Surgical Elective': (1.5, 0.6), 'Surgical Emergency': (2.5, 0.8), 'Medical Emergency': (3.5, 1.2)},
    'Elective': {'Elective': (2, 0.7)}
}

hospital = BedModel(n_surgical_elective_beds=40,
                    n_surgical_emergency_beds=150,
                    n_medical_emergency_beds=450,
                    n_escalation_beds=20, 
                    source_probability=source_prob, 
                    category_probability=category_prob, 
                    los_distributions=los_distributions)

hospital.warm_up_model(warmup_number=warmup_n)


hospital.patients


[[737505, 'Non-ED Admission', 'Medical Emergency', 44],
 [522449, 'Emergency Department', 'Medical Emergency', 2],
 [754857, 'Emergency Department', 'Medical Emergency', 33],
 [383740, 'Emergency Department', 'Medical Emergency', 70],
 [619159, 'Emergency Department', 'Medical Emergency', 67],
 [351387, 'Emergency Department', 'Surgical Emergency', 3],
 [528005, 'Emergency Department', 'Medical Emergency', 10],
 [460134, 'Non-ED Admission', 'Medical Emergency', 136],
 [545634, 'Elective', 'Elective', 10],
 [319007, 'Elective', 'Elective', 10],
 [584878, 'Emergency Department', 'Medical Emergency', 16],
 [763339, 'Emergency Department', 'Medical Emergency', 13],
 [989971, 'Emergency Department', 'Medical Emergency', 81],
 [714033, 'Emergency Department', 'Medical Emergency', 53],
 [374517, 'Emergency Department', 'Surgical Emergency', 11],
 [743733, 'Emergency Department', 'Medical Emergency', 4],
 [720224, 'Emergency Department', 'Medical Emergency', 27],
 [720963, 'Emergency Departmen

In [143]:
m = hospital.occupied_medical_emergency_beds

In [2]:
def join_lists(lists):
    # Initialize an empty list to hold the result
    joined_list = []

    # Concatenate each list in the input list
    for lst in lists:
        joined_list += lst

    return joined_list

# Example usage:
list1 = [[1], [2], [3]]
list2 = [[], [5], [6]]
list3 = [[7], [8], [9]]

result = join_lists([list1, list2, list3])
print(result)  # Output: [1, 2, 3, 4, 5, 6, 7, 8, 9]

[[1], [2], [3], [], [5], [6], [7], [8], [9]]
