# Environment for Queuing Systems

We consider a system of queues that work in discrete time. Let the system has $N$ processors, each with a queue length of $B$.

Let $\lambda$ denote the arrival rate of jobs and $\mu_i$ denote the processing rate of $i$-th server.

Let $\sigma$ denote the rate at which the controller decides to switch on/off the servers.

At each time instant, the system can be in any one of the following 3 modes:

1. loadbalancing: a new job has arrived (or)
2. processing: a job is being processed by $i$-th server (or)
3. switching: the system ready for switching

The MDP is defined as follow:

***States:*** The state representation consists of 3 components, $X$, $S$ and $m$ respectively. $X$ is a $N$-dimensional vector denoting the number of jobs in each queue of the system. The $i$-th element of $X$ represents the number of jobs in the $i$-th queue. $S$ is again a $N$-dimensional vector denoting the readyness of the servers. A $0$ in the $i$-th coordinate of $S$ represents that the $i$-th server is not accepting new jobs. Similarly, $1$ represents it is accepting the jobs. The $m$ component of the state represents the mode of the system. The possible values for $m$ are loadbalancing, processing and switching as described above.


***Actions:*** The actions consist of 2 components, $U$ and $V$ respectively. $U$ is a 1-hot vector representing the queue to which the job needs to be assigned. $V$ vecotor represents the desired on/off status of each server. Note that, $U$ should be $\vec{0}$ in switching and processing mode, and $V$ should be same as $S$ component of state in loadbalancing and processing mode.

For example, consider a system of 3 servers with maximum queue length of 3.

The allowed actions in the state of $X=[1,2,0], S=[1,1,0], m=loadbalancing$ are assigning the new job to either the 1st or 2nd queue i.e.,  $U=[1,0,0], V=[1,1,0]$ or $U=[0,1,0], V=[1,1,0]$ respectively. Note that the job cannot to assigned to the 3rd queue as the queue is closed. As load balancing and switching are decoupled, $V$ cannot differ from $S$ when the system is in loadbalancing mode.

Similarly, the allowed actions in the state $X=[1,2,0], S=[1,1,0], m=switching$ are $U=[0,0,0]$ and $V$ is a binary vector of size 3.

Note that, the only allowed action when the mode is processing is $U=\vec{0}$ and $V=S$ i.e., during processing, neither a new job is assigned, nor the servers are switched.


There are four types of ***costs*** that can be incurred while interacting with the system.
1. An assignment cost is incurred when a job is added to a queue. The cost value is the length of the queue after assigning the job. This cost can be seen as an analogous of waiting time.
2. A packet drop cost is incurred when all the queues are full.
3. A switching on cost is incurred when the server status is changed from on to off and switching off cost for vice versa.
4. A running cost is the total cost incurred while running the turned on servers. (Note: all queues having more than 0 jobs are considered running)

The transition probabilities are defined as follows:

$P(X_{n+1} = X_{n} + U_{n} , S_{n+1} = V_{n} | X_n, S_n, U_n, V_n) = $

* $ \frac{\lambda I\{ U_n = e_i,\;S_n^i = 1\} }{\lambda + \sum_{j} \mu_j I\{X_n^j > 0\} + \sigma }$ where $U_n = e_i$ and $V_n = \vec{0}$ and $m=loadbalancing$

* $ \frac{\mu_i I\{X_n^i > 0\}}{\lambda + \sum_{j} \mu_j I\{X_n^j > 0\} + \sigma}$ where $U_n = -e_i$ and $V_n = \vec{0}$ and $m=processing$

* $ \frac{\sigma}{\lambda + \sum_{j} \mu_j I\{X_n^j > 0\} + \sigma}$ where $U_n = \vec{0}$, $S_{n+1}=V_n$ and $m=switching$ .


Github link: https://github.com/EshwarSR/QueuesEnvironment

In [None]:
# Environment code

import numpy as np
import itertools
import copy


class Queues:
    def __init__(self, no_of_qs, len_of_q, arrival_rate, switching_rate, processing_rates, packet_drop_cost, switch_on_cost, switch_off_cost, switch_off_signal_cost, running_cost, incur_holding_cost=True):
        self.no_of_qs = no_of_qs  # N
        self.len_of_q = len_of_q  # B
        self.arrival_rate = arrival_rate  # lambda
        self.switching_rate = switching_rate  # sigma
        self.mus = processing_rates # processing rates of each queue
        self.incur_holding_cost = incur_holding_cost

        # Costs
        # cost incurred when all queues are full
        self.packet_drop_cost = packet_drop_cost
        # cost incurred for turning on a single server
        self.switch_on_cost = switch_on_cost
        # cost incurred for turning off a single server
        self.switch_off_cost = switch_off_cost
        # cost incurred for signaling an off
        self.switch_off_signal_cost = switch_off_signal_cost
        # initial experiments with switch_off_signal_cost = 0; gave policies which tell to turn off now, but in the next subsequent task to keep it on as no switch off/on cost is incurred in between.
        # such kind of policies might confuse us during interpretation
        self.running_cost = running_cost  # cost incurred for running a single server

        # State = [X,S,task]
        # X:vector of queue lengths
        # S: Vector of server states(i.e. on/off)
        # task: "lb","nothing", "switching"
        # Initial state: all the queues are empty, all the servers are on
        self.init_state = (np.array([0.0] * no_of_qs),
                           np.array([1.0] * no_of_qs), "load_balancing")

        self.state = self.init_state

    def reset(self, s=None):
        if s:
            self.state = s
        else:
            self.state = self.init_state

        return self.state

    def get_state(self):
        # always use get_state(), so that we dont modify self.state accidentally
        return copy.deepcopy(self.state)

    def _loadbalancing_step(self, X, S, mode, U, V, info):
        packet_drop_cost = 0
        # assignment_cost = 0

        # Check if the action is valid
        if np.sum(U) not in [0, 1]:
            raise Exception("Invalid U component of the action.")
        if not (V == S).all():
            raise Exception("Invalid V component of the action.")

        if ((X == self.len_of_q) | (S == 0)).all():
            # All the queues are full or off, so incurring the packet drop cost
            if np.sum(U) != 0:
                raise Exception("Assignment cannot be done. Hence, U cannot be {U} when state is {S}".format(
                    U=U, S=self.get_state()))
            next_X = X
            next_S = S
            packet_drop_cost = self.packet_drop_cost

        elif sum(U) != 1:
            # if in loadbalancing mode and yet the assignment is not done
            raise Exception("No assignment done! U cannot be {U} when state is {S}".format(
                U=U, S=self.get_state()))

        else:
            # load balancing as per U
            i = U.tolist().index(1)
            if S[i] == 1 and X[i] < self.len_of_q:
                next_X = X+U
                next_S = S
                info["message"] = "added the job to queue " + str(i)
                # assignment_cost = next_X[i]
            else:
                # ith server is either closed or full
                next_X = X
                next_S = S
                info["message"] = "Queue " + \
                    str(i) + " is either closed or full"
                raise Exception("Queue " + str(i) +
                                " is either closed or full")

        return next_X, next_S, packet_drop_cost

    def _switching_step(self, X, S, mode, U, V, info):
        switch_cost = 0
        wrong_switch_cost = 0
        if np.sum(U) != 0:
            raise Exception("Invalid U component of the action.")

        next_X = X
        next_S = V

        # on cost only for the queues who have len=0 and S=0. if len is greater than 0, they are already on.
        on_cost = (((V-S) == 1) & (X == 0)).sum() * self.switch_on_cost

        # cost for signalling off
        off_signal_cost = ((S-V) == 1).sum() * self.switch_off_signal_cost

        # off only the queue whose len=0. else mark it with S=0. In the processing mode, check if S=0 and X=0 then apply turn off cost
        off_cost = (((S-V) == 1) & (X == 0)).sum() * self.switch_off_cost
        switch_cost = on_cost + off_cost + off_signal_cost

        info["message"] = "switched to V = " + str(V.tolist())
        info["switch_on_cost"] = on_cost
        info["switch_off_cost"] = off_cost
        info["switch_off_signal_cost"] = off_signal_cost

        return next_X, next_S, switch_cost

    def _processing_step(self, X, S, mode, U, V, info):

        if np.sum(U) != 0:
            raise Exception("Invalid U component of the action.")
        if not (V == S).all():
            raise Exception("Invalid V component in action.")

        i = int(mode.split("_")[1])
        U = np.zeros(self.no_of_qs)
        U[i] = -1  # reduce the length by 1 in the i-th queue
        next_X = X+U
        next_S = S

        info["message"] = "processed the job in queue " + str(i)
        switch_cost = 0
        if next_X[i] == 0 and S[i] == 0:
            # this means the server needs to be turned off now
            switch_cost = self.switch_off_cost
            info["switch_off_cost"] = switch_cost

        return next_X, next_S, switch_cost

    def _get_next_mode(self, X, info):
        # z is the vector of probabilities of each outcome. There are n+2 outcomes
        # z[0 to n-1] is the departure from one of the queues
        # z[n] is the loadbalancing
        # z[n+1] is the switching

        n = self.no_of_qs
        z = np.zeros(n+2)
        denom = self.arrival_rate + \
            np.sum(self.mus * (X > 0).astype(int)) + self.switching_rate

        # processing
        for i in range(n):
            indicator = int(X[i] > 0)
            z[i] = self.mus[i] * indicator / denom
        # load-balancing
        z[n] = self.arrival_rate / denom
        # switching
        z[n+1] = self.switching_rate / denom

        info["probability_vector"] = z.tolist()

        sampled_z = np.random.multinomial(1, z, size=1)[0].tolist().index(1)
        info["sampled_z"] = sampled_z

        if sampled_z == n:  # loadbalancing
            next_mode = "load_balancing"

        elif sampled_z == n+1:  # switching task
            next_mode = "switching"

        else:  # processing
            next_mode = "processing_" + str(sampled_z)

        return next_mode

    def step(self, action):
        # Action = [U,V].
        # U: one hot vector as to increase/decrease the queue length.
        # V: vector mentioning readyness to take new job
        state = self.get_state()
        X = state[0]
        S = state[1]
        mode = state[2]

        U = np.copy(action[0])
        V = np.copy(action[1])

        packet_drop_cost = 0
        switch_cost = 0

        done = False
        info = {}

        if mode == "load_balancing":
            next_X, next_S, packet_drop_cost = self._loadbalancing_step(
                X, S, mode, U, V, info)

        elif mode == "switching":
            next_X, next_S, switch_cost = self._switching_step(
                X, S, mode, U, V, info)

        else:
            # processing mode
            next_X, next_S, switch_cost = self._processing_step(
                X, S, mode, U, V, info)

        #########
        # costs #
        #########

        if self.incur_holding_cost:
            # total number of jobs in the system at this instant of time
            holding_cost = X.sum()
        else:
            holding_cost = 0

        # all servers which have a queue length greater than 1 are considered on (even if S=0. S=0 here only signifies that the queue is not taking new jobs.)
        # also servers whose X=0 but S=1 are considered on
        running_cost = self.running_cost * ((X > 0) | (S == 1)).sum()
        cost = holding_cost + packet_drop_cost + switch_cost + running_cost

        info["cost"] = {
            "holding_cost": holding_cost,
            "packet_drop_cost": packet_drop_cost,
            "switch_cost": switch_cost,
            "running_cost": running_cost
        }

        #############
        # Next mode #
        #############
        next_mode = self._get_next_mode(next_X, info)

        # Moving to the next state after simulation
        next_state = (next_X, next_S, next_mode)
        self.state = next_state

        return next_state, cost, done, info

    def get_all_states(self):
        states = []

        all_Xs = np.array(list(itertools.product(
            range(self.len_of_q+1), repeat=self.no_of_qs)))
        all_Ss = np.array(
            list(itertools.product([0, 1], repeat=self.no_of_qs)))
        all_combinations = list(itertools.product(all_Xs, all_Ss))

        # add all states with mode load_balancing
        for X, S in all_combinations:
            states.append((np.copy(X), np.copy(S), "load_balancing"))

        # add all states with mode switching
        for X, S in all_combinations:
            states.append((np.copy(X), np.copy(S), "switching"))

        # add all states with mode processing_*
        for X, S in all_combinations:
            for q_no, q_len in enumerate(X):
                if q_len > 0:
                    states.append((np.copy(X), np.copy(
                        S), "processing_{q_no}".format(q_no=q_no)))

        return states

    def get_all_possible_actions(self, state):
        actions = []

        X = np.copy(state[0])
        S = np.copy(state[1])
        mode = state[2]

        if mode == "load_balancing":
            if ((X == self.len_of_q) | (S == 0)).all():
                # all queues are either full or closed
                U = np.zeros(self.no_of_qs)
                V = np.copy(S)
                actions.append(np.array([U, V]))

            else:
                # add all actions possible under load_balancing
                for idx in range(self.no_of_qs):
                    U = np.zeros(self.no_of_qs)
                    if X[idx] < self.len_of_q and S[idx] == 1:
                        U[idx] = 1
                        V = np.copy(S)
                        actions.append(np.array([U, V]))

        elif mode == "switching":
            # add all actions possible under switching
            all_Vs = np.array(
                list(itertools.product([0, 1], repeat=self.no_of_qs)))
            for v in all_Vs:
                U = np.zeros(self.no_of_qs)
                V = np.copy(v)
                actions.append(np.array([U, V]))

        else:  # mode = processing_*
            # add only actions possible under processing_*
            U = np.zeros(self.no_of_qs)
            V = np.copy(S)
            actions.append(np.array([U, V]))

        return actions

    def sample_random_action(self, state):
        all_actions = self.get_all_possible_actions(state)
        idx = np.random.choice(np.arange(len(all_actions)))
        return all_actions[idx]


## Random run

In [None]:
np.random.seed(3)

no_of_qs = 3
len_of_q = 3
arrival_rate = 2
min_proc_rate = 0.5
max_proc_rate = 1.5
switching_rate = 0.7
processing_rates = np.random.uniform(low=min_proc_rate, high=max_proc_rate, size=no_of_qs)
packet_drop_cost = 500
switch_off_signal_cost = 5
switch_off_cost = 10
switch_on_cost = 20
running_cost = 30

env = Queues(no_of_qs, len_of_q, arrival_rate, switching_rate, processing_rates, packet_drop_cost, switch_on_cost, switch_off_cost, switch_off_signal_cost, running_cost, incur_holding_cost=True)

print("mus:", env.mus)
init_s = env.reset()
print("\n\nInit state:", init_s)

mus: [1.0507979  1.20814782 0.79090474]


Init state: (array([0., 0., 0.]), array([1., 1., 1.]), 'load_balancing')


In [None]:
trajectory = []
traj_return = 0
traj_len = 100
gamma = 0.99

for t in range(traj_len):
    a = env.sample_random_action(env.state)
    print("Time step:", t)
    s = env.get_state()
    print("State:", s)
    print("Action:", a.tolist())
    ns, c, d, i = env.step(a)
    r = -c
    traj_return += (gamma ** t) * r
    print("Next state:", ns)
    print("Cost:", c)
    print("Info:", i)
    print("=="* 75)
    trajectory.append((s,a,c,ns,i))
print("Trajectory return:", traj_return)

Time step: 0
State: (array([0., 0., 0.]), array([1., 1., 1.]), 'load_balancing')
Action: [[1.0, 0.0, 0.0], [1.0, 1.0, 1.0]]
Next state: (array([1., 0., 0.]), array([1., 1., 1.]), 'load_balancing')
Cost: 90.0
Info: {'message': 'added the job to queue 0', 'cost': {'holding_cost': 0.0, 'packet_drop_cost': 0, 'switch_cost': 0, 'running_cost': 90}, 'probability_vector': [0.2801531647048485, 0.0, 0.0, 0.5332198779964085, 0.18662695729874296], 'sampled_z': 3}
Time step: 1
State: (array([1., 0., 0.]), array([1., 1., 1.]), 'load_balancing')
Action: [[0.0, 1.0, 0.0], [1.0, 1.0, 1.0]]
Next state: (array([1., 1., 0.]), array([1., 1., 1.]), 'load_balancing')
Cost: 91.0
Info: {'message': 'added the job to queue 1', 'cost': {'holding_cost': 1.0, 'packet_drop_cost': 0, 'switch_cost': 0, 'running_cost': 90}, 'probability_vector': [0.2118994562163204, 0.2436299749118876, 0.0, 0.40331153249762375, 0.1411590363741683], 'sampled_z': 3}
Time step: 2
State: (array([1., 1., 0.]), array([1., 1., 1.]), 'load_ba