<a href="https://colab.research.google.com/github/ali-vosoughi/CausalClimate/blob/main/MyElevator.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
#from datetime import datetime
from time import sleep
from IPython.display import clear_output

UP = 'UP'
DOWN = 'DOWN'
IDLE = 'IDLE'
REACHED = 'OPEN DOOR'


class Request:
  def __init__(self, timestamp, direction = None, target_floor = None):
    self.time = timestamp
    self.target_floor = target_floor
    self.direction = direction

  def __lt__(self, obj):
        return ((self.target_floor) < (obj.target_floor))
  
  def __gt__(self, obj):
      return ((self.target_floor) > (obj.target_floor))

  def __le__(self, obj):
      return ((self.target_floor) <= (obj.target_floor))

  def __ge__(self, obj):
      return ((self.target_floor) >= (obj.target_floor))

  def __eq__(self, obj):
      return (self.target_floor == obj.btarget_floor)
  
  def __repr__(self):
      return str((self.target_floor, self.direction))


class Elevator:
  def __init__(self, num_floors = 10):
    self.num_floors = num_floors
    self.curr_floor = 1
    self.state = IDLE    # UP, DOWN, IDLE, REACHED
    self.curr_target = None
    self.serving_requests = []
    self.queued_requests = []
    self.time_stamp = 0
    self.quit = 0

  def get_floor(self):
    return self.curr_floor

  def move_up(self):        # move 1 floor up
    if self.curr_floor < self.num_floors:
      self.curr_floor += 1
      self.state = UP

  def move_down(self):      # move 1 floor down
    if self.curr_floor > 1:
      self.curr_floor -= 1
      self.state = DOWN

  def move_towards_target(self):
    if self.curr_floor == self.curr_target.target_floor:
      return True
    elif self.curr_floor < self.curr_target.target_floor:
      self.move_up()
      return False
    else:
      self.move_down()
      return False

  # Handle Requests
  def add_request_from_out(self, floor, travel_direction):
    if floor == self.curr_floor:
      pass
    request = Request(self.time_stamp, target_floor = floor, direction = travel_direction)
    self.time_stamp += 1
    self.process_new_request(request)

  def add_request_from_in(self, target_floor):
    if target_floor == self.curr_floor:
      pass
    else:
      request = Request(self.time_stamp, target_floor = target_floor)
      self.time_stamp += 1
      self.process_new_request(request)
  
  def process_new_request(self, request):
    
    moving_direction = self.check_travel_direction(self.curr_floor, request.target_floor)

    if self.state == IDLE:
      self.serving_requests.append(request)

    elif self.state == moving_direction:
      if self.check_on_route(request.target_floor):
        if request.direction != None and request.direction != self.state:
          self.queued_requests.append(request)
        else:
          self.serving_requests.append(request)
      else:
        self.queued_requests.append(request)

    else:
      self.queued_requests.append(request)
      
  def check_on_route(self,floor):
    if self.state == UP and floor >= self.curr_floor:
      return True
    elif self.state == DOWN and floor <= self.curr_floor:
      return True
    return False

  def check_travel_direction(self, from_floor, to_floor):
    if to_floor > from_floor:
      return UP
    else:
      return DOWN
  
  def run(self):
    while not self.quit:
      #self.print_state()
      self.draw_elevator()
      self.serve_requests()
      sleep(2)
      
  
  def stop(self):
    self.quit = 1

  def print_state(self):
    print('\r#Elevator at floor {}, state = {}#'.format(self.curr_floor, self.state),end='')

  def serve_requests(self):
    if len(self.serving_requests)>0:
      self.serving_requests = sorted(self.serving_requests, reverse = (self.state == DOWN))
      self.curr_target = self.serving_requests[0]
      reached_target_floor = self.move_towards_target()

      if reached_target_floor:
        self.state = REACHED
        #self.reached_target()
        self.serving_requests.pop(0)
        self.curr_target = None

    elif len(self.queued_requests)>0:
      self.renew_serving_requests()

    else:
      self.state = IDLE

  

  def reached_target(self):
    print('\rElevator arrived at floor {}, request {} served.'.format(self.curr_floor, self.curr_target),end='')

  def renew_serving_requests(self):
    r = self.queued_requests.pop(0)
    self.serving_requests.append(r)
    direction = self.check_travel_direction(self.curr_floor, r.target_floor)
    for i in range(len(self.queued_requests)):
      if direction == self.check_travel_direction(self.curr_floor,self.queued_requests[i].target_floor):
        self.serving_requests.append(self.queued_requests.pop(i))
  
  def draw_elevator(self):
    
    clear_output(wait=True)
    print(self.serving_requests)
    print(self.queued_requests)
    
    if self.state == UP:
      print('going up!')
    elif self.state == DOWN:
      print('going down!')
    elif self.state == IDLE:
      print('IDLE!')
    elif self.state == REACHED:
      print('DOOR OPEN!')

    for i in range(self.num_floors):
      if i != self.num_floors-self.curr_floor:
        print('{}'.format(self.num_floors-i))
        #print()
        print('   -----')
      else:
        if self.state != REACHED:
          print('{}  |XXX|'.format(self.num_floors-i))
        else:
          print('{}  |   |'.format(self.num_floors-i))
        print('   -----')

    

In [None]:
from multiprocessing import Process
from threading import Thread

num_floors = 10

def validate_floor(n, num_floors,loc_):
  if loc_ == 'out':
    if n < 1 or n > num_floors:
      return False
    return True
  elif loc_ == 'in':
    if n >= 1 and n<= num_floors:
      return True
    else:
      return False

def validate_direction_cmd(dir):
  if dir == "up" or dir == "down":
    return True
  else:
    return False

def validate_direction(n, num_floors, dir):
  if n == 1 and dir == "down":
    return False
  elif n == num_floors and dir == "up":
    return False
  else:
    return True

def parse_command(cmds, elevator):
  cmds_list = cmds.split(',')
  print(cmds_list)
  for cmd in cmds_list:
    command_list = cmd.strip().split(' ')
    if command_list[0] == 'out':
      if len(command_list) < 3:
        print('Invalid command!')
      else:
        floor = int(command_list[1])
        travel_direction = command_list[2]
        
        if validate_floor( floor, num_floors, 'out') and validate_direction_cmd(travel_direction) and validate_direction(floor, num_floors, travel_direction):
          elevator.add_request_from_out(floor=floor, travel_direction=travel_direction)
        else:
          print('Invalid command!')

    elif command_list[0] == 'in':
      if len(command_list) < 2:
        print('Invalid command!')
      else:
        floor = int(command_list[1])
        if validate_floor( floor, num_floors, 'in'):
          elevator.add_request_from_in(floor)
        else:
          print('Invalid command!')

    elif command_list[0] == 'quit':
      return 0
    else:
      print('Invalid command!')

def main():

  elevator = Elevator(num_floors)
  elevator_process = Thread(target=elevator.run, args=())
  elevator_process.start()


  while 1:
    command = input('New request: ').lower()   # out floor direction | in floor
    out = parse_command(command, elevator)
    if out == 0:
      break

main()

          





[]
[]
IDLE!
10
   -----
9
   -----
8
   -----
7 |XXX|
   -----
6
   -----
5
   -----
4
   -----
3
   -----
2
   -----
1
   -----
