In [1]:
using Revise
using DataStructures
using Distributions
using Quarton

  ** incremental compilation may be fatally broken for this module **

  ** incremental compilation may be fatally broken for this module **



In [2]:
function doubling_arrival_rate(arrival_rate, service_rate)
    model = QueueModel()
    source = add_queue!(model, InfiniteSourceQueue())
    fifo = add_queue!(model, FIFOQueue())
    sink = add_queue!(model, SinkQueue())
    s1 = add_server!(model, ArrivalServer(arrival_rate))
    s2 = add_server!(model, ModifyServer(service_rate))
    connect!(model, source, s1, :only)
    connect!(model, s1, fifo, :only)
    connect!(model, fifo, s2, :only)
    connect!(model, s2, sink, :only)
    check_model(model)
    trajectory = Trajectory(2342334)
    start_time = zero(Float64)
    activate!(model, trajectory, s1, Work())
    when = 0.0
    while when < 10000.0
        when, which = next(trajectory)
        step!(model, trajectory, (when, which))
    end
    response = sink.retire_total_duration / sink.retire_cnt
    return response
end
doubling_arrival_rate(3.0, 5.0)

0.5167163561993535

In [35]:
"""
Simplistic stochastic search. Start with two values and use the slope to
determine how much to modify the current guess.
"""
function doubling_arrival_rate_find_closest(; slowly=0.1, recent=10)
    x = CircularBuffer{Float64}(recent)
    y = CircularBuffer{Float64}(recent)
    service_rate = 5.0
    response = doubling_arrival_rate(6.0, service_rate)
    push!(x, service_rate)
    push!(y, response)
    service_rate = 10.0
    response = doubling_arrival_rate(6.0, service_rate)
    push!(x, service_rate)
    push!(y, response)
    desired = 0.5167
    slowly = slowly
    iteration_cnt = 1
    while abs(response - desired) > 0.001 && iteration_cnt < 100
        # Do a small linear least squares by hand to get the slope.
        X = ones(length(x), 2)
        X[:, 2] .= x
        β = (X' * X) \ (X' * y)
        service_rate += slowly * (desired - response) / β[2]
        response = doubling_arrival_rate(6.0, service_rate)
        push!(x, service_rate)
        push!(y, response)
        iteration_cnt += 1
    end
    service_rate, iteration_cnt
end
doubling_arrival_rate_find_closest()

(8.002811729249586, 34)

### Design example 2 - Sometimes "Improvements" Do Nothing

Page 6.

In [64]:
function improvements_do_nothing(
    service_rate; base_rate=1/3, stopping_time=10000.0
    )
    model = QueueModel()
    fifo1 = add_queue!(model, FIFOQueue())
    fifo2 = add_queue!(model, FIFOQueue())
    s1 = ModifyServer(service_rate, disbursement=RandomAssignment())
    add_server!(model, s1)
    s2 = ModifyServer(base_rate, disbursement=RandomAssignment())
    add_server!(model, s2)
    connect!(model, fifo1, s1, :only)
    connect!(model, s1, fifo1, :low)
    connect!(model, s1, fifo2, :high)
    connect!(model, fifo2, s2, :only)
    connect!(model, s2, fifo1, :low)
    connect!(model, s2, fifo2, :high)
    check_model(model)
    trajectory = Trajectory(2342334)
    start_time = zero(Float64)
    # Need to start some reaction to get it going.
    activate!(model, trajectory, s1, Work())
    activate!(model, trajectory, s2, Work())
    # Question wants 6 tokens, and 2 are already being processed.
    for token_idx in 1:2
        push!(fifo1, Work(), start_time)
        push!(fifo2, Work(), start_time)
    end
    when = 0.0
    process_cnt = 0
    while when < stopping_time
        when, which = next(trajectory)
        if which === nothing
            println("no event at time $when step $(process_cnt)")
        end
        step!(model, trajectory, (when, which))
        process_cnt += 1
    end
    return process_cnt / stopping_time
end

[improvements_do_nothing(rate) for rate in [1/3, 1/2, 2.0, 10.0]]


4-element Vector{Float64}:
 0.5755
 0.6535
 0.6669
 0.6675

Suppose the system had higher multiprogramming level N. Does the answer change? The multiprogramming level is the total number of parallel servers. We will make a bunch of servers and queues and connect the output of every server to the input of every queue.

In [69]:
function improvements_do_nothing_higher_multiprogramming(
    service_rate; base_rate=1/3, stopping_time=10000.0, N=10
    )
    model = QueueModel()
    queues = Vector{FIFOQueue}(undef, N)
    for create_queue in 1:N
        queues[create_queue] = add_queue!(model, FIFOQueue())
    end
    servers = Vector{ModifyServer}(undef, N)
    servers[1] = ModifyServer(service_rate, disbursement=RandomAssignment())
    add_server!(model, servers[1])
    for create_server in 2:N
        # These are at the base_rate, not the one we speed up.
        s2 = ModifyServer(base_rate, disbursement=RandomAssignment())
        add_server!(model, s2)
        servers[create_server] = s2
    end
    roles = [gensym() for _ in 1:N]
    for s_connect in 1:N
        connect!(model, queues[s_connect], servers[s_connect], :only)
        for q_connect in 1:N
            connect!(model, servers[s_connect], queues[q_connect], roles[q_connect])
        end
    end
    check_model(model)
    trajectory = Trajectory(2342334)
    start_time = zero(Float64)
    # Need to start some reaction to get it going.
    for start_server in 1:N
        activate!(model, trajectory, servers[start_server], Work())
    end
    # Question wants 6 tokens, and 2 are already being processed.
    for fill_queue in 1:N
        for token_idx in 1:2
            push!(queues[fill_queue], Work(), start_time)
        end
    end
    when = 0.0
    process_cnt = 0
    while when < stopping_time
        when, which = next(trajectory)
        if which === nothing
            println("no event at time $when step $(process_cnt)")
        end
        step!(model, trajectory, (when, which))
        process_cnt += 1
    end
    return process_cnt / stopping_time
end
a = [improvements_do_nothing_higher_multiprogramming(6.0, N=n) for n in [2, 4, 8, 16]]
b = [improvements_do_nothing_higher_multiprogramming(10.0, N=n) for n in [2, 4, 8, 16]]
(a, b)

([0.6707, 1.1625, 2.103, 4.1144], [0.6675, 1.1357, 2.1129, 4.0858])

### Design Example 3 - One Machine or Many?

Page 7. You are given a choice between one fast CPU of speed $s$ or $n$ slow CPUs each of speed $s/n$. Your goal is to minimize mean response time. To start, assume that jobs are non-preemptible, which means they must run to completion without interruption.

In [3]:
# Convert from [0,1] to [20,1].
function gamma_k(job_variability)
    kmax = 20
    kmin = 1.001
    return 1.0 + (kmin - kmax) * (job_variability - 1.0)
end

function one_machine_or_many_single(arrival_rate, service_rate, job_variability)
    # Represent job variability by using a Gamma distribution for token workload.
    # Mean of a distribution is kθ. Generally want 1 < k < 20. Keep kθ=1.
    # Small k = more spread.
    k = gamma_k(job_variability)
    work_dist = (when, rng) -> Work(rand(rng, Gamma(k, 1/k)), when)
    model = QueueModel()
    source = add_queue!(model, InfiniteSourceQueue(work_dist))
    fifo = add_queue!(model, FIFOQueue())
    sink = add_queue!(model, SinkQueue())
    s1 = add_server!(model, ArrivalServer(arrival_rate))
    s2 = add_server!(model, ModifyServer(service_rate))
    connect!(model, source, s1, :only)
    connect!(model, s1, fifo, :only)
    connect!(model, fifo, s2, :only)
    connect!(model, s2, sink, :only)
    check_model(model)
    trajectory = Trajectory(2342334)
    start_time = zero(Float64)
    activate!(model, trajectory, s1, Work())
    when = 0.0
    while when < 10000.0
        when, which = next(trajectory)
        step!(model, trajectory, (when, which))
    end
    response = sink.retire_total_duration / sink.retire_cnt
    return response
end
[one_machine_or_many_single(4.0, 4.0, variability)
 for variability in [0.0, 0.5, 0.7, 0.99]]

4-element Vector{Float64}:
 15.514343546710185
 23.980658038533136
 39.70529089955526
 84.09416620939764

Now let's take the single server with rate 4 and split it into 10 servers with rate 4/10. For workloads with low variability, it's better to have the faster server (15s single vs 45s multiple). For workloads with high variability, it's better to have many slower servers (84s single vs 27s multiple).

In [9]:
function one_machine_or_many_many(
    arrival_rate, service_rate, job_variability, N=10
    )
    k = gamma_k(job_variability)
    work_dist = (when, rng) -> Work(rand(rng, Gamma(k, 1/k)), when)
    model = QueueModel()
    source = add_queue!(model, InfiniteSourceQueue(work_dist))
    fifo = add_queue!(model, FIFOQueue())
    sink = add_queue!(model, SinkQueue())
    s1 = add_server!(model, ArrivalServer(arrival_rate))
    servers = Vector{ModifyServer}(undef, N)
    for s_create_idx in 1:N
        s2 = add_server!(model, ModifyServer(service_rate))
        servers[s_create_idx] = s2
    end
    connect!(model, source, s1, :only)
    connect!(model, s1, fifo, :only)
    for s_connect_idx in 1:N
        connect!(model, fifo, servers[s_connect_idx], gensym())
        connect!(model, servers[s_connect_idx], sink, :only)
    end
    check_model(model)
    trajectory = Trajectory(2342334)
    start_time = zero(Float64)
    activate!(model, trajectory, s1, Work())
    when = 0.0
    while when < 10000.0
        when, which = next(trajectory)
        step!(model, trajectory, (when, which))
    end
    response = sink.retire_total_duration / sink.retire_cnt
    return response
end
N = 10
[one_machine_or_many_many(4.0, 4.0 / N, variability, N)
 for variability in [0.0, 0.5, 0.7, 0.99]]


4-element Vector{Float64}:
 45.98084383697046
 64.19296764223655
 96.82479025436999
 27.451872678339

### Design Example 4 - Task Assignment in a server farm

Page 9, we have arrivals going to a dispatcher, or load balancer, which directs jobs to N hosts. The question is which method of dispatch works best, among

 * Random
 * Round-robin
 * Shortest-queue - Check the number of jobs in each destination queue
 * Size-Interval-Task-Assignment - Designate short, medium, or long jobs to sets of servers by type.
 * Least-Work-Left - Each queue totals the workload in all of its jobs, and choose the one with the least total.
 * Central-queue - All servers pull from one central set of jobs and get the first one.

Note that there is some confusion here in the text, which is OK because it's at the beginning. There are two questions, not one, which are who gets a job and which job they get.

Quarton has Assignment classes that represent these choices above. We could try various ones. This isn't the right time to explore all of the options and analyze them, so let's just show one popular choice, like least-work-left.

In [6]:
function task_assignment_server_farm(
    arrival_rate, service_rate, job_variability, N=10
    )
    k = gamma_k(job_variability)
    work_dist = (when, rng) -> Work(rand(rng, Gamma(k, 1/k)), when)
    model = QueueModel()
    source = add_queue!(model, InfiniteSourceQueue(work_dist))
    fifo = add_queue!(model, FIFOQueue())
    sink = add_queue!(model, SinkQueue())
    assign_strategy = LeastWorkLeft()
    s1 = add_server!(model, ArrivalServer(arrival_rate, disbursement=assign_strategy))
    servers = Vector{ModifyServer}(undef, N)
    for s_create_idx in 1:N
        s2 = add_server!(model, ModifyServer(service_rate))
        servers[s_create_idx] = s2
    end
    connect!(model, source, s1, :only)
    connect!(model, s1, fifo, :only)
    for s_connect_idx in 1:N
        connect!(model, fifo, servers[s_connect_idx], gensym())
        connect!(model, servers[s_connect_idx], sink, :only)
    end
    check_model(model)
    trajectory = Trajectory(2342334)
    start_time = zero(Float64)
    activate!(model, trajectory, s1, Work())
    when = 0.0
    while when < 10000.0
        when, which = next(trajectory)
        step!(model, trajectory, (when, which))
    end
    response = sink.retire_total_duration / sink.retire_cnt
    return response
end
task_assignment_server_farm(100.0, 1.0, 0.2, 10)

4503.310655770496

### Design Example 5 - Scheduling Policies

 * First-come-first-served (FCFS). I call this FIFO.
 * Non-preemptive last-come-first-served.
 * Random - Starts working on a random job from the queue.

For fun, let's run the single server with the random queue.

In [10]:
function scheduling_policies()
    arrival_rate = 3.0
    service_rate = 3.0
    model = QueueModel()
    source = add_queue!(model, InfiniteSourceQueue())
    fifo = add_queue!(model, RandomQueue())
    sink = add_queue!(model, SinkQueue())
    s1 = add_server!(model, ArrivalServer(arrival_rate))
    s2 = add_server!(model, ModifyServer(service_rate))
    connect!(model, source, s1, :only)
    connect!(model, s1, fifo, :only)
    connect!(model, fifo, s2, :only)
    connect!(model, s2, sink, :only)
    check_model(model)
    trajectory = Trajectory(2342334)
    start_time = zero(Float64)
    activate!(model, trajectory, s1, Work())
    when = 0.0
    while when < 10000.0
        when, which = next(trajectory)
        step!(model, trajectory, (when, which))
    end
    response = sink.retire_total_duration / sink.retire_cnt
    return response
end
scheduling_policies()

88.80172675871297