In [9]:
# A job scheduling environment for allocate the jobs to machines
import numpy as np
import numbers
import random
import functools, operator
from collections import namedtuple
from collections import deque
from collections import Counter
class Job(object):
  # A job class that denotes the job processed by the machine/server
  def __init__(self, job_type, intensity):
    if job_type not in ['CPU', 'IO']:
      raise ValueError("Undefined type of job, "
                       "only 'CPU' or 'IO' are allowed")
    """
      Args:
        job_type: 'CPU' or 'IO', either it's CPU intensive or IO intensive;
        intensity: the computational requirement of the job, for example, 
            the average MIPS needed to accomplish the job.
    """
    self.__name__ = 'Job'
    self._type = job_type
    self._intensity = intensity
    
  def get_type(self):
    return self._type
  
  def get_intensity(self):
    return self._intensity
  
  def info(self):
    info = {'type': self._type, 'intensity': self._intensity}
    return info
  
  def server_time_estimate(self, servers):
    if type(servers) == Server:
      servers = [servers]
    server_times = []
    for server in servers:
      if self._type == 'CPU':
        exec_time = self._intensity / server.get_cpu_power()
      else:
        exec_time = self._intensity / server.get_io_power()
      server_times.append(exec_time)
    return server_times

class Server(object):
  # A server object that contains the info of a server used to 
  # process the give jobs
  def __init__(self, cpu_power, io_power):
    """
      Args:
        cpu_power: the computationa power of the server while it's working 
            on the cpu intensive tasks. In here, it's the MIPS per second
            that the server can excute.
        io_power: the computational power of the server when working with
            the io intensive tasks.
    """
    self.__name__ = 'Server'
    self._cpu_power = cpu_power
    self._io_power = io_power
    if cpu_power >= io_power:
      self._type = 'CPU'
    else:
      self._type = 'IO'
  
  def get_type(self):
    return self._type
  
  def get_cpu_power(self):
    return self._cpu_power
  
  def get_io_power(self):
    return self._io_power
  
  def info(self):
    info = {'type': self._type, 
            'cpu_power': self._cpu_power, 
            'io_power': self._io_power}
    return info
  
  def job_time_estimate(self, jobs):
    # The argument jobs should either be a single job or a list of jobs.
    # This will return the total excution time of the give jobs
    if type(jobs) == Job:
      jobs = [jobs]
    if type(jobs) == list:
      if type(jobs[0]) != Job:
        raise TypeError("The input jobs are not valid!")
    total_cpu_intenstity = 0
    total_io_intensity = 0
    for job in jobs:
      if job.get_type() == 'CPU':
        total_cpu_intenstity += job.get_intensity()
      else:
        total_io_intensity += job.get_intensity()
    total_exec_time = (total_cpu_intenstity / self._cpu_power + 
                       total_io_intensity / self._io_power)
    return total_exec_time


class Generator(object):
  # the Generator object is used to generate a given number of objects, the
  # object being generated must have a __name__ attribute.
  def __init__(self, object_prototype, init_params_range):
    """
      Args:
        object_prototype: a prototype of the object to be generated;
        number_of_objects: the number of objects to generate;
        init_params_range: a dict contains the range of every param used to initialize 
        the object, the key should be the name of the paramater, the value of each key 
        should either be a list (for non-numerical type) or a tuple (the lower and 
        upper bound of the numerical type)
        

    """
    self._ob_proto = object_prototype
    self._params_range = init_params_range
    
  def generate(self, num_to_generate, method='random'):
    # 'method' argument only works on the numerical objects
    objects = []
    for i in range(num_to_generate):
      init_params = self.build_params(method=method)
      objects.append( self._ob_proto(**init_params) )
    return objects
    
  def build_params(self, method='random'):
    init_params = {}
    for param_name, param_range in self._params_range.items():
      if type(param_range) == list:
        param = random.sample(param_range, 1)[0]
      elif type(param_range) == tuple:
        low, upp = param_range
        param = random.uniform(low, upp)
      else:
        raise TypeError("The param range: {} is neither list nor tuple!"
                        .format(param_range))
      init_params[param_name] = param
    return init_params

In [635]:
class SchedulingEnv(object):
  # This is the training environment of the job scheduling task
  def __init__(self, 
               num_jobs, 
               num_servers, 
               job_gen_params,
               server_gen_params,
               scheduing_speed,
               response_time_discount,
               init_template=None, 
               init_from_template=False):
    """
    Args:
      num_jobs: the number of initial jobs to generate;
      num_servers: the number of servers to generate;
      init_template: a dictionary type parameter that serve as the template
          of all jobs and servers;
      init_from_template: if True, the jobs and servers will initialize from
          init_tempalte.
    """
    self._clock = 0.
    self._num_jobs = num_jobs
    self._job_capacity = num_jobs
    self._num_servers = num_servers
    self._job_generator = Generator(Job, job_gen_params)
    self._server_generator = Generator(Server, server_gen_params)
    self._init_template = None
    self._schduling_speed = scheduing_speed 
    self._response_time_discount = response_time_discount
    
    if init_template:
      if (len(init_template["job"]) == num_jobs and 
          len(init_template["server"]) == num_servers ):
        if init_from_template:
          self._init_template = init_template
          self._jobs = init_template["job"]
          self._servers = init_template["server"]
    if self._init_template == None:
      print("The initialization template is either None or invalid"
            ", the servers and jobs will be generated randomly")
      self._jobs = self._job_generator.generate(num_jobs)
      self._servers = self._server_generator.generate(num_servers)
    self._num_finished_jobs = 0
    self._nfj_in_servers = [0] * num_servers
    self._clock = 0. # the unit ms indicator of the running time
    self.init_server_status() 
    self._template = {"job": self._jobs, "server": self._servers}
    # wrap jobs as deque object
    self._jobs = deque(self._jobs, num_jobs)

    
  def reset(self):
    self.__init__(self._num_jobs, self._num_servers, None, None, 
                  self._schduling_speed, self._response_time_discount,
                  init_template=self._template, init_from_template=True)
    
  def get_num_jobs(self):
    return len(self._jobs)

  def get_current_job(self):
    if self._jobs:
      return self._jobs[0]
    else:
      print("There is no unscheduled job lest")
      return None

  def pop_current_job(self):
    return self._jobs.popleft()

  def job_exec_times(self, job):
    return job.server_time_estimate(self._servers)

  def init_server_status(self):
    # for each server, generate a status namedtuple containing the as:
    #     dict("job_que", "expected_idle_time").
    # the server_id i is assgined as i in range(num_servers) 
    # the returned object is a orderdict of all the namedtuples.
    servers_status = []
    for server_id, server in enumerate(self._servers):
      servers_status.append({"job_info_que": deque([], self._job_capacity),
                             "expected_idle_time": self._clock}
                           )
      self._servers_status = servers_status
    return servers_status
  
  def _get_job_type_num_from_status(self, status):
    num_cpu_type = 0
    num_io_type = 0
    for job_info in status["job_info_que"]:
      if job_info["job"].get_type() == 'CPU':
        num_cpu_type += 1
      else:
        num_io_type +=1
    return (num_cpu_type, num_io_type)
  
  def get_server_status(self):
    status_reports = []
    for sid, status in enumerate(self._servers_status):
      (num_cpu_type, num_io_type) = self._get_job_type_num_from_status(status)
      status_reports.append(StatusReport(sid,
                                         self._servers[sid].get_type(),
                                         self._servers[sid].get_cpu_power(),
                                         self._servers[sid].get_io_power(),
                                         len(status["job_info_que"]),
                                         status["expected_idle_time"],
                                         num_cpu_type, num_io_type)
                            )
    return status_reports
  
  def get_overview(self, time_span):
    total_queuing_jobs = (self._num_jobs - len(self._jobs) - 
                          self._num_finished_jobs)
    ind_avg_res = []
    individual_avg_res_times = self.average_response_time(time_span)
    for i in range(self._num_servers):
      ind_avg_res.append({i: individual_avg_res_times[i]})
    total_response_time = sum(individual_avg_res_times)
    avg_response_time = total_response_time / self._num_servers
    return {"Total jobs in server queue": total_queuing_jobs,
            "Average response time of each server": ind_avg_res,
            "Average response time accross all servers": avg_response_time}
      
  def expected_wait_times(self):
    # also the expected response time
    wait_times = []
    for server_id, status in enumerate(self._servers_status):
      wait_times.append(max(status["expected_idle_time"] - self._clock,
                            0)
                       )
    return wait_times

  def expected_finish_times(self, job):
    finish_times = []
    exec_times = self.job_exec_times(job)
    wait_times = self.expected_wait_times()
    for server_id, (exec_time, wait_time) in enumerate(zip(exec_times, 
                                                           wait_times)):
      finish_times.append(exec_time + wait_time + self._clock)
    return finish_times

  def _num_queued_jobs(self, server_id):
    return len(self._servers_status[server_id]["job_info_que"])

  def allocale_job_to(self, server_id):
    status = self._servers_status[server_id]
    current_job = self._jobs.popleft()
    wait_time = self.expected_wait_times()[server_id]
    if self._num_queued_jobs(server_id) < 1:
      discounted_wait_time = wait_time
      cum_response_time = wait_time
    else:
      last_dwt = status["job_info_que"][-1]["cum_discounted_response_time"]
      last_crt = status["job_info_que"][-1]["cum_response_time"]
      discounted_wait_time = wait_time + (self._response_time_discount 
                                          * last_dwt)
      cum_response_time = last_crt + wait_time
    exec_time = self.job_exec_times(current_job)[server_id]
    finish_time = self.expected_finish_times(current_job)[server_id]
    self._servers_status[server_id]["job_info_que"].append(
        {"job": current_job, "response_time": wait_time,
         "cum_response_time": cum_response_time,
         "cum_discounted_response_time": discounted_wait_time,
         "finish_time": finish_time}
        )
    last_idle_time = self._servers_status[server_id]["expected_idle_time"]
    if last_idle_time < self._clock:
      last_idle_time = self._clock
    self._servers_status[server_id]["expected_idle_time"] = (last_idle_time + 
                                                             exec_time)
    if (finish_time - self._servers_status[server_id]["expected_idle_time"] 
            > 0.00000001):
      raise ValueError("Mismatching finish_time and expected_idle_time at "
                       "server: {}, expected_idle_time: {}, finish_time: {}"
                       .format(server_id, 
                               self._servers_status[server_id]["expected_idle_time"],
                               finish_time))
    reward = self._reward_fn(wait_time, exec_time, finish_time)
    return server_id, reward

  def simulate_time_past(self, time_span):
    self._clock += time_span
    self._server_status_updater()

  def _server_status_updater(self):
    for sid, status in enumerate(self._servers_status):
      # like ETA, etf means expected finish time.
      for job_info in list(status["job_info_que"])[self._nfj_in_servers[sid]:]:
        if job_info["finish_time"] < self._clock:
          # self._servers_status[sid]["job_info_que"].popleft()
          self._nfj_in_servers[sid] += 1
          self._num_finished_jobs += 1
        else:
          break

  def average_response_time(self, time_span):
    watching_jobs_init = int(time_span * self._schduling_speed)
    avg_response_t = []
    for sid, status in enumerate(self._servers_status):
      watching_jobs = watching_jobs_init
      num_queued_jobs = self._num_queued_jobs(sid)
      if num_queued_jobs < watching_jobs:
        watching_jobs = num_queued_jobs
      if watching_jobs == 0:
        avg_response_t.append(0)
      else:
        job_info_queue = list(status["job_info_que"])[-watching_jobs:]
        total_res_t = functools.reduce(
            operator.add, map(lambda x: x["response_time"], job_info_queue))
        avg_response_t.append(total_res_t / watching_jobs)
    return avg_response_t

  def average_discounted_response(self, time_span):
    watching_jobs_init = int(time_span * self._schduling_speed)
    avg_dc_t = []
    for sid, status in enumerate(self._servers_status):
      watching_jobs = watching_jobs_init
      num_queued_jobs = self._num_queued_jobs(sid)
      if num_queued_jobs < watching_jobs:
        watching_jobs = num_queued_jobs
      if watching_jobs == 0:
        avg_dc_t.append(0)
      else:
        avg_dc_t.append( list(status["job_info_que"])
                         [-1]["cum_discounted_response_time"]
                         / watching_jobs)
    return avg_dc_t

  def _reward_fn(self, wait_time, exec_time, finish_time):
    return exec_time / (wait_time + 0.00001)
  
  def _buildin_policy(self, observation_span):
    return BuildInPolicy(self, observation_span)
  
#def _buildin_policy(self, principle='random'):
class BuildInPolicy():
  def __init__(self, 
               scheduling_env, 
               observation_span, 
              ):
    self._scheduling_env = scheduling_env
    self._action_space = scheduling_env._num_servers
    self._action_counter = -1
    self._observation_span = observation_span


  def random_policy(self):
    return random.sample(range(self._action_space), 1)[0]

  def bestfit_policy(self):
    job = self._scheduling_env.get_current_job()
    job_type = job.get_type()
    wait_times = self._scheduling_env.expected_wait_times()
    servers = self._scheduling_env._servers
    num_servers = len(servers)
    bestfit = 0
    while bestfit < num_servers:
      if servers[bestfit].get_type() == job_type:
        break
      else:
        bestfit += 1
    lastest_fit = bestfit + 1
    while lastest_fit < num_servers:
      if servers[lastest_fit].get_type() == job_type:
        if wait_times[lastest_fit] < wait_times[bestfit]:
          bestfit = lastest_fit
      lastest_fit += 1
    if bestfit <= num_servers - 1:
      if servers[num_servers - 1 ].get_type() != job_type:
        bestfit = wait_times.index(min(wait_times))
    return bestfit

  def round_robin_policy(self):
    self._action_counter += 1
    self._action_counter %= self._action_space
    return self._action_counter

  def earlist_policy(self):
    job = self._scheduling_env.get_current_job()
    wait_times = self._scheduling_env.expected_wait_times()
    return wait_times.index(min(wait_times))

  def sensible_policy(self): 
    avg_response_time = ( 
        self._scheduling_env.average_discounted_response(self._observation_span)
        )
    offset = max(avg_response_time)
    logits = -np.array(avg_response_time) + offset + 10
    probs = logits / sum(logits)
    action = np.random.choice(a=range(self._action_space), p=probs)
    return action
    
class StatusReport():
  def __init__(self, sid, server_type, cpu_power, io_power, queue_len, e_i_t,
               num_cpu_job, num_io_job):
    self.status = {"Server id": sid, "Type": server_type, 
                   "CPU power": cpu_power, "IO power": io_power,
                   "Job queue length": queue_len, "Expected idle time": e_i_t,
                   "CPU intensive jobs": num_cpu_job,
                   "IO intensive jobs": num_io_job}

  def get_status(self):
    return self.status
                             

In [644]:
job_params = {"job_type":["IO", "CPU"], "intensity":(800, 1200)}
server_params = {"cpu_power":(800, 1200), "io_power":(500, 1500)}
env = SchedulingEnv(10000, 10, job_params, server_params, 35, 0.7)

The initialization template is either None or invalid, the servers and jobs will be generated randomly


In [645]:
env.reset()
watch_time = 200
job_in_speed = 35
watching_jobs = watch_time * job_in_speed
policy = env._buildin_policy(watch_time).bestfit_policy
actions = [0] * 10
for i in range(watching_jobs):
  action = policy()
  env.allocale_job_to(action)
  actions[action] += 1
  env.simulate_time_past(0.05)
env.get_overview(watch_time)

{'Total jobs in server queue': 3217,
 'Average response time of each server': [{0: 147.02397059100394},
  {1: 147.47827276232965},
  {2: 147.13006118762206},
  {3: 147.42725092382776},
  {4: 147.73036174843745},
  {5: 147.8286815379872},
  {6: 148.28295190680996},
  {7: 147.9136968864199},
  {8: 147.67932001895124},
  {9: 147.86721632608567}],
 'Average response time accross all servers': 147.6361783889475}

In [646]:
for i in range(10):
  print(env.get_server_status()[i].get_status())

{'Server id': 0, 'Type': 'CPU', 'CPU power': 981.092317378119, 'IO power': 945.0440766726667, 'Job queue length': 625, 'Expected idle time': 644.4374163502096, 'CPU intensive jobs': 543, 'IO intensive jobs': 82}
{'Server id': 1, 'Type': 'IO', 'CPU power': 875.901052395139, 'IO power': 1444.1650541110891, 'Job queue length': 935, 'Expected idle time': 644.3026747500157, 'CPU intensive jobs': 0, 'IO intensive jobs': 935}
{'Server id': 2, 'Type': 'CPU', 'CPU power': 1159.2339929769942, 'IO power': 790.7133156395689, 'Job queue length': 701, 'Expected idle time': 645.1315783915268, 'CPU intensive jobs': 598, 'IO intensive jobs': 103}
{'Server id': 3, 'Type': 'IO', 'CPU power': 833.6981194987777, 'IO power': 1309.4873020961465, 'Job queue length': 839, 'Expected idle time': 644.5067836152473, 'CPU intensive jobs': 0, 'IO intensive jobs': 839}
{'Server id': 4, 'Type': 'CPU', 'CPU power': 1044.1478907195803, 'IO power': 502.81038015227796, 'Job queue length': 592, 'Expected idle time': 645.06