|        |         |                                @ |
|:-------|:--------|---------------------------------:|
| Luca   | Mosetti | luca.mosetti-1@studenti.unitn.it |
| Shandy | Darma   |   shandy.darma@studenti.unitn.it |

In [None]:
from dataclasses import dataclass
from typing import Iterator, Callable, Iterable
from matplotlib_inline.backend_inline import set_matplotlib_formats
from numpy.typing import NDArray

import doctest
import math
import heapq as hq
import itertools as it
import statistics as st
import matplotlib.pyplot as plt
import more_itertools as mit
import numpy as np
import scipy as sp

In [None]:
set_matplotlib_formats('svg')

# Exercise 2

1. Repeat exercise 1 for an $\text M/\text M/c/K$ system where

    - there are $c$ servers (start with $c = 2$, then test for larger values if time allows);
    - the queue of each server can hold up to $K$ packets (start with some easy number such as $K = 10$): this means that a server having $K$ packets in queue would discard any other packets assigned to it.

2. Experiment with simple packet assignment policies (round-robin, least-loaded servers first, ...) as well as with policies that consider the occupancy of the queue of the servers (e.g., avoid sending packets to servers that have a full queue, or to a queue that is more than $x\%$ full, for some $x$).

3. For each policy, plot a histogram showing the number of packets served by each of the $c$ servers, as well as the average distribution of the queuing delay experienced by each packet. You can show other metrics as needed.

In [None]:
@dataclass(order=True, frozen=True, slots=True)
class START:
    pass


@dataclass(order=True, frozen=True, slots=True)
class ARRIVAL:
    pass


@dataclass(order=True, frozen=True, slots=True)
class DEPARTURE:
    by: int


@dataclass(order=True, frozen=True, slots=True)
class ASSIGNED:
    by: int


@dataclass(order=True, frozen=True, slots=True)
class SERVING:
    by: int


@dataclass(order=True, frozen=True, slots=True)
class DISCARDED:
    by: int


ET = START | ARRIVAL | DEPARTURE
LT = ASSIGNED | SERVING | DEPARTURE | DISCARDED


@dataclass(order=True, frozen=True, slots=True)
class Event:
    timestamp: float
    event: ET


@dataclass(order=True, frozen=True, slots=True)
class Log:
    timestamp: float
    log: LT

$$
U \sim \text{Uniform}(0, 1) \qquad X = - \frac {\log U} \lambda \sim \text{Exp}(\lambda)
$$

In [None]:
def uni_to_exp(lmbd: float, u: float) -> float:
    return -math.log(u) / lmbd


def round_robin(last: int, servers: NDArray[np.uint]) -> int:
    return (last + 1) % len(servers)


def least_loaded(_: int, servers: NDArray[np.uint]) -> int:
    return np.argmin(servers).item()


def least_loaded_round_robin(last: int, servers: NDArray[np.uint]) -> int:
    """

    >>> least_loaded_round_robin(1, [0, 0, 1, 0])
    3

    >>> least_loaded_round_robin(0, [0, 1, 1, 1])
    0

    >>> zs = np.zeros(10)
    >>> all([ least_loaded_round_robin(a, zs) == round_robin(a, zs) for a in range(10) ])
    True

    >>> zs = np.zeros(10)
    >>> all([ least_loaded_round_robin(a, zs) != least_loaded(a, zs) for a in range(9) ])
    True
    """
    shift: int = last + 1
    return (np.argmin(np.roll(servers, -shift)).item() + shift) % len(servers)


def mmck_simulation(
        seed_arr: int,
        seeds_dep: list[int],
        lmbd: float,
        mu: float,
        k: int,
        policy: Callable[[int, NDArray[np.uint]], int]
) -> Iterator[Log]:
    """
    Reproducible simulation of M/M/c/K queue-server system

    >>> mit.take(100, mmck_simulation(3, [7], 1, 1, 5, least_loaded)) == mit.take(100, mmck_simulation(3, [7], 1, 1, 5, least_loaded))
    True

    >>> l1m1c1k5 = mmck_simulation(3, [7], 1, 1, 5, least_loaded)
    >>> all(a.timestamp <= b.timestamp for a, b in mit.take(100, it.pairwise(l1m1c1k5)))
    True

    >>> l1m1c1k5 = mit.take(100, mmck_simulation(3, [7], 1, 1, 5, least_loaded))
    >>> mit.quantify(l.log == DISCARDED(0) for l in l1m1c1k5) <= mit.quantify(l.log == ASSIGNED(0) for l in l1m1c1k5)
    True

    >>> l1m1c1k5 = mit.take(100, mmck_simulation(3, [7], 1, 1, 5, least_loaded))
    >>> mit.quantify(l.log == SERVING(0) for l in l1m1c1k5) <= mit.quantify(l.log == ASSIGNED(0) for l in l1m1c1k5)
    True

    >>> l1m1c1k5 = mit.take(100, mmck_simulation(3, [7], 1, 1, 5, least_loaded))
    >>> mit.quantify(l.log == DEPARTURE(0) for l in l1m1c1k5) <= mit.quantify(l.log == SERVING(0) for l in l1m1c1k5)
    True

    >>> l1m1c1k5 = mit.take(100, mmck_simulation(3, [7], 1, 1, 5, least_loaded))
    >>> l1m2c1k5 = mit.take(100, mmck_simulation(3, [7], 1, 2, 5, least_loaded))
    >>> l1m1_dep_over_arr = mit.quantify(l.log == DEPARTURE(0) for l in l1m1c1k5) / mit.quantify(l.log == ASSIGNED(0) for l in l1m1c1k5)
    >>> l1m2_dep_over_arr = mit.quantify(l.log == DEPARTURE(0) for l in l1m2c1k5) / mit.quantify(l.log == ASSIGNED(0) for l in l1m2c1k5)
    >>> l1m1_dep_over_arr < l1m2_dep_over_arr
    True

   >>> l1m1c1k3 = mit.take(100, mmck_simulation(3, [7], 1, 1, 3, least_loaded))
   >>> l1m1c1k5 = mit.take(100, mmck_simulation(3, [7], 1, 1, 5, least_loaded))
   >>> mit.quantify(l.log == DISCARDED(0) for l in l1m1c1k3) > mit.quantify(l.log == DISCARDED(0) for l in l1m1c1k5)
   True
   """

    c: int = len(seeds_dep)

    assert c > 0
    assert k > 0

    rng_arr: np.random.Generator = np.random.default_rng(seed_arr)

    def next_arr(timestamp: float) -> Event:
        return Event(
            timestamp + uni_to_exp(lmbd, rng_arr.random()),
            ARRIVAL()
        )

    rngs_dep: list[np.random.Generator] = [np.random.default_rng(seed_dep) for seed_dep in seeds_dep]

    def next_dep(by: int, timestamp: float) -> Event:
        return Event(
            timestamp + uni_to_exp(mu, rngs_dep[by].random()),
            DEPARTURE(by=by)
        )

    last: int = -1
    servers: NDArray[np.uint] = np.zeros(c, np.uint)

    timeline: list[Event] = [
        Event(timestamp=0, event=START()),
    ]

    while True:
        e: Event = hq.heappop(timeline)
        match e.event:

            case START():
                hq.heappush(timeline, next_arr(e.timestamp))

            case ARRIVAL():
                hq.heappush(timeline, next_arr(e.timestamp))

                by: int = policy(last, servers.copy())
                last = by
                yield Log(e.timestamp, ASSIGNED(by=by))
                match servers[by]:
                    case 0:
                        yield Log(e.timestamp, SERVING(by=by))
                        servers[by] += 1
                        hq.heappush(timeline, next_dep(by, e.timestamp))

                    case x if x == k + 1:
                        yield Log(e.timestamp, DISCARDED(by=by))

                    case _:
                        servers[by] += 1

            case DEPARTURE(by):
                yield Log(e.timestamp, DEPARTURE(by))

                servers[by] -= 1
                if servers[by] > 0:
                    yield Log(e.timestamp, SERVING(by=by))
                    hq.heappush(timeline, next_dep(by, e.timestamp))


def waiting(c: int, xs: Iterator[Log]) -> Iterable[tuple[float, int, float]]:
    """
    From sequence of logs to sequence of (arrival time, served by, waiting time)

    >>> logs_0 = [Log(10, ASSIGNED(0)), Log(20, ASSIGNED(0)), Log(30, ASSIGNED(0)), Log(40, SERVING(0))]
    >>> logs_1 = [Log(15, ASSIGNED(1)), Log(18, SERVING(1)), Log(19, ASSIGNED(1)), Log(41, SERVING(1))]
    >>> logs_2 = [Log(11, ASSIGNED(2)), Log(30, SERVING(2)), Log(31, ASSIGNED(2)), Log(42, DISCARDED(2))]
    >>> logs = sorted(it.chain(logs_0, logs_1, logs_2), key=lambda l: l.timestamp)
    >>> list(waiting(3, logs))
    [(10, 0, 30), (11, 2, 19), (15, 1, 3), (19, 1, 22)]
    """

    def single_waiting(by: int, xs: Iterator[Log]) -> Iterator[tuple[float, int, float]]:
        xs1, xs2 = it.tee(xs, 2)
        las = (l.timestamp for l in xs1 if l.log == ASSIGNED(by))
        lzs = ((l.log, l.timestamp) for l in xs2 if l.log in [DISCARDED(by), SERVING(by)])
        return ((a, by, e - a) for a, (l, e) in zip(las, lzs) if l != DISCARDED(by))

    xss: tuple[Iterator[Log], ...] = it.tee(xs, c)

    ws: list[Iterator[tuple[float, int, float]]] = [single_waiting(by, xss[by]) for by in range(c)]

    return hq.merge(*ws, key=lambda t: t[0])


def timestamp_packets(c: int, xs: Iterator[Log]) -> Iterator[tuple[float, NDArray[np.uint]]]:
    """
    From sequence of logs to sequence of (time, (packets served by 0, ...))

    >>> logs_0 = [Log(10, ASSIGNED(0)), Log(15, ASSIGNED(0)), Log(15, DISCARDED(0)), Log(25, DEPARTURE(0))]
    >>> logs_1 = [Log(11, ASSIGNED(1)), Log(20, DEPARTURE(1))]
    >>> logs_2 = [Log(12, ASSIGNED(2)), Log(30, SERVING(2)), Log(31, ASSIGNED(2)), Log(42, DISCARDED(2))]
    >>> logs = sorted(it.chain(logs_0, logs_1, logs_2), key=lambda l: l.timestamp)
    >>> list(timestamp_packets(3, logs))
    [(0, array([0, 0, 0], dtype=uint64)), (10, array([1, 0, 0], dtype=uint64)), (11, array([1, 1, 0], dtype=uint64)), (12, array([1, 1, 1], dtype=uint64)), (15, array([2, 1, 1], dtype=uint64)), (15, array([1, 1, 1], dtype=uint64)), (20, array([1, 0, 1], dtype=uint64)), (25, array([0, 0, 1], dtype=uint64)), (31, array([0, 0, 2], dtype=uint64)), (42, array([0, 0, 1], dtype=uint64))]
    """

    def timestamp_packets(acc: tuple[float, NDArray[np.uint]], l: Log) -> tuple[float, NDArray[np.uint]]:
        pckts = acc[1].copy()
        match l.log:
            case ASSIGNED(by):
                pckts[by] += 1
                return l.timestamp, pckts
            case DISCARDED(by) | DEPARTURE(by):
                pckts[by] -= 1
                return l.timestamp, pckts

    return it.accumulate(
        (l for l in xs if isinstance(l.log, ASSIGNED | DISCARDED | DEPARTURE)),
        timestamp_packets,
        initial=(0, np.zeros(c, dtype=np.uint))
    )


def timestamp_tot_packets(c: int, xs: Iterator[Log]) -> Iterator[tuple[float, NDArray[np.uint]]]:
    """
    From sequence of logs to sequence of (time, (tot packets served by 0, ...))

    >>> logs_0 = [Log(10, ASSIGNED(0)), Log(15, ASSIGNED(0)), Log(15, DISCARDED(0)), Log(25, DEPARTURE(0))]
    >>> logs_1 = [Log(11, ASSIGNED(1)), Log(20, DEPARTURE(1))]
    >>> logs_2 = [Log(12, ASSIGNED(2)), Log(30, SERVING(2)), Log(31, ASSIGNED(2)), Log(42, DISCARDED(2))]
    >>> logs = sorted(it.chain(logs_0, logs_1, logs_2), key=lambda l: l.timestamp)
    >>> list(timestamp_tot_packets(3, logs))
    [(0, array([0, 0, 0], dtype=uint64)), (10, array([1, 0, 0], dtype=uint64)), (11, array([1, 1, 0], dtype=uint64)), (12, array([1, 1, 1], dtype=uint64)), (15, array([2, 1, 1], dtype=uint64)), (15, array([1, 1, 1], dtype=uint64)), (31, array([1, 1, 2], dtype=uint64)), (42, array([1, 1, 1], dtype=uint64))]
    """

    def timestamp_tot_packets(acc: tuple[float, NDArray[np.uint]], l: Log) -> tuple[float, NDArray[np.uint]]:
        pckts = acc[1].copy()
        match l.log:
            case ASSIGNED(by):
                pckts[by] += 1
                return l.timestamp, pckts
            case DISCARDED(by):
                pckts[by] -= 1
                return l.timestamp, pckts

    return it.accumulate(
        (l for l in xs if isinstance(l.log, ASSIGNED | DISCARDED)),
        timestamp_tot_packets,
        initial=(0, np.zeros(c, dtype=np.uint))
    )

In [None]:
doctest.testmod()