In [1]:
import numpy as np
from matplotlib import pyplot as plt
from scipy.stats import expon, poisson, gamma, chi2, chisquare

In [2]:
'''
Simulate one trajectory of the process N_t(ω) ∣ t ∈ (0,10].
Illustrate the trajectory graphically and describe the principle of generating this trajectory.
'''
lambda_ = 10
shape_ = 2
scale_ = 1 / 4

def plot_trajectory(process):
  plt.plot(process[:, 0], np.cumsum(process[:, 1]))
  plt.title("Random process M∣G∣∞: 10 sec")
  plt.xlabel('Time')
  plt.ylabel('Number of clients')

def create_trajectory(T):
  process = [[0, 0]]
  t = 0

  while t < T:
    t_delta = expon.rvs(scale=1/lambda_)
    if t + t_delta > T:
      break

    process.append([t + t_delta, 1])
    t_service = gamma.rvs(shape_, scale=scale_)
    if t + t_delta + t_service <= T:
      process.append([t + t_delta + t_service, -1])
    t += t_delta

  process = np.array(process)
  return process[np.argsort(process[:, 0])]

In [3]:
'''
Simulate n = 500 independent trajectories for t∈(0,100).
Based on these simulations, estimate the distribution of the random variable N_100.
'''

def plot_simulated_trajectories(res, N=500, T=100):
  plt.plot(np.arange(N), res)
  plt.title("Simulation of 500 independent trajectories M∣G∣∞: 100 sec")
  plt.xlabel('Attempt')
  plt.ylabel(f'Number of clients after {T} seconds in the system')

def simulate_trajectories(N=500, T=100):
  processes_results = np.zeros(N)
  for i in np.arange(N):
    trajectory = create_trajectory(T)
    processes_results[i] = np.sum(trajectory[:, 1])

  return processes_results

##############################################################

def plot_clients_distr(distr, T=100):
  plt.bar(np.arange(len(distr)), distr)
  plt.title(f"Distribution of the number of clients after {T} seconds")
  plt.xticks(np.arange(len(distr)))
  plt.xlabel('Number of clients')
  plt.ylabel('Probability')

def find_clients_distr(res):
  Ni = np.arange(int(np.max(res)+1))
  for i in Ni:
    Ni[i] = np.sum(res == i)

  return Ni / np.sum(Ni)

##############################################################

def plot_poisson_distr(len_, distr):
  plt.bar(np.arange(len_), distr)
  plt.title("Poisson's ditribution with intensity 5")
  plt.xticks(np.arange(len_))
  plt.xlabel('Number of clients')
  plt.ylabel('Probability')

def poisson_distr(intensity, len_poisson_probs, size_processes_results):
  poisson_probs = np.zeros(size_processes_results, dtype=float)
  for i in np.arange(len_poisson_probs):
    poisson_probs[i] = poisson.pmf(i, 5.0)
  poisson_probs[-1] = 1 - np.sum(poisson_probs[:-1])

  return poisson_probs

In [4]:
'''
Discuss the limiting distribution of this system for t→∞ (see lecture 23).
Using an appropriate test at the 5% significance level, check whether the simulation results for N_100 correspond to this distribution.
'''

# Pearson's chi-squared test
class PearsonsTest:
  def __init__(self, clients_distr, poisson_distr):
    self.clients_distr = clients_distr
    self.poisson_distr = poisson_distr
    self.sets_array = []

  def normalize_table(self):
    while any(poisson_prob < 5 for _, _, poisson_prob in self.sets_array):
      sorted_array = sorted(self.sets_array, key=lambda x: x[1])
      smallest = sorted_array[:2]

      combined_letter = smallest[0][0] + smallest[1][0]
      combined_expected = smallest[0][1] + smallest[1][1]
      combined_actual = smallest[0][2] + smallest[1][2]

      self.sets_array.append((combined_letter, combined_expected, combined_actual))

      self.sets_array.remove(smallest[0])
      self.sets_array.remove(smallest[1])

    print(self.sets_array)

  def test(self, alpha, normalize_flag):
    for num_client, (client_p, possion_p) in enumerate(zip(self.clients_distr, self.poisson_distr)):
      self.sets_array.append((num_client, client_p, possion_p))

    print(self.sets_array)
    if normalize_flag:
      self.normalize_table()

    test_val = 0.0
    for _, np, N in self.sets_array:
      test_val += (N - np)**2 / np

    ddf = len(self.sets_array) - 1
    critical_value= chi2.isf(alpha, ddf)
    p_value = chi2.sf(test_val, ddf)
    print(f'Manual values: ddf: {ddf}, p_value: {p_value}, test_val: {test_val}, critical_value: {critical_value}')

    print(chisquare([s[2] for s in self.sets_array], [s[1] for s in self.sets_array]))

In [5]:
N = 500
############# 1. task #############
process = create_trajectory(10.0)
#plot_trajectory(process)
###################################

############# 2. task #############
res = simulate_trajectories()
#plot_simulated_trajectories(res)

clients_distr = find_clients_distr(res)
#plot_clients_distr(clients_distr)

'''
How to calculate intensity
a = 4, p = 2, λ = 10

ES = 1/μ = p/a
μ = 1/ES = a/p = 4/2 = 2
λ/μ = 5
'''
p_distr = poisson_distr(5.0, len(clients_distr), int(np.max(res)+1))
#plot_poisson_distr(len(clients_distr), p_distr)
###################################

############# 3. task #############
pearson_test = PearsonsTest(clients_distr * N, p_distr * N)
pearson_test.test(0.05, True)
###################################

[(0, 4.0, 3.3689734995427334), (1, 21.0, 16.84486749771367), (2, 41.0, 42.11216874428416), (3, 61.0, 70.18694790714025), (4, 80.0, 87.73368488392532), (5, 91.0, 87.73368488392533), (6, 69.0, 73.1114040699377), (7, 60.0, 52.22243147852697), (8, 41.0, 32.63901967407933), (9, 16.0, 18.132788707821856), (10, 10.0, 9.066394353910926), (11, 4.0, 4.1210883426867815), (12, 2.0, 2.7265459565049555)]
[(1, 21.0, 16.84486749771367), (2, 41.0, 42.11216874428416), (3, 61.0, 70.18694790714025), (4, 80.0, 87.73368488392532), (5, 91.0, 87.73368488392533), (6, 69.0, 73.1114040699377), (7, 60.0, 52.22243147852697), (8, 41.0, 32.63901967407933), (9, 16.0, 18.132788707821856), (10, 10.0, 9.066394353910926), (23, 10.0, 10.21660779873447)]
Manual values: ddf: 10, p_value: 0.7774782620904389, test_val: 6.435121430770229, critical_value: 18.30703805327515
Power_divergenceResult(statistic=6.435121430770229, pvalue=0.7774782620904389)
