<a href="https://colab.research.google.com/github/eric-pding/RSM8413_Group_Assignments/blob/main/RSM8414.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [6]:
import numpy as np


class EDSimulator:

    def __init__(self, n_server):
        # static
        self.service_time = [4, 3, 2]
        self.n_type = len(self.service_time)
        self.n_server = n_server
        self.n_total_open_room = np.sum(n_server)
        self.unit_cost_staff = np.array((3900, 3000, 1600))
        self.cost_staff = self.n_server * self.unit_cost_staff
        if np.sum(self.cost_staff) > 42000:
            print('Staffing cost violates budget constraint!')
        if self.n_total_open_room > 16:
            print('Number of open rooms violates capacity constraint!')

        # room_type: each entry denotes the type of corresponding room
        self.room_type = np.zeros(self.n_total_open_room)
        idx = 0
        for k in range(self.n_type):
            # Room type conventions:
            # denote numbers of each type of rooms are n_1, n_2, n_3
            # rooms 0 to n_1 - 1 are type 0.
            # rooms n_1 to n_2 + n_1 - 1 are type 1.
            # rooms n_2 + n_1 - 1 to n_3 + n_2 + n_1 - 1 are type 2.
            self.room_type[idx: idx + self.n_server[k]] = k
            idx += self.n_server[k]

        # dynamic
        # queue: each entry denotes number of waiting patients (with type 0, 1, 2) in the beginning of every hour.
        self.queue = np.zeros(self.n_type)
        # residual_service_time: each entry denotes remaining service time of a room.
        self.residual_service_time = np.zeros(self.n_total_open_room)
        # in_service_patient: each entry denotes type of in-service patient of a room; -1 means idle (or room is not
        # open).
        self.in_service_patient = - np.ones(self.n_total_open_room)
        # n_arrival: each row denotes number of new arrivals from three types of an hour.
        self.n_arrival = np.zeros((24, self.n_type))

        # track performance metrics
        # n_lwbs: number of patients who Leave Without Being Seen (LBWS)
        self.n_lwbs = np.zeros(self.n_type)
        self.neglect_harm = np.zeros(self.n_type)
        # wait_count: count the number of hours for each type of customers waiting in queue.
        self.wait_count = np.zeros(self.n_type)
        # service_complete_count: count the number of customers that have finished service.
        self.service_complete_count = np.zeros(self.n_type)
        # server_busy_count: count the number of busy hours for each type of rooms.
        self.server_busy_count = np.zeros(self.n_total_open_room)

        self.unit_revenue_service = [1000, 600, 250]
        self.unit_cost_wait = [250, 100, 25]
        self.unit_cost_lwbs = 200
        self.unit_cost_neglect = 10000

        self.arrival_rate = np.array([
            np.repeat([2, 0.5, 1, 3, 4, 4, 3.5, 3], 3) / 3,
            np.repeat([4.5, 2, 2, 4, 6, 7, 7, 5.5], 3) / 3,
            np.repeat([1, 1, 5, 9, 10, 7, 6, 2], 3) / 3,
        ])
        self.arrival_rate = self.arrival_rate.T

    def generate_arrival(self):
        matrix = np.zeros((24, self.n_type))
        for t in range(24):
            for k in range(self.n_type):
                matrix[t, k] = np.random.poisson(self.arrival_rate[t, k])
        return matrix

    def assignment_patient_to_server(self, t, policy):
        """
        assign every patient in the queue according to policy 1, 2:
        either (1) allocate them to certain room or (2) keep them waiting in queue.
        :param t:
        :return:
        """
        queue_copy = self.queue.copy()
        for k in range(self.n_type):
            for i in range(int(queue_copy[k])):
              if policy == 1 :
                # policy 1: only primary assignments: A to A, B to B, C to C; otherwise waiting
                available_servers = (self.residual_service_time == 0) & ((self.room_type == k))
                # print(available_servers, np.where(available_servers == True)[0])
              elif policy == 2:
                # policy 2: assign patients to rooms whenever applicable
                available_servers = (self.residual_service_time == 0) & ((self.room_type >= 0) & (self.room_type <= k))

                if np.sum(available_servers) > 0:
                    # take the minimum for tie breaking among the available rooms with same type
                    server_to_assign = np.min(np.where(available_servers == True)[0])
                    self.residual_service_time[server_to_assign] = self.service_time[k]
                    self.in_service_patient[server_to_assign] = k
                    self.queue[k] -= 1

                    # if we can finish the service before the last hour, count one patient as completed.
                    if t + self.service_time[k] <= 23:
                        self.service_complete_count[k] += 1

                    print(
                        'A type {} customer is assigned to a room {}'.format(k, int(self.room_type[server_to_assign])))

    def simulate_hour(self, t, policy):
        """
        simulate the dynamics in hour t.
        :param t:
        :return:
        """
        self.assignment_patient_to_server(t,policy)
        # patient arrives to the queue
        print('Pre-arrival queue length', self.queue)
        self.queue += self.n_arrival[t]
        print('After arrival, pre-assignment queue length', self.queue)
        # assign patient to available servers
        self.assignment_patient_to_server(t,policy)
        print('Post-assignment queue length', self.queue)
        self.wait_count += self.queue
        # if residual service time is positive, then server is indicated as busy in hour t.
        self.server_busy_count += (self.residual_service_time > 0)

        # provide care and advance patients
        print('Rooms that are going to be released', np.where(self.residual_service_time == 1)[0])
        print('Number of to-be released rooms by each type', [
            np.sum((self.residual_service_time == 1) & (self.room_type == k)) for k in range(self.n_type)
        ])
        self.residual_service_time = np.maximum(self.residual_service_time - 1, 0)
        self.in_service_patient[self.residual_service_time == 0] = -1

        # crowding-related events
        for k in range(self.n_type):
            if k == 0:
                # patient harmed from neglect
                n_harm = np.random.binomial(self.queue[k], 1 / 20)
                self.neglect_harm[k] += n_harm
            elif k in [1, 2]:
                # LWBS
                n_lwbs = np.random.binomial(self.queue[k], 1 / 20)
                self.n_lwbs[k] += n_lwbs
                self.queue[k] -= n_lwbs
                if n_lwbs > 0:
                    print('{} type {} patients left without being seen.'.format(n_lwbs, k))

    def initialize_day(self):
        self.queue = np.zeros(self.n_type)
        self.residual_service_time = np.zeros(self.n_total_open_room)
        self.in_service_patient = - np.ones(self.n_total_open_room)
        self.n_arrival = self.generate_arrival()
        self.n_lwbs = np.zeros(self.n_type)
        self.neglect_harm = np.zeros(self.n_type)
        self.wait_count = np.zeros(self.n_type)
        self.service_complete_count = np.zeros(self.n_type)
        self.server_busy_count = np.zeros(self.n_total_open_room)

    def collect_performance_metric(self):
        """
        track the performance metrics shown from Excel
        :return:
        """
        utilization_by_type = np.zeros(self.n_type)
        for k in range(self.n_type):
            utilization_by_type[k] = np.sum(self.server_busy_count[self.room_type == k]) / (
                    24 * np.maximum(np.sum(self.room_type == k), 1))
        utilization_overall = np.sum(self.server_busy_count) / (24 * self.n_total_open_room)
        revenue = np.dot(self.service_complete_count, self.unit_revenue_service)
        cost_wait = self.wait_count * self.unit_cost_wait
        cost_lwbs = np.sum(self.n_lwbs) * self.unit_cost_lwbs
        cost_neglect = np.sum(self.neglect_harm) * self.unit_cost_neglect
        cost_staff = self.cost_staff
        profit = np.sum(revenue) - (np.sum(cost_wait) + cost_lwbs + cost_neglect + np.sum(cost_staff))
        return utilization_by_type, utilization_overall, revenue, cost_wait, cost_lwbs, cost_neglect, cost_staff, profit

    def simulate_day(self,policy):
        # initialize the system at the beginning
        self.initialize_day()
        for t in range(24):
            self.simulate_hour(t,policy)
        performance = self.collect_performance_metric()
        return performance

In [5]:
def run_sample():
    # this is a test example.

    # we initialize the simulator with following staffing schedule:
    # - 4 type 0 rooms.
    # - 6 type 1 rooms.
    # - 5 type 2 rooms.
    simulator = EDSimulator([4, 6, 5])

    ########################## part 1: test under fixed arrival patterns ##########################
    # we fix the arrival pattern of a day.
    # - Hour 0: 0 arrival for type 0, 0 arrival for type 1, 2 arrivals for type 2.
    # - Hour 1: 0 arrival for type 0, 6 arrivals for type 1, 0 arrival for type 2.
    # - Hour 2: 1 arrival for type 0, 0 arrival for type 1, 0 arrival for type 2.
    # - all remaining hours do not have arrivals.
    # REMARK: this is only for testing purpose and you do NOT need to specify the arrival patterns by yourself
    # in the assignment. You can directly recall simulate_day for simulating daily performance as shown in part 2.
    simulator.n_arrival = np.zeros((24, 3))
    simulator.n_arrival[0] = [0, 0, 2]
    simulator.n_arrival[1] = [0, 6, 0]
    simulator.n_arrival[2] = [1, 0, 0]

    # simulate the dynamics within a day given the arrival defined above.
    for t in range(24):
        print('*' * 20 + ' Hour {} '.format(t) + '*' * 20)
        simulator.simulate_hour(t)
        # input()
    print("*************end of test*************")

    ########################## part 2: test under true arrival patterns ##########################
    # test utilization & waiting cost trade-off.
    # collect the utilization and waiting cost for 1 day of operations.
    np.random.seed(10)
    utilization_list, cost_wait_list, profit_list = [], [], []
    for d in range(1):
        # method of simulate_day simulates all performance metrics within a day.
        utilization_by_type, utilization_overall, revenue, cost_wait, cost_lwbs, cost_neglect, cost_staff, profit = \
            simulator.simulate_day()
        utilization_list.append(utilization_overall)
        cost_wait_list.append(cost_wait)
        profit_list.append(profit)
    # report the average utilization and waiting cost.
    print(np.mean(utilization_list), np.mean(cost_wait_list), np.mean(profit_list))


if __name__ == '__main__':
    run_sample()

******************** Hour 0 ********************


TypeError: ignored

# New Section