In [None]:
# Simpy Exercises 2:

## Complex pathways

In [1]:
import simpy
simpy.__version__

'3.0.12'

In [2]:
import numpy as np
import pandas as pd
import itertools
import math
import matplotlib.pyplot as plt

# Distribution classes

The notebook includes some pre-written distribution classes to help with sampling.  You are free to use these, but you can choose not too.

In [10]:
class Exponential():
    '''
    Convenience class for the exponential distribution.
    packages up distribution parameters, seed and random generator.
    '''
    def __init__(self, mean, random_seed=None):
        '''
        Constructor
        
        Params:
        ------
        mean: float
            The mean of the exponential distribution
        
        random_seed: int, optional (default=None)
            A random seed to reproduce samples.  If set to none then a unique
            sample is created.
        '''
        self.rand = np.random.default_rng(seed=random_seed)
        self.mean = mean
        
    def sample(self, size=None):
        '''
        Generate a sample from the exponential distribution
        
        Params:
        -------
        size: int, optional (default=None)
            the number of samples to return.  If size=None then a single
            sample is returned.
        '''
        return self.rand.exponential(self.mean, size=size)

class Triangular():
    '''
    Convenience class for the triangular distribution.
    packages up distribution parameters, seed and random generator.
    '''
    def __init__(self, low, mode, high, random_seed=None):
        self.rand = np.random.default_rng(seed=random_seed)
        self.low = low
        self.high = high
        self.mode = mode
        
    def sample(self, size=None):
        return self.rand.triangular(self.low, self.mode, self.high, size=size)
    
class Bernoulli():
    '''
    Convenience class for the Bernoulli distribution.
    packages up distribution parameters, seed and random generator.
    '''
    def __init__(self, p, random_seed=None):
        '''
        Constructor
        
        Params:
        ------
        p: float
            probability of drawing a 1
        
        random_seed: int, optional (default=None)
            A random seed to reproduce samples.  If set to none then a unique
            sample is created.
        '''
        self.rand = np.random.default_rng(seed=random_seed)
        self.p = p
        
    def sample(self, size=None):
        '''
        Generate a sample from the exponential distribution
        
        Params:
        -------
        size: int, optional (default=None)
            the number of samples to return.  If size=None then a single
            sample is returned.
        '''
        return self.rand.binomial(n=1, p=self.p, size=size)

class Uniform():
    '''
    Convenience class for the Bernoulli distribution.
    packages up distribution parameters, seed and random generator.
    '''
    def __init__(self, low, high, random_seed=None):
        '''
        Constructor
        
        Params:
        ------
        low: float
            lower range of the uniform
            
        high: float
            upper range of the uniform
        
        random_seed: int, optional (default=None)
            A random seed to reproduce samples.  If set to none then a unique
            sample is created.
        '''
        self.rand = np.random.default_rng(seed=random_seed)
        self.low = low
        self.high = high
        
    def sample(self, size=None):
        '''
        Generate a sample from the exponential distribution
        
        Params:
        -------
        size: int, optional (default=None)
            the number of samples to return.  If size=None then a single
            sample is returned.
        '''
        return self.rand.uniform(low=self.low, high=self.high, size=size)

In [11]:
class Lognormal(object):
    """
    Encapsulates a lognormal distirbution
    """
    def __init__(self, mean, stdev, random_seed=None):
        """
        Params:
        -------
        mean = mean of the lognormal distribution
        stdev = standard dev of the lognormal distribution
        """
        self.rand = np.random.default_rng(seed=random_seed)
        mu, sigma = self.normal_moments_from_lognormal(mean, stdev**2)
        self.mu = mu
        self.sigma = sigma
        
    def normal_moments_from_lognormal(self, m, v):
        '''
        Returns mu and sigma of normal distribution
        underlying a lognormal with mean m and variance v
        source: https://blogs.sas.com/content/iml/2014/06/04/simulate-lognormal
        -data-with-specified-mean-and-variance.html

        Params:
        -------
        m = mean of lognormal distribution
        v = variance of lognormal distribution
                
        Returns:
        -------
        (float, float)
        '''
        phi = math.sqrt(v + m**2)
        mu = math.log(m**2/phi)
        sigma = math.sqrt(math.log(phi**2/m**2))
        return mu, sigma
        
    def sample(self):
        """
        Sample from the normal distribution
        """
        return np.random.lognormal(self.mu, self.sigma)

In [3]:
def trace(msg):
    '''
    Turing printing of events on and off.
    
    Params:
    -------
    msg: str
        string to print to screen.
    '''
    if TRACE:
        print(msg)

# Model parameters and scenario class

In [38]:
# These are the parameters for a base case model run.

#resources
N_CUBICLES = 15
N_BAYS = 3

#time between arrivals in minutes (exponential)
MEAN_IAT = 5

#triage (triangular)
TRIAGE_LOW = 2
TRIAGE_MODE = 5
TRIAGE_HIGH = 10

#assessment (lognormal)
ASSESS_MEAN = 10
ASSESS_STD = 3

#diagnostics (bernoulli)
PROB_DIAG = 0.45 

#diagnostics waiting time (exp)
DIAG_WT_MEAN = 30

#diagnostics process time (triangular)
DIAG_PT_LOW = 10
DIAG_PT_MODE = 15
DIAG_PT_HIGH = 20

#% in parallel (bernoulli)
P_PARALLEL = 0.75

#SEEDS to reproduce results of a single run
REPRODUCIBLE_RUN = True
    
if REPRODUCIBLE_RUN:
    SEEDS = [42, 101, 1066, 1966, 2013, 999, 1444]
else:
    SEEDS = [None, None, None, None, None, None]

In [40]:
class Scenario(object):
    '''
    Parameter class for 111 simulation model
    '''
    def __init__(self):
        '''
        The init method sets up our defaults. 
        '''
        
        #triage bays
        self.triage_bays = simpy.Resource(env, capacity=N_BAYS)
        
        #assessment and treatment cubicles
        self.minor_cubicles = simpy.Resource(env, capacity=N_CUBICLES)
        
        #inter-arrival distribution
        self.arrival_dist = Exponential(MEAN_IAT, random_seed=SEEDS[0])
        
        #triage distribution
        self.triage_dist = Triangular(TRIAGE_LOW, TRIAGE_MODE, TRIAGE_HIGH, 
                                      random_seed=SEEDS[1])
        
        #assessment distribution
        self.assessment_dist = Lognormal(ASSESS_MEAN, ASSESS_STD, 
                                         random_seed=SEEDS[2])
        
        #diagnostics: prob that patient needs imaging etc.
        self.n_diagnostics_minor = Bernoulli(PROB_DIAG, random_seed=SEEDS[3])
        
        #waiting and process time dists for diagnostics
        self.diag_wait_dist = Exponential(DIAG_WT_MEAN, random_seed=SEEDS[4])
        self.diag_dist = Triangular(DIAG_PT_LOW, DIAG_PT_MODE, DIAG_PT_HIGH,
                                    random_seed=SEEDS[5])
        
        #prob diagnostics done in parallel to nurse assessment
        self.p_diag_parallel = Bernoulli(P_PARALLEL, random_seed=SEEDS[6])


# Model logic

In [44]:
class MinorPatient(object):
    '''
    Patient in the minor ED process
    '''
    def __init__(self, identifier, env, args):
        '''
        Constructor method
        
        Params:
        -----
        identifier: int
            a numeric identifier for the patient.
            
        env: simpy.Environment
            the simulation environment
            
        args: Scenario
            The input data for the scenario
        '''
        #patient id and environment
        self.identifier = identifier
        self.env = env

        #triage parameters
        self.triage_bays = args.triage_bays
        self.triage_dist = args.triage_dist
    
        #minor assessment parameters
        self.minor_cubicles = args.minor_cubicles
        self.assessment_dist = args.assessment_dist
        
        #diagnostics: prob that patient needs imaging etc.
        self.n_diagnostics_dist = args.n_diagnostics_minor
        
        #prob diagnostics done in parallel to nurse assessment
        self.p_diag_parallel = args.p_diag_parallel
        
        #waiting and process time dists for diagnostics
        self.diag_wait_dist = args.diag_wait_dist 
        self.diag_dist = args.diag_dist
                
        # individual patient metrics
        self.time_to_nurse = 0.0
        self.time_in_system = 0.0
        
    
    def assessment(self):
        '''
        simulates the process for minor emergencies
        
        1. request and wait for triage
        2. request and wait for a minor cubicle
        3. minor assessment
        4. exit system.
        
        '''
        #record the time that patient entered the system
        arrival_time = env.now

        #request an operator 
        with self.triage_bays.request() as req:
            yield req
            
            trace(f'minor triage {self.identifier} at {self.env.now:.3f}')
            
            #sample triage duration.
            triage_duration = self.triage_dist.sample()
            yield self.env.timeout(triage_duration)
            
            trace(f'triage ended {self.identifier} ended {self.env.now:.3f}; '
                      + f' waiting time was {self.env.now - arrival_time:.3f}')
            
        
        #get cubicle
        with self.minor_cubicles.request() as req:
            yield req
            
            #record time to first being seen by a nurse
            self.time_to_nurse = self.env.now - arrival_time
            
            trace(f'minor assessment {self.identifier} at {self.env.now:.3f}; '
                      + f' time to nurse was {self.time_to_nurse:.3f}')
            
            #sample for patient pathway
            assess_time, n_diagnostics, \
                parallel, diag_wt, diag_pt = self.sample_all()
            
            #patient undergoing diagnostics?
            if n_diagnostics > 0:

                #in parallel to minor assessment?
                if parallel:
                    
                    trace(f'parallel diagnostics patient {self.identifier}')
                    
                    #use the longest delay
                    activity_duration = max(assess_time, diag_wt + diag_pt)
                    
                else:
                    
                    trace(f'diagnostics following assessment for '
                          + f'patient {self.identifier}')
                    
                    #use the total delay
                    activity_duration = assess_time + diag_wt + diag_pt
            else:
                activity_duration = assess_time
                
            yield self.env.timeout(activity_duration)
            
            trace(f'completed {self.identifier} at {self.env.now:.3f}; '
                      + f' time in system{self.time_to_nurse:.3f}')
            
                        
    def sample_all(self):
        '''
        Sample assessment time, if the patient requires diagnostics,
        if the diags are done in parallel to assessment and 
        '''

        #sample assessment time
        assess_time = self.assessment_dist.sample()

        #will the patient need diagnostics?
        n_diagnostics = self.n_diagnostics_dist.sample()

        if n_diagnostics > 0:
            #will the diagnostics be in parallel with assessment
            parallel = self.p_diag_parallel.sample()

            #diagnostic waiting time
            diag_wt = self.diag_wait_dist.sample()

            #diagnostic process time
            diag_pt = self.diag_dist.sample()

            return assess_time, n_diagnostics, parallel, diag_wt, diag_pt
        else:
            return assess_time, n_diagnostics, _, _, _
            
            
            
            
            

In [35]:
class MinorInjuryUnit(object):  
    '''
    Model of an minor injury unit
    '''
    def __init__(self, env, args):
        '''
        Contructor
        
        Params:
        -------
        env: simpy.Environment
        
        args: Scenario
            container class for simulation model inputs.
        '''
        self.env = env
        self.args = args 
        
        self.patients = []
            
    def arrivals_generator(self):
        '''
        IAT is exponentially distributed

        Parameters:
        ------
        env: simpy.Environment

        args: Scenario
            Container class for model data inputs
        '''
        for patient_count in itertools.count(start=1):
            inter_arrival_time = self.args.arrival_dist.sample()
            yield env.timeout(inter_arrival_time)

            trace(f'patient {patient_count} arrives at: {env.now:.3f}')

            new_patient = MinorPatient(patient_count, self.env, self.args)
            self.patients.append(new_patient)
            
            env.process(new_patient.assessment())

In [None]:
# Script to run the model

In [46]:
#run length in minutes
RUN_LENGTH = 1440
MEAN_IAT = 5
TRACE = False

#create simpy environment
env = simpy.Environment()
#base case scenario with default parameters
default_args = Scenario()

#create the model
model = MinorInjuryUnit(env, default_args)

#setup the process
env.process(model.arrivals_generator())

env.run(until=RUN_LENGTH)
print(f'end of run. simulation clock time = {env.now}')

end of run. simulation clock time = 1440.0


In [None]:
TRACE = True