## Markov Chain

### Irreducible

In [14]:
# check irreducible

import numpy as np
import networkx as nx

def is_irreducible(P, tol=1e-12):
    n = P.shape[0]
    
    # Accumulate reachability
    reachability = np.zeros_like(P)
    P_power = np.eye(n)
    
    for _ in range(1, n + 1):
        P_power = P_power @ P
        reachability += (P_power > tol)
    
    # Check if all state pairs are reachable
    return np.all(reachability > 0)

def is_irreducible(P):
    G = nx.DiGraph()
    n = P.shape[0]
    
    for i in range(n):
        for j in range(n):
            if P[i, j] > 0:
                G.add_edge(i, j)
    
    return nx.is_strongly_connected(G)

def is_irreducible_nx(matrix):
    # Convert numpy array to a NetworkX Directed Graph
    G = nx.from_numpy_array(matrix, create_using=nx.DiGraph)
    
    # Check if the graph is strongly connected
    return nx.is_strongly_connected(G)

P1 = np.array([
    [0.8, 0.2, 0, 0],
    [0.6, 0.2, 0.2, 0],
    [0, 0.4, 0, 0.6],
    [0, 0, 0.8, 0.2]
])

P2 = np.array([
    [0, 0.2, 0, 0.8],
    [0, 0, 1, 0],
    [0, 1, 0, 0],
    [0.5, 0, 0.5, 1]
])

print(is_irreducible(P1))  # True
print(is_irreducible(P2))  # False


True
False


### Stationary distribution

| Chain type                       | Stationary dist exists? | Unique? | Converges? |
| -------------------------------- | ----------------------- | ------- | ---------- |
| Finite, reducible                | ✅ Yes                   | ❌ No    | ❌ No       |
| Finite, irreducible, periodic    | ✅ Yes                   | ✅ Yes   | ❌ No       |
| Finite, irreducible, aperiodic   | ✅ Yes                   | ✅ Yes   | ✅ Yes      |
| Infinite (e.g. random walk on ℤ) | ❌ No                    | —       | —          |


In [15]:
import numpy as np

def stationary_distribution_linear(P):
    """
    Compute stationary distribution using linear equations.
    """
    n = P.shape[0]
    
    A = P.T - np.eye(n)
    A[-1] = np.ones(n)        # replace last equation
    b = np.zeros(n)
    b[-1] = 1
    
    pi = np.linalg.solve(A, b)
    
    return pi

def stationary_distribution_power(P, max_iter=10_000, tol=1e-10):
    """
    Compute stationary distribution using power iteration.
    
    Converges if the chain is irreducible + aperiodic.
    """
    n = P.shape[0]
    pi = np.ones(n) / n   # initial distribution
    
    for _ in range(max_iter):
        pi_next = pi @ P
        if np.linalg.norm(pi_next - pi, 1) < tol:
            break
        pi = pi_next
    
    return pi

print("Stationary distribution (linear):", stationary_distribution_linear(P1))
print(sum(stationary_distribution_linear(P1)))  # Should be 1
print("Stationary distribution (power):", stationary_distribution_power(P1))
print(sum(stationary_distribution_power(P1)))  # Should be 1

Stationary distribution (linear): [0.61538462 0.20512821 0.1025641  0.07692308]
1.0
Stationary distribution (power): [0.61538462 0.20512821 0.1025641  0.07692308]
1.000000000000003


### Aperiodic

In [16]:
import numpy as np
from math import gcd
from functools import reduce

def state_periods(P, max_power=50, tol=1e-12):
    """
    Compute the period of each state in a Markov chain.
    
    Returns:
        periods : list of periods for each state
    """
    n = P.shape[0]
    periods = []

    for i in range(n):
        return_times = []

        P_power = np.eye(n)
        for k in range(1, max_power + 1):
            P_power = P_power @ P
            if P_power[i, i] > tol:
                return_times.append(k)

        if return_times:
            periods.append(reduce(gcd, return_times))
        else:
            periods.append(None)  # transient / unreachable return

    return periods

def state_aperiodicity(P):
    """
    Determine whether each state is aperiodic.
    
    Returns:
        dict: {state_index: (period, is_aperiodic)}
    """
    periods = state_periods(P)
    result = {}

    for i, d in enumerate(periods):
        result[i] = (d, d == 1)

    return result

def chain_is_aperiodic(P):
    """
    Check if a Markov chain is aperiodic.
    
    Returns:
        True  -> aperiodic
        False -> periodic
    """
    periods = state_periods(P)
    
    # remove None (transient states)
    periods = [d for d in periods if d is not None]
    
    return all(d == 1 for d in periods)

print("State periods:", state_periods(P1))  # [1, 1, 1, 1]
print("State aperiodicity:", state_aperiodicity(P1))  # all aperiodic
print("Chain is aperiodic:", chain_is_aperiodic(P1))

print("State periods:", state_periods(P2))  # [1, 2, 2, 1]
print("State aperiodicity:", state_aperiodicity(P2))  # mixed
print("Chain is aperiodic:", chain_is_aperiodic(P2))

State periods: [1, 1, 1, 1]
State aperiodicity: {0: (1, True), 1: (1, True), 2: (1, True), 3: (1, True)}
Chain is aperiodic: True
State periods: [1, 2, 2, 1]
State aperiodicity: {0: (1, True), 1: (2, False), 2: (2, False), 3: (1, True)}
Chain is aperiodic: False


### Reversibility

In [17]:
# calculating if the MC is reversible
def is_reversible(P, tol=1e-8):

    if not (is_irreducible(P) and chain_is_aperiodic(P)): # if there is no stationary distribution
        return False
    
    P = np.array(P)
    n = P.shape[0]

    pi = stationary_distribution_linear(P)

    for i in range(n):
        for j in range(n):
            if not np.isclose(pi[i] * P[i, j], pi[j] * P[j, i], atol=tol):
                return False

    return True

print("Is P1 reversible?", is_reversible(P1))  # True
print("Is P2 reversible?", is_reversible(P2))  # False

Is P1 reversible? True
Is P2 reversible? False


### N-steps transition matrix

In [None]:
import numpy as np

def n_step_transition(P, n):
    '''
    Calculating n-step transition matrix.
    P^n --> transitions after n steps.
    With increasing n, each row will converge to stationary dist.
    '''
    return np.linalg.matrix_power(P, n)

In [None]:
def comp_transition_matrix(transitions, n_states):
    """
    Estimate the transition matrix P from observed transitions.

    Args:
        transitions: array of shape (n_samples, 2)
        n_states: number of states

    Returns:
        P_hat: estimated transition matrix
    """
    P_hat = np.zeros((n_states, n_states))
    
    n_transitions = transitions.shape[0]
    for t in transitions:
        
        P_hat[t[0],t[1]] += 1

    row_sums = P_hat.sum(axis=1)
    P_hat = P_hat / row_sums[:, np.newaxis]

    return P_hat

In [None]:
def is_transition_matrix(P):
    """
    Check if P is a transition matrix.
    """

    if P.ndim != 2 or P.shape[0] != P.shape[1]:
        print("Wrong shape")
        return False

    if not np.isfinite(P).all():
        print("Is not finite")
        return False

    if np.all(P < 0):
        print("Has negative elements")
        return False

    if not np.allclose(P.sum(axis=1), 1.0, atol=1e-8):
        print("Rows do not sum to 1")
        return False
    
    return True

In [None]:
def simulate_chain(P, start_state, n_steps):
    """
    Simulate a Markov chain trajectory with a fixed random seed.

    Returns: array of visited states of length n_steps + 1
    """
    # seed = 1234  # don't change that
    rng = np.random.default_rng()

    path = np.zeros(n_steps + 1, dtype=int)
    path[0] = start_state

    for t in range(n_steps):
        path[t + 1] = rng.choice(
            P.shape[0],
            p=P[path[t]]
        )

    return path

## Generating Synthetic Data

### LCG

This is a simple way to generate chains of pseudorandom numbers and uniform distributions

A good way to create an LCG is to follow the hull dobell theorem

Hull–Dobell Theorem.

The congruential generator (a,b,M) has period M iff

gcd(b,M)=1,

p divides a-1 for every prime p that divides M

4 divides a- 1 if 4 divides M.

some good numbers for LCG:

m = 2^31 - 1 = 2147483647

a=16807

c = 0

seed = {1, …, m-1}

Xn+1 = a*Xn mod (2^31 - 1) IF WE WANT UNIFORM NUMBERS Un = Xn/m

In [None]:
def linConGen(n, m, a, b, x0):
    '''A linear congruential sequence generator.
    
    Param m is the integer modulus to use in the generator.
    Param a is the integer multiplier.
    Param b is the integer increment.
    Param x0 is the integer seed.
    Param n is the integer number of desired pseudo-random numbers.
    
    Returns a list of n pseudo-random integer modulo m numbers.'''
    
    x = x0 # the seed
    retValue = [x % m]  # start the list with x=x0
    for i in range(2, n+1, 1):
        x = (a * x + b) % m # the generator, using modular arithmetic
        retValue.append(x) # append the new x to the list
    return retValue

def uniformFromLCG(n,m,a,b,x0):
  return linConGen(n,m,a,b,x0) / m

### Accept-Reject Sampler

In [None]:
def problem1_inversion(f, M, n_samples=1):

    if M is None: # finding max of a function
        from scipy.optimize import minimize_scalar
        res = minimize_scalar(lambda x: -f(x), bounds=(0, 1), method='bounded')
        M = f(res.x)

    result = []
    while (len(result) < n_samples):
        x1 = np.random.uniform(0,1)
        f_x = f(x1)
        x2 = np.random.uniform(0,1)
        if (x2 <= f_x/M):
            result.append(x1)
        else:
            continue
    return np.array(result)

### Inverse Transform Sampling

In [None]:
#exponential distribution with lambda = 1
pdf_x = lambda x: np.exp(-x)
cdf_x = lambda x: 1 - np.exp(-x)
cdf_inverse_x = lambda u: -np.log(1 - u)

Xs = np.linspace(0, 10, num = 1000)
Ys = pdf_x(Xs)

plt.plot(Xs,Ys, label = 'true distribution')



#Inverse sampling of the distribution
samples = []
for i in range(10000):
  u = np.random.uniform(0,1)
  sample_x = cdf_inverse_x(u)
  samples.append(sample_x)


plt.hist(samples, bins = 100, density= True, label = 'samples')
plt.legend()

### Box-Muller Sampling (Box-Muller Transform)

In [None]:
import numpy as np
import matplotlib.pyplot as plt

def box_muller_sample(n_samples, mu=0, sigma=1):
    """
    Generates normal samples using the Box-Muller transform.
    Returns two arrays of independent samples.
    """
    # Step 1: Generate uniform random numbers U1, U2 ~ Uniform(0, 1)
    u1 = np.random.rand(n_samples)
    u2 = np.random.rand(n_samples)

    # Step 2: Apply the transformation equations
    # Magnitude (Radius)
    mag = np.sqrt(-2 * np.log(u1))

    # Angle (0 to 2*pi)
    angle = 2 * np.pi * u2

    # Calculate Z0 and Z1
    z0 = mag * np.cos(angle)
    z1 = mag * np.sin(angle)

    # Step 3: Scale to target mean and standard deviation
    x0 = mu + z0 * sigma
    x1 = mu + z1 * sigma

    return x0, x1

# --- Execution and Visualization ---
n = 10000
samples_x, samples_y = box_muller_sample(n)

# Plotting the results
plt.figure(figsize=(12, 5))

# Plot Histogram to verify Normal Distribution
plt.subplot(1, 2, 1)
plt.hist(samples_x, bins=50, density=True, color='skyblue', edgecolor='black', alpha=0.7)
plt.title(f"Histogram of Z0 samples (n={n})")
plt.xlabel("Value")
plt.ylabel("Frequency")

# Plot Scatter to show independence/joint distribution
plt.subplot(1, 2, 2)
plt.scatter(samples_x, samples_y, s=1, alpha=0.2, color='purple')
plt.title("Joint Distribution (Z0 vs Z1)")
plt.xlabel("Z0")
plt.ylabel("Z1")

plt.tight_layout()
plt.show()

## EDA

In [None]:
# plotting histograms of each column in a df

n_rows, n_cols = 6, 5

fig, axes = plt.subplots(n_rows, n_cols, figsize=(16, 16))  # Adjust figure size
axes = axes.flatten()  # Flatten 2D array of axes to 1D for easy iteration

for i, col in enumerate(df.columns):
    axes[i].hist(df[col], bins=20, color='skyblue', edgecolor='black')
    axes[i].set_title(col)

# Hide any unused subplots if X has fewer than 16 columns
for j in range(i+1, n_rows*n_cols):
    axes[j].axis('off')

plt.tight_layout()
plt.show()

In [None]:
# correlation heatmap
import seaborn as sns

corr = df.corr()
sns.heatmap(corr, annot=True)

## Poisson Regression - evaluation

In [None]:
import numpy as np

def poisson_deviance(y_true, y_pred, eps=1e-10):
    """
    Oblicza dewiancję Poissona

    y_true : array-like (prawdziwe liczby wizyt)
    y_pred : array-like (przewidywane lambda > 0)
    eps    : zabezpieczenie numeryczne dla log(0)
    """
    y_true = np.asarray(y_true)
    y_pred = np.asarray(y_pred)

    # zabezpieczenie: lambda musi być dodatnia
    y_pred = np.maximum(y_pred, eps)

    # specjalne traktowanie y = 0 (bo 0 * log(0) = 0)
    term = np.where(
        y_true == 0,
        -y_pred,
        y_true * np.log(y_true / y_pred) - (y_true - y_pred)
    )

    return 2 * np.sum(term)

## Confidence Interval

### CLT

In [None]:
# CLT
import numpy as np
from scipy import stats

def conf_interval(data, conf_level = 0.95):
    n = len(data)
    mean = np.mean(data)
    std = np.std(data, ddof=1)

    # CI using t-distribution
    alpha = 1 - conf_level
    t_crit = stats.t.ppf(1 - alpha/2, df=n-1)

    margin_error = t_crit * std / np.sqrt(n)
    ci = (mean - margin_error, mean + margin_error)
    print("Sample mean:", mean)
    print("95% CI:", ci)

### Markov Inequality

In [None]:
import numpy as np

def markov_conf_interval(data, conf_level=0.95):
    n = len(data)

    sample_mean = np.mean(data)
    alpha = 1 - conf_level
    upper_bound = sample_mean / alpha

    return upper_bound

### Chebychev Inequality

In [None]:
def chebychev_conf_interval(data, conf_level=0.95):
    n = len(data)
    alfa = 1 - conf_level
    epsilon = np.sqrt(np.var(data)/(n*alfa))
    mean = np.mean(data)

    return (mean - epsilon, mean + epsilon)

### Hoeffding Inequality

In [None]:
import numpy as np

def hoeffding_conf_interval(data, a, b, conf_level=0.95):
    alpha = 1 - conf_level
    n = len(data)
    epsilon = (b - a) / np.sqrt(2 * n) * np.sqrt(np.log(2 / alpha))
    mean = np.mean(data)
    return (mean - epsilon, mean + epsilon)

### Bennet Inequality

In [None]:
import numpy as np

def bennet_conf_interval(data, conf_level=0.95):
    variance = np.var(data)
    alfa = 1 - conf_level
    n = len(data)

    epsilon = np.sqrt( (2 * variance * np.log(2/alfa)) / n ) + (5 * np.log(2/alfa)) / (3 * n)
    mean = np.mean(data)

    interval = (mean - epsilon, mean + epsilon)
    return interval