In [2]:
# imports 

import numpy as np
import heapq
import math
import scipy as sp
from matplotlib import pyplot
from typing import NamedTuple

In [2]:
#################### TODO ####################

# We can copy and paste the main code and change policies and metrics

# Simple model is done so test different metrics
# Work on more "realisitc" model

In [3]:
# Elevator structure

# clients    : list where index j is list of arrival times that want to go to floor j
# path       : lists of pairs (floor number, time of arrival to floor)
# floor      : the current floor
# direction  : 1 for up, 0 for stationary, -1 for down
# size       : amount of people inside
class Elevator(NamedTuple):
    clients: list
    path: list
    floor: int
    direction: int
    size: int
    
# Floor structure

# clients : list where index j is amount of people want to go to floor j
# button  : list of pairs (floor, time of press, direction)
# times   : pair of last times that elevator came to service down/up
# number  : the floor number
class Floor(NamedTuple):
    clients: list
    times: list
    number: int

In [None]:
# Scheduling policy example

# fastest elevator policy
# the quickest elevator 

# Either
# an elvator that has same direction as button
# Or
# a dormant elevator closest

# if tie current "winner" is kept

# what if we alrady have an eelvator going to that floor?
# and it happens to be the earliest?
# then we shouldn't change anything
# otherwise we can proceed as planned

def fastest_elevator(elevators, time, floor, direction, e_clocks)
    # The algorithm below "streams" in the elevators and keeps a running "best" one
    c_e_arrival_t = -1    # the time of arrival to rung floor by fastest elevator
    e_ix = -1             # index of elevator to have path modified
    path_ix = -1          # the index to insert into the path
    for i, elevator in enumerate(elevators):

        if elevator.direction == direction: # moving
            # Search through its path to see where to augment
            for j, [e_floor, e_finish_t] in enumerate(elevator.path):
                if (direction == 1 and 0 <= floor - e_floor)
                # uninitalized or faster to set values
                if e_ix == -1 or e_finish_t + np.abs(e_floor - floor) < c_e_arrival_t:
                    c_e_arrival_t = e_finish_t + np.abs(e_floor - floor)
                    e_ix = i
                    path_ix = j

        elif elevator.direction == 0: # dormant
            # unitialized or closer to set values
            if e_ix == -1 or np.abs(elevator.floor - floor) < c_e_arrival_t:
                c_e_arrival_t = np.abs(elevator.floor - floor)
                e_ix = i
                path_ix = 0

    # variables now store "closest" path information so we augment the relevant path
    elevators[i].path.insert(j + 1, (floor, c_e_arrival_t))
    if elevators[i].direction == 0:
        elevators[i].direction = 1 if floor - elevators[i].floor >= 0 else -1

In [None]:
# Loading policy example
def simple_load(elevator, lambdas, floor, interval):
    # Unload people
    elevator.capcity -= elevator.clients[floor.number]
    elevator.clients[floor.number] = 0
    # Load past arrivals then new arrivals
    if elevator.direction == 1:
        # Past arrivals
        for j, clients in enumerate(elevators[i].clients[floor:]):
            if elevators[i].size >= capacity:
                break
            elevators[i].clients[j + floor] += clients
            elevators[i].size += clients
                # New arrivals 
                for arrival in range((time - floors[floor].times[0]) * (total[floor] - down[floor])):

            # Symmetric
            else:
                for j, clients in enumerate(elevators[i].clients[:floor]):
                    if elevators[i].size >= capacity:
                        break
                    elevators[i].clients[j] += clients
                    elevators[i].size += clients


In [3]:
#################### PARAMETERS ####################

# m         : the number of floors in the building
# n         : the number of elevators 
# lambdas   : m * m array; i,j is the rate of arrivals on floor i who want to go to floor j
# capacity  : max capacity of an elevator
# h         : length of simulation
# SCHEDULER : given list of elevator structures, button press, elevator clocks
#             augments an elevator structure and clocks
# LOADER    : given a floor and elevator stuctures
#             updates floor and elevator structures after unload and load

#################### SIMULATION DESIGN ####################

# Elevators have predetermined paths
# On button press, scheduler decides which paths change
# When elevator reaches a floor people get off 
# Arrivals are generated retrospectively with a queue to store excess

# Button timings are events on clock and so are elevator arrivals

#################### NOTES ####################

# Floors are labeled 0 ... m - 1
# Assume time to travel between floors is constant
# We'll assume loading happens in 0 time i.e. instantly

# All 2(m-1) buttons are initialized to begin 
# then buttons are pressed after elevator leaves the floor

#################### DATA STRUCTURES ####################

# down  : tuple where index j is rate of clients of floor j going down
# total : tuple where index j is sum of all rates

# elevators : list of elevator structures
# floors    : list of floor structures

#################### Dictionary ####################
# c  |-> closest
# e  |-> elevator
# ix |-> index
# t  |-> time
# b  |-> button

def simple_sim(m: int, n: int, lambdas: tuple[tuple[float]], capacity: int, h: int, SCHEDULER, LOADER):

    # Initialization 
    rng       = np.random.default_rng()
    t         = 0  # total elapsed time
    e_clocks  = [] # clocks for elevator arrivals
    b_clocks  = [] # clocks for the button presses
    floors    = list( Floor([], []) for _ in range(m) )
    elevators = list( Elevator(np.zeros(m), [], 0) for _ in range(n) )
    down      = tuple( sum(lambdas[j][:j]) for j in range(m) )
    total     = tuple( sum(lambdas[j]) for j in range(m) )
    # cumulative sum buckets for retrospective

    # initialize b_clocks with both up and down presses
    for i in range(1, m-1):
        first = [i, rng.exponential(1/total[i])]
        second = [i]
        if rng.uniform(0, 1) <= down[i] / total[i]:
            first.append(-1)
            second.append(first[0] + rng.exponential(1/(total[i] - down[i])))
            second.append(1)
            floors[i].times.append(first[1])
            floors[i].times.append(second[1])
        else:
            first.append(1)
            second.append(first[0] + rng.exponential(1/down[i]))
            second.append(-1)
            floors[i].times.append(second[1])
            floors[i].times.append(first[1])
        b_clocks.append(tuple(first))
        b_clocks.append(tuple(second))
    # add first and last floor
    b_clocks.append((0, rng.exponential(1/total[0]), 1))
    b_clocks.append((m - 1, rng.exponential(1/total[-1]), -1))
    heapq.heapify(e_clocks)

    # Finite horizon
    while(t <= h):

        if e_clocks == [] or b_clocks[0][1] <= e_clocks[0][1]:
        # SCHEDULER SECTION
            time, floor, direction = heapq.heappop(b_clocks)
            t = time
            SCHEDULER(elevators, time, floor, direction, e_clocks)
            # update the e_clocks


        else:
        # LOADING SECTION
            elevator = heapq.heapop(e_clocks) # figure this out
            LOADER(elevator, floor, down, total)
        # if people left waiting immediatly add clock
            heapq.heappush(b_clocks, (t, floor_numer, direction))
        # else generate the next time and add that 
            
    return # performance metrics

SyntaxError: invalid syntax (196298251.py, line 122)

In [None]:
# takes 2D matrix of lamda_i,j and returns a tuple containing lambda_i,down and lambda_i,up for floor i
def lambda_direction(i: int, lambdas):
    lambda_down = 0
    lambda_up = 0
    for dest in range (i):
        lambda_down += lambdas[i][dest]
    
    for dest in range (i+1, m):
        lambda_up += lambdas[i][dest]
    
    return lambda_down, lambda_up
    

In [None]:
#tracking the clocks of button change of every floor
# lambdas is m x m matrix of lambda_i,j where i is the floor of origin and j is the destination

def DES_engine(m: int, lambda_param, h: int, lambdas):
    rng = np.random.default_rng() # rv generator
    t = 0.0
    n = 0
    generate_tuples = lambda m: [(0, 0) for _ in range(m)]
    generate_clocks = lambda m: [rng.exponential(1/lambda_param) for _ in range(m)]
    floor_states = generate_tuples(m) #initial state of every floor is (0,0)
    button_arrival = generate_clocks(m) #generates arrival times for each button on each floor, maybe have to make it a tuple for up and down ?
    
    SOF = [button_arrival, floor_states, q_up, q_down]
    
    while (t<h):
        next = min(button_arrival)
        tau = next[0]
        floor = button_arrival.index(tau)
        for i in range (m):
            
            # queues for floor i
            q_down = 0
            q_up = 0

            # generate buckets for retrospective up/down on floor i
            lambda_i = lambda_direction(i, lambdas) # stores the tuple of lambda_down and lambda_up
            p_down = lambda_i[0]/sum(lambda_i) # probability of going down

            if rng.uniform(0,1) <= p_down:
                q_down += 1
            else:
                q_up += 1
            
            # generate buckets for retrospective destination for each person in queue
            # this part requires an elevator list where elevator[j] is number of people going to floor j
            j_down = []
            j_up = []
            for j in range (0, m):
                p_ij = lambdas[i][j]/lambda_i[0] # probability of going from floor i to floor j
                j_down.append(p_ij + sum(j_down))
            
            for j in range (0, m):
                p_ij = lambdas[i][j]/lambda_i[1] # probability of going from floor i to floor j
                j_up.append(p_ij + sum(j_up))

            for q in range (q_down):
                u = rng.uniform(0,1)
                for j in j_down:
                    if u <= j_down[j]:
                        elevator[j] += 1 # one person added to elevator going to floor j (j less than i)
                        break
            
            for q in range (q_up):
                u = rng.uniform(0,1)
                for j in j_up:
                    if u <= j_up[j]:
                        elevator[i+j] += 1 # one person added to elevator going to floor i+j (j less than i)
                        break

            
                