In [2]:
import numpy as np
import scipy as sp
import scipy.stats as sps
import matplotlib.pyplot as plt

## 1

![](img/task1.png)

In [3]:
def exact(q: int, n: int) -> tuple[float, float]:
    cdf = sps.gamma(a=n, scale=1/(n*q)).cdf
    mean = 1/q
    sigma = 1/q
    std  = sigma / np.sqrt(n)
    
    return 1 - cdf(mean + 2 * std), cdf(mean - 2 * std)

In [4]:
exact(2, 10)

(0.03685411175968156, 0.004634706298986589)

## 2

![](img/task2.png)

In [5]:
import random
import math
from queue import deque

random.seed(2)

class Exp:
    start_time = 9 * 60
    end_time = 16 * 60
    
    lambd = 1/2
    a, b = 15, 45
    num_workers = 1
    
    def gen_work_time():
        return random.uniform(Exp.a, Exp.b)

def generate_customers(start_time, end_time, lambd):
    customers_arrival_times_deque = deque()
    time_arrival = start_time
    while time_arrival <= end_time:
        time_arrival += random.expovariate(lambd)
        customers_arrival_times_deque.append(time_arrival)
    customers_arrival_times_deque.pop()
    
    return customers_arrival_times_deque

def insert_to_sorted_deq(deq, value):
    idx = 0
    for elem in deq:
        if elem > value:
            break
        idx += 1
        
    deq.insert(idx, value)
    
def get_next_event(event_deq_list, max_time=10**9):
    min_idx = None
    min_time = max_time
    for idx, deq in enumerate(event_deq_list):
        if deq and deq[0] < min_time:
            min_time = deq[0]
            min_idx = idx
            
    return min_idx, min_time
    
def experiment():    
    time = Exp.start_time

    customers_arrival_times_deque = generate_customers(Exp.start_time, Exp.end_time, Exp.lambd)
        
    customers_queue_size = 0
    workers_finish_times_deque = deque()

    while time <= Exp.end_time:
        event_type, event_time = get_next_event((customers_arrival_times_deque, workers_finish_times_deque))
        
        if event_time > Exp.end_time:
            break
        
        if event_type == 0:
            customers_arrival_times_deque.popleft()
            
            if len(workers_finish_times_deque) < Exp.num_workers:
                insert_to_sorted_deq(workers_finish_times_deque, event_time + Exp.gen_work_time())
            else:
                customers_queue_size += 1
                
        elif event_type == 1:
            workers_finish_times_deque.popleft()
            if customers_queue_size > 0 and len(workers_finish_times_deque) < Exp.num_workers:
                customers_queue_size -= 1
                insert_to_sorted_deq(workers_finish_times_deque, event_time + Exp.gen_work_time())
                
        elif event_type is None:
            break
        
        time = event_time
    
    return customers_queue_size, workers_finish_times_deque
        

def closing_time(outcome):
    customers_queue_size, workers_finish_times_deque = outcome
    
    result_time = Exp.end_time
    while workers_finish_times_deque:
        result_time = workers_finish_times_deque.popleft()
        if customers_queue_size > 0:
            insert_to_sorted_deq(workers_finish_times_deque, result_time + Exp.gen_work_time())
            customers_queue_size -= 1
            
    return result_time - Exp.end_time
        
        

def monte_carlo(experiment,
                closing_time,
                number_of_trials):
    results = [0] * number_of_trials
    
    for idx in range(number_of_trials):
        results[idx] = closing_time(experiment())
        
    return math.fsum(results) / number_of_trials

In [6]:
monte_carlo(experiment, closing_time, 10000)  

5881.979028194062

## 3

![](img/task3.png)

In [8]:
import numpy as np
import numba

@numba.njit(cache=True, inline='always')
def bg_nbd(a: int, b: int, c: int, d: int, time_limit: int) -> int:
    p_u = np.random.beta(a, b)
    q_u = np.random.gamma(c, 1/d)
    
    time = 0
    act_num = 0
    
    time_since_last_act = np.random.exponential(1/q_u)
    prob_alive_at_0 = (1 - p_u) * np.exp(-q_u * time_since_last_act) / (p_u + (1 - p_u) * np.exp(-q_u * time_since_last_act))
    
    if prob_alive_at_0 < np.random.rand():
        return 0
    
    max_act_num = np.random.geometric(p_u)
    for _ in range(max_act_num):
        time += np.random.exponential(1/q_u)
        
        if time > time_limit:
            break
        
        act_num += 1
            
    return act_num
    
    
@numba.njit(cache=True, parallel=True)
def mean(a: int, b: int, c: int, d: int, number_of_trials: int = 1_000_000) -> float:
    act_num_sum = 0
    for _ in range(number_of_trials):
        act_num_sum += bg_nbd(a, b, c, d, time_limit=10)
        
    return act_num_sum / number_of_trials

In [9]:
mean(2, 20, 2, 2, 1_000_000)

4.962898

## 4

![](img/task4.png)

In [10]:
import numpy as np
import math

def pmf(x: float, p: float, q: float, k: int) -> float:
    result = (1 - p)**k * (q * x)**k * np.exp(-q * x) / math.factorial(k)
    if k > 0:
        result += p * (1 - p) ** (k - 1) * (1 - np.exp(-q * x) * sum((q * x)**m / math.factorial(m) for m in range(k)))
        
    return result


def p_alive_at_0(t: float, p: float, q: float) -> float:
    return (1 - p) * np.exp(-q * t) / (p + (1 - p) * np.exp(-q * t))

def corrected_pmf(t: float, p: float, q: float, k: int) -> float:
    pmf_k = pmf(10, p, q, k)
    p_alive = p_alive_at_0(t, p, q)
    
    if k == 0:
        return (1 - p_alive) 
    else:
        return p_alive * pmf_k
    
def activities(t: float, p: float, q: float, summands: int = 20) -> float:
    return sum(k * corrected_pmf(t, p, q, k) for k in range(summands))

In [11]:
print(activities(5, 0.15, 0.2))
print(activities(0.5, 0.1, 0.3))
print(activities(20, 0.5, 0.1))

1.1677247235122066
2.29548717086201
0.09380539017673797


## 5

![](img/task5a.png)
![](img/task5b.png)

In [None]:
import numpy as np


class HiddenMarkovModel:
    EPS = 1e-12
    
    def __init__(self, n_states, n_emissions, B=None):
        self.n_states = n_states
        self.n_emissions = n_emissions
        
        self.A = np.random.rand(n_states, n_states)
        self.A /= self.A.sum(axis=1, keepdims=True)
        
        if B is None:
            self.fit_B = True
            self.B = np.random.rand(n_states, n_emissions)
            self.B /= self.B.sum(axis=1, keepdims=True)
        else:
            self.fit_B = False
            self.B = B
        
        self.pi = np.random.rand(n_states)
        self.pi /= self.pi.sum()

    def forward(self, observations):
        T = len(observations)
        alpha = np.zeros((T, self.n_states))
        scaling = np.zeros(T)
        
        alpha[0] = self.pi * self.B[:, observations[0]]
        scaling[0] = alpha[0].sum()
        alpha[0] /= scaling[0] + self.EPS
        
        for t in range(1, T):
            alpha[t] = (alpha[t-1] @ self.A) * self.B[:, observations[t]]
            scaling[t] = alpha[t].sum()
            alpha[t] /= scaling[t] + self.EPS
        
        return alpha, scaling

    def backward(self, observations, scaling):
        T = len(observations)
        beta = np.zeros((T, self.n_states))
        beta[-1] = 1 / (scaling[-1] + self.EPS)
        
        for t in range(T-2, -1, -1):
            beta[t] = (self.A @ (self.B[:, observations[t+1]] * beta[t+1])) / (scaling[t] + self.EPS)
        
        return beta

    def compute_gamma_xi(self, observations, alpha, beta):
        T = len(observations)
        gamma = alpha * beta
        xi = np.zeros((T-1, self.n_states, self.n_states))
        
        for t in range(T-1):
            obs_next = observations[t+1]
            xi[t] = alpha[t, :, None] * self.A * self.B[:, obs_next] * beta[t+1]
            xi_sum = xi[t].sum()
            xi[t] /= xi_sum + self.EPS
        
        return gamma, xi

    def update_parameters(self, observations, gamma, xi):
        # Update transition matrix A
        num_A = np.sum(xi, axis=0)
        den_A = np.sum(gamma[:-1], axis=0, keepdims=True).T
        self.A = num_A / (den_A + self.EPS)
        self.A /= self.A.sum(axis=1, keepdims=True)
        
        # Update emission matrix B
        if self.fit_B:
            obs_matrix = np.eye(self.n_emissions)[observations]
            num_B = gamma.T @ obs_matrix
            den_B = gamma.sum(axis=0)[:, None]
            self.B = num_B / (den_B + self.EPS)
            self.B /= self.B.sum(axis=1, keepdims=True)
        
        # Update initial distribution
        self.pi = gamma[0] / (gamma[0].sum() + self.EPS)
        self.pi = np.nan_to_num(self.pi, nan=1/self.n_states)
        self.pi /= self.pi.sum()

    def fit(self, observations, max_iter=200, tol=1e-6):
        prev_log_likelihood = -np.inf
        
        for _ in range(max_iter):
            alpha, scaling = self.forward(observations)
            beta = self.backward(observations, scaling)
            
            log_likelihood = np.sum(np.log(scaling + self.EPS))
            if abs(log_likelihood - prev_log_likelihood) < tol:
                break
            prev_log_likelihood = log_likelihood
            
            gamma, xi = self.compute_gamma_xi(observations, alpha, beta)
            self.update_parameters(observations, gamma, xi)
            
        return self

def train_hmm(observations, n_states, n_emissions, B=None, restarts=1, eps=1e-12):
    best_model = None
    best_log_likelihood = -np.inf
    
    for _ in range(restarts):
        model = HiddenMarkovModel(n_states, n_emissions, B)
        model.fit(observations)
        
        _, scaling = model.forward(observations)
        log_likelihood = np.sum(np.log(scaling + eps))
        
        if log_likelihood > best_log_likelihood:
            best_model = model
            best_log_likelihood = log_likelihood
    
    return best_model

In [13]:
def generate(n: int) -> np.array: ...

In [None]:
observations = np.random.choice(3, (1000, ))

In [None]:
np.random.seed(0)
emit_prob = np.array([
    [0.1, 0.4, 0.5],
    [0.6, 0.3, 0.1]  
])
# observations = generate(10000)
hmm = train_hmm(observations, n_states=2, n_emissions=3, B=emit_prob, restarts=1)


# print("Transition matrix:\n", hmm.A.round(3))
# print("\nEmission matrix:\n", hmm.B.round(3))
# print("\nInitial distribution:\n", hmm.pi.round(3))

In [None]:
stat_prob = np.array([hmm.A[1, 0], hmm.A[0, 1]])
stat_prob /= stat_prob.sum()
print(stat_prob[0])