## Estimating $p_1, p_2$ in queues with parallelizable jobs

### $\circ$ Arrival rate $\lambda$
### $\circ$ Job sizes $\sim \text{Exp}(\mu)$
### $\circ$ $c$ cores
### $\circ$ Incoming jobs - speed-up $s_1(.)$ or $s_2(.)$ parameterized by $p_1, p_2$

In [None]:
import math
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import time
from scipy.optimize import fsolve, minimize
from scipy.integrate import quad
# from scipy import linspace, meshgrid, arange, empty, concatenate, newaxis, shape
from numpy import linspace, meshgrid, arange, empty, concatenate, newaxis, shape
from collections import deque

In [None]:
def generate_service_times(mu, n):
    service_times = np.random.exponential(1/mu, n)
    return service_times

def generate_arrival_times(lam, n):
    interarrival_times = np.random.exponential(1/lam, n-1)
    arrival_times = np.append([0], np.cumsum(interarrival_times))
    return arrival_times

def generate_job_categories(alpha, n):
    job_categories = np.ones(n) + np.random.binomial(1, alpha, n)
    return job_categories

def allocate_cores(total_cores, n_jobs):
# This function should come from a learning process
# For now, we assume a simple allocation policy so we can study estimation
    if n_jobs[0] == 0 and n_jobs[1] == 0:
        return np.array([0, 0])
    elif n_jobs[0] == 0:
        return np.array([0, total_cores])
    elif n_jobs[1] == 0:
        return np.array([total_cores, 0])
    else:
        fraction = n_jobs[0]/(n_jobs[0] + n_jobs[1])
        allocation = np.array([fraction*total_cores, (1-fraction)*total_cores])
    return allocation

def speed_up(amdahl_par, n_cores):
    speed_up = 1/(1 - amdahl_par*(1 - 1/max(1, n_cores)))
    return speed_up

In [None]:
class Job:
    # Essential characteristics of each job which we would like to track
    def __init__(self):
        self.identity = 0
        self.category = int(0)
        self.arrival_time = 0
        self.departure_time = 0
        self.service_time = 0
        self.residual_service_time = 0
        self.history = [] # List of 2-tuple elements (speed, time at that speed)


def queue_simulation(amdahl_pars, model_pars, n, arrival_times, service_times, job_categories, initial_state):
    
    # Unpack model variables
    lam = model_pars[0]
    mu = model_pars[1]
    total_cores = model_pars[2]
    alpha = model_pars[3]
    
    # Preprocessing
    jobs = [Job() for i in range(n)]
    for i in range(n):
        jobs[i].identity = i
        jobs[i].category = job_categories[i]
        jobs[i].arrival_time = arrival_times[i]
        jobs[i].service_time = service_times[i]
        jobs[i].residual_service_time = service_times[i]
    
    ##############################################################################################################################################################
    
    # Data required for future
    events = [] # List of 3-element tuples (Time, Arrival/Departure, Job id)
    n_jobs_in_system_just_after_events = [] # List of 2-element tuples (Number of Type 1 jobs, Number of Type 2 jobs)
    core_allocation_just_after_events = [] # List of 2-element tuples (Number of Type 1 cores, Number of Type 2 cores)
    
    # Simulation variables
    job_ids_in_system = [] # Dynamic list which stores indices of jobs in system at any given time 
    arrived_job_id = -1
    departed_job_id = -1
    event = 'arrival'
    n_jobs_in_system = initial_state
    current_core_allocation = np.zeros(2)
    current_core_allocation_per_job = np.zeros(2)
    
    current_time = 0
    time_until_next_arrival = 0
    time_until_next_departure = 0
    time_until_next_event = 0
    speed_up_values = np.zeros(2)
    
    ##############################################################################################################################################################
    
    # Queue simulation
    while arrived_job_id < n-1:
        if event == 'arrival':
            arrived_job_id += 1 
            events.append([current_time, 'arrival', arrived_job_id])
            job_ids_in_system.append(arrived_job_id) # Appends arrived job id to the list of jobs in system
            n_jobs_in_system[int(jobs[arrived_job_id].category)-1] += 1
        else:
            events.append([current_time, 'departure', departed_job_id])
            job_ids_in_system.remove(departed_job_id) # Removes departed job id from list of jobs in system
            n_jobs_in_system[int(jobs[departed_job_id].category)-1] -= 1
        
        n_jobs_in_system_just_after_events.append(n_jobs_in_system.copy())
        current_core_allocation = allocate_cores(total_cores, n_jobs_in_system)
        core_allocation_just_after_events.append(current_core_allocation)
        current_core_allocation_per_job = np.array([current_core_allocation[j]/n_jobs_in_system[j] if n_jobs_in_system[j] !=0 else 0 for j in range(2)])
        
        if arrived_job_id != n-1:
            # Time until next arrival
            time_until_next_arrival = arrival_times[arrived_job_id + 1] - current_time
            # Time until next departure
            if job_ids_in_system == []:
                time_until_next_departure = math.inf
            else:
                speed_up_values[0] = speed_up(amdahl_pars[0], current_core_allocation_per_job[0])
                speed_up_values[1] = speed_up(amdahl_pars[1], current_core_allocation_per_job[1])
                expected_times_until_departure = np.array([jobs[i].residual_service_time/speed_up_values[int(jobs[i].category)-1] for i in job_ids_in_system])
                time_until_next_departure = np.min(expected_times_until_departure)
            # Time until next event
            if time_until_next_arrival > time_until_next_departure:
                event = 'departure'
                time_until_next_event = time_until_next_departure
                departed_job_id = job_ids_in_system[np.argmin(expected_times_until_departure)]
                jobs[departed_job_id].departure_time = current_time + time_until_next_event
            else:
                event = 'arrival'
                time_until_next_event = time_until_next_arrival
            
            # Updating system state at time of next event
            for job_id in job_ids_in_system:
                jobs[job_id].residual_service_time -= speed_up_values[int(jobs[job_id].category)-1] * time_until_next_event
                state = [speed_up_values[int(jobs[job_id].category)-1], time_until_next_event]
                jobs[job_id].history.append(list(state))
            
            current_time += time_until_next_event
    
    return [jobs, job_ids_in_system, events, n_jobs_in_system_just_after_events, core_allocation_just_after_events]

In [None]:
# Given initial state of system, generates queueing dataset
# Will be useful for us in every window

def generate_dataset(amdahl_pars, model_pars, n, initial_state):
    arrival_times = generate_arrival_times(lam, n)
    service_times = generate_service_times(mu, n)
    job_categories = generate_job_categories(alpha, n)
    
    dataset = queue_simulation(amdahl_pars, model_pars, n, arrival_times, service_times, job_categories, initial_state)
    return dataset

In [None]:
def Log_Likelihood(amdahl_pars, model_pars, jobs, job_ids_in_system, events, n_jobs_in_system_just_after_events, core_allocation_just_after_events):
    # print(amdahl_pars)
    
    lam = model_pars[0]
    mu = model_pars[1]
    total_cores = model_pars[2]
    alpha = model_pars[3]
    
    LL = 0
    
    # For parameter p_1
    start_event_index = 0
    end_event_index = 0
    
    while end_event_index < len(events):
        # Calculate start index, end index for parameter p_1
        # Corresponds basically to an inter-departure time barring edge cases
        
        # Start index calculation        
        start_event_index = end_event_index
        while start_event_index < len(events) - 1 and n_jobs_in_system_just_after_events[start_event_index][0] == 0:
            start_event_index += 1
        if start_event_index >= len(events) - 1:
            break
        # End index calculation
        flag = 0
        end_event_index = start_event_index
        while end_event_index + 1 < len(events):
            end_event_index += 1
            if events[end_event_index][1] == 'departure' and jobs[events[end_event_index][2]].category == 1:
                flag = 1
                break
        # print("Type 1 job (Start index, Start time) = ", [start_event_index, events[start_event_index][0]], " (End index, End time) = ", [end_event_index, events[end_event_index][0]])
        
        # Log Likelihood component corresponding to this inter-departure time
        core_allocation_per_job = 0
        for i in range(start_event_index, end_event_index):
                core_allocation_per_job = core_allocation_just_after_events[i][0]/n_jobs_in_system_just_after_events[i][0] if n_jobs_in_system_just_after_events[i][0] !=0 else 0
                LL += -mu * n_jobs_in_system_just_after_events[i][0] * speed_up(amdahl_pars[0], core_allocation_per_job) * (events[i+1][0] - events[i][0])
        if flag == 1: # If flag = 0, event is just that during some time, number of departures are 0.
            LL += np.log(n_jobs_in_system_just_after_events[end_event_index - 1][0]) + np.log(speed_up(amdahl_pars[0], core_allocation_per_job))
        
    # For parameter p_2
    start_event_index = 0
    end_event_index = 0
    
    while end_event_index < len(events):
        # Calculate start index, end index for parameter p_2
        # Corresponds basically to an inter-departure time barring edge cases
        
        # Start index calculation        
        start_event_index = end_event_index
        while start_event_index < len(events) - 1 and n_jobs_in_system_just_after_events[start_event_index][1] == 0:
            start_event_index += 1
        if start_event_index >= len(events) - 1:
            break
        # End index calculation
        flag = 0
        end_event_index = start_event_index
        while end_event_index + 1 < len(events):
            end_event_index += 1
            if events[end_event_index][1] == 'departure' and jobs[events[end_event_index][2]].category == 2:
                flag = 1
                break
        # print("Type 2 job (Start index, Start time) = ", [start_event_index, events[start_event_index][0]], " (End index, End time) = ", [end_event_index, events[end_event_index][0]])
        
        # Log Likelihood component corresponding to this inter-departure time
        core_allocation_per_job = 0
        for i in range(start_event_index, end_event_index):
                core_allocation_per_job = core_allocation_just_after_events[i][1]/n_jobs_in_system_just_after_events[i][1] if n_jobs_in_system_just_after_events[i][1] !=0 else 0
                LL += -mu * n_jobs_in_system_just_after_events[i][1] * speed_up(amdahl_pars[1], core_allocation_per_job) * (events[i+1][0] - events[i][0])
        if flag == 1: # If flag = 0, event is just that during some time, number of departures are 0.
            LL += np.log(n_jobs_in_system_just_after_events[end_event_index - 1][1]) + np.log(speed_up(amdahl_pars[1], core_allocation_per_job))
    
    return -LL

In [None]:
def estimation(model_pars, jobs, job_ids_in_system, events, n_jobs_in_system_just_after_events, core_allocation_just_after_events):
    # Initial Estimates
    initial_estimates = np.array([0.5, 0.5])
    # Bounds
    amdahl_par_bounds = [(0.001, 0.999), (0.001, 0.999)]
    # Estimation
    estd_amdahl_pars = minimize(Log_Likelihood, initial_estimates, method = "L-BFGS-B", args = (model_pars, jobs, job_ids_in_system, events, n_jobs_in_system_just_after_events, core_allocation_just_after_events), bounds = amdahl_par_bounds)
    return estd_amdahl_pars

In [None]:
# Executes one step algorithm.
# We need to use this in every window

def execute_algorithm_1(amdahl_pars, model_pars, n, initial_state):
# Estimation based on the departure process = algorithm 1
    dataset = generate_dataset(amdahl_pars, model_pars, n, initial_state)
    estd_amdahl_pars = estimation(model_pars, *dataset)
    return estd_amdahl_pars

In [None]:
# # True parameters

# lam = 10 # Arrival rate
# mu = 1 # Mean service time
# total_cores = 20 # Total number of cores
# alpha = 0.5 # Fraction of type 1 jobs
# model_pars = [lam, mu, total_cores, alpha]

# p_1 = 0.38 # Speed-up for type 1 jobs
# p_2 = 0.79 # Speed-up for type 2 jobs
# amdahl_pars = [p_1, p_2] 

# n = 50000 # Total number of arrivals

# initial_state = np.array([0, 0])


# estd_amdahl_pars = execute_algorithm_1(amdahl_pars, model_pars, n, initial_state)
# print("True parameters = ", amdahl_pars)
# print("Estimated parameters = ", estd_amdahl_pars.x)

## Sanity Checks

In [None]:
# Checking whether inherent size as calculated from job history equates to true size.

# for job in jobs:
#     calculated_job_size = 0
#     for state in job.history:
#         calculated_job_size += state[0]*state[1]
#     print("Calculated job size = ", calculated_job_size, " , True job size = ", job.service_time)
    
# NOTE:
# For some of the jobs at the end, calculated and true job sizes will not match.
# This is because they have not yet departed from system