# 02407 Stochastic Processes exam 2025

### Question 1

This is a model for the development of cases

\begin{equation}
X_{n+1} = \min\{6,X_n-D_n+A_n\}
\end{equation}

### Question 2

The following is the transition density matrix

In [1]:
import numpy as np
import math
from scipy.stats import binom, poisson

# -----------------------------
# Model parameters
# -----------------------------
CAP = 6
lam_A = 2.5       
p_complete = 1/3   

states = range(CAP + 1)

def build_transition_matrix():
    P = np.zeros((CAP + 1, CAP + 1))

    for i in states:
        for d in range(i + 1):
            p_d = binom.pmf(d, i, p_complete)
            r = i - d  #remaining after completions

            
            for j in range(CAP):
                a_needed = j - r
                if a_needed >= 0:
                    P[i, j] += p_d * poisson.pmf(a_needed, lam_A)

            # j = 6 (capacity)
            P[i, CAP] += p_d * poisson.sf(CAP - r - 1, lam_A)

    return P


Now we find the probability of having 5 cases under investigation in 2030 (ie. 5 years from initial point)

In [2]:

def prob_at_least_5_start_2030(x0=2):
    P = build_transition_matrix()

    # initial distribution (start of 2025)
    pi = np.zeros(CAP + 1)
    pi[x0] = 1.0

    # propagate 5 years
    for _ in range(5):
        pi = pi @ P

    return pi[5] + pi[6], pi, P

if __name__ == "__main__":
    prob, pi_2030, P = prob_at_least_5_start_2030()

    np.set_printoptions(precision=6, suppress=True)
    print("P(X_2030 >= 5 | X_2025 = 2) =", prob)
    print("Distribution at start of 2030:", pi_2030)


P(X_2030 >= 5 | X_2025 = 2) = 0.7400685690396941
Distribution at start of 2030: [0.001317 0.00935  0.033331 0.078881 0.137052 0.182394 0.557674]


### Question 3

Probality of hitting the absorbing state in 10 steps (ie. having one year with zero cases)

In [3]:
P_trans = build_transition_matrix()
def prob_hit_zero_within_T(P, x0=2, T=10):

    # Make 0 absorbing
    P_abs = P.copy()
    P_abs[0, :] = 0.0
    P_abs[0, 0] = 1.0

    # Initial distribution concentrated at x0
    pi0 = np.zeros(CAP + 1)
    pi0[x0] = 1.0

    # After T steps, probability mass in state 0 equals P(hit 0 by time T)
    
    piT = pi0 @ np.linalg.matrix_power(P_abs, T)
    return piT[0]

print(prob_hit_zero_within_T(P_trans,x0=2, T=10))


0.022642632237199575


This can also be done with PH distributions

In [4]:
P = P_trans

# Make 0 absorbing (same as your other method)
P_abs = P.copy()
P_abs[0, :] = 0.0
P_abs[0, 0] = 1.0

T = 10
x0 = 3

S = P_abs[1:, 1:]                 # transient-to-transient
alpha = np.zeros(S.shape[0])
alpha[x0 - 1] = 1.0

survive_10 = alpha @ np.linalg.matrix_power(S, T) @ np.ones(S.shape[0])
hit_by_10  = 1.0 - survive_10

# Your other method
pi0 = np.zeros(P.shape[0]); pi0[x0] = 1.0
piT = pi0 @ np.linalg.matrix_power(P_abs, T)
hit_by_10_other = piT[0]

survive_10, hit_by_10, hit_by_10_other


(0.9858050039366757, 0.014194996063324306, 0.014194996063325287)

### Question 4

In [8]:
import numpy as np

# -----------------------------
# Model parameters
# -----------------------------
CAP_4 = 7
lam_A = 2.5       
p_complete = 1/3   

states_q4 = range(CAP_4 + 1)

def build_transition_matrix_q4():
    P = np.zeros((CAP_4 + 1, CAP_4 + 1))

    for i in states_q4:
        for d in range(i + 1):
            p_d = binom.pmf(d, i, p_complete)
            r = i - d  #remaining after completions

            
            for j in range(CAP_4):
                a_needed = j - r
                if a_needed >= 0:
                    P[i, j] += p_d * poisson.pmf(a_needed, lam_A)

            # j = 6 (capacity)
            P[i, CAP_4] += p_d * poisson.sf(CAP_4 - r - 1, lam_A)

    return P

P_trans_q4 = build_transition_matrix_q4()



def mean_time_to_hit_state(P, target=7):
    """
    Expected number of steps to FIRST hit `target`, starting from each state.

    Solves the first-step equations:
        v[target] = 0
        v[i] = 1 + sum_j P[i,j] * v[j]  for i != target

    Parameters
    ----------
    P : (n,n) array_like
        Transition matrix.
    target : int
        Target (absorbing/hitting) state.

    Returns
    -------
    v : (n,) np.ndarray
        v[i] = E[T | X0 = i], where T = first hitting time of `target`.
    """
    P = np.asarray(P, dtype=float)
    n = P.shape[0]
    if P.shape != (n, n):
        raise ValueError("P must be a square matrix.")
    if not (0 <= target < n):
        raise ValueError("target must be a valid state index.")
    
    # Non-target states
    S = [i for i in range(n) if i != target]

    # Build linear system: (I - Q) v_S = 1
    # where Q is P restricted to transitions among non-target states
    Q = P[np.ix_(S, S)]
    A = np.eye(len(S)) - Q
    b = np.ones(len(S))

    try:
        v_S = np.linalg.solve(A, b)
    except np.linalg.LinAlgError as e:
        raise np.linalg.LinAlgError(
            "Linear system is singular. This typically means the expected hitting time "
            "is infinite for at least some starting states (target may not be reached w.p. 1)."
        ) from e

    v = np.zeros(n)
    v[target] = 0.0
    v[S] = v_S
    return v

# Convenience: mean time from a specific start state
def mean_time_from_state(P, start, target=6):
    v = mean_time_to_hit_state(P_trans_q4, target=target)
    return float(v[start])
# P is your 7x7 numpy array
v = mean_time_to_hit_state(P, target=6)

print("Mean time to hit state 6 from each start state:")
for i, vi in enumerate(v):
    print(f"start {i}: {vi:.6f} steps")

print("From state 2:", mean_time_from_state(P_trans_q4, start=3, target=7))


Mean time to hit state 6 from each start state:
start 0: 4.071062 steps
start 1: 3.738604 steps
start 2: 3.362915 steps
start 3: 2.948439 steps
start 4: 2.516543 steps
start 5: 2.104127 steps
start 6: 0.000000 steps
From state 2: 4.153650467453991


In [7]:
import numpy as np

def mean_and_var_to_hit_state(P, target=7):
    """
    Returns:
      m[i]   = E[T | X0=i]
      var[i] = Var(T | X0=i)
    where T is the first hitting time of `target`.
    """
    P = np.asarray(P, dtype=float)
    n = P.shape[0]
    if P.shape != (n, n):
        raise ValueError("P must be a square matrix.")
    if not (0 <= target < n):
        raise ValueError("target must be a valid state index.")

    # Non-target states
    S = [i for i in range(n) if i != target]
    print(S)
    Q = P[np.ix_(S, S)]               # transitions among non-target states
    print(Q)
    I = np.eye(len(S))

    # Solve for mean: (I - Q) m_S = 1
    A = I - Q
    ones = np.ones(len(S))
    try:
        m_S = np.linalg.solve(A, ones)
    except np.linalg.LinAlgError as e:
        raise np.linalg.LinAlgError(
            "Singular system for mean. Expected hitting time may be infinite "
            "(target not reached with probability 1 from some states)."
        ) from e

    # Solve for second moment: (I - Q) s_S = 1 + 2 Q m_S
    rhs_s = ones + 2.0 * (Q @ m_S)
    try:
        s_S = np.linalg.solve(A, rhs_s)
    except np.linalg.LinAlgError as e:
        raise np.linalg.LinAlgError(
            "Singular system for second moment. Variance may be infinite."
        ) from e

    # Assemble full vectors
    m = np.zeros(n)
    s = np.zeros(n)
    m[target] = 0.0
    s[target] = 0.0
    m[S] = m_S
    s[S] = s_S

    var = s - m**2
    return m, var

# Example usage:
m, var = mean_and_var_to_hit_state(P_trans_q4, target=7)
print("Mean from each state:", m)
print("Variance from each state:", var)
print("Std dev from state 2:", var[2])


[0, 1, 2, 3, 4, 5, 6]
[[0.082085 0.205212 0.256516 0.213763 0.133602 0.066801 0.027834]
 [0.027362 0.123127 0.222314 0.242265 0.187043 0.111335 0.053812]
 [0.009121 0.059284 0.15619  0.228964 0.223857 0.161807 0.092161]
 [0.00304  0.025842 0.091586 0.180448 0.227262 0.203174 0.138591]
 [0.001013 0.010641 0.047756 0.121206 0.196052 0.219232 0.181646]
 [0.000338 0.004222 0.023013 0.07224  0.146155 0.203779 0.206704]
 [0.000113 0.001633 0.010486 0.039422 0.096878 0.165363 0.204754]]
Mean from each state: [5.290231 4.960206 4.584017 4.15365  3.670172 3.152187 2.637321 0.      ]
Variance from each state: [9.561025 9.459976 9.304508 9.03581  8.549615 7.732841 6.554548 0.      ]
Std dev from state 2: 9.30450752162755


### Question 5

In [10]:
import numpy as np

def prob_hit_a_before_b(P, a=0, b=6, start=2):
    """
    u_i = P(hit a before b | X0=i) in a finite DTMC with transition matrix P.
    Returns u_start.
    """
    P = np.asarray(P, dtype=float)
    n = P.shape[0]
    if P.shape != (n, n):
        raise ValueError("P must be square.")
    if not (0 <= a < n and 0 <= b < n and 0 <= start < n):
        raise ValueError("Invalid state index.")
    if a == b:
        raise ValueError("a and b must be different.")

    # Boundary values:
    if start == a:
        return 1.0
    if start == b:
        return 0.0

    S = [i for i in range(n) if i not in (a, b)]  # unknown states
    idx = {s:i for i, s in enumerate(S)}

    Q = P[np.ix_(S, S)]
    r = P[S, a]

    A = np.eye(len(S)) - Q
    try:
        uS = np.linalg.solve(A, r)
    except np.linalg.LinAlgError as e:
        raise np.linalg.LinAlgError(
            "System is singular; this can happen if {a,b} is not reached with probability 1 "
            "from some states."
        ) from e


    return float(uS[idx[start]])

# Example:
u2 = prob_hit_a_before_b(P_trans_q4, a=0, b=7, start=2)
print(u2)


0.019099378956124548


### Question 6

In [None]:
import numpy as np

def stationary_distribution_linear(P, tol=1e-12):

    P = np.asarray(P, dtype=float)
    n = P.shape[0]
    if P.shape != (n, n):
        raise ValueError("P must be square.")

    # Solve (P^T - I) pi = 0 with constraint sum(pi)=1
    A = P.T - np.eye(n)
    A[-1, :] = 1.0               # replace one equation by normalization
    b = np.zeros(n)
    b[-1] = 1.0

    pi = np.linalg.solve(A, b)

    # numerical cleanup
    pi[np.abs(pi) < tol] = 0.0
    # If small negative values occur from rounding, clip and renormalize
    if np.any(pi < -1e-10):
        raise ValueError("Computed solution has significant negative entries; chain may be ill-conditioned.")
    pi = np.clip(pi, 0.0, None)
    s = pi.sum()
    if s == 0:
        raise ValueError("Failed to compute a valid stationary distribution.")
    pi /= s
    return pi


pi1 = stationary_distribution_linear(P)
print("Stationary distribution (linear solve):", pi1)
print("Long run mean:", np.average(pi1))
pi1 *= np.arange(pi1.shape[0])
print(f"So we by multiplying 1M with {np.sum(pi1)} with get that long run mean cost a year is: {np.sum(pi1)}")


Stationary distribution (linear solve): [0.001069 0.008032 0.030106 0.074286 0.133326 0.181687 0.571494]
Long run mean: 0.14285714285714285
So we by multiplying 1M with 5.1618061961962365 with get that mean cost a year is: 5.1618061961962365


In [17]:
pi1 = stationary_distribution_linear(P)
C_ev = np.sum(pi1*np.arange(pi1.shape[0]))
C_ev_sq = np.sum(pi1*np.arange(pi1.shape[0])**2)
C_ev_sq-C_ev**2

1.40196611383222

In [32]:
def build_transition_matrix_q7():
    P_q4 = build_transition_matrix_q4()
    P_q4[7,:] = 0.0
    P_q4[7,7] = 1.0
    return P_q4

In [33]:
P_trans_q7 = build_transition_matrix_q7()

## Q7

In [36]:
P_10 = np.linalg.matrix_power(P_trans_q7,10)
P_10[2,7]

0.9491380230185796

In [45]:
CAP_4 = 7
accept_rate = 0.022
incident_rate = 25
lam_A = incident_rate*accept_rate   
p_complete = 1/3   

states_q4 = range(CAP_4 + 1)

def build_transition_matrix_q4():
    P = np.zeros((CAP_4 + 1, CAP_4 + 1))

    for i in states_q4:
        for d in range(i + 1):
            p_d = binom.pmf(d, i, p_complete)
            r = i - d  #remaining after completions

            
            for j in range(CAP_4):
                a_needed = j - r
                if a_needed >= 0:
                    P[i, j] += p_d * poisson.pmf(a_needed, lam_A)

            # j = 6 (capacity)
            P[i, CAP_4] += p_d * poisson.sf(CAP_4 - r - 1, lam_A)

    return P
def build_transition_matrix_q7():
    P_q4 = build_transition_matrix_q4()
    P_q4[7,:] = 0.0
    P_q4[7,7] = 1.0
    return P_q4
P_trans_q7 = build_transition_matrix_q7()
P_10 = np.linalg.matrix_power(P_trans_q7,10)
(P_10[2,7],0.99)

(0.010430484091193622, 0.99)

In [None]:
import numpy as np
from scipy.stats import binom, poisson

CAP_4 = 7
incident_rate = 25
p_complete = 1/3

def build_transition_matrix_q4(CAP, lam_A, p_complete):
    P = np.zeros((CAP + 1, CAP + 1))
    states = range(CAP + 1)

    for i in states:
        # d = number completed among i (Binomial)
        for d in range(i + 1):
            p_d = binom.pmf(d, i, p_complete)
            r = i - d  # remaining after completions

            # j = 0..CAP-1: exact arrivals needed
            for j in range(CAP):
                a_needed = j - r
                if a_needed >= 0:
                    P[i, j] += p_d * poisson.pmf(a_needed, lam_A)

            # j = CAP: overflow (>= CAP - r arrivals)
            P[i, CAP] += p_d * poisson.sf(CAP - r - 1, lam_A)

    return P

def build_transition_matrix_q7(CAP, lam_A, p_complete):
    P = build_transition_matrix_q4(CAP, lam_A, p_complete)
    P[CAP, :] = 0.0
    P[CAP, CAP] = 1.0  # make CAP absorbing
    return P

def p_reach_cap_in_t(accept_rate, t=10, start=2, CAP=CAP_4, incident_rate=incident_rate, p_complete=p_complete):
    lam_A = incident_rate * accept_rate
    P = build_transition_matrix_q7(CAP, lam_A, p_complete)
    P_t = np.linalg.matrix_power(P, t)
    return float(P_t[start, CAP])

def find_accept_rate_for_target(target=0.99, t=10, start=2, lo=0.0, hi=1.0, tol=1e-6, max_iter=80):
    f = lambda a: p_reach_cap_in_t(a, t=t, start=start) - target

    flo, fhi = f(lo), f(hi)
    if flo >= 0:
        return lo, p_reach_cap_in_t(lo, t=t, start=start)
    if fhi <= 0:
        raise ValueError(f"Target not reachable on [{lo},{hi}]. "
                         f"At hi={hi}, prob={p_reach_cap_in_t(hi, t=t, start=start):.6f}")

    # Bisection (assumes prob increases with accept_rate)
    for _ in range(max_iter):
        mid = 0.5 * (lo + hi)
        fmid = f(mid)
        if abs(fmid) <= tol:
            return mid, p_reach_cap_in_t(mid, t=t, start=start)
        if fmid < 0:
            lo = mid
        else:
            hi = mid

    mid = 0.5 * (lo + hi)
    return mid, p_reach_cap_in_t(mid, t=t, start=start)

# Example:
a_star, prob_star = find_accept_rate_for_target(target=0.01, t=10, start=2)
print("accept_rate* =", a_star)
print("P_10[2,7]    =", prob_star)

## Appendix of current BS

In [119]:
def dph_factorial_moment(k, alpha, S):
    """
    Falling factorial moment: E[X(X-1)...(X-k+1)] for discrete PH(alpha, S).
    alpha: shape (m,) or (1,m)
    S:     shape (m,m)
    """
    if k < 0:
        raise ValueError("k must be >= 0")
    if k == 0:
        return 1.0

    S = np.asarray(S)
    m = S.shape[0]
    I = np.eye(m)

    alpha = np.asarray(alpha).reshape(1, m)     # row vector
    ones  = np.ones((m, 1))                     # column vector

    U = np.linalg.inv(I - S)                    # (I - S)^{-1}

    val = (math.factorial(k)
           * (alpha
              @ np.linalg.matrix_power(U, k)     # (I-S)^{-k}
              @ np.linalg.matrix_power(S, k-1)   # S^{k-1}
              @ ones))
    return float(val)

a = np.array([0,0,1,0,0,0,0])
dph_factorial_moment(1,a,P_trans)


  return float(val)


7880397966671143.0