# Imports

Please use the provided `hds_stoch` environment for this work.  

In [1]:
import simpy
simpy.__version__

'4.0.1'

In [2]:
import numpy as np
import pandas as pd
import itertools
import math
import matplotlib.pyplot as plt

# Distribution classes

To help you build your model, the notebook includes some pre-written distribution classes that you may wish to use to setup sampling.  You are free to use these, but you can choose not too if you have an approach you prefer. 

In [3]:
class Exponential:
    '''
    Convenience class for the exponential distribution.
    packages up distribution parameters, seed and random generator.
    '''
    def __init__(self, mean, random_seed=None):
        '''
        Constructor
        
        Params:
        ------
        mean: float
            The mean of the exponential distribution
        
        random_seed: int, optional (default=None)
            A random seed to reproduce samples.  If set to none then a unique
            sample is created.
        '''
        self.rand = np.random.default_rng(seed=random_seed)
        self.mean = mean
        
    def sample(self, size=None):
        '''
        Generate a sample from the exponential distribution
        
        Params:
        -------
        size: int, optional (default=None)
            the number of samples to return.  If size=None then a single
            sample is returned.
        '''
        return self.rand.exponential(self.mean, size=size)

class Lognormal:
    """
    Encapsulates a lognormal distirbution
    """
    def __init__(self, mean, stdev, random_seed=None):
        """
        Params:
        -------
        mean = mean of the lognormal distribution
        stdev = standard dev of the lognormal distribution
        """
        self.rand = np.random.default_rng(seed=random_seed)
        mu, sigma = self.normal_moments_from_lognormal(mean, stdev**2)
        self.mu = mu
        self.sigma = sigma
        
    def normal_moments_from_lognormal(self, m, v):
        '''
        Returns mu and sigma of normal distribution
        underlying a lognormal with mean m and variance v
        source: https://blogs.sas.com/content/iml/2014/06/04/simulate-lognormal
        -data-with-specified-mean-and-variance.html

        Params:
        -------
        m = mean of lognormal distribution
        v = variance of lognormal distribution
                
        Returns:
        -------
        (float, float)
        '''
        phi = math.sqrt(v + m**2)
        mu = math.log(m**2/phi)
        sigma = math.sqrt(math.log(phi**2/m**2))
        return mu, sigma
        
    def sample(self):
        """
        Sample from the normal distribution
        """
        return self.rand.lognormal(self.mu, self.sigma)

# Utility functions

In [4]:
def trace(msg):
    '''
    Utility function for printing simulation
    set the TRACE constant to FALSE to 
    turn tracing off.
    
    Params:
    -------
    msg: str
        string to print to screen.
    '''
    if TRACE:
        print(msg)

In [5]:
def performance(ASU_model):
    '''
    Utility function for final metrics calculation.
    
    Returns numpy.float64 variable containining
    mean waiting time of the bottom 90% of the pts
    formatted to hours.
    
    Params:
    -------
    ASU_model: Class
        refer to the list of Patient objects appended 
        throughout a simulation run to the patients list
        inside of the ASU Class
    '''
    # np array of all queue times to treatment
    patients_queue_times = np.array([patient.queue_time 
                               for patient in model.patients])

    # transform values from days into hours
    patients_queue_times *= 24
    
    # calculate total number of patients
    arr_length = np.size(patients_queue_times)

    # determine the number of values to be dropped
    num_to_drop = int(arr_length * 0.1)

    # sort the array in descending order
    sorted_queue_times = np.sort(patients_queue_times)[::-1]

    # drop top 10%, estimate the mean
    queue_bottom90 = sorted_queue_times[num_to_drop:].mean()
    
    
    #bed utilisation = sum(call durations) / (run length X no. BEDS)
    util = np.array([patient.treat_time 
                 for patient in model.patients]).sum() / (RUN_LENGTH * N_BEDS)
    
    return queue_bottom90, util

# Model parameters

The constants below provides hard coded data representing the base case or 'as-is' state of the minor injury unit.   

In [6]:
# These are the parameters for a base case model run.

# run length in days
RUN_LENGTH = 365

# Turn off tracing
TRACE = False 

# resource counts
N_BEDS = 9

# time between arrivals in minutes (exponential)
# for acute stroke, TIA and neuro respectively
MEAN_IATs = [1.2, 9.5, 3.5]

# treatment (lognormal)
# for acute stroke, TIA and neuro respectively
TREAT_MEANs = [7.4, 1.8, 2.0]
TREAT_STDs = [8.5, 2.3, 2.5]

# SEEDS to reproduce results of a single run
REPRODUCIBLE_RUN = False
    
if REPRODUCIBLE_RUN:
    SEEDS = [42, 101, 1066, 1966, 2013, 999, 1444, 2016]
else:
    SEEDS = [None, None, None, None, None, None, None, None]

# Scenario class
In the cell below the parameters you will find a `Scenario` class.  This makes use of the default parameters to set up the base case scenario.  Remember that its good practice to pass all of your parameters to your simulation model in a **container**.  A class is a flexible way to achieve this aim.

In [7]:
class Scenario:
    '''
    Parameter container class for minor injury unit model.
    '''
    def __init__(self, name=None):
        '''
        The init method sets up our defaults. 
        
        Params:
        -------
        
        name - str or None
            optional name for scenario
        '''
        # optional name
        self.name = name
        
        # number of beds
        self.beds = N_BEDS
        
        # inter-arrival distributions
        self.arrival_dist_type1 = Exponential(MEAN_IATs[0], random_seed=SEEDS[0]) 
        self.arrival_dist_type2 = Exponential(MEAN_IATs[1], random_seed=SEEDS[1])
        self.arrival_dist_type3 = Exponential(MEAN_IATs[2], random_seed=SEEDS[2])
        
        
        # treatment distributions
        self.treatment_dist_type1 = Lognormal(TREAT_MEANs[0], TREAT_STDs[0], random_seed=SEEDS[3])
        self.treatment_dist_type2 = Lognormal(TREAT_MEANs[1], TREAT_STDs[1], random_seed=SEEDS[4])
        self.treatment_dist_type3 = Lognormal(TREAT_MEANs[2], TREAT_STDs[2], random_seed=SEEDS[5])

# Model building

In [32]:
class Patient:
    '''
    Patient in the minor ED process
    '''
    def __init__(self, identifier, env, args):
        '''
        Constructor method
        
        Params:
        -----
        identifier: int
            a numeric identifier for the patient.
            
        env: simpy.Environment
            the simulation environment
            
        args: Scenario
            The input data for the scenario
        '''
        # patient id and environment
        self.identifier = identifier
        self.env = env

        # treatment parameters
        self.beds = args.beds
        self.treatment_dist_type1 = args.treatment_dist_type1
        self.treatment_dist_type2 = args.treatment_dist_type2
        self.treatment_dist_type3 = args.treatment_dist_type3
                
        # individual patient metrics
        self.queue_time = 0.0
        self.treat_time = 0.0
        
    
    def treatment_type1(self):

        # record the time that patient entered the system
        arrival_time = self.env.now
     
        # get a bed
        with self.beds.request() as req:
            yield req
            
            # record time to first being seen by a doctor
            self.queue_time = self.env.now - arrival_time

            trace(f'Patient № {self.identifier} started treatment at {self.env.now:.3f};'
                      + f' queue time was {self.queue_time:.3f}')            

            # sample for patient pathway
            self.treat_time = self.treatment_dist_type1.sample()
            
            activity_duration = self.treat_time
          
            # treatment delay
            yield self.env.timeout(activity_duration)
            
            
            
    def treatment_type2(self):

        # record the time that patient entered the system
        arrival_time = self.env.now
     
        # get a bed
        with self.beds.request() as req:
            yield req
            
            # record time to first being seen by a doctor
            self.queue_time = self.env.now - arrival_time
            
            trace(f'Patient № {self.identifier} started treatment at {self.env.now:.3f};'
                      + f' queue time was {self.queue_time:.3f}')            

            # sample for patient pathway
            self.treat_time = self.treatment_dist_type2.sample()
            
            activity_duration = self.treat_time
          
            # treatment delay
            yield self.env.timeout(activity_duration)
                                        
            
    def treatment_type3(self):

        # record the time that patient entered the system
        arrival_time = self.env.now
     
        # get a bed
        with self.beds.request() as req:
            yield req
            
            # record time to first being seen by a doctor
            self.queue_time = self.env.now - arrival_time
            
            trace(f'Patient № {self.identifier} started treatment at {self.env.now:.3f};'
                      + f' queue time was {self.queue_time:.3f}')             

            # sample for patient pathway
            self.treat_time = self.treatment_dist_type3.sample()
            
            activity_duration = self.treat_time
          
            # treatment delay
            yield self.env.timeout(activity_duration)
            

In [33]:
class ASU:  
    '''
    Model of an ASU
    '''
    def __init__(self, args):
        '''
        Contructor
        
        Params:
        -------
        env: simpy.Environment
        
        args: Scenario
            container class for simulation model inputs.
        '''
        self.env = simpy.Environment()
        self.args = args 
        self.init_model_resources(args)
        self.patients = []
        self.patient_count = 0
        
        
    def init_model_resources(self, args):
        '''
        Setup the simpy resource objects
        
        Params:
        ------
        args - Scenario
            Simulation Parameter Container
        '''

        args.beds = simpy.Resource(self.env, 
                                   capacity=args.beds)
        
        
    def run(self, results_collection_period = RUN_LENGTH,
            warm_up = 0):
        '''
        Conduct a single run of the model in its current 
        configuration

        run length = results_collection_period + warm_up

        Parameters:
        ----------
        results_collection_period, float, optional
            default = DEFAULT_RESULTS_COLLECTION_PERIOD

        warm_up, float, optional (default=0)
            length of initial transient period to truncate
            from results.

        Returns:
        --------
            None

        '''
        # setup the arrival processes
        self.env.process(self.arrivals_generator_type1())
        self.env.process(self.arrivals_generator_type2())
        self.env.process(self.arrivals_generator_type3())
                
        # run
        self.env.run(until=results_collection_period+warm_up)
        
            
    def arrivals_generator_type1(self):
            
        while True:
            
            
                # PATIENT WITH ACUTE STROKE ARRIVES (TYPE 1)
                inter_arrival_time = self.args.arrival_dist_type1.sample()               
                
                yield self.env.timeout(inter_arrival_time)
                
                self.patient_count += 1
                
                trace(f'Patient № {self.patient_count} (Stroke) arrives at: {self.env.now:.3f}')
                
                # create a new patient and pass in env and args
                new_patient = Patient(self.patient_count, self.env, self.args)                

                # init the minor injury process for this patient
                self.env.process(new_patient.treatment_type1())                 
            
                # keep a record of the patient for results calculation
                self.patients.append(new_patient)                
                
                
    def arrivals_generator_type2(self):
            
        while True:
            
                            
                # PATIENT WITH TRANSITORY ISCHEMIC ATTACK ARRIVES (TYPE 2)
                inter_arrival_time = self.args.arrival_dist_type2.sample()
           
                yield self.env.timeout(inter_arrival_time)
            
                self.patient_count += 1            
                                
                trace(f'Patient № {self.patient_count} (TIA) arrives at: {self.env.now:.3f}')   
                
                # create a new patient and pass in env and args
                new_patient = Patient(self.patient_count, self.env, self.args)
            
                # init the minor injury process for this patient
                self.env.process(new_patient.treatment_type2())                
                
                # keep a record of the patient for results calculation
                self.patients.append(new_patient)                
                
    def arrivals_generator_type3(self):
            
        while True:
            
                
                # PATIENT WITH COMPLEX NEUROLOGICAL DIAGNOSIS ARRIVES (TYPE 3)
                inter_arrival_time = self.args.arrival_dist_type3.sample()
                
                yield self.env.timeout(inter_arrival_time)
                
                self.patient_count += 1
                
                trace(f'Patient № {self.patient_count} (Neuro) arrives at: {self.env.now:.3f}')
                
                # create a new patient and pass in env and args
                new_patient = Patient(self.patient_count, self.env, self.args)
            
                # init the minor injury process for this patient
                self.env.process(new_patient.treatment_type3())                
                
                # keep a record of the patient for results calculation
                self.patients.append(new_patient)
                
    def run_summary_frame(self):
        
        '''
        Utility function for final metrics calculation.
        
        Returns numpy.float64 variable containining
        mean waiting time of the bottom 90% of the pts
        formatted to hours.
    
        Params:
        -------
        ASU_model: Class
            refer to the list of Patient objects appended 
            throughout a simulation run to the patients list
            inside of the ASU Class
        '''
        # np array of all queue times to treatment
        patients_queue_times = np.array([patient.queue_time 
                                   for patient in self.patients])

        # transform values from days into hours
        patients_queue_times *= 24
    
        # calculate total number of patients
        arr_length = np.size(patients_queue_times)

        # determine the number of values to be dropped
        num_to_drop = int(arr_length * 0.1)

        # sort the array in descending order
        sorted_queue_times = np.sort(patients_queue_times)[::-1]

        # drop top 10%, estimate the mean
        queue_bottom90 = sorted_queue_times[num_to_drop:].mean()
    
    
        #bed utilisation = sum(call durations) / (run length X no. BEDS)
        util = np.array([patient.treat_time 
                     for patient in self.patients]).sum() / (RUN_LENGTH * N_BEDS)
        

        df = pd.DataFrame({'1':{'mean_queue_bottom90%': queue_bottom90, 
                                'bed utilisation': util}})
        df = df.T
        df.index.name = 'rep'
        return df
    

# Functions for single and multiple runs

In [34]:
def single_run(scenario, 
               rc_period = RUN_LENGTH, 
               warm_up = 0):
    '''
    Perform a single run of the model and return the results
    
    Parameters:
    -----------
    
    scenario: Scenario object
        The scenario/paramaters to run
        
    rc_period: int
        The length of the simulation run that collects results
        
    warm_up: int, optional (default=0)
        warm-up period in the model.  The model will not collect any results
        before the warm-up period is reached.  
        
    random_no_set: int or None, optional (default=1)
        Controls the set of random seeds used by the stochastic parts of the 
        model.  Set to different ints to get different results.  Set to None
        for a random set of seeds.
        
    Returns:
    --------
        pandas.DataFrame:
        results from single run.
    '''  
        
    # set random number set - this controls sampling for the run.
    #scenario.set_random_no_set(random_no_set)
    
    # create the model
    model = ASU(scenario)

    model.run(results_collection_period = rc_period, warm_up = warm_up)
    
    # run the model
    results_summary= model.run_summary_frame()
    
    return results_summary

# Script to run the model

In [35]:
# base case scenario with default parameters
default_args = Scenario()

print('Running simulation ...', end = ' => ')

results = single_run(default_args)

print('simulation complete.')

results

Running simulation ... => simulation complete.


Unnamed: 0_level_0,bed utilisation,mean_queue_bottom90%
rep,Unnamed: 1_level_1,Unnamed: 2_level_1
1,0.752069,6.631477
