In [None]:

# Import libraries

import simpy
import random
import statistics
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

# Set Simulation Global Variables
PASSENGER_INTERVAL = 5
NUM_FLOORS = 10
NUM_ELEVATORS = 4
MAX_CAPACITY = 10   # per elevator capacity
MINUTES_IN_DAY = 24 * 60 # Total simulation time in minutes
SIM_TIME = MINUTES_IN_DAY * 5 # Total simulation time in minutes for 5 days
CHECK_INTERVAL = 1
# event log is used to keep a chronological record of every time a passenger is generated for an elevator 
event_log = []

STRATEGY = None
    # low_ride_time, low_wait_time, sweep, or None
    

In [None]:
def get_arrival_rate(current_time_minutes, floor):
    """
    returns the arrival rate  based on time of day and floor.
    
    arrival patterns:
    Morning (7:30-8:30 AM): High ground floor arrivals 
    Lunch (12:00-1:00 PM): High arrivals from all floors 
    Evening (5:00-6:00 PM): High arrivals from upper floors (people leaving work)
    Night/Off-peak: Low arrivals from all floors 
    """
    
    # convert time to minutes 
    time_of_day = current_time_minutes % 1440
    
    # hard coded arrival rates 
    high_rate = 0.5      # Peak traffic
    medium_rate = 0.2    # Moderate traffic  
    low_rate = 0.05      # Off-peak traffic
    
    # time periods
    morning_start, morning_end = 450, 510      # 7:30-8:30 AM
    lunch_start, lunch_end = 720, 780          # 12:00-1:00 PM  
    evening_start, evening_end = 1020, 1080   # 5:00-6:00 PM
    

    # determine time period and apply floor-specific rates by checking to see which band the current time falls into
    if time_of_day >= 1320 or time_of_day <= 300:
        return 0.01
    
    elif morning_start <= time_of_day <= morning_end:
        if floor == 0:  
            return high_rate
        else:
            return low_rate
            
    elif lunch_start <= time_of_day <= lunch_end:
        if floor == 0:  
            return low_rate  
        else:
            return high_rate  
            
    elif evening_start <= time_of_day <= evening_end:
        if floor == 0:  
            return low_rate
        else:
            return high_rate
            
    else:  
        return medium_rate
    

In [None]:
def get_time_period(current_time_minutes):
    
    time_of_day = current_time_minutes % 1440
    
    if 450 <= time_of_day <= 510:
        return "morning"
    elif 720 <= time_of_day <= 780:
        return "lunch"
    elif 1020 <= time_of_day <= 1080:
        return "evening"
    else:
        return "off_peak"

In [None]:
def generate_destination(pickup_floor, time_period, num_floors):
    """
    generate destinations based on pickup floor and time of day
    """

    if time_period == "morning" and pickup_floor == 0:
        # Morning arrivals at ground floor typically go up
        return random.randint(1, num_floors - 1)
    
    elif time_period in ["lunch", "evening"] and pickup_floor > 0:

        # Lunch/evening departures from upper floors typically go to ground
        if random.random() < 0.8:  # 80% chance to go to ground floor
            return 0
        
        else:
            # 20% chance to go to another floor
            destination = random.randint(0, num_floors - 1)
            while destination == pickup_floor:
                destination = random.randint(0, num_floors - 1)
            return destination
    else:
        # Random destination for other cases
        destination = random.randint(0, num_floors - 1)
        while destination == pickup_floor:
            destination = random.randint(0, num_floors - 1)
        return destination


In [None]:
class Passenger:
    def __init__(self, pid, pickup, destination, start_time):
        self.id = pid
        self.pickup = pickup
        self.destination = destination
        self.start_time = start_time  # Track when passenger enters system
    
    def __repr__(self, color = '\033[97m'):         # changes representation of passenger to be easier to read
        return f"{color}P{self.id}({self.pickup} --> {self.destination})"

In [None]:
class Dispatcher:
    def __init__(self, elevators):
        self.elevators = elevators


    # weight elevator assignment based on distance and number of pickup_requests
    def assign_passenger(self, passenger):
        def elevator_score(e):
            distance = abs(e.floor - passenger.pickup)
            load = len(e.onboard) + sum(len(v) for v in e.pickup_requests.values())
            return distance + 2*load
        best_elevator = min(self.elevators, key = elevator_score)
        

        best_elevator.request_pickup(passenger)

    # def reassign_passenger(self, passenger):
    #     def elevator_score(e):
    #         distance = abs(e.floor - passenger.pickup)
    #         load = len(e.onboard) + sum(len(v) for v in e.pickup_requests.values())
    #         return distance + 2*load
    #     candidates = [e for e in self.elevators if e.floor != passenger.pickup and len(e.onboard) < e.max_capacity]
    #     if candidates:
    #         best_elevator = min(candidates, key = elevator_score)
    #         best_elevator.request_pickup(passenger)
    #     else:
    #         best_elevator = min(self.elevators, key = elevator_score)
    #         best_elevator.request_pickup(passenger)

In [None]:
def time_to_string(minutes):
    """Convert minutes from midnight to HH:MM format"""
    hours = int(minutes // 60) % 24
    mins = int(minutes % 60)
    return f"{hours:02d}:{mins:02d}"

In [None]:

def passenger_generator(env, dispatcher, num_floors=NUM_FLOORS, check_interval=CHECK_INTERVAL):
    """
    Generate passengers using Bernoulli trials.
    
    How it works:
    1. Every 'check_interval' minutes, check each floor
    2. For each floor, get the arrival rate (passengers/minute)
    3. Convert rate to probability: P(arrival) = rate × interval
    4. Use random() < probability to decide if passenger arrives
    5. If yes, create passenger with realistic destination
    """
    pid = 1
    
    while True:
        current_time_str = time_to_string(env.now)
        time_period = get_time_period(env.now)
        
        # Print current rates
        if env.now % 60 == 0:  # Print every hour
            print(f"[{current_time_str}] Time period: {time_period}")
            for floor in range(num_floors):
                rate = get_arrival_rate(env.now, floor)
                prob = rate * check_interval
                print(f"  Floor {floor}: rate={rate:.3f}/min, prob={prob:.3f}")
        
        # Check each floor for potential passenger arrivals
        for floor in range(num_floors):
            arrival_rate = get_arrival_rate(env.now, floor)
            
            # Convert rate to probability for this time interval
            # For small intervals: P(arrival in interval) ≈ rate × interval
            arrival_probability = arrival_rate * check_interval
            
            # Bernoulli trial: does a passenger arrive?
            if random.random() < arrival_probability:
                
                # Generate realistic destination
                destination = generate_destination(floor, time_period, num_floors)
                
                # Create passenger
                start_time = env.now
                passenger = Passenger(pid, floor, destination, start_time)
                
                # Assign to dispatcher
                dispatcher.assign_passenger(passenger)
                
                # Log passenger creation
                print(f"[{current_time_str}] Passenger {pid} appears at floor {floor}, wants to go to {destination} (Period: {time_period})")
                
                event_log.append({'timestamp': env.now, 'floor_number': floor})
                
                pid += 1
        
        # Wait for next check
        yield env.timeout(check_interval)


In [None]:
def low_wait_time_strategy(elevator):
    if elevator.pickup_requests and len(elevator.onboard) < elevator.max_capacity:
        target = elevator.find_longest_waiting_request()
        if target is not None:
            if target > elevator.floor:
                elevator.direction = 'up'
                elevator.floor +=1
            elif target < elevator.floor:
                elevator.direction = 'down'
                elevator.floor -=1
            
    elif elevator.onboard:
        dests = [q.destination for q in elevator.onboard]
        target = dests[0]
        if target > elevator.floor:
            elevator.direction = 'up'
            elevator.floor +=1
        elif target < elevator.floor:
            elevator.direction = 'down'
            elevator.floor -=1
            
    else:
        elevator.log(f"No pickup request, Elevator is idle")

    if elevator.floor == target:
        elevator.log(f"At target floor")
    else:
        elevator.log(f"Moving {elevator.direction} toward floor {target}")

In [None]:
def low_ride_time_strategy(elevator):
    if elevator.onboard:
        dests = [q.destination for q in elevator.onboard]
        target = dests[0]
        
        if target > elevator.floor:
            elevator.direction = 'up'
            elevator.floor +=1
        elif target < elevator.floor:
            elevator.direction = 'down'
            elevator.floor -=1
    else:
        target = elevator.find_longest_waiting_request()
        if target > elevator.floor:
            elevator.direction = 'up'
            elevator.floor +=1
        elif target < elevator.floor:
            elevator.direction = 'down'
            elevator.floor -=1
            
    if elevator.floor == target:
        elevator.log(f"At target floor")
    else:
        elevator.log(f"Moving {elevator.direction} toward floor {target}")

In [None]:
def sweep_strategy(elevator):
    # 3) Move logic based on strategy
    if elevator.strategy == "longest_wait":
        target_floor = elevator.find_longest_waiting_request()

        if target_floor is not None and target_floor != elevator.floor:
            if target_floor > elevator.floor:
                elevator.direction = 'up'
                elevator.floor += 1
            elif target_floor < elevator.floor:
                elevator.direction = 'down'
                elevator.floor -= 1
                elevator.log(f"Moving {elevator.direction} toward floor {target_floor}")
        else:
            # No passengers waiting → continue in current direction
            elevator.log("No pickups pending, continuing current direction")
            if elevator.direction == 'up':
                if elevator.floor < elevator.num_floors - 1:
                    elevator.floor += 1
                else:
                    elevator.direction = 'down'
                    elevator.floor -= 1
            else:
                if elevator.floor > 0:
                    elevator.floor -= 1
                else:
                    elevator.direction = 'up'
                    elevator.floor += 1
    else:
        # Default nearest (up/down sweep)
        if elevator.direction == 'up':
            if elevator.floor < elevator.num_floors - 1:
                elevator.floor += 1
            else:
                elevator.direction = 'down'
                elevator.floor -= 1
        else:
            if elevator.floor > 0:
                elevator.floor -= 1
            else:
                elevator.direction = 'up'
                elevator.floor += 1

    elevator.log(f"Moving {elevator.direction} to floor {elevator.floor}")

In [None]:
class Elevator:
    def __init__(self, env, num_floors, max_capacity, strategy, eid=0):
        self.env = env
        self.id = eid
        self.floor = 0          # Start at ground floor (0)
        self.direction = 'up'   # up or down
        self.num_floors = num_floors # total
        self.max_capacity = max_capacity
        self.pickup_requests = {floor: [] for floor in range(num_floors)}
        self.onboard = []
        self.strategy = strategy
        self.stats = {
            'total_passengers': 0,
            'total_wait_time': 0,
            'total_travel_time': 0,
            'passengers_turned_away': 0,
        }
        self.longest_wait_time = {
            'passenger_id': -1,
            'longest_wait_time': -1,
        }
        self.longest_in_system = {
            'passenger_id': -1,
            'longest_travel_time': -1,
        }

    def log(self, message):
        current_time = time_to_string(self.env.now)
        print(f"    [{current_time}] Elevator {self.id} at floor: {self.floor} | {message}")

    def request_pickup(self, passenger):
        self.pickup_requests[passenger.pickup].append(passenger)
        self.log(f"Passenger {passenger.id} requests a pickup at floor {passenger.pickup}")

    # longest waiting passenger
    def find_longest_waiting_request(self):
        longest_wait_time = -1
        target_floor = None
        for floor, passengers in self.pickup_requests.items():
            if not passengers:
                continue
            for p in passengers:
                wait_time = self.env.now - p.start_time
                if wait_time > longest_wait_time:
                    longest_wait_time = wait_time
                    target_floor = floor
        return target_floor

    # off-load, on-board, logic, 
    def run_elevator(self):
        while True:
            # 1) Off-load passengers at current floor
            offboarding = [p for p in self.onboard if p.destination == self.floor]
            for p in offboarding:
                self.onboard.remove(p)
                travel_time = self.env.now - p.start_time
                self.stats['total_travel_time'] += travel_time
                self.stats['total_passengers'] += 1
                self.log(f"Dropped off {p} (travel time: {travel_time:.1f} min)")
                if travel_time >self.longest_in_system['longest_travel_time']:
                    self.longest_in_system['passenger_id'] = p.id
                    self.longest_in_system['longest_travel_time'] = travel_time


            # 2) On-board passengers at current floor
            boarding = self.pickup_requests[self.floor]
            remaining = []
            for p in boarding:
                if len(self.onboard) < self.max_capacity:
                    self.onboard.append(p)
                    wait_time = self.env.now - p.start_time
                    self.stats['total_wait_time'] += wait_time
                    self.log(f"Picked up {p} (wait time: {wait_time:.1f} min)")
                    if wait_time > self.longest_wait_time['longest_wait_time']:
                        self.longest_wait_time['passenger_id'] = p.id
                        self.longest_wait_time['longest_wait_time'] = wait_time
                else:
                    remaining.append(p)
                    self.stats['passengers_turned_away'] += 1
                    self.log(f"Elevator full, {p} must wait")
                    
                    
                    
            
            self.pickup_requests[self.floor] = remaining

            if not any(self.pickup_requests.values()) and not self.onboard:
                #self.log("idle")
                yield self.env.timeout(1)
                continue
            

            # 3) Logic
            
            # 3a) Move Logic if logic = 'low_ride_time'
            """ If Passengers: Drop off first passenger 
                Else: Go to Requests
                Else: Go to floor 0                         """
            if self.strategy == "low_ride_time":
                low_ride_time_strategy(self)

            #3b)    Low_wait_time
            if self.strategy == "low_wait_time":
                low_wait_time_strategy(self)
                    
            #3c) if logic = "sweep"
            if self.strategy == "sweep":
                sweep_strategy(self)
                
                

# 4) Wait for travel time (e.g. 1 time-unit per floor)
            yield self.env.timeout(.5)

In [None]:
class Elevator_Controller:
    def __init__(self, env, NUM_ELEVATORS, NUM_FLOORS):
        self.env = env
        self.elevators = [Elevator(env, NUM_FLOORS, max_capacity=MAX_CAPACITY, strategy = STRATEGY,  eid=i) for i in range(NUM_ELEVATORS)]
        self.dispatcher = Dispatcher(self.elevators)
        

    def start(self):
        for elevator in self.elevators:
            self.env.process(elevator.run_elevator())

In [None]:
def print_simulation_stats(controller):
    """Print simulation statistics"""
    total_passengers = 0
    total_wait_time = 0.0
    total_travel_time = 0.0
    total_remaining = 0
    print("\n" + "="*50)
    print("SIMULATION STATISTICS")
    strat = " ".join(STRATEGY.split("_"))
    print(f"Elevator Strategy: {strat}")
    print("="*50)
    for i, elevator in enumerate(controller.elevators):
        passengers = elevator.stats['total_passengers']
        wait = elevator.stats['total_wait_time']
        travel = elevator.stats['total_travel_time']
        total_passengers += passengers
        total_wait_time += wait
        total_travel_time += travel

        remaining_onboard = len(elevator.onboard)
        remaining_pickups = sum(len(v) for v in elevator.pickup_requests.values())
        total_remaining += (remaining_onboard + remaining_pickups)

        print(f"Elevator {i+1}:")
        print(f"Passengers Served: {passengers}")
        #print(f"Passengers Turned Away: {elevator.stats['passengers_turned_away']}")
        print(f"Longest Total Time: {elevator.longest_in_system}")
        print(f"Longest Wait Time: {elevator.longest_wait_time}")
        print(f"Average wait time: {wait / passengers:.1f}")
        print(f"Average total time: {travel / passengers:.1f}")
        print("-"*50)
    print(f"Total passengers served: {total_passengers}")
    print(f"Passengers still in system: {total_remaining}")



In [None]:
def collect_run_stats(controller):
    total_passengers = 0
    total_wait_time = 0.0
    total_travel_time = 0.0
    total_turned_away = 0
    total_remaining = 0

    for e in controller.elevators:
        total_passengers += e.stats['total_passengers']
        total_wait_time += e.stats['total_wait_time']
        total_travel_time += e.stats['total_travel_time']
        total_turned_away += e.stats['passengers_turned_away']
        total_remaining += len(e.onboard) + sum(len(v) for v in e.pickup_requests.values())
        
    return {
        "served" : total_passengers,
        "turned_away": total_turned_away,
        "average_wait": total_wait_time / total_passengers,
        "average_travel": total_travel_time / total_passengers,
        "remaining": total_remaining,
    }

In [None]:
# Run the simulation
if __name__ == "__main__":
    NUM_RUNS = 10
    # if STRATEGY == None:
    #     STRATEGY = 'sweep'
    #             # sweep, low_ride_time, low_wait_time
    Compare = pd.DataFrame()
    STRATS = ['sweep', 'low_ride_time', 'low_wait_time']
    for strat in STRATS:
        stats_list = []
        STRATEGY = strat

        for run in range(NUM_RUNS):
            random.seed(run)
            env = simpy.Environment()
            controller = Elevator_Controller(env, NUM_ELEVATORS, NUM_FLOORS)
            controller.start()
            env.process(passenger_generator(env, controller.dispatcher))
            env.run(until = SIM_TIME)

            stats = collect_run_stats(controller)
            stats_list.append(stats)
        avg_served = statistics.mean(s['served'] for s in stats_list)
        avg_turned_away = statistics.mean(s['turned_away'] for s in stats_list)
        avg_wait = statistics.mean(s['average_wait'] for s in stats_list)
        avg_travel_time = statistics.mean(s['average_travel'] for s in stats_list)
        avg_remaining = statistics.mean(s['remaining'] for s in stats_list)
        
        Compare = pd.concat([Compare, pd.DataFrame([{
            'strategy': strat, 
            'avg_served': avg_served,
            'avg_turned_away': avg_turned_away,
            'avg_wait': avg_wait,
            'avg_travel_time': avg_travel_time,
            'avg_remaining': avg_remaining,

        }])], ignore_index = True)

    
    print(f"Starting 24-hour elevator simulation...")
    print(f"Simulation will run for {SIM_TIME} minutes ({SIM_TIME/60:.1f} hours)")
    print("-" * 50)
    print(Compare)
    
    
    
    #print_simulation_stats(controller)

low_ride
    4.8
    7.5

sweep
    

In [None]:
events = pd.DataFrame(event_log)
events['hour'] = (events['timestamp'] // 60).astype(int) % 24

arrival_counts = events.groupby(['floor_number','hour']).size().unstack(fill_value=0)
total_arrivals = events['floor_number'].value_counts().sort_index()
sim_hours = SIM_TIME / 60.0
arrival_rate_per_hour = total_arrivals / sim_hours
print("Arrival counts by elevator and hour:\n", arrival_counts)

rate_table = arrival_rate_per_hour.rename('arrivals_per_hour') \
                                 .to_frame() \
                                 .reset_index() \
                                 .rename(columns={'index':'floor_number'})
print(rate_table)

# plot histogram of arrivals by hour
plt.figure()
plt.hist(events['hour'], bins=range(25), align='left')
plt.xticks(range(24))
plt.xlabel('Hour of Day')
plt.ylabel('Number of Arrivals')
plt.title('Histogram of Arrivals by Hour')
plt.show()

# plot arrivals per hour by floor over whole simulation
plt.figure(figsize=(8, 5))
plt.bar(rate_table['floor_number'].astype(str), rate_table['arrivals_per_hour'])
plt.xlabel('Floor Number')
plt.ylabel('Arrivals per Hour')
plt.title('Average Arrival Rate per Floor')
plt.tight_layout()
plt.show()

