# simulating the stroke unit

In [1]:
import numpy as np
import pandas as pd
import random 
import itertools
import math
import matplotlib.pyplot as plt

In [2]:
import simpy
simpy.__version__

'4.1.1'

# distribution classes

distribution classes used
- exponential for IAT (inter arrival time)
- lognormal for length of stay

In [3]:
class Exponential:
    '''
    Convenience class for the exponential distribution.
    packages up distribution parameters, seed and random generator.
    '''
    def __init__(self, mean, random_seed=None):
        '''
        Constructor
        
        Params:
        ------
        mean: float
            The mean of the exponential distribution
        
        random_seed: int, optional (default=None)
            A random seed to reproduce samples.  If set to none then a unique
            sample is created.
        '''
        self.rand = np.random.default_rng(seed=random_seed)
        self.mean = mean
        
    def sample(self, size=None):
        '''
        Generate a sample from the exponential distribution
        
        Params:
        -------
        size: int, optional (default=None)
            the number of samples to return.  If size=None then a single
            sample is returned.
        '''
        return self.rand.exponential(self.mean, size=size)

class Lognormal:
    """
    Encapsulates a lognormal distirbution
    """
    def __init__(self, mean, stdev, random_seed=None):
        """
        Params:
        -------
        mean = mean of the lognormal distribution
        stdev = standard dev of the lognormal distribution
        """
        self.rand = np.random.default_rng(seed=random_seed)
        mu, sigma = self.normal_moments_from_lognormal(mean, stdev**2)
        self.mu = mu
        self.sigma = sigma
        
    def normal_moments_from_lognormal(self, m, v):
        '''
        Returns mu and sigma of normal distribution
        underlying a lognormal with mean m and variance v
        source: https://blogs.sas.com/content/iml/2014/06/04/simulate-lognormal
        -data-with-specified-mean-and-variance.html

        Params:
        -------
        m = mean of lognormal distribution
        v = variance of lognormal distribution
                
        Returns:
        -------
        (float, float)
        '''
        phi = math.sqrt(v + m**2)
        mu = math.log(m**2/phi)
        sigma = math.sqrt(math.log(phi**2/m**2))
        return mu, sigma
        
    def sample(self):
        """
        Sample from the normal distribution
        """
        return self.rand.lognormal(self.mu, self.sigma)

class Bernoulli:
    '''
    Convenience class for the Bernoulli distribution.
    packages up distribution parameters, seed and random generator.
    '''
    def __init__(self, p, random_seed=None):
        '''
        Constructor
        
        Params:
        ------
        p: float
            probability of drawing a 1
        
        random_seed: int, optional (default=None)
            A random seed to reproduce samples.  If set to none then a unique
            sample is created.
        '''
        self.rand = np.random.default_rng(seed=random_seed)
        self.p = p
        
    def sample(self, size=None):
        '''
        Generate a sample from the exponential distribution
        
        Params:
        -------
        size: int, optional (default=None)
            the number of samples to return.  If size=None then a single
            sample is returned.
        '''
        return self.rand.binomial(n=1, p=self.p, size=size)


# utility function 

In [4]:
def trace(msg):
    '''
    Utility function for printing simulation
    set the TRACE constant to FALSE to 
    turn tracing off.
    
    Params:
    -------
    msg: str
        string to print to screen.
    '''
    if TRACE:
        print(msg)

# model perameters

In [5]:
# resource counts
N_BEDS = 10

# time between arrivals in days (exponential)
MEAN_IAT_STROKE = 1.2
MEAN_IAT_TIA = 9.3
MEAN_IAT_CN = 3.6
MEAN_IAT_OTHER = 3.2

# acute stroke unit length of stay in days (lognormal)
STAY_MEAN = 7.4
STAY_STD = 8.6

# transfer (bernoulli)
PROB_ESD = 0.13

# SEEDS to reproduce results of a single run
REPRODUCIBLE_RUN = True

if REPRODUCIBLE_RUN:
    SEEDS = [42, 101, 1066, 1966, 2013, 999, 1444, 2016]
else:
    SEEDS = [None, None, None, None, None, None, None, None]

In [6]:
class Scenario:
    '''
    Parameter container class for acute stroke unit model.
    '''
    def __init__(self, name=None):
        '''
        The init method sets up our defaults. 
        
        Params:
        -------
        
        name - str or None
            optional name for scenario
        '''
        # optional name
        self.name = name
        
        # beds
        self.unit_beds = N_BEDS
        
        # inter-arrival distribution for different patient types
        self.arrival_dist = {
            "Stroke": Exponential(MEAN_IAT_STROKE, random_seed=SEEDS[0]),
            "TIA": Exponential(MEAN_IAT_TIA, random_seed=SEEDS[1]),
            "ComplexNeuro": Exponential(MEAN_IAT_CN, random_seed=SEEDS[2]),
            "Other": Exponential(MEAN_IAT_OTHER, random_seed=SEEDS[3]),
        }

        # assessment distribution
        self.length_of_stay_dist = Lognormal(STAY_MEAN, STAY_STD, 
                                         random_seed=SEEDS[4])
        
        # ESD transfer: prob that patient goes to ESD.
        self.esd_transfer = Bernoulli(PROB_ESD, random_seed=SEEDS[5])
    

# build the model 

## model the unit

In [7]:
class AcuteStrokeUnit:  
    '''
    Model of an acute stroke unit
    '''
    def __init__(self, env, args):
        '''
        Contructor
        
        Params:
        -------
        env: simpy.Environment
        
        args: Scenario
            container class for simulation model inputs.
        '''
        self.env = env
        self.args = args 
        self.init_model_resources(args)
        self.patients = []
        
        
    def init_model_resources(self, args):
        '''
        Setup the simpy resource objects
        
        Params:
        ------
        args - Scenario
            Simulation Parameter Container
        '''
        args.unit_beds = simpy.Resource(self.env, 
                                          capacity=args.unit_beds)   
            
    def arrivals_generator(self):
        '''
        IAT is exponentially distributed

        Parameters:
        ------
        env: simpy.Environment

        args: Scenario
            Container class for model data inputs
        '''
        for patient_count in itertools.count(start=1):
            
            patient_type = random.choice(list(self.args.arrival_dist.keys()))
            
            # Get the Exponential distribution object for the selected patient type
            arrival_dist = self.args.arrival_dist[patient_type]

            inter_arrival_time = arrival_dist.sample()
            
            yield self.env.timeout(inter_arrival_time)

            trace(f'Patient {patient_count} ({patient_type}) arrives at: {self.env.now:.3f}')

            # create a new minor patient and pass in env and args
            new_patient = AcutePatient(patient_count, self.env, self.args, patient_type)
            
            # keep a record of the patient for results calculation
            self.patients.append(new_patient)
            
            # init the minor injury process for this patient
            self.env.process(new_patient.assessment())

## model the patient

In [11]:
class AcutePatient:
    '''
    Patient in the minor ED process
    '''
    def __init__(self, identifier, env, args, patient_type):
        '''
        Constructor method
        
        Params:
        -----
        identifier: int
            a numeric identifier for the patient.
            
        env: simpy.Environment
            the simulation environment
            
        args: Scenario
            The input data for the scenario
        '''
        # patient id and environment
        self.identifier = identifier
        self.env = env
        self.patient_type = patient_type  # Track patient category

        # stroke unit parameters
        self.unit_beds = args.unit_beds
        self.length_of_stay_dist = args.length_of_stay_dist
        
        # esd transfer: prob that patient is transfered to ESD.
        self.esd_transfer = args.esd_transfer
                
        # individual patient metrics
        self.time_to_bed = 0.0
        self.time_to_esd = 0.0
        self.time_in_system = 0.0
        self.four_hour_target = 0.0
    
    def assessment(self):
        '''
        simulates the process for acute stroke unit
        
        1.
        2. 
        3.
        4. 
        
        '''
        # record the time that patient entered the system
        arrival_time = self.env.now

        # request a bed 
        with self.unit_beds.request() as req:
            yield req
                        
            trace(f'Patient {self.identifier} ({self.patient_type}) gets a bed at {self.env.now:.3f}')
            
            # time to acute unit bed
            self.time_to_bed = self.env.now - arrival_time
            
            # Simulate length of stay
            length_of_stay = self.length_of_stay_dist.sample()
            
            yield self.env.timeout(length_of_stay)
            
            self.time_in_system = self.env.now - arrival_time  # Total time in system
           
            trace(f'Patient {self.identifier} departs at {self.env.now:.3f}; '
                  f'Time in system: {self.time_in_system:.3f} minutes')

            
            # Check if patient met the 4-hour target
            if self.time_in_system <= (4 * 60):  # 4 hours = 240 minutes
                self.four_hour_target = 1  # Patient met the target
            else:
                self.four_hour_target = 0  # Patient exceeded the target
            
            # Check for ESD transfer
            if self.esd_transfer.sample():
                self.time_to_esd = self.env.now
                trace(f'Patient {self.identifier} transferred to ESD at {self.env.now:.3f}')

            trace(f'Patient {self.identifier} discharged at {self.env.now:.3f}')

# script to run the model 

In [13]:
# run length in minutes
RUN_LENGTH = RUN_LENGTH = 1445

# Turn off tracing
TRACE = False

# create simpy environment
env = simpy.Environment()

# base case scenario with default parameters
default_args = Scenario()

# create the model
model = AcuteStrokeUnit(env, default_args)

# setup the process
env.process(model.arrivals_generator())

env.run(until=RUN_LENGTH)
print(f'end of run. simulation clock time = {env.now}')

# 1. Mean time in system
mean_time_in_system = np.array([patient.time_in_system 
                                for patient in model.patients]).mean()

# 2. Proportion of patients who met the 4-hour target
four_hours = np.array([patient.four_hour_target 
                       for patient in model.patients]).sum() / len(model.patients)

# 3. Mean time to bed (from arrival)
mean_time_to_bed = np.array([patient.time_to_bed 
                             for patient in model.patients]).mean()

# 4. Mean time to ESD transfer (if applicable)
# We'll calculate the average time from arrival to ESD transfer, if the patient was transferred
patients_with_esd = [patient for patient in model.patients if patient.time_to_esd > 0]
mean_time_to_esd = np.array([patient.time_to_esd - patient.time_to_bed
                             for patient in patients_with_esd]).mean() if patients_with_esd else 0

# Print the results
print('\nSingle run results\n------------------')
print(f'Mean Time in System (days): {mean_time_in_system:.2f}')
print(f'Mean Time to Bed (days): {mean_time_to_bed:.2f}')
print(f'Mean Time to ESD (days): {mean_time_to_esd:.2f}' if patients_with_esd else "No ESD transfers")
print(f'Proportion Discharged Before 4 Hours: {four_hours:.2f}')


end of run. simulation clock time = 1445

Single run results
------------------
Mean Time in System (minutes): 6.93
Mean Time to Bed (minutes): 0.00
Mean Time to ESD (minutes): 782.83
Proportion Discharged Before 4 Hours: 0.98


In [None]:
# check between minutes and days 