In [1]:
import warnings
from abc import ABC, abstractmethod

warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=RuntimeWarning)

In [2]:
import math as m
import numpy as np
import pandas as pd

factorial_vectorized = np.vectorize(m.factorial)

In [14]:
class BaseAnalyzer(ABC):
    MO_DESCRIPTION = pd.DataFrame(
        data=[
            ('СМО', 'Система масового обслуговування'),
            ('МО', 'Мережа обслуговування (складається з n-ї кількості СМО)'),
            ('lambda', 'Інтенсивність надходження вимог до СМО'),
            ('u', 'Iнтенсивність обслуговування в каналі СМО'),
            ('r', 'Kількість каналів у СМО'),
            ('e', 'Коефіцієн передачі до СМО'),
            ('p(k)', 'Ймовірність, що в СМО знаходиться k вимог'),
            ('n', 'Сумарна кількість вимог в МО'),
            ('C', 'Нормуючий множник'),
            ('L', 'Середня кількість вимог у СМО'),
            ('R', 'Середня кількість зайнятих пристроїв у СМО'),
            ('M', 'Cередня кількість вимог, що перебувають у СМО'),
            ('Q', 'Cередній час очікування в СМО'),
            ('T', 'Середній час перебування вимог у СМО'),
            ('t', 'Cередній час перебування вимог у мережі МО'),
        ],
        columns=['Variable', 'Description']
    )

    def __init__(self, smo_e: list[float], smo_u: list[float], smo_r: list[int]):
        self.mo = pd.DataFrame(
            data=np.array([
                smo_e,
                smo_u,
                smo_r
            ]).T,
            index=[f'smo_{i}' for i in range(len(smo_e))],
            columns=['e', 'u', 'r']
        )
        self.mo['r'] = self.mo['r'].astype(int)

    @staticmethod
    @abstractmethod
    def calc_p(row, k_: int) -> float:
        pass

    @staticmethod
    @abstractmethod
    def calc_effective_value(self) -> None:
        pass

In [15]:
BaseAnalyzer.MO_DESCRIPTION

Unnamed: 0,Variable,Description
0,СМО,Система масового обслуговування
1,МО,Мережа обслуговування (складається з n-ї кільк...
2,lambda,Інтенсивність надходження вимог до СМО
3,u,Iнтенсивність обслуговування в каналі СМО
4,r,Kількість каналів у СМО
5,e,Коефіцієн передачі до СМО
6,p(k),"Ймовірність, що в СМО знаходиться k вимог"
7,n,Сумарна кількість вимог в МО
8,C,Нормуючий множник
9,L,Середня кількість вимог у СМО


In [4]:
class NotClosedAnalyzer(BaseAnalyzer):
    def __init__(self, smo_e: list[float], smo_u: list[float], smo_r: list[int], lambda_0: float):
        super().__init__(smo_e, smo_u, smo_r)
        if not self.is_consistent(smo_e, smo_u, smo_r, lambda_0):
            self.mo['r_min'] = self.mo.apply(lambda row: m.ceil(row['e'] * lambda_0 / row['u']), axis=1)
            raise ValueError(f'Queue is not consistent. Min number of channels: {self.mo["r_min"].tolist()}')

        self.lambda_0 = lambda_0
        self.mo['lambda'] = self.mo.apply(lambda row: row['e'] * lambda_0, axis=1)

    @staticmethod
    def is_consistent(smo_e: list[float], smo_u: list[float], smo_r: list[int], lambda_0: float) -> bool:
        for e_i, u_i, r_i in zip(smo_e, smo_u, smo_r):
            if u_i * r_i / e_i <= lambda_0:
                return False
        return True

    @staticmethod
    def calc_p(row, k_: int) -> float:
        return (
            (row['lambda'] / row['u']) ** k_ *
            row['C'] * 1 / m.factorial(k_) if k_ <= row['r'] else
            1 / (m.factorial(int(row['r'])) * row['r'] ** (k_ - 1))
        )

    @staticmethod
    def calc_c(row) -> float:
        lambda_ = row['lambda']
        u_i = row['u']
        r_i = int(row['r'])

        return (
                (lambda_ / u_i) ** r_i *
                1 / (factorial_vectorized(r_i) * (1 - lambda_ / (u_i * r_i))) +
                sum((lambda_ / u_i) ** k * 1 / factorial_vectorized(k) for k in range(r_i))
        ) ** -1

    def calc_effective_value(self):
        self.mo['C'] = self.mo.apply(self.calc_c, axis=1)
        self.mo['L'] = self.mo.apply(
            lambda row: sum((j - row['r']) * self.calc_p(row, j) for j in range(int(row['r']) + 1, 1000)),
            axis=1
        )
        self.mo['R'] = self.mo['lambda'] / self.mo['u']
        self.mo['M'] = self.mo['L'] + self.mo['R']
        self.mo['Q'] = self.mo['L'] / self.mo['lambda']
        self.mo['T'] = self.mo['M'] / self.mo['lambda']

    @property
    def t(self):
        return (self.mo['T'] * self.mo['e']).sum()

![title](imgs/task_1.png)

In [8]:
u1 = 0.2
u2 = 1 / 2
e1 = 1.25
e2 = (3 / 4) * e1 + (1 / 2)

analyzer = NotClosedAnalyzer(
    smo_e=[e1, e2],
    smo_u=[u1, u2],
    smo_r=[4, 2],
    lambda_0=0.6,
)

analyzer.calc_effective_value()
print(f"Середній час перебування в системі: {analyzer.t} c")
analyzer.mo

Середній час перебування в системі: 9.958815586419753 c


Unnamed: 0,e,u,r,lambda,C,L,R,M,Q,T
smo_0,1.25,0.2,4,0.75,0.006561,0.000289,3.75,3.750289,0.000386,5.000386
smo_1,1.4375,0.5,2,0.8625,0.073826,0.5,1.725,2.225,0.57971,2.57971


In [6]:
class ClosedAnalyzer(BaseAnalyzer):
    def __init__(self, smo_e: list[float], smo_u: list[float], smo_r: list[int], n: int):
        super().__init__(smo_e, smo_u, smo_r)
        self.n = n
        self.c_n = self.calc_c()

    @staticmethod
    def calc_p(row, k_: int) -> float:
        r_ = int(row[2])
        return (
            (row[0] / row[1]) ** k_ *
            (1 / m.factorial(k_)) if k_ <= r_ else 1 / (m.factorial(r_) * r_ ** (k_ - 1))
        )

    def calc_c(self) -> float:
        def _calc_sum_of_p_of_k(n: int, rows: list) -> float:
            if len(rows) == 1:
                return self.calc_p(rows[0], n)
            return sum(self.calc_p(rows[0], i) * _calc_sum_of_p_of_k(n - i, rows[1:]) for i in range(n + 1))

        return _calc_sum_of_p_of_k(self.n, self.mo.values.tolist()) ** -1

    def calc_p_smo_1(self, j: int) -> float:
        p1 = self.calc_p(self.mo.iloc[0], j)
        p2 = self.calc_p(self.mo.iloc[1], self.n - j)
        return self.c_n * p1 * p2

    def calc_p_smo_2(self, j: int) -> float:
        p1 = self.calc_p(self.mo.iloc[0], self.n - j)
        p2 = self.calc_p(self.mo.iloc[1], j)
        return self.c_n * p1 * p2

    def calc_effective_value(self):
        self.mo['L'] = self.mo.apply(
            lambda row: sum(
                (j - row['r']) * self.calc_p_smo_1(j) if row.name == 'smo_0' else self.calc_p_smo_2(j)
                for j in range(int(row['r']) + 1, self.n + 1)
            ), axis=1
        )
        self.mo['R'] = self.mo.apply(
            lambda row: row['r'] - sum(
                (row['r'] - j) * self.calc_p_smo_1(j) if row.name == 'smo_0' else self.calc_p_smo_2(j)
                for j in range(int(row['r']) - 1)
            ), axis=1
        )
        self.mo['M'] = self.mo['L'] + self.mo['R']
        self.mo['lambda'] = self.mo['R'] * self.mo['u']
        self.mo['T'] = self.mo['M'] / self.mo['lambda']
        self.mo['Q'] = self.mo['L'] / self.mo['lambda']

![title](imgs/task_2.png)

In [11]:
N = 20

u1 = 1 / 0.8
u2 = 1 / 0.3

e1 = 1
e2 = 1 / 0.8

analyzer = ClosedAnalyzer(
    smo_e=[e1, e2],
    smo_u=[u1, u2],
    smo_r=[1, 3],
    n=N
)
analyzer.calc_effective_value()

analyzer.mo

Unnamed: 0,e,u,r,L,R,M,lambda,T,Q
smo_0,1.0,1.25,1,18.601152,1.0,19.601152,1.25,15.680921,14.880921
smo_1,1.25,3.333333,3,0.006327,2.060382,2.06671,6.86794,0.300921,0.000921
