# SimPy

`simpy` uses process based model worldview.  Given its simplicity it is a highly flexible discrete-event simulation package.

> Detailed documentation can be found here: https://simpy.readthedocs.io/en/latest/

---

## Imports

It is recommended that you use the provided conda virtual environment `os-sim`.  

>If you are running this code in **Google Colab** then `simpy` can be installed by:

In [None]:
pip install simpy==4.0.1

In [2]:
import simpy
simpy.__version__

'4.0.1'

In [3]:
import numpy as np
import pandas as pd
import itertools
import math
import matplotlib.pyplot as plt

In [4]:
# need numpy > v1.18
np.__version__

'1.19.2'

# Example: A health clinic in the US.

This example is based on Nelson (2010) ex. 13 page 170.

# Constants

In [11]:
TRACE = True

DEFAULT_TRIAGE_MEAN = 3.0

DEFAULT_REG_MEAN = 5.0
DEFAULT_REG_VAR= 2.0

DEFAULT_EXAM_MEAN = 16.0
DEFAULT_EXAM_VAR = 3.0

DEFAULT_TRAUMA_MEAN = 90.0

DEFAULT_TRAUMA_TREAT_MEAN = 30.0
DEFAULT_TRAUMA_TREAT_VAR = 4.0

DEFAULT_NON_TRAUMA_TREAT_MEAN = 13.3
DEFAULT_NON_TRAUMA_TREAT_VAR = 2.0

DEFAULT_NON_TRAUMA_TREAT_P = 0.40

#default random number SET
DEFAULT_RNG_SET = None
N_STREAMS = 10

#default results collection period
DEFAULT_RESULTS_COLLECTION_PERIOD = 1440

In [6]:
def trace(msg):
    '''
    Utility function for printing simulation
    set the TRACE constant to FALSE to 
    turn tracing off.
    
    Params:
    -------
    msg: str
        string to print to screen.
    '''
    if TRACE:
        print(msg)

## Distribution classes

In [9]:
class Exponential:
    '''
    Convenience class for the exponential distribution.
    packages up distribution parameters, seed and random generator.
    '''
    def __init__(self, mean, random_seed=None):
        '''
        Constructor
        
        Params:
        ------
        mean: float
            The mean of the exponential distribution
        
        random_seed: int, optional (default=None)
            A random seed to reproduce samples.  If set to none then a unique
            sample is created.
        '''
        self.rng = np.random.default_rng(seed=random_seed)
        self.mean = mean
        
    def sample(self, size=None):
        '''
        Generate a sample from the exponential distribution
        
        Params:
        -------
        size: int, optional (default=None)
            the number of samples to return.  If size=None then a single
            sample is returned.
        '''
        return self.rng.exponential(self.mean, size=size)

    
class Bernoulli:
    '''
    Convenience class for the Bernoulli distribution.
    packages up distribution parameters, seed and random generator.
    '''
    def __init__(self, p, random_seed=None):
        '''
        Constructor
        
        Params:
        ------
        p: float
            probability of drawing a 1
        
        random_seed: int, optional (default=None)
            A random seed to reproduce samples.  If set to none then a unique
            sample is created.
        '''
        self.rng = np.random.default_rng(seed=random_seed)
        self.p = p
        
    def sample(self, size=None):
        '''
        Generate a sample from the exponential distribution
        
        Params:
        -------
        size: int, optional (default=None)
            the number of samples to return.  If size=None then a single
            sample is returned.
        '''
        return self.rng.binomial(n=1, p=self.p, size=size)

class Lognormal:
    """
    Encapsulates a lognormal distirbution
    """
    def __init__(self, mean, stdev, random_seed=None):
        """
        Params:
        -------
        mean: float
            mean of the lognormal distribution
            
        stdev: float
            standard dev of the lognormal distribution
            
        random_seed: int, optional (default=None)
            Random seed to control sampling
        """
        self.rng = np.random.default_rng(seed=random_seed)
        mu, sigma = self.normal_moments_from_lognormal(mean, stdev**2)
        self.mu = mu
        self.sigma = sigma
        
    def normal_moments_from_lognormal(self, m, v):
        '''
        Returns mu and sigma of normal distribution
        underlying a lognormal with mean m and variance v
        source: https://blogs.sas.com/content/iml/2014/06/04/simulate-lognormal
        -data-with-specified-mean-and-variance.html

        Params:
        -------
        m: float
            mean of lognormal distribution
        v: float
            variance of lognormal distribution
                
        Returns:
        -------
        (float, float)
        '''
        phi = math.sqrt(v + m**2)
        mu = math.log(m**2/phi)
        sigma = math.sqrt(math.log(phi**2/m**2))
        return mu, sigma
        
    def sample(self):
        """
        Sample from the normal distribution
        """
        return self.rng.lognormal(self.mu, self.sigma)

In [10]:
class Normal:
    '''
    Convenience class for the normal distribution.
    packages up distribution parameters, seed and random generator.
    '''
    def __init__(self, mean, sigma, random_seed=None):
        '''
        Constructor
        
        Params:
        ------
        mean: float
            The mean of the normal distribution
            
        sigma: float
            The stdev of the normal distribution
        
        random_seed: int, optional (default=None)
            A random seed to reproduce samples.  If set to none then a unique
            sample is created.
        '''
        self.rng = np.random.default_rng(seed=random_seed)
        self.mean = mean
        self.sigma = sigma
        
    def sample(self, size=None):
        '''
        Generate a sample from the normal distribution
        
        Params:
        -------
        size: int, optional (default=None)
            the number of samples to return.  If size=None then a single
            sample is returned.
        '''
        return self.rng.normal(self.mean, self.sigma, size=size)



In [None]:
class Scenario:
    '''
    Container class for scenario parameters/arguments
    
    Passed to a model and its process classes
    '''
    def __init__(self, random_number_set=DEFAULT_RNG_SET):
        '''
        The init method sets up our defaults.
        
        Parameters:
        -----------
        random_number_set: int, optional (default=DEFAULT_RNG_SET)
            Set to control the initial seeds of each stream of pseudo
            random numbers used in the model.
        '''
                #sampling
        self.random_number_set = random_number_set
        self.init_sampling()
    
    def set_random_no_set(self, random_number_set):
        '''
        Controls the random sampling 
        Parameters:
        ----------
        random_number_set: int
            Used to control the set of psuedo random numbers
            used by the distributions in the simulation.
        '''
        self.random_number_set = random_number_set
        self.init_sampling()

    def init_sampling(self):
        '''
        Create the distributions used by the model and initialise 
        the random seeds of each.
        '''
        # create random number streams
        rng_streams = np.random.default_rng(self.random_number_set)
        self.seeds = rng_streams.integers(0, 999999999, size=N_STREAMS)


        # create distributions
        
        # Triage duration
        self.triage_dist = Exponential(DEFAULT_TRIAGE_MEAN, 
                                       random_seed=self.seeds[0])
        
        # Registration duration (non-trauma only)
        self.reg_dist = Lognormal(DEFAULT_REG_MEAN, 
                                  np.sqrt(DEFAULT_REG_VAR),
                                  random_seed=self.seeds[1])
        
        # Evaluation (non-trauma only)
        self.exam_dist = Normal(DEFAULT_EXAM_MEAN,
                                np.sqrt(DEFAULT_EXAM_VAR),
                                random_seed=self.seeds[2])
        
        # Trauma/stablisation duration (trauma only)
        self.trauma_dist = Exponential(DEFAULT_TRAUMA_MEAN, 
                                       random_seed=self.seeds[3])
        
        # Non-trauma treatment
        self.nt_treat_dist = Lognormal(DEFAULT_NON_TRAUMA_TREAT_MEAN, 
                                       np.sqrt(DEFAULT_NON_TRAUMA_TREAT_VAR),
                                       random_seed=self.seeds[4])
        
        # treatment of trauma patients
        self.treat_dist = Lognormal(DEFAULT_TRAUMA_TREAT_MEAN, 
                                    np.sqrt(DEFAULT_TRAUMA_TREAT_VAR),
                                    random_seed=self.seeds[5])
        
        # probability of non-trauma patient requiring treatment
        self.nt_p_treat_dist = Bernoulli(DEFAULT_NON_TRAUMA_TREAT_P, 
                                         random_seed=self.seeds[6])
        
    

In [7]:
class TraumaPathway:
    '''
    Encapsulates the process a patient with severe injuries or illness.
    
    These patients are signed into the ED and triaged as having severe injuries
    or illness.
    
    Patients are stabilised in resus (trauma) and then sent to Treatment.  
    Following treatment they are discharged.
    '''
    def __init__(self, identifier, env, args):
        '''
        Constructor method
        
        Params:
        -----
        identifier: int
            a numeric identifier for the patient.
            
        env: simpy.Environment
            the simulation environment
            
        args: Scenario
            Container class for the simulation parameters
            
        '''
        self.identifier = identifier
        self.env = env
        
        # triage resource
        self.triage = args.triage
        
        # metrics
        self.arrival = -np.inf
        self.wait_triage = -np.inf
        self.wait_trauma = -np.inf
        self.wait_treat = -np.inf
        self.total_time = -np.inf
        
    def execute(self):
        '''
        simulates the major treatment process for a patient
        
        1. request and wait for sign-in/triage
        2. trauma
        3. treatment
        '''
        # record the time of arrival and entered the triage queue
        self.arrival = env.now

        # request sign-in/triage 
        with self.triage.request() as req:
            yield req
            
            # record the waiting time for triage
            self.wait_triage = self.env.now - self.arrival
            trace(f'patient {self.identifier} triaged to trauma '
                  f'{self.env.now:.3f}')
            
            # sample triage duration.
            triage_duration = args.triage_dist.sample()
            yield self.env.timeout(triage_duration)
            
            trace(f'triage {self.identifier} complete {self.env.now:.3f}; '
                  f'waiting time was {self.waiting_time:.3f}')
            
        # record the time that entered the trauma queue
        start_wait = env.now
    
        # request trauma room 
        with self.args.trauma.request() as req:
            yield req
            
            # record the waiting time for trauma
            self.wait_trauma = self.env.now - start_wait
            trace(f'stabilisation of patient {self.identifier} at '
                  f'{self.env.now:.3f}')
            
            # sample stablisation duration.
            trauma_duration = args.trauma_dist.sample()
            yield self.env.timeout(trauma_duration)
            
            trace(f'patient {self.identifier} stabilised {self.env.now:.3f}; '
                  f'waiting time was {self.wait_trauma:.3f}')
            
        # record the time that entered the treatment queue
        start_wait = env.now
    
        # request treatment cubicle 
        with self.args.cubicle.request() as req:
            yield req
            
            # record the waiting time for trauma
            self.wait_treat = self.env.now - start_wait
            trace(f'treatment of patient {self.identifier} at '
                  f'{self.env.now:.3f}')
            
            # sample treatment duration.
            treat_duration = args.trauma_dist.sample()
            yield self.env.timeout(treat_duration)
            
            trace(f'patient {self.identifier} treatment complete {self.env.now:.3f}; '
                  f'waiting time was {self.wait_treat:.3f}')
        
        # total time in system
        self.total_time = self.env.now - self.arrival     

In [13]:
class NonTraumaPathway(object):
    '''
    Encapsulates the process a patient with minor injuries and illness.
    
    These patients are signed into the ED and triaged as having minor 
    complaints and streamed to registration and then examination. 
    
    Post examination 40% are discharged while 60% proceed to treatment.  
    Following treatment they are discharged.
    '''
    def __init__(self, identifier, env, args):
        '''
        Constructor method
        
        Params:
        -----
        identifier: int
            a numeric identifier for the patient.
            
        env: simpy.Environment
            the simulation environment
            
        args: Scenario
            Container class for the simulation parameters
            
        '''
        self.identifier = identifier
        self.env = env
        
        # triage resource
        self.triage = args.triage
        
        # metrics
        self.arrival = -np.inf
        self.wait_triage = -np.inf
        self.wait_reg = -np.inf
        self.wait_eval = -np.inf
        self.wait_treat = -np.inf
        self.total_time = -np.inf
        
    def execute(self):
        '''
        simulates the minor treatment process for a patient
        
        1. request and wait for sign-in/triage
        2. patient registration
        3. evaluation
        4.1 40% discharged
        4.2 60% treatment then discharge
        '''
        # record the time of arrival and entered the triage queue
        self.arrival = env.now

        # request sign-in/triage 
        with self.triage.request() as req:
            yield req
            
            # record the waiting time for triage
            self.wait_triage = self.env.now - self.arrival
            trace(f'patient {self.identifier} triaged to minors '
                  f'{self.env.now:.3f}')
            
            # sample triage duration.
            triage_duration = args.triage_dist.sample()
            yield self.env.timeout(triage_duration)
            
            trace(f'triage {self.identifier} complete {self.env.now:.3f}; '
                  f'waiting time was {self.waiting_time:.3f}')
            
        # record the time that entered the registration queue
        start_wait = env.now
    
        # request registration clert 
        with self.args.registration.request() as req:
            yield req
            
            # record the waiting time for registration
            self.wait_reg = self.env.now - start_wait
            trace(f'registration of patient {self.identifier} at '
                  f'{self.env.now:.3f}')
            
            # sample registration duration.
            reg_duration = args.trauma_dist.sample()
            yield self.env.timeout(reg_duration)
            
            trace(f'patient {self.identifier} registered at'
                  f'{self.env.now:.3f}; '
                  f'waiting time was {self.wait_reg:.3f}')
            
        # record the time that entered the evaluation queue
        start_wait = env.now
    
        # request evaluation resource
        with self.args.evaluation.request() as req:
            yield req
            
            # record the waiting time for registration
            self.wait_eval = self.env.now - start_wait
            trace(f'evaluation of patient {self.identifier} begins '
                  f'{self.env.now:.3f}')
            
            # sample evaluation duration.
            eval_duration = args.trauma_dist.sample()
            yield self.env.timeout(eval_duration)
            
            trace(f'patient {self.identifier} evaluation complete ' 
                  f'at {self.env.now:.3f};'
                  f'waiting time was {self.wait_eval:.3f}')
            
        # sample if patient requires treatment?
        self.require_treat = self.args.minor_treat_dist.sample()
        
        if self.require_treat:
        
            # record the time that entered the treatment queue
            start_wait = env.now
    
            # request treatment cubicle
            with self.args.cubicle.request() as req:
                yield req

                # record the waiting time for treatment
                self.wait_treat = self.env.now - start_wait
                trace(f'treatment of patient {self.identifier} begins '
                      f'{self.env.now:.3f}')

                # sample treatment duration.
                treat_duration = args.trauma_dist.sample()
                yield self.env.timeout(treat_duration)

                trace(f'patient {self.identifier} treatment complete '
                      f'at {self.env.now:.3f};'
                      f'waiting time was {self.wait_treat:.3f}')
                
        # total time in system
        self.total_time = self.env.now - self.arrival         