# 1) Risk & Supervised Learning

### 1.1 Loss & Risk Metrics

**What is this?**
Loss functions measure how wrong a prediction is compared to the true value. Risk is the average (expected) loss across all predictions.

**Key Concepts:**
- **Mean Squared Error (MSE)**: Used for regression. Penalizes large errors heavily. Formula: $(y_{true} - y_{pred})^2$
- **Mean Absolute Error (MAE)**: Used for regression. Less sensitive to outliers than MSE. Formula: $|y_{true} - y_{pred}|$
- **Zero-One Loss**: Used for classification. Returns 1 if prediction is wrong, 0 if correct.
- **Log Loss (Binary Cross-Entropy)**: Used for probabilistic binary classification. Penalizes confident wrong predictions heavily. Formula: $-(y \log(p) + (1-y)\log(1-p))$
- **Average Risk**: The mean of all losses - tells you overall model performance.

**When to use:**
- Use MSE when you want to penalize large errors more
- Use MAE when outliers shouldn't dominate your loss
- Use Zero-One for simple classification accuracy
- Use Log Loss when you have probability predictions and want to encourage confident correct predictions

In [40]:
import numpy as np

# Mean Squared Error
def loss_squared(y_true, y_pred):
    y_true = np.asarray(y_true, float)
    y_pred = np.asarray(y_pred, float)
    return (y_true - y_pred) ** 2

# Mean Absolute Error
def loss_absolute(y_true, y_pred):
    y_true = np.asarray(y_true, float)
    y_pred = np.asarray(y_pred, float)
    return np.abs(y_true - y_pred)

# Zero-One Loss
def loss_zero_one(y_true, y_pred_label):
    y_true = np.asarray(y_true, int)
    y_pred_label = np.asarray(y_pred_label, int)
    return (y_true != y_pred_label).astype(float)

# Log Loss for Binary Classification
def loss_logloss_binary(y_true, y_pred_prob, eps=1e-12):
    # y_true in {0,1}, y_pred_prob in [0,1]
    y_true = np.asarray(y_true, float)
    p = np.asarray(y_pred_prob, float)
    p = np.clip(p, eps, 1 - eps)
    return -(y_true*np.log(p) + (1-y_true)*np.log(1-p))

# Average Risk (Expected Loss)
def average_risk(y_true, y_pred, loss_fn):
    # returns the average loss
    losses = loss_fn(y_true, y_pred)
    return float(np.mean(losses))

In [41]:
# Sample calls for loss functions and average risk
y_true = np.array([1.0, 2.0, 3.0, 4.0, 5.0])
y_pred = np.array([1.1, 2.3, 2.8, 4.2, 5.1])

# Mean Squared Error
mse_losses = loss_squared(y_true, y_pred)
print(f"MSE losses: {mse_losses}")
print(f"Average MSE risk: {average_risk(y_true, y_pred, loss_squared)}")

# Mean Absolute Error
mae_losses = loss_absolute(y_true, y_pred)
print(f"MAE losses: {mae_losses}")
print(f"Average MAE risk: {average_risk(y_true, y_pred, loss_absolute)}")

# Zero-One Loss (classification)
y_true_class = np.array([0, 1, 1, 0, 1])
y_pred_class = np.array([0, 1, 0, 0, 1])
zo_losses = loss_zero_one(y_true_class, y_pred_class)
print(f"Zero-One losses: {zo_losses}")
print(f"Average 0-1 risk (error rate): {average_risk(y_true_class, y_pred_class, loss_zero_one)}")

# Log Loss (binary classification)
y_true_binary = np.array([1, 0, 1, 0, 1])
y_pred_prob = np.array([0.9, 0.1, 0.8, 0.2, 0.85])
ll_losses = loss_logloss_binary(y_true_binary, y_pred_prob)
print(f"Log losses: {ll_losses}")
print(f"Average log loss: {average_risk(y_true_binary, y_pred_prob, loss_logloss_binary)}")

MSE losses: [0.01 0.09 0.04 0.04 0.01]
Average MSE risk: 0.03799999999999999
MAE losses: [0.1 0.3 0.2 0.2 0.1]
Average MAE risk: 0.18
Zero-One losses: [0. 0. 1. 0. 0.]
Average 0-1 risk (error rate): 0.2
Log losses: [0.10536052 0.10536052 0.22314355 0.22314355 0.16251893]
Average log loss: 0.1639054126883694


### 1.2 Train Test Validation Split

**What is this?**
Splitting your data into separate sets to train your model and evaluate its performance fairly.

**Why we need it:**
- **Training set**: Used to train/fit the model
- **Validation set**: Used during model development to tune hyperparameters and make decisions
- **Test set**: Used only once at the end to get an unbiased estimate of model performance

**Key points:**
- Default split is 70% train, 15% validation, 15% test
- Always shuffle data before splitting (unless time-series)
- Use `random_state` for reproducibility
- The validation set helps prevent overfitting during model selection
- Never touch the test set until final evaluation!

In [42]:
from sklearn.model_selection import train_test_split

def train_val_test_split(X, y=None, train_frac=0.7, val_frac=0.15, test_frac=0.15, shuffle=True, random_state=None):
    """
    Split X (and optional y) into train/val/test using scikit-learn.
    Returns (X_train, X_val, X_test) or (X_train, X_val, X_test, y_train, y_val, y_test)
    """
    fracs = float(train_frac) + float(val_frac) + float(test_frac)
    if not np.isclose(fracs, 1.0):
        raise ValueError("train_frac + val_frac + test_frac must sum to 1.0")

    # first split off the training set
    test_plus_val = val_frac + test_frac
    if y is None:
        X_train, X_temp = train_test_split(
            X, train_size=train_frac, test_size=test_plus_val,
            random_state=random_state, shuffle=shuffle
        )
    else:
        X_train, X_temp, y_train, y_temp = train_test_split(
            X, y, train_size=train_frac, test_size=test_plus_val,
            random_state=random_state, shuffle=shuffle
        )

    # handle edge cases where val or test fraction is zero
    if np.isclose(test_plus_val, 0.0):
        # no val/test portion
        X_val, X_test = X_temp[:0], X_temp[:0]
        if y is not None:
            y_val, y_test = y_temp[:0], y_temp[:0]
    elif np.isclose(val_frac, 0.0):
        X_val, X_test = X_temp[:0], X_temp
        if y is not None:
            y_val, y_test = y_temp[:0], y_temp
    elif np.isclose(test_frac, 0.0):
        X_val, X_test = X_temp, X_temp[:0]
        if y is not None:
            y_val, y_test = y_temp, y_temp[:0]
    else:
        # split the temp set into val and test according to their relative proportions
        test_rel = test_frac / (val_frac + test_frac)
        if y is None:
            X_val, X_test = train_test_split(
                X_temp, test_size=test_rel, random_state=random_state, shuffle=shuffle
            )
        else:
            X_val, X_test, y_val, y_test = train_test_split(
                X_temp, y_temp, test_size=test_rel, random_state=random_state, shuffle=shuffle
            )

    if y is None:
        return X_train, X_val, X_test
    return X_train, X_val, X_test, y_train, y_val, y_test

In [43]:
# Sample call for train_val_test_split
from sklearn.datasets import make_classification

# Create sample data
X_sample, y_sample = make_classification(n_samples=100, n_features=5, n_informative=3, random_state=42)

# Split with default proportions (70/15/15)
X_train, X_val, X_test, y_train, y_val, y_test = train_val_test_split(
    X_sample, y_sample, 
    train_frac=0.7, val_frac=0.15, test_frac=0.15, 
    random_state=42
)

print(f"Training set size: {X_train.shape[0]}")
print(f"Validation set size: {X_val.shape[0]}")
print(f"Test set size: {X_test.shape[0]}")

Training set size: 70
Validation set size: 15
Test set size: 15


### 1.3 Grid Search

**What is this?**
A systematic way to find the best hyperparameters for your model by trying all combinations of parameter values.

**How it works:**
1. Define a grid of hyperparameter values to try
2. For each combination, train the model using k-fold cross-validation
3. Evaluate performance using a scoring metric
4. Return the best combination

**Key parameters:**
- `param_grid`: Dictionary mapping parameter names to lists of values to try
- `cv`: Number of cross-validation folds (default: 5)
- `scoring`: Metric to optimize (e.g., 'accuracy', 'f1', 'neg_mean_squared_error')
- `n_jobs=-1`: Use all CPU cores for parallel processing

**Example:**
```python
param_grid = {'C': [0.1, 1, 10], 'kernel': ['rbf', 'linear']}
# This tries 6 combinations: 3 C values × 2 kernels
```

In [44]:
from sklearn.model_selection import GridSearchCV

def grid_search(model, param_grid, X_train, y_train, cv=5, scoring=None, verbose=1, n_jobs=-1):
    """
    Perform grid search to find the best hyperparameters for a given model.
    
    Parameters:
    -----------
    model : estimator object
        The model to tune (e.g., sklearn model or any model with fit/predict methods)
    param_grid : dict
        Dictionary with parameter names (str) as keys and lists of parameter settings to try
    X_train : array-like
        Training features
    y_train : array-like
        Training labels
    cv : int, default=5
        Number of cross-validation folds
    scoring : str or callable, default=None
        Scoring metric (e.g., 'accuracy', 'neg_mean_squared_error', 'f1')
    verbose : int, default=1
        Controls the verbosity level
    n_jobs : int, default=-1
        Number of jobs to run in parallel (-1 uses all processors)
    
    Returns:
    --------
    best_model : estimator
        The model with the best hyperparameters fitted on the training data
    best_params : dict
        The best hyperparameters found
    best_score : float
        The best cross-validation score
    grid_search_results : GridSearchCV object
        The full GridSearchCV object with all results
    """
    grid = GridSearchCV(
        estimator=model,
        param_grid=param_grid,
        cv=cv,
        scoring=scoring,
        verbose=verbose,
        n_jobs=n_jobs,
        return_train_score=True
    )
    
    grid.fit(X_train, y_train)
    
    return grid.best_estimator_, grid.best_params_, grid.best_score_, grid

In [45]:
# Sample call for grid_search
from sklearn.svm import SVC

# Create sample data
X_sample, y_sample = make_classification(n_samples=100, n_features=5, random_state=42)

# Define parameter grid to search
param_grid = {
    'C': [0.1, 1, 10],
    'kernel': ['rbf', 'linear']
}

# Perform grid search
best_model, best_params, best_score, grid_results = grid_search(
    model=SVC(),
    param_grid=param_grid,
    X_train=X_sample,
    y_train=y_sample,
    cv=3,
    scoring='accuracy',
    verbose=0
)

print(f"Best parameters: {best_params}")
print(f"Best cross-validation score: {best_score:.4f}")

Best parameters: {'C': 1, 'kernel': 'rbf'}
Best cross-validation score: 1.0000


### 1.4 Cross-Validation

**What is this?**
A resampling technique to evaluate model performance by splitting data into multiple train/test folds.

**K-Fold Cross-Validation:**
1. Split data into k equal-sized folds
2. For each fold i: Train on (k-1) folds, test on fold i
3. Average the k test scores

**Benefits:**
- More reliable than single train/test split
- Uses all data for both training and testing
- Reduces variance in performance estimates

**Common k values:**
- k=5: Standard choice (good bias-variance trade-off)
- k=10: More computation but less bias
- k=n (Leave-One-Out): Maximum data usage but high variance

**Stratified CV:** Maintains class proportions in each fold (important for imbalanced data)

In [46]:
from sklearn.model_selection import cross_val_score, KFold, StratifiedKFold

def cross_validate_model(model, X, y, cv=5, scoring='accuracy', stratified=True):
    """
    Perform k-fold cross-validation on a model.
    
    Parameters:
    -----------
    model : estimator object
        The model to evaluate
    X : array-like
        Features
    y : array-like
        Target labels
    cv : int, default=5
        Number of folds
    scoring : str, default='accuracy'
        Scoring metric
    stratified : bool, default=True
        Use stratified folds (maintains class proportions)
    
    Returns:
    --------
    scores : array
        Cross-validation scores for each fold
    mean_score : float
        Mean of CV scores
    std_score : float
        Standard deviation of CV scores
    """
    if stratified and hasattr(y, 'dtype') and y.dtype in [int, 'int64', 'int32']:
        cv_splitter = StratifiedKFold(n_splits=cv, shuffle=True, random_state=42)
    else:
        cv_splitter = KFold(n_splits=cv, shuffle=True, random_state=42)
    
    scores = cross_val_score(model, X, y, cv=cv_splitter, scoring=scoring)
    
    return scores, float(np.mean(scores)), float(np.std(scores))

In [47]:
# Sample call for cross_validate_model
from sklearn.linear_model import LogisticRegression

# Create sample data
X_sample, y_sample = make_classification(n_samples=100, n_features=5, random_state=42)

# Perform cross-validation
model = LogisticRegression(max_iter=1000)
scores, mean_score, std_score = cross_validate_model(
    model=model,
    X=X_sample,
    y=y_sample,
    cv=5,
    scoring='accuracy',
    stratified=True
)

print(f"Cross-validation scores: {scores}")
print(f"Mean score: {mean_score:.4f} (+/- {std_score:.4f})")

Cross-validation scores: [1.   0.95 1.   1.   1.  ]
Mean score: 0.9900 (+/- 0.0200)


# 2) Estimation, Bias-Variance

### 2.1 Monte-Carlo Estimate of Bias/Variance

**What is this?**
A simulation approach to understand how well an estimator (like sample mean) performs compared to the true value.

**Key Concepts:**
- **Bias**: How far off your estimator is on average from the true value
  - Formula: $\text{Bias} = E[\hat{\theta}] - \theta_{true}$
- **Variance**: How much your estimator varies across different samples
  - Formula: $\text{Var}(\hat{\theta}) = E[(\hat{\theta} - E[\hat{\theta}])^2]$
- **MSE**: Combines both errors: $\text{MSE} = \text{Bias}^2 + \text{Variance}$

**How it works:**
1. Generate many random samples (e.g., 2000)
2. Compute your estimate on each sample
3. Calculate bias (mean of estimates - true value)
4. Calculate variance (spread of estimates)

**Interpretation:**
- Low bias + Low variance = Good estimator ✓
- High bias = Systematically wrong (underfitting)
- High variance = Unstable predictions (overfitting)

In [48]:
def estimate_bias_variance(estimator_fn, sampler_fn, true_value, n_trials=2000, seed=0):
    """
    estimator_fn(sample) -> estimate (scalar)
    sampler_fn(rng) -> one sample dataset (any structure)
    true_value: the actual value we're trying to estimate
    """
    rng = np.random.default_rng(seed)
    ests = []
    for _ in range(n_trials):
        sample = sampler_fn(rng)
        ests.append(estimator_fn(sample))
    ests = np.asarray(ests, float)
    bias = float(np.mean(ests) - true_value)
    variance = float(np.var(ests, ddof=0))
    mse = float(np.mean((ests - true_value) ** 2))
    return {"bias": bias, "variance": variance, "mse": mse, "ests": ests}


In [49]:
# Sample call for estimate_bias_variance
# Example: estimating mean of normal distribution
true_mean = 5.0
estimator_fn = lambda sample: np.mean(sample)
sampler_fn = lambda rng: rng.normal(true_mean, 1.0, size=30)

result = estimate_bias_variance(estimator_fn, sampler_fn, true_mean, n_trials=1000, seed=42)
print(f"Bias: {result['bias']:.6f}")
print(f"Variance: {result['variance']:.6f}")
print(f"MSE: {result['mse']:.6f}")

Bias: 0.008174
Variance: 0.034397
MSE: 0.034464


### 2.2 Bootstrap Estimate

**What is this?**
A resampling technique to estimate the uncertainty of a statistic (like mean, median) without knowing the underlying distribution.

**How it works:**
1. Take your original dataset of size n
2. Create many "bootstrap samples" by randomly sampling with replacement
3. Compute your statistic on each bootstrap sample
4. Analyze the distribution of these statistics

**Bootstrap Confidence Interval:**
- Uses percentiles of bootstrap estimates
- For 95% CI with α=0.05: take 2.5th and 97.5th percentiles
- No assumptions about the data distribution needed!

**When to use:**
- When you don't know the theoretical distribution
- When sample size is too small for asymptotic theory
- To estimate standard errors of complex statistics

**Key insight:** Bootstrap simulates what would happen if you could collect many datasets

In [50]:
def bootstrap_estimates(estimator_fn, data, n_boot=2000, seed=0):
    rng = np.random.default_rng(seed)
    data = np.asarray(data)
    n = len(data)
    ests = np.empty(n_boot, float)
    for b in range(n_boot):
        idx = rng.integers(0, n, size=n)
        ests[b] = estimator_fn(data[idx])
    return ests

def bootstrap_ci(ests, alpha=0.05):
    ests = np.asarray(ests, float)
    lo = float(np.quantile(ests, alpha/2))
    hi = float(np.quantile(ests, 1 - alpha/2))
    return (lo, hi)


In [51]:
# Sample calls for bootstrap functions
data_sample = np.array([23, 25, 27, 29, 31, 33, 35, 37, 39, 41])

# Bootstrap estimates of the mean
ests = bootstrap_estimates(np.mean, data_sample, n_boot=1000, seed=42)
print(f"Bootstrap mean estimate: {np.mean(ests):.4f}")
print(f"Bootstrap std of mean: {np.std(ests):.4f}")

# Bootstrap confidence interval
ci = bootstrap_ci(ests, alpha=0.05)
print(f"95% Bootstrap CI for mean: ({ci[0]:.4f}, {ci[1]:.4f})")

Bootstrap mean estimate: 31.9324
Bootstrap std of mean: 1.7993
95% Bootstrap CI for mean: (28.4000, 35.6000)


### 2.3 Mean and Standard Error

**What is this?**
Measures of central tendency and uncertainty in your sample estimate.

**Formulas:**
- **Mean**: $\bar{x} = \frac{1}{n}\sum_{i=1}^{n} x_i$
- **Standard Error**: $SE = \frac{s}{\sqrt{n}}$ where $s$ is sample standard deviation

**Standard Error vs Standard Deviation:**
- **Standard Deviation (s)**: Measures spread of data points
- **Standard Error (SE)**: Measures uncertainty in the mean estimate
- SE gets smaller as sample size increases: $SE \propto 1/\sqrt{n}$

**Interpretation:**
- Smaller SE = More confident in our mean estimate
- 95% CI is approximately: mean ± 2×SE (for large samples)

In [52]:
def mean_and_se(x):
    x = np.asarray(x, float)
    m = float(np.mean(x))
    se = float(np.std(x, ddof=1) / np.sqrt(len(x)))
    return m, se


In [53]:
# Sample call for mean_and_se
sample_data = np.array([12, 15, 18, 22, 25, 28, 30, 33, 36])
mean, se = mean_and_se(sample_data)
print(f"Sample mean: {mean:.4f}")
print(f"Standard error: {se:.4f}")

Sample mean: 24.3333
Standard error: 2.7437


### 2.4 Confidence Intervals (t-distribution)

**What is this?**
When sample size is small and population variance is unknown, use t-distribution for more accurate confidence intervals.

**Formula:** $\bar{x} \pm t_{\alpha/2, n-1} \cdot \frac{s}{\sqrt{n}}$

**Key differences from normal distribution:**
- Heavier tails (accounts for uncertainty in variance estimate)
- Converges to normal as sample size increases
- Degrees of freedom: $df = n - 1$

**When to use:**
- Sample size < 30 AND population variance unknown
- Normality assumption holds (or n ≥ 15 with symmetric data)

**Rule of thumb:**
- n < 30: Use t-distribution
- n ≥ 30: t ≈ normal (can use either)

In [54]:
from scipy import stats

def confidence_interval_t(x, confidence=0.95):
    """
    Compute confidence interval using t-distribution.
    
    Parameters:
    -----------
    x : array-like
        Sample data
    confidence : float, default=0.95
        Confidence level (e.g., 0.95 for 95% CI)
    
    Returns:
    --------
    (lower, upper) : tuple
        Confidence interval bounds
    mean : float
        Sample mean
    """
    x = np.asarray(x, float)
    n = len(x)
    mean = float(np.mean(x))
    se = float(np.std(x, ddof=1) / np.sqrt(n))
    
    # Critical value from t-distribution
    alpha = 1 - confidence
    t_crit = stats.t.ppf(1 - alpha/2, df=n-1)
    
    margin = t_crit * se
    return (mean - margin, mean + margin), mean

In [55]:
# Sample call for confidence_interval_t
small_sample = np.array([15, 18, 21, 24, 27, 30])
ci_t, mean_t = confidence_interval_t(small_sample, confidence=0.95)
print(f"Sample mean: {mean_t:.4f}")
print(f"95% t-distribution CI: ({ci_t[0]:.4f}, {ci_t[1]:.4f})")

Sample mean: 22.5000
95% t-distribution CI: (16.6101, 28.3899)


### 2.5 Central Limit Theorem (CLT)

**What is this?**
One of the most important theorems in statistics: the sum (or average) of many independent random variables approaches a normal distribution, regardless of the original distribution!

**Theorem:** If $X_1, X_2, \ldots, X_n$ are i.i.d. with mean $\mu$ and variance $\sigma^2$, then:
$$\frac{\bar{X} - \mu}{\sigma/\sqrt{n}} \xrightarrow{d} N(0, 1)$$

Or equivalently: $\bar{X} \approx N\left(\mu, \frac{\sigma^2}{n}\right)$ for large $n$

**Why it matters:**
- Explains why normal distribution appears everywhere in nature
- Justifies using normal-based confidence intervals for large samples
- Foundation for hypothesis testing and inference

**Key Requirements:**
1. Independence: Samples must be independent
2. Identical distribution: Same distribution for all $X_i$
3. Finite variance: $\sigma^2 < \infty$
4. Large sample size: Usually $n \geq 30$ is sufficient

**Rule of thumb:**
- Original distribution nearly symmetric: $n \geq 15$ may suffice
- Original distribution very skewed: Need $n \geq 50$ or more
- Bernoulli/binary data: Need $np \geq 5$ and $n(1-p) \geq 5$

**Practical implications:**
- Sample means are approximately normal for large $n$
- Can use z-tests and normal-based CIs even when data isn't normal
- Larger samples → Better normal approximation

**When NOT to rely on CLT:**
- Heavy-tailed distributions (infinite variance)
- Small sample sizes with highly skewed data
- Strong dependencies between observations

In [56]:
def clt_normal_approx(sample_mean, population_std, n):
    """
    Get normal approximation parameters for sample mean using CLT.
    
    Parameters:
    -----------
    sample_mean : float
        Observed sample mean (estimate of μ)
    population_std : float
        Population standard deviation σ
    n : int
        Sample size
    
    Returns:
    --------
    mean : float
        Mean of approximate normal distribution (= sample_mean)
    std : float
        Standard deviation of sample mean (= σ/√n)
    """
    std_of_mean = population_std / np.sqrt(n)
    return sample_mean, std_of_mean

def clt_confidence_interval(sample_mean, population_std, n, confidence=0.95):
    """
    Compute confidence interval for population mean using CLT.
    
    Parameters:
    -----------
    sample_mean : float
        Observed sample mean
    population_std : float
        Population standard deviation (known)
    n : int
        Sample size
    confidence : float
        Confidence level (e.g., 0.95 for 95% CI)
    
    Returns:
    --------
    (lower, upper) : tuple
        Confidence interval bounds
    """
    from scipy import stats
    
    std_of_mean = population_std / np.sqrt(n)
    alpha = 1 - confidence
    z_crit = stats.norm.ppf(1 - alpha/2)
    
    margin = z_crit * std_of_mean
    return (sample_mean - margin, sample_mean + margin)

In [57]:
# Sample calls for CLT functions
sample_mean_obs = 50.0
population_std_known = 10.0
sample_size = 100

# Get normal approximation parameters
mean_approx, std_approx = clt_normal_approx(sample_mean_obs, population_std_known, sample_size)
print(f"Approx. distribution: N({mean_approx:.2f}, {std_approx:.4f})")

# Compute confidence interval using CLT
ci_clt = clt_confidence_interval(sample_mean_obs, population_std_known, sample_size, confidence=0.95)
print(f"95% CLT-based CI: ({ci_clt[0]:.4f}, {ci_clt[1]:.4f})")

Approx. distribution: N(50.00, 1.0000)
95% CLT-based CI: (48.0400, 51.9600)


### 2.6 Log Likelihood

**What is this?**
The logarithm of the likelihood function, used to measure how well a statistical model fits observed data.

**Likelihood Function:** For data $X = \{x_1, x_2, \ldots, x_n\}$ and parameter $\theta$:
$$L(\theta | X) = \prod_{i=1}^{n} f(x_i | \theta)$$

**Log Likelihood:** 
$$\ell(\theta | X) = \log L(\theta | X) = \sum_{i=1}^{n} \log f(x_i | \theta)$$

**Why use log instead of plain likelihood?**
1. **Numerical stability**: Products of small probabilities → underflow
2. **Computational efficiency**: Sum is faster than product
3. **Mathematical convenience**: Derivatives are simpler
4. **Monotonic transformation**: Same maximizer as likelihood

**Properties:**
- Higher log-likelihood = Better fit to data
- Log-likelihood is typically negative (since probabilities < 1)
- Often reported as negative log-likelihood (NLL) to minimize

**Common distributions:**

**Gaussian/Normal:** $\ell = -\frac{n}{2}\log(2\pi\sigma^2) - \frac{1}{2\sigma^2}\sum_{i=1}^{n}(x_i - \mu)^2$

**Bernoulli/Binary:** $\ell = \sum_{i=1}^{n} [y_i \log(p) + (1-y_i)\log(1-p)]$

**Poisson:** $\ell = \sum_{i=1}^{n} [x_i \log(\lambda) - \lambda - \log(x_i!)]$

**When to use:**
- Parameter estimation (MLE)
- Model comparison (higher is better)
- Deep learning (minimize negative log-likelihood)

In [58]:
def log_likelihood_gaussian(data, mu, sigma):
    """
    Compute log-likelihood for Gaussian/Normal distribution.
    
    Parameters:
    -----------
    data : array-like
        Observed data points
    mu : float
        Mean parameter
    sigma : float
        Standard deviation parameter
    
    Returns:
    --------
    float : Log-likelihood value
    """
    data = np.asarray(data, float)
    n = len(data)
    
    ll = -0.5 * n * np.log(2 * np.pi * sigma**2)
    ll -= np.sum((data - mu)**2) / (2 * sigma**2)
    
    return float(ll)

def log_likelihood_bernoulli(labels, p):
    """
    Compute log-likelihood for Bernoulli distribution.
    
    Parameters:
    -----------
    labels : array-like
        Binary outcomes (0 or 1)
    p : float
        Success probability parameter (0 < p < 1)
    
    Returns:
    --------
    float : Log-likelihood value
    """
    labels = np.asarray(labels)
    eps = 1e-12  # Avoid log(0)
    p = np.clip(p, eps, 1 - eps)
    
    ll = np.sum(labels * np.log(p) + (1 - labels) * np.log(1 - p))
    return float(ll)

def negative_log_likelihood(log_likelihood):
    """
    Convert log-likelihood to negative log-likelihood (for minimization).
    """
    return -log_likelihood

In [59]:
# Sample calls for log likelihood functions
# Gaussian log-likelihood
gaussian_data = np.array([1.2, 2.3, 1.9, 2.5, 2.1])
ll_gauss = log_likelihood_gaussian(gaussian_data, mu=2.0, sigma=0.5)
print(f"Gaussian log-likelihood: {ll_gauss:.4f}")
print(f"Negative log-likelihood: {negative_log_likelihood(ll_gauss):.4f}")

# Bernoulli log-likelihood
binary_data = np.array([1, 1, 0, 1, 0, 1, 1, 0])
ll_bern = log_likelihood_bernoulli(binary_data, p=0.6)
print(f"Bernoulli log-likelihood: {ll_bern:.4f}")

Gaussian log-likelihood: -3.1290
Negative log-likelihood: 3.1290
Bernoulli log-likelihood: -5.3030


### 2.7 Maximum Likelihood Estimator (MLE)

**What is this?**
A method to estimate parameters by finding values that maximize the likelihood (or log-likelihood) of observing the data.

**Principle:** Choose $\hat{\theta}$ that makes observed data most probable:
$$\hat{\theta}_{MLE} = \arg\max_{\theta} \ell(\theta | X) = \arg\max_{\theta} \sum_{i=1}^{n} \log f(x_i | \theta)$$

**Algorithm:**
1. Write down the log-likelihood function $\ell(\theta | X)$
2. Take derivative with respect to $\theta$: $\frac{d\ell}{d\theta}$
3. Set equal to zero and solve: $\frac{d\ell}{d\theta} = 0$
4. Verify it's a maximum (check second derivative < 0)

**Common MLE Solutions:**

**Normal distribution:**
- $\hat{\mu}_{MLE} = \bar{x} = \frac{1}{n}\sum_{i=1}^{n} x_i$ (sample mean)
- $\hat{\sigma}^2_{MLE} = \frac{1}{n}\sum_{i=1}^{n}(x_i - \bar{x})^2$ (biased sample variance)

**Bernoulli distribution:**
- $\hat{p}_{MLE} = \bar{y} = \frac{1}{n}\sum_{i=1}^{n} y_i$ (proportion of successes)

**Poisson distribution:**
- $\hat{\lambda}_{MLE} = \bar{x}$ (sample mean)

**Properties of MLE:**
- **Consistent**: Converges to true parameter as $n \to \infty$
- **Asymptotically normal**: $\hat{\theta}_{MLE} \sim N(\theta, I(\theta)^{-1}/n)$ for large $n$
- **Asymptotically efficient**: Lowest possible variance among consistent estimators
- **Invariant**: If $\hat{\theta}$ is MLE, then $g(\hat{\theta})$ is MLE of $g(\theta)$

**Limitations:**
- May not exist or be unique
- Can be biased for small samples
- Requires knowing the parametric family
- May be computationally difficult (need numerical optimization)

**When to use:**
- Standard approach for parametric estimation
- When you have a probabilistic model of your data
- Foundation for logistic regression, neural networks, etc.

In [60]:
def mle_gaussian(data):
    """
    Compute MLE for Gaussian/Normal distribution parameters.
    
    Parameters:
    -----------
    data : array-like
        Observed data points
    
    Returns:
    --------
    mu_hat : float
        MLE estimate of mean
    sigma_hat : float
        MLE estimate of standard deviation (biased estimator)
    """
    data = np.asarray(data, float)
    mu_hat = float(np.mean(data))
    sigma_hat = float(np.std(data, ddof=0))  # MLE uses ddof=0 (biased)
    return mu_hat, sigma_hat

def mle_bernoulli(data):
    """
    Compute MLE for Bernoulli distribution parameter.
    
    Parameters:
    -----------
    data : array-like
        Binary outcomes (0 or 1)
    
    Returns:
    --------
    p_hat : float
        MLE estimate of success probability
    """
    data = np.asarray(data)
    p_hat = float(np.mean(data))
    return p_hat

def mle_poisson(data):
    """
    Compute MLE for Poisson distribution parameter.
    
    Parameters:
    -----------
    data : array-like
        Count data (non-negative integers)
    
    Returns:
    --------
    lambda_hat : float
        MLE estimate of rate parameter
    """
    data = np.asarray(data, float)
    lambda_hat = float(np.mean(data))
    return lambda_hat

In [61]:
# Sample calls for MLE functions
# MLE for Gaussian
gaussian_sample = np.array([1.5, 2.3, 1.8, 2.1, 1.9, 2.4, 2.0])
mu_mle, sigma_mle = mle_gaussian(gaussian_sample)
print(f"Gaussian MLE - mu: {mu_mle:.4f}, sigma: {sigma_mle:.4f}")

# MLE for Bernoulli
bernoulli_sample = np.array([1, 1, 0, 1, 1, 0, 1, 0, 1, 1])
p_mle = mle_bernoulli(bernoulli_sample)
print(f"Bernoulli MLE - p: {p_mle:.4f}")

# MLE for Poisson
poisson_sample = np.array([2, 3, 1, 2, 4, 2, 3, 2, 1, 3])
lambda_mle = mle_poisson(poisson_sample)
print(f"Poisson MLE - lambda: {lambda_mle:.4f}")

Gaussian MLE - mu: 2.0000, sigma: 0.2828
Bernoulli MLE - p: 0.7000
Poisson MLE - lambda: 2.3000


# 3) Sampling and Random Number Generator

### 3.1 Basic Random Number Generator

**What is this?**
Creates a random number generator (RNG) with a seed for reproducible randomness.

**Why use a seed?**
- Makes your random experiments reproducible
- Same seed = Same sequence of "random" numbers
- Essential for debugging and scientific reproducibility

**Usage:**
```python
rng = make_rng(seed=42)  # Fixed seed
samples = rng.normal(0, 1, size=100)  # Will always give same 100 numbers
```

In [62]:
import numpy as np

def make_rng(seed=0):
    return np.random.default_rng(seed)


In [63]:
# Sample call for make_rng
rng = make_rng(seed=42)
random_samples = rng.normal(0, 1, size=5)
print(f"Random samples: {random_samples}")

Random samples: [ 0.30471708 -1.03998411  0.7504512   0.94056472 -1.95103519]


### 3.2 LCG (Linear Congruential Generator)

**What is this?**
A simple algorithm for generating pseudo-random numbers using a linear recurrence relation.

**Formula:** $u_{n+1} = (a \cdot u_n + c) \mod m$

**How it works:**
1. Start with a seed value
2. Apply the formula repeatedly
3. Normalize by dividing by m to get values in [0, 1]

**Parameters:**
- `a = 1103515245`: Multiplier
- `c = 12345`: Increment
- `m = 2^31`: Modulus
- These specific values are used in many standard libraries

**Limitations:**
- Not cryptographically secure
- Can have patterns in high dimensions
- Modern RNGs (like numpy's) are better for serious work

In [64]:
def lcg(size, seed=1, a=1103515245, c=12345, m=2**31):
    u = seed
    out = []
    for _ in range(size):
        u = (a*u + c) % m
        out.append(u / m)
    return np.array(out, float)


In [65]:
# Sample call for LCG
lcg_samples = lcg(size=10, seed=12345)
print(f"LCG samples: {lcg_samples}")

LCG samples: [0.65515405 0.30481432 0.67496063 0.10676848 0.51657445 0.48966634
 0.6024722  0.36995476 0.25666706 0.37418221]


### 3.3 Inversion Sampling

**What is this?**
A method to generate random samples from any distribution by using its inverse CDF (cumulative distribution function).

**Algorithm:**
1. Generate uniform random variable $U \sim \text{Uniform}(0,1)$
2. Apply inverse CDF: $X = F^{-1}(U)$
3. Result: $X$ follows the desired distribution!

**Why it works:**
- Mathematical property: If $U$ is uniform, then $F^{-1}(U)$ follows distribution $F$
- This is called the probability integral transform

**When to use:**
- When you can compute or approximate the inverse CDF
- Examples: Exponential, Weibull, Cauchy distributions

**Example - Exponential distribution:**
- CDF: $F(x) = 1 - e^{-\lambda x}$
- Inverse: $F^{-1}(u) = -\frac{1}{\lambda}\log(1-u)$

In [66]:
def sample_by_inversion(n, inv_cdf_fn, rng=None):
    rng = np.random.default_rng() if rng is None else rng
    u = rng.uniform(0, 1, size=n)
    return inv_cdf_fn(u)

In [67]:
# Sample call for sample_by_inversion
# Sample from exponential distribution with rate=2.0
inv_cdf_exp = lambda u: -np.log(1 - u) / 2.0
exp_samples = sample_by_inversion(n=100, inv_cdf_fn=inv_cdf_exp, rng=make_rng(42))
print(f"Exponential samples (mean={np.mean(exp_samples):.4f}, expected=0.5)")

Exponential samples (mean=0.4434, expected=0.5)


### 3.4 Rejection Sampling

**What is this?**
A method to sample from a complex target distribution $f(x)$ by using a simpler proposal distribution $g(x)$.

**Algorithm:**
1. Sample $x$ from proposal distribution $g$
2. Sample $u \sim \text{Uniform}(0,1)$
3. Accept $x$ if $u \leq \frac{f(x)}{M \cdot g(x)}$, otherwise reject and try again
4. Repeat until you have enough accepted samples

**Key Requirements:**
- Need constant $M$ such that $f(x) \leq M \cdot g(x)$ for all $x$
- Smaller $M$ = Higher acceptance rate = More efficient
- $g(x)$ should be easy to sample from

**When to use:**
- Target distribution is hard to sample from directly
- But you can evaluate its PDF
- Common when inverse CDF is not available

**Efficiency:**
- Acceptance rate = $1/M$
- If $M$ is too large, you'll reject most samples (wasteful!)

In [68]:
def rejection_sample(n, sample_g, pdf_f, pdf_g, M, rng=None, max_tries=10_000_000):
    """
    sample_g(rng) -> one sample x
    pdf_f(x) -> target density value
    pdf_g(x) -> proposal density value
    M: constant such that pdf_f(x) <= M * pdf_g(x) for all x
    """
    rng = np.random.default_rng() if rng is None else rng
    out = []
    tries = 0
    while len(out) < n and tries < max_tries:
        x = sample_g(rng)
        u = rng.uniform(0, 1)
        accept_prob = pdf_f(x) / (M * pdf_g(x))
        if u <= accept_prob:
            out.append(x)
        tries += 1
    if len(out) < n:
        raise RuntimeError("Rejection sampling failed: increase max_tries or fix M/proposal.")
    return np.array(out)


In [69]:
# Sample call for rejection_sample
# Sample from Beta(2,5) using uniform proposal
from scipy.stats import beta as beta_dist

# Target: Beta(2,5), Proposal: Uniform(0,1)
pdf_f = lambda x: beta_dist.pdf(x, 2, 5)
pdf_g = lambda x: 1.0  # Uniform(0,1) density
sample_g = lambda rng: rng.uniform(0, 1)
M = 2.5  # Envelope constant

rejection_samples = rejection_sample(n=500, sample_g=sample_g, pdf_f=pdf_f, pdf_g=pdf_g, M=M, rng=make_rng(42))
print(f"Rejection sampling: generated {len(rejection_samples)} samples")
print(f"Sample mean: {np.mean(rejection_samples):.4f} (expected={2/(2+5):.4f})")

Rejection sampling: generated 500 samples
Sample mean: 0.2756 (expected=0.2857)


### 3.5 Importance Sampling

**What is this?**
A variance reduction technique for Monte Carlo estimation by sampling from a better-chosen distribution.

**Problem:** Standard MC can be inefficient when the important region (where $h(x)f(x)$ is large) is rare.

**Solution:** Sample from proposal $g(x)$ that concentrates on important regions.

**Formula:** $E_f[h(X)] = E_g\left[h(X)\frac{f(X)}{g(X)}\right]$

**Algorithm:**
1. Sample $x_1, \ldots, x_n$ from proposal $g$
2. Compute importance weights: $w_i = \frac{f(x_i)}{g(x_i)}$
3. Estimate: $\hat{E}[h(X)] = \frac{1}{n}\sum_{i=1}^{n} h(x_i)w_i$

**When to use:**
- Rare event simulation (e.g., estimating tail probabilities)
- When you can't sample directly from $f$
- To reduce variance compared to standard MC

**Key insight:** Good proposal $g$ puts more samples where $h(x)f(x)$ is large

In [70]:
def importance_sampling(n, sample_g, pdf_f, pdf_g, h_fn, rng=None):
    """
    Estimate E_f[h(X)] using importance sampling with proposal g.
    
    Parameters:
    -----------
    n : int
        Number of samples
    sample_g : callable
        Function to sample from proposal: sample_g(rng) -> x
    pdf_f : callable
        Target density: pdf_f(x) -> float
    pdf_g : callable
        Proposal density: pdf_g(x) -> float
    h_fn : callable
        Function to compute expectation of: h_fn(x) -> float
    rng : numpy RNG, optional
        Random number generator
    
    Returns:
    --------
    estimate : float
        Importance sampling estimate
    weights : array
        Importance weights for each sample
    samples : array
        Generated samples
    """
    rng = np.random.default_rng() if rng is None else rng
    
    samples = [sample_g(rng) for _ in range(n)]
    weights = np.array([pdf_f(x) / pdf_g(x) for x in samples])
    h_values = np.array([h_fn(x) for x in samples])
    
    estimate = float(np.mean(weights * h_values))
    
    return estimate, weights, np.array(samples)

In [71]:
# Sample call for importance_sampling
# Estimate E[X^2] for X ~ N(0,1) using proposal N(0,2)
from scipy.stats import norm

pdf_f = lambda x: norm.pdf(x, 0, 1)
pdf_g = lambda x: norm.pdf(x, 0, 2)
sample_g = lambda rng: rng.normal(0, 2)
h_fn = lambda x: x**2

estimate, weights, samples = importance_sampling(n=1000, sample_g=sample_g, pdf_f=pdf_f, pdf_g=pdf_g, h_fn=h_fn, rng=make_rng(42))
print(f"Importance sampling estimate of E[X^2]: {estimate:.4f} (expected=1.0)")

Importance sampling estimate of E[X^2]: 0.9653 (expected=1.0)


### 3.6 Monte-Carlo Integration

**What is this?**
A method to estimate expectations (integrals) using random sampling instead of analytical computation.

**Goal:** Estimate $E[h(X)] = \int h(x)f(x)dx$

**Algorithm:**
1. Draw samples $x_1, x_2, \ldots, x_n$ from distribution $f$
2. Compute $h(x_i)$ for each sample
3. Average: $\hat{E}[h(X)] = \frac{1}{n}\sum_{i=1}^{n} h(x_i)$

**Why it works:**
- Law of Large Numbers: Sample average converges to true expectation
- Accuracy improves with $\sqrt{n}$

**When to use:**
- High-dimensional integrals (curse of dimensionality)
- Complex functions where analytical integration is impossible
- Estimating probabilities, expected values, etc.

**Example:** Estimating $\pi$ by sampling points in a square and counting how many fall in a circle

In [72]:
def monte_carlo_expectation(samples, h_fn):
    samples = np.asarray(samples)
    vals = np.array([h_fn(x) for x in samples])
    return float(np.mean(vals)), np.asarray(vals, float)


In [73]:
# Sample call for monte_carlo_expectation
rng_mc = make_rng(42)
samples_mc = rng_mc.normal(5, 2, size=1000)
h_func = lambda x: x**2
estimate_mc, vals = monte_carlo_expectation(samples_mc, h_func)
print(f"Monte Carlo estimate of E[X^2]: {estimate_mc:.4f} (expected={5**2 + 2**2:.4f})")

Monte Carlo estimate of E[X^2]: 28.3358 (expected=29.0000)


### 3.7 Hoeffding's Inequality

**What is this?**
A probability inequality that gives confidence bounds for the mean of bounded random variables.

**Formula:** For $n$ i.i.d. samples $X_i \in [a,b]$ with sample mean $\bar{X}$:
$$P(|\bar{X} - \mu| \geq \epsilon) \leq 2e^{-2n\epsilon^2/(b-a)^2}$$

**Confidence Interval:**
With probability $1-\alpha$, the true mean $\mu$ is in:
$$\bar{X} \pm (b-a)\sqrt{\frac{\log(2/\alpha)}{2n}}$$

**Key Properties:**
- Works for ANY bounded distribution (no assumptions!)
- Confidence interval width decreases as $1/\sqrt{n}$
- Wider range $(b-a)$ = Wider confidence interval

**When to use:**
- When you don't know the distribution
- When you need distribution-free guarantees
- Trade-off: Conservative (wider than parametric CIs)

In [74]:
import math

def hoeffding_ci(values, a, b, alpha=0.05):
    values = np.asarray(values, float)
    n = len(values)
    mean = float(np.mean(values))
    radius = (b - a) * math.sqrt(math.log(2/alpha) / (2*n))
    return (mean - radius, mean + radius), mean, radius


In [75]:
# Sample call for hoeffding_ci
bounded_values = np.random.uniform(0, 10, size=100)
ci_hoef, mean_hoef, radius_hoef = hoeffding_ci(bounded_values, a=0, b=10, alpha=0.05)
print(f"Sample mean: {mean_hoef:.4f}")
print(f"95% Hoeffding CI: ({ci_hoef[0]:.4f}, {ci_hoef[1]:.4f})")
print(f"Radius: {radius_hoef:.4f}")

Sample mean: 5.0668
95% Hoeffding CI: (3.7087, 6.4249)
Radius: 1.3581


In [76]:
# Sample call for hoeffding_ci
bounded_values = np.array([0.2, 0.5, 0.3, 0.8, 0.6, 0.4, 0.7, 0.5])
ci_hoeff, mean_hoeff, radius_hoeff = hoeffding_ci(bounded_values, a=0.0, b=1.0, alpha=0.05)
print(f"Hoeffding CI: ({ci_hoeff[0]:.4f}, {ci_hoeff[1]:.4f}), radius={radius_hoeff:.4f}")

Hoeffding CI: (0.0198, 0.9802), radius=0.4802


### 3.8 Markov's Inequality

**What is this?**
A basic probability inequality for non-negative random variables that gives an upper bound on tail probabilities.

**Formula:** For non-negative random variable $X$ and $a > 0$:
$$P(X \geq a) \leq \frac{E[X]}{a}$$

**Interpretation:**
- Probability that $X$ is at least $a$ times bigger than its mean
- Very general: works for ANY non-negative distribution
- Trade-off: Often very loose (conservative bound)

**Example:**
- If average income is \$50K, probability someone earns ≥ \$500K is at most 50K/500K = 10%
- Actual probability might be much lower!

**Key Properties:**
- Only requires mean to exist
- Useful when distribution is unknown
- Foundation for other inequalities (Chebyshev's)

**When to use:**
- Quick rough bounds when you only know the mean
- Proving theoretical results
- When tighter bounds (like Hoeffding's) don't apply

In [77]:
def markov_bound(mean_value, threshold):
    """
    Compute Markov's inequality upper bound on P(X >= threshold).
    
    Parameters:
    -----------
    mean_value : float
        Expected value E[X], must be positive
    threshold : float
        Value a > 0 for which we want P(X >= a)
    
    Returns:
    --------
    float : Upper bound on P(X >= threshold)
    """
    if mean_value <= 0 or threshold <= 0:
        raise ValueError("Both mean_value and threshold must be positive")
    return float(mean_value / threshold)

In [78]:
# Sample call for markov_bound
mean_val = 10.0
threshold_val = 50.0
markov_prob_bound = markov_bound(mean_val, threshold_val)
print(f"Markov bound: P(X >= {threshold_val}) <= {markov_prob_bound:.4f}")

Markov bound: P(X >= 50.0) <= 0.2000


### 3.9 Chebyshev's Inequality

**What is this?**
A stronger inequality than Markov's that bounds how far a random variable deviates from its mean.

**Formula:** For any random variable $X$ with mean $\mu$ and variance $\sigma^2$:
$$P(|X - \mu| \geq k\sigma) \leq \frac{1}{k^2}$$

Or equivalently:
$$P(|X - \mu| \geq \epsilon) \leq \frac{\sigma^2}{\epsilon^2}$$

**Confidence Interval:**
With probability at least $1-\alpha$, $X$ is within:
$$\mu \pm \frac{\sigma}{\sqrt{\alpha}}$$

**Interpretation:**
- At least 75% of data is within 2 standard deviations ($k=2$)
- At least 89% is within 3 standard deviations ($k=3$)
- Works for ANY distribution!

**Comparison with other inequalities:**
- Markov's: Only needs mean, only for non-negative variables
- Chebyshev's: Needs mean & variance, works for all variables, tighter bound
- Hoeffding's: Needs bounded range, even tighter for sample means

**When to use:**
- Distribution unknown but you have mean and variance
- Quick check for outliers
- Proving convergence in probability

In [79]:
def chebyshev_bound(variance, epsilon):
    """
    Compute Chebyshev's inequality upper bound on P(|X - mu| >= epsilon).
    
    Parameters:
    -----------
    variance : float
        Variance of the random variable
    epsilon : float
        Deviation threshold (> 0)
    
    Returns:
    --------
    float : Upper bound on P(|X - mu| >= epsilon)
    """
    if variance <= 0 or epsilon <= 0:
        raise ValueError("Variance and epsilon must be positive")
    return float(variance / (epsilon ** 2))

def chebyshev_ci(mean, std, alpha=0.05):
    """
    Compute confidence interval using Chebyshev's inequality.
    
    Parameters:
    -----------
    mean : float
        Mean of the distribution
    std : float
        Standard deviation
    alpha : float, default=0.05
        Confidence level (e.g., 0.05 for 95% CI)
    
    Returns:
    --------
    (lower, upper) : tuple
        Confidence interval bounds
    k : float
        Number of standard deviations
    """
    k = 1.0 / np.sqrt(alpha)
    margin = k * std
    return (mean - margin, mean + margin), k

In [80]:
# Sample calls for Chebyshev inequality
variance_val = 4.0
epsilon_val = 3.0
cheb_bound = chebyshev_bound(variance_val, epsilon_val)
print(f"Chebyshev bound: P(|X - mu| >= {epsilon_val}) <= {cheb_bound:.4f}")

# Chebyshev CI
mean_cheb = 50.0
std_cheb = 10.0
ci_cheb, k_cheb = chebyshev_ci(mean_cheb, std_cheb, alpha=0.05)
print(f"Chebyshev CI: ({ci_cheb[0]:.4f}, {ci_cheb[1]:.4f}), k={k_cheb:.4f} std deviations")

Chebyshev bound: P(|X - mu| >= 3.0) <= 0.4444
Chebyshev CI: (5.2786, 94.7214), k=4.4721 std deviations


# 4) Markov Chain & Convergence

### 4.1 Validate Transition Matrix

**What is this?**
Checks if a matrix is a valid Markov chain transition matrix.

**Requirements for validity:**
1. **Non-negative entries**: All probabilities $P_{ij} \geq 0$
2. **Row stochastic**: Each row sums to 1 (sum of transition probabilities from any state = 1)

**Interpretation:**
- $P_{ij}$ = Probability of transitioning from state $i$ to state $j$
- Each row represents a probability distribution over next states

**Example:**
```python
P = [[0.7, 0.3],
     [0.4, 0.6]]  # Valid: rows sum to 1, all non-negative
```

In [81]:
def is_transition_matrix(P, tol=1e-10):
    P = np.asarray(P, float)
    if (P < -tol).any():
        return False
    return np.allclose(P.sum(axis=1), 1.0, atol=1e-8)


In [82]:
# Sample call for is_transition_matrix
P_valid = np.array([[0.7, 0.3], [0.4, 0.6]])
P_invalid = np.array([[0.7, 0.4], [0.4, 0.6]])
print(f"Valid transition matrix: {is_transition_matrix(P_valid)}")
print(f"Invalid transition matrix: {is_transition_matrix(P_invalid)}")

Valid transition matrix: True
Invalid transition matrix: False


### 4.2 Estimate Transition Matrix from Path

**What is this?**
Learn the transition probabilities of a Markov chain from observed state sequences.

**Algorithm:**
1. Count transitions: How many times did we go from state $i$ to state $j$?
2. Normalize each row: $\hat{P}_{ij} = \frac{\text{count}(i \to j)}{\sum_k \text{count}(i \to k)}$

**Example:**
```
Path: [0, 1, 1, 0, 1]
Transitions: 0→1 (2 times), 1→1 (1 time), 1→0 (1 time)
Matrix: P[0,1] = 2/2 = 1.0
        P[1,0] = 1/2 = 0.5
        P[1,1] = 1/2 = 0.5
```

**Use case:** Learning transition probabilities from real data (e.g., customer behavior, weather patterns)

In [83]:
def estimate_transition_matrix(states, n_states=None):
    s = np.asarray(states, dtype=int)
    if n_states is None:
        n_states = int(s.max()) + 1

    counts = np.zeros((n_states, n_states), dtype=float)
    for a, b in zip(s[:-1], s[1:]):
        counts[a, b] += 1

    row_sums = counts.sum(axis=1, keepdims=True)
    P_hat = np.divide(counts, row_sums, out=np.zeros_like(counts), where=row_sums > 0)
    return P_hat


In [84]:
# Sample call for estimate_transition_matrix
observed_states = np.array([0, 1, 1, 0, 1, 0, 0, 1, 1, 2, 2, 1])
P_estimated = estimate_transition_matrix(observed_states, n_states=3)
print(f"Estimated transition matrix:\n{P_estimated}")

Estimated transition matrix:
[[0.25 0.75 0.  ]
 [0.4  0.4  0.2 ]
 [0.   0.5  0.5 ]]


### 4.3 Simulate Chain

**What is this?**
Generate a random path/trajectory through a Markov chain.

**Algorithm:**
1. Start at initial state $x_0$
2. At each step, randomly choose next state according to $P[x_t, \cdot]$
3. Repeat for $T$ steps

**Parameters:**
- `P`: Transition matrix
- `x0`: Starting state
- `T`: Number of transitions to simulate

**Output:** Sequence of states $[x_0, x_1, x_2, \ldots, x_T]$

**Use cases:**
- Simulating random processes (stock prices, weather)
- Monte Carlo estimation of chain properties
- Testing chain convergence

In [85]:
def simulate_chain(P, x0, T, rng=None):
    rng = np.random.default_rng() if rng is None else rng
    P = np.asarray(P, float)
    x = int(x0)
    path = [x]
    for _ in range(T):
        x = rng.choice(P.shape[0], p=P[x])
        path.append(int(x))
    return np.array(path, dtype=int)


In [86]:
# Sample call for simulate_chain
P_sim = np.array([[0.8, 0.2], [0.3, 0.7]])
path_sim = simulate_chain(P_sim, x0=0, T=10, rng=make_rng(42))
print(f"Simulated path: {path_sim}")

Simulated path: [0 0 0 1 1 0 1 1 1 0 0]


### 4.4 N-Step Transition Probability

**What is this?**
Computes the probability of being in state $j$ after exactly $n$ steps, starting from state $i$.

**Formula:**
$$P^{(n)}_{ij} = (P^n)_{ij}$$

where $P^n$ is the transition matrix raised to the $n$-th power.

**Why it works:**
- $P^1 = P$: 1-step transition probabilities
- $P^2 = P \times P$: 2-step transition probabilities  
- $P^n$: n-step transition probabilities (Chapman-Kolmogorov equation)

**Example:**
If you're in state 0, what's the probability of being in state 1 after 5 steps?
```python
prob = n_step_probability(P, start_state=0, end_state=1, n=5)
```

**Use cases:**
- Long-term predictions without computing full stationary distribution
- Finite-horizon forecasting (e.g., "What's my customer state in 6 months?")
- Understanding transient behavior before convergence

In [87]:
def n_step_probability(P, start_state, end_state, n):
    """
    Calculate probability of being in end_state after n steps from start_state.
    
    Parameters:
    -----------
    P : array-like, shape (n_states, n_states)
        Transition probability matrix
    start_state : int
        Starting state index
    end_state : int
        Target state index
    n : int
        Number of steps
        
    Returns:
    --------
    float : Probability of transition from start_state to end_state in n steps
    """
    P = np.asarray(P, float)
    P_n = np.linalg.matrix_power(P, n)
    return P_n[start_state, end_state]


In [88]:
# Sample call for n_step_probability
P_nstep = np.array([[0.7, 0.3], [0.4, 0.6]])
prob_5step = n_step_probability(P_nstep, start_state=0, end_state=1, n=5)
print(f"Probability of being in state 1 after 5 steps from state 0: {prob_5step:.4f}")

Probability of being in state 1 after 5 steps from state 0: 0.4275


### 4.5 First Passage Time Probability

**What is this?**
Computes the probability of reaching state $j$ **for the first time** after exactly $n$ steps, starting from state $i$.

**Difference from n-step probability:**
- **N-step probability**: Probability of being in state $j$ after $n$ steps (may have visited before)
- **First passage time**: Probability of reaching state $j$ for the **first time** at step $n$

**Formula:**
$$f^{(n)}_{ij} = P^{(n)}_{ij} - \sum_{k=1}^{n-1} f^{(k)}_{ij} \cdot P^{(n-k)}_{jj}$$

where:
- $f^{(1)}_{ij} = P_{ij}$ (1-step is always first time if $i \neq j$)
- For $i = j$: first return time to the same state

**Example:**
If you start in state 0, what's the probability you reach state 2 for the first time at exactly step 5?
```python
prob = first_passage_probability(P, start_state=0, target_state=2, n=5)
```

**Use cases:**
- Time-to-event analysis (e.g., "When will customer first churn?")
- Expected hitting times
- Analyzing recurrence properties

In [89]:
def first_passage_probability(P, start_state, target_state, n):
    """
    Calculate probability of reaching target_state for the FIRST time 
    at exactly step n, starting from start_state.
    
    Parameters:
    -----------
    P : array-like, shape (n_states, n_states)
        Transition probability matrix
    start_state : int
        Starting state index
    target_state : int
        Target state index to reach for first time
    n : int
        Number of steps (must reach at exactly this step)
        
    Returns:
    --------
    float : First passage probability
    """
    P = np.asarray(P, float)
    
    if n == 1:
        # First step: just direct transition probability
        return P[start_state, target_state]
    
    # Compute first passage probabilities recursively
    f = {}  # f[(k, i, j)] = first passage prob from i to j at step k
    
    # Base case: 1-step first passage
    for i in range(P.shape[0]):
        for j in range(P.shape[0]):
            f[(1, i, j)] = P[i, j]
    
    # Recursive computation for steps 2 to n
    for k in range(2, n + 1):
        for i in range(P.shape[0]):
            for j in range(P.shape[0]):
                # P^(k)_{ij} - sum of (first reach j at step m) * (stay around j until step k)
                P_k_ij = n_step_probability(P, i, j, k)
                correction = sum(f[(m, i, j)] * n_step_probability(P, j, j, k - m) 
                               for m in range(1, k))
                f[(k, i, j)] = P_k_ij - correction
    
    return f[(n, start_state, target_state)]


In [90]:
# Sample call for first_passage_probability
P_fpp = np.array([[0.7, 0.3], [0.4, 0.6]])
fpp = first_passage_probability(P_fpp, start_state=0, target_state=1, n=3)
print(f"Probability of first reaching state 1 at exactly step 3: {fpp:.4f}")

Probability of first reaching state 1 at exactly step 3: 0.1470


### 4.6 K-th Passage Time Probability

**What is this?**
Computes the probability of reaching state $j$ **for the k-th time** after exactly $n$ steps, starting from state $i$.

**Generalizes first passage time:**
- $k=1$: First time reaching the state
- $k=2$: Second time reaching the state
- $k=3$: Third time reaching the state, etc.

**Formula (recursive):**
$$f^{(n)}_k(i \to j) = \sum_{m=1}^{n-1} f^{(m)}_{k-1}(i \to j) \cdot f^{(n-m)}_1(j \to j)$$

where:
- $f^{(n)}_1(i \to j)$ is the first passage probability
- $f^{(n)}_k(i \to j)$ builds on previous visits

**Intuition:**
To reach $j$ for the k-th time at step $n$:
1. Reach $j$ for the (k-1)-th time at some earlier step $m$
2. Then reach $j$ again (first return) in the remaining $n-m$ steps

**Example:**
What's the probability of visiting state 1 for the **second time** at exactly step 10?
```python
prob = kth_passage_probability(P, start_state=0, target_state=1, k=2, n=10)
```

**Use cases:**
- Repeated event analysis (e.g., "When will customer churn for the 2nd time?")
- Multi-visit patterns
- Understanding state recurrence frequency

In [91]:
def kth_passage_probability(P, start_state, target_state, k, n):
    """
    Calculate probability of reaching target_state for the K-TH time 
    at exactly step n, starting from start_state.
    
    Parameters:
    -----------
    P : array-like, shape (n_states, n_states)
        Transition probability matrix
    start_state : int
        Starting state index
    target_state : int
        Target state index
    k : int
        Which visit (1=first time, 2=second time, etc.)
    n : int
        Number of steps (must reach for k-th time at exactly this step)
        
    Returns:
    --------
    float : K-th passage probability
    """
    P = np.asarray(P, float)
    
    if k < 1 or n < k:
        return 0.0  # Impossible to have k visits in less than k steps
    
    # Memoization: f_cache[(visit, step, i, j)] = prob of visit-th passage from i to j at step
    f_cache = {}
    
    def f_passage(visit, step, i, j):
        """Compute visit-th passage probability from i to j at exactly step."""
        if (visit, step, i, j) in f_cache:
            return f_cache[(visit, step, i, j)]
        
        if visit == 1:
            # First passage: use the previous function logic
            result = first_passage_probability(P, i, j, step)
        else:
            # K-th passage: sum over all ways to reach (k-1)-th visit, then first return
            result = 0.0
            for m in range(visit - 1, step):  # Need at least (visit-1) steps for (k-1) visits
                # Reach j for (visit-1)-th time at step m
                prob_prev = f_passage(visit - 1, m, i, j)
                # Then first return from j to j in (step - m) steps
                if step - m > 0:
                    prob_return = first_passage_probability(P, j, j, step - m)
                    result += prob_prev * prob_return
        
        f_cache[(visit, step, i, j)] = result
        return result
    
    return f_passage(k, n, start_state, target_state)


In [92]:
# Sample call for kth_passage_probability
P_kpp = np.array([[0.7, 0.3], [0.4, 0.6]])
kpp_2nd = kth_passage_probability(P_kpp, start_state=0, target_state=1, k=2, n=6)
print(f"Probability of 2nd visit to state 1 at step 6: {kpp_2nd:.4f}")

Probability of 2nd visit to state 1 at step 6: 0.0926


### 4.7 Irreducibility Check

**What is this?**
A Markov chain is **irreducible** if you can reach any state from any other state (eventually).

**Why it matters:**
- Irreducible chains have a unique stationary distribution
- Non-irreducible chains can get "stuck" in subsets of states

**Algorithm:**
Uses graph reachability: For each state, check if all other states are reachable following the transition graph.

**Example:**
```
Irreducible: [0→1→2→0] (circular)
Not irreducible: [0⇄1] [2⇄3] (two separate groups)
```

**Testing:** The function performs BFS/DFS from each state to verify full connectivity

In [93]:
def is_irreducible(P):
    P = np.asarray(P, float)
    n = P.shape[0]
    adj = (P > 0)

    def reachable(start):
        seen = set([start])
        stack = [start]
        while stack:
            u = stack.pop()
            for v in np.where(adj[u])[0]:
                if v not in seen:
                    seen.add(int(v))
                    stack.append(int(v))
        return seen

    for s in range(n):
        if len(reachable(s)) != n:
            return False
    return True


In [94]:
# Sample call for is_irreducible
P_irr = np.array([[0.8, 0.2], [0.3, 0.7]])
P_not_irr = np.array([[1.0, 0.0], [0.5, 0.5]])
print(f"First matrix irreducible: {is_irreducible(P_irr)}")
print(f"Second matrix irreducible: {is_irreducible(P_not_irr)}")

First matrix irreducible: True
Second matrix irreducible: False


In [95]:
Mat_A = np.array([[0.8,0.2,0,0],
                  [0.6,0.2,0.2,0],
                  [0,0.4,0,0.6],
                  [0,0,0.8,0.2]])

is_irreducible(Mat_A)

True

### 4.8 Communicating Classes

**What is this?**
States $i$ and $j$ **communicate** if you can go from $i$ to $j$ and from $j$ to $i$ (not necessarily in one step).

**Communicating Class:**
A maximal set of states that all communicate with each other.

**Properties:**
- Communication is an equivalence relation (reflexive, symmetric, transitive)
- Every state belongs to exactly one communicating class
- Irreducible chain = Single communicating class (all states communicate)

**Why it matters:**
- Identifies independent parts of the chain
- States in different classes don't interact
- Each class can have its own stationary distribution

**Example:**
```
States: {0, 1, 2, 3}
Transitions: 0⇄1, 2⇄3 (but no path between {0,1} and {2,3})
Classes: {0, 1} and {2, 3}
```

In [96]:
def find_communicating_classes(P):
    """
    Find all communicating classes in a Markov chain.
    
    Parameters:
    -----------
    P : array-like
        Transition matrix
    
    Returns:
    --------
    classes : list of sets
        List of communicating classes (each class is a set of state indices)
    """
    P = np.asarray(P, float)
    n = P.shape[0]
    adj = (P > 0)
    
    def reachable_from(start):
        """Find all states reachable from start."""
        seen = set([start])
        stack = [start]
        while stack:
            u = stack.pop()
            for v in np.where(adj[u])[0]:
                if v not in seen:
                    seen.add(int(v))
                    stack.append(int(v))
        return seen
    
    # Find communicating classes using Union-Find approach
    classes = []
    visited = set()
    
    for i in range(n):
        if i in visited:
            continue
        
        # Find all states reachable from i
        reachable_i = reachable_from(i)
        
        # Find states that can reach i (i.e., i is reachable from them)
        can_reach_i = set()
        for j in range(n):
            if i in reachable_from(j):
                can_reach_i.add(j)
        
        # Communicating class = intersection
        comm_class = reachable_i & can_reach_i
        
        classes.append(comm_class)
        visited.update(comm_class)
    
    return classes

def classify_chain_structure(P):
    """
    Classify the structure of a Markov chain.
    
    Returns information about communicating classes and irreducibility.
    """
    P = np.asarray(P, float)
    classes = find_communicating_classes(P)
    
    is_irreducible_chain = (len(classes) == 1)
    
    return {
        "num_classes": len(classes),
        "classes": classes,
        "is_irreducible": is_irreducible_chain,
        "class_sizes": [len(c) for c in classes]
    }

In [97]:
# Sample call for find_communicating_classes
P_comm = np.array([[0.5, 0.5, 0, 0], 
                   [0.5, 0.5, 0, 0],
                   [0, 0, 0.6, 0.4],
                   [0, 0, 0.3, 0.7]])
classes = find_communicating_classes(P_comm)
print(f"Communicating classes: {classes}")

# Classify chain structure
structure = classify_chain_structure(P_comm)
print(f"Number of classes: {structure['num_classes']}")
print(f"Is irreducible: {structure['is_irreducible']}")

Communicating classes: [{0, 1}, {2, 3}]
Number of classes: 2
Is irreducible: False


### 4.9 Transient and Recurrent States

**What is this?**
Classification of states based on whether the chain will definitely return to them.

**Definitions:**
- **Recurrent state**: Starting from state $i$, the chain returns to $i$ with probability 1
  - Guaranteed to revisit infinitely often
- **Transient state**: Starting from state $i$, there's a chance of never returning
  - Visited only finitely many times (eventually leaves forever)

**Mathematical Test:**
State $i$ is recurrent if and only if:
$$\sum_{n=1}^{\infty} P^n_{ii} = \infty$$

**Key Properties:**
- In a finite irreducible chain, all states are recurrent
- Transient states lead to recurrent states
- If state $i$ is recurrent and communicates with $j$, then $j$ is also recurrent

**Example:**
```
Random walk on integers: All states are recurrent (1D)
Random walk on 2D grid: All states are recurrent
Random walk on 3D grid: All states are transient!
```

**Practical importance:**
- Recurrent states form the "core" of the chain
- Transient states are temporary (system eventually settles in recurrent states)

In [98]:
def classify_states_transient_recurrent(P, max_steps=1000, tol=1e-10):
    """
    Classify states as transient or recurrent using finite approximation.
    
    For finite chains: A state is recurrent if starting from it, you can return to it.
    More precisely, we check if the sum of return probabilities diverges.
    
    Parameters:
    -----------
    P : array-like
        Transition matrix
    max_steps : int, default=1000
        Maximum number of steps to check
    tol : float, default=1e-10
        Tolerance for numerical comparisons
    
    Returns:
    --------
    classification : dict
        Dictionary with 'recurrent' and 'transient' state sets
    """
    P = np.asarray(P, float)
    n = P.shape[0]
    
    recurrent = set()
    transient = set()
    
    # For each state, compute sum of P^n[i,i]
    P_power = P.copy()
    cumulative_return_prob = np.diag(P).copy()
    
    for step in range(2, max_steps + 1):
        P_power = P_power @ P
        cumulative_return_prob += np.diag(P_power)
    
    # In finite chains, we use a heuristic:
    # If sum is very large, state is recurrent
    # More precisely: check if state can reach itself
    for i in range(n):
        # Check if i can return to itself
        can_return = cumulative_return_prob[i] > tol
        
        # For finite chains: state is recurrent if it's in a closed communicating class
        # Simple heuristic: if cumulative return probability is high, it's recurrent
        if cumulative_return_prob[i] > max_steps * 0.01:  # Heuristic threshold
            recurrent.add(i)
        else:
            transient.add(i)
    
    return {
        "recurrent": recurrent,
        "transient": transient,
        "cumulative_return_probs": cumulative_return_prob
    }

def find_absorbing_states(P, tol=1e-10):
    """
    Find absorbing states in a Markov chain.
    
    An absorbing state is a state that, once entered, cannot be left (P[i,i] = 1).
    
    Parameters:
    -----------
    P : array-like
        Transition matrix
    tol : float, default=1e-10
        Tolerance for checking P[i,i] = 1
    
    Returns:
    --------
    absorbing_states : set
        Set of absorbing state indices
    """
    P = np.asarray(P, float)
    n = P.shape[0]
    
    absorbing = set()
    for i in range(n):
        if abs(P[i, i] - 1.0) < tol:
            absorbing.add(i)
    
    return absorbing

In [99]:
# Sample call for classify_states_transient_recurrent
P_trans = np.array([[0.5, 0.5, 0],
                    [0, 0.6, 0.4],
                    [0, 0, 1.0]])  # State 2 is absorbing
classification = classify_states_transient_recurrent(P_trans, max_steps=100)
print(f"Recurrent states: {classification['recurrent']}")
print(f"Transient states: {classification['transient']}")

# Find absorbing states
absorbing = find_absorbing_states(P_trans)
print(f"Absorbing states: {absorbing}")

Recurrent states: {1, 2}
Transient states: {0}
Absorbing states: {2}


### 4.10 Absorbing States and Absorbing Chains

**What is this?**
An **absorbing state** is a state you can never leave once you enter it.

**Mathematical definition:** State $i$ is absorbing if $P_{ii} = 1$ (and $P_{ij} = 0$ for $j \neq i$)

**Absorbing Chain:**
A chain with:
1. At least one absorbing state
2. From every state, you can reach some absorbing state

**Standard Form:**
Absorbing chains can be written as:
$$P = \begin{pmatrix} Q & R \\ 0 & I \end{pmatrix}$$

Where:
- $Q$: Transitions between transient states
- $R$: Transitions from transient to absorbing
- $I$: Identity matrix (absorbing states stay put)

**Fundamental Matrix:** $N = (I - Q)^{-1}$
- $N_{ij}$ = Expected number of times in transient state $j$ starting from $i$

**Absorption Probabilities:** $B = NR$
- $B_{ij}$ = Probability of absorbing into state $j$ starting from transient state $i$

**Examples:**
- Gambler's ruin: Broke ($0) and Rich (target) are absorbing
- Random walk with barriers
- Customer churn models

In [100]:
def analyze_absorbing_chain(P, absorbing_states=None, tol=1e-10):
    """
    Analyze an absorbing Markov chain.
    
    Computes fundamental matrix and absorption probabilities.
    
    Parameters:
    -----------
    P : array-like
        Transition matrix
    absorbing_states : set or list, optional
        Indices of absorbing states. If None, automatically detected.
    tol : float, default=1e-10
        Tolerance for numerical operations
    
    Returns:
    --------
    result : dict
        Dictionary containing:
        - 'absorbing_states': Set of absorbing state indices
        - 'transient_states': List of transient state indices
        - 'N': Fundamental matrix (expected time in each transient state)
        - 'B': Absorption probability matrix
        - 'expected_steps': Expected steps to absorption from each transient state
    """
    P = np.asarray(P, float)
    n = P.shape[0]
    
    # Find absorbing states if not provided
    if absorbing_states is None:
        absorbing_states = find_absorbing_states(P, tol)
    
    absorbing_states = set(absorbing_states)
    transient_states = [i for i in range(n) if i not in absorbing_states]
    
    if len(absorbing_states) == 0:
        return {
            "absorbing_states": set(),
            "transient_states": list(range(n)),
            "N": None,
            "B": None,
            "expected_steps": None,
            "message": "No absorbing states found"
        }
    
    if len(transient_states) == 0:
        return {
            "absorbing_states": absorbing_states,
            "transient_states": [],
            "N": None,
            "B": None,
            "expected_steps": None,
            "message": "All states are absorbing"
        }
    
    # Reorder states: transient first, then absorbing
    absorbing_list = sorted(absorbing_states)
    state_order = transient_states + absorbing_list
    
    # Reorder transition matrix
    P_reordered = P[np.ix_(state_order, state_order)]
    
    # Extract Q (transient to transient) and R (transient to absorbing)
    n_transient = len(transient_states)
    Q = P_reordered[:n_transient, :n_transient]
    R = P_reordered[:n_transient, n_transient:]
    
    # Compute fundamental matrix N = (I - Q)^(-1)
    I = np.eye(n_transient)
    try:
        N = np.linalg.inv(I - Q)
    except np.linalg.LinAlgError:
        return {
            "absorbing_states": absorbing_states,
            "transient_states": transient_states,
            "N": None,
            "B": None,
            "expected_steps": None,
            "message": "Cannot compute fundamental matrix (I-Q is singular)"
        }
    
    # Compute absorption probabilities B = NR
    B = N @ R
    
    # Expected number of steps to absorption
    expected_steps = N @ np.ones(n_transient)
    
    return {
        "absorbing_states": absorbing_states,
        "transient_states": transient_states,
        "N": N,  # Fundamental matrix
        "B": B,  # Absorption probabilities
        "expected_steps": expected_steps,
        "Q": Q,
        "R": R
    }

In [101]:
# Sample call for analyze_absorbing_chain
# Gambler's ruin: States 0 and 4 are absorbing (broke and rich)
P_gambler = np.array([[1.0, 0.0, 0.0, 0.0, 0.0],  # State 0: broke (absorbing)
                      [0.4, 0.0, 0.6, 0.0, 0.0],  # State 1: $1
                      [0.0, 0.4, 0.0, 0.6, 0.0],  # State 2: $2
                      [0.0, 0.0, 0.4, 0.0, 0.6],  # State 3: $3
                      [0.0, 0.0, 0.0, 0.0, 1.0]]) # State 4: rich (absorbing)

result = analyze_absorbing_chain(P_gambler)
print(f"Absorbing states: {result['absorbing_states']}")
print(f"Transient states: {result['transient_states']}")
if result['expected_steps'] is not None:
    print(f"Expected steps to absorption from each transient state: {result['expected_steps']}")
    print(f"Absorption probabilities (to each absorbing state):\n{result['B']}")

Absorbing states: {0, 4}
Transient states: [1, 2, 3]
Expected steps to absorption from each transient state: [3.30769231 3.84615385 2.53846154]
Absorption probabilities (to each absorbing state):
[[0.58461538 0.41538462]
 [0.30769231 0.69230769]
 [0.12307692 0.87692308]]


### 4.11 Periodicity Check

**What is this?**
A chain is **aperiodic** if states don't have cyclical return patterns. Periodicity matters for convergence.

**Period of a state:** $d = \gcd\{n : P^n_{ii} > 0\}$
- Period 1 = Aperiodic (can return at any time)
- Period > 1 = Periodic (returns only at multiples of d)

**Why it matters:**
- Aperiodic + Irreducible → Convergence to unique stationary distribution
- Periodic chains oscillate (don't converge to stationary distribution)

**Making chain aperiodic:**
Add self-loops (e.g., set $P_{ii} > 0$ for some state)

**Example:**
- [0→1→0]: Period 2 (alternates)
- [0→0, 0→1, 1→0]: Period 1 (aperiodic, has self-loop)

In [102]:
import math

def gcd_list(numbers):
    """Compute GCD of a list of numbers."""
    result = numbers[0]
    for num in numbers[1:]:
        result = math.gcd(result, num)
    return result

def state_period(P, state):
    """
    Compute the period of a specific state in a Markov chain.
    
    Parameters:
    -----------
    P : array-like
        Transition matrix
    state : int
        State to check period for
    
    Returns:
    --------
    period : int
        Period of the state (1 = aperiodic)
    """
    P = np.asarray(P, float)
    n = P.shape[0]
    max_steps = 100  # Check up to this many steps
    
    # Find all n where P^n[state, state] > 0
    return_times = []
    P_power = P.copy()
    
    for step in range(1, max_steps + 1):
        if P_power[state, state] > 1e-10:
            return_times.append(step)
        P_power = P_power @ P
    
    if len(return_times) == 0:
        return float('inf')  # Never returns
    
    return gcd_list(return_times)

def is_aperiodic(P):
    """Check if chain is aperiodic (all states have period 1)."""
    P = np.asarray(P, float)
    n = P.shape[0]
    
    for state in range(n):
        if state_period(P, state) > 1:
            return False
    return True

def all_state_periods(P):
    """
    Compute the period for each state in a Markov chain.
    
    Parameters:
    -----------
    P : array-like
        Transition matrix
    
    Returns:
    --------
    periods : array
        Array where periods[i] is the period of state i
        (period = 1 means aperiodic, period = inf means never returns)
    """
    P = np.asarray(P, float)
    n = P.shape[0]
    
    periods = np.zeros(n)
    for state in range(n):
        periods[state] = state_period(P, state)
    
    return periods

In [103]:
# Sample calls for periodicity functions
P_periodic = np.array([[0, 1], [1, 0]])  # Alternating (period 2)
P_aperiodic = np.array([[0.5, 0.5], [0.5, 0.5]])  # Has self-loops (period 1)

period_0 = state_period(P_periodic, 0)
print(f"Period of state 0 in periodic chain: {period_0}")

is_aper = is_aperiodic(P_aperiodic)
print(f"Second chain is aperiodic: {is_aper}")

all_periods = all_state_periods(P_periodic)
print(f"All state periods in periodic chain: {all_periods}")

Period of state 0 in periodic chain: 2
Second chain is aperiodic: True
All state periods in periodic chain: [2. 2.]


### 4.12 Stationary Distribution

**What is this?**
The stationary (or invariant) distribution is a probability distribution over states that remains unchanged as the Markov chain evolves. Once the chain reaches this distribution, it stays in it forever. This represents the equilibrium behavior of the system.

**Mathematical Definition:** 

A probability distribution $\pi = [\pi_1, \pi_2, \ldots, \pi_n]$ is stationary if:

$$\pi^T P = \pi^T$$

or equivalently:

$$\sum_{j=1}^{n} \pi_j P_{ji} = \pi_i \quad \text{for all } i$$

where:
- $\pi$ is a row vector of probabilities
- $P$ is the transition matrix
- $\pi_i \geq 0$ for all $i$ and $\sum_{i=1}^{n} \pi_i = 1$

**Key Properties:**

1. **Eigenvector Interpretation:** $\pi$ is a left eigenvector of $P$ with eigenvalue 1
   - Equivalently, $\pi$ is a right eigenvector of $P^T$ with eigenvalue 1
   
2. **Uniqueness:** For irreducible, aperiodic chains, the stationary distribution is unique

3. **Convergence:** $\lim_{n \to \infty} P^n = \begin{bmatrix} \pi^T \\ \pi^T \\ \vdots \\ \pi^T \end{bmatrix}$ (each row converges to $\pi^T$)

**How to Find It:**

**Method 1: Eigenvalue Approach**
1. Find eigenvector $v$ of $P^T$ corresponding to eigenvalue $\lambda = 1$
2. Normalize: $\pi = \frac{v}{\sum_i v_i}$ so that $\sum_i \pi_i = 1$

**Method 2: System of Linear Equations**
1. Solve $\pi^T P = \pi^T$ which gives $(n-1)$ independent equations
2. Add constraint $\sum_{i=1}^{n} \pi_i = 1$
3. Solve the resulting system

**Method 3: Power Method**
- Compute $P^n$ for large $n$; any row gives $\pi^T$

**Interpretation:**
- $\pi_i$ = Long-run proportion of time the chain spends in state $i$
- $\pi_i$ = Limiting probability of being in state $i$ after many steps
- After running the chain long enough, state probabilities converge to $\pi$ regardless of initial state
- For irreducible, aperiodic chains, this convergence is guaranteed

**Detailed Example:** 

Weather Model with states $\{$Sunny, Rainy$\}$ and transition matrix:

$$P = \begin{bmatrix} 0.8 & 0.2 \\ 0.4 & 0.6 \end{bmatrix}$$

To find $\pi = [\pi_S, \pi_R]$, solve:
- $0.8\pi_S + 0.4\pi_R = \pi_S$ → $0.2\pi_S = 0.4\pi_R$ → $\pi_S = 2\pi_R$
- $\pi_S + \pi_R = 1$

Solution: $\pi = [0.667, 0.333]$ or $[\frac{2}{3}, \frac{1}{3}]$

**Interpretation:** In the long run, expect 66.7% sunny days and 33.3% rainy days.

In [104]:
from numpy.linalg import eig


def has_stationary_distribution(P):
    """
    Check if a Markov chain has a unique stationary distribution.
    
    A finite Markov chain has a unique stationary distribution if and only if
    it is irreducible and aperiodic (i.e., ergodic).
    
    Parameters:
    -----------
    P : array-like
        Transition matrix
    
    Returns:
    --------
    has_stationary : bool
        True if chain has a unique stationary distribution
    reason : str
        Explanation of why the chain does or doesn't have stationary distribution
    """
    P = np.asarray(P, float)
    
    # Check if it's a valid transition matrix
    if not is_transition_matrix(P):
        return False, "Not a valid transition matrix"
    
    # Check irreducibility
    if not is_irreducible(P):
        return False, "Chain is not irreducible (some states are not reachable from others)"
    
    # Check aperiodicity
    if not is_aperiodic(P):
        return False, "Chain is not aperiodic (has periodic states)"
    
    return True, "Chain is ergodic (irreducible and aperiodic) - has unique stationary distribution"

def stationary_distribution(P):
    P = np.asarray(P, dtype=float)
    w, v = eig(P.T)
    k = np.argmin(np.abs(w - 1))
    pi = np.real(v[:, k])
    pi = np.abs(pi)  # Take absolute value instead of maximum with 0
    pi = pi / pi.sum()
    return pi


In [105]:
# Sample calls for stationary distribution
P_stat = np.array([[0.7, 0.3], [0.4, 0.6]])

# Check if has stationary distribution
has_stat, reason = has_stationary_distribution(P_stat)
print(f"Has stationary distribution: {has_stat}")
print(f"Reason: {reason}")

# Compute stationary distribution
pi = stationary_distribution(P_stat)
print(f"Stationary distribution: {pi}")

# Verify it's stationary: π^T P = π^T
pi_after = pi @ P_stat
print(f"π after one step: {pi_after}")
print(f"Distribution unchanged: {np.allclose(pi, pi_after)}")

Has stationary distribution: True
Reason: Chain is ergodic (irreducible and aperiodic) - has unique stationary distribution
Stationary distribution: [0.57142857 0.42857143]
π after one step: [0.57142857 0.42857143]
Distribution unchanged: True


### Reversibility and Detailed Balance

**What is this?**
A Markov chain is **reversible** if the forward and backward processes are statistically identical. This is formalized through the **detailed balance condition**.

**Detailed Balance Condition:**
A chain with transition matrix $P$ and stationary distribution $\pi$ satisfies detailed balance if:
$$\pi_i P_{ij} = \pi_j P_{ji} \quad \text{for all states } i, j$$

**Interpretation:**
- $\pi_i P_{ij}$ = Long-run rate of transitions from $i$ to $j$
- $\pi_j P_{ji}$ = Long-run rate of transitions from $j$ to $i$
- Detailed balance means these rates are equal (equilibrium at individual transition level)

**Why it matters:**
- **Checking stationary distribution**: If detailed balance holds, $\pi$ is stationary
- **MCMC algorithms**: Many samplers (Metropolis-Hastings) rely on reversibility
- **Symmetric chains**: If $P = P^T$ (symmetric), the chain is reversible with $\pi = \text{uniform}$

**Key Properties:**
- Reversible + Irreducible → Unique stationary distribution
- Not all chains are reversible (e.g., deterministic cycles)
- Reversibility is sufficient but not necessary for stationarity

**Example:**
Random walk on undirected graph is reversible; directed cycle is not.

In [106]:
def check_detailed_balance(P, pi, tol=1e-8):
    """
    Check if a Markov chain satisfies detailed balance condition.
    
    Parameters:
    -----------
    P : array-like
        Transition matrix
    pi : array-like
        Stationary distribution (or candidate stationary distribution)
    tol : float, default=1e-8
        Numerical tolerance for equality check
    
    Returns:
    --------
    is_reversible : bool
        True if detailed balance holds
    max_violation : float
        Maximum absolute difference |pi_i * P_ij - pi_j * P_ji|
    """
    P = np.asarray(P, float)
    pi = np.asarray(pi, float)
    n = P.shape[0]
    
    # Check detailed balance: pi[i] * P[i,j] = pi[j] * P[j,i] for all i,j
    max_violation = 0.0
    
    for i in range(n):
        for j in range(n):
            forward_rate = pi[i] * P[i, j]
            backward_rate = pi[j] * P[j, i]
            violation = abs(forward_rate - backward_rate)
            max_violation = max(max_violation, violation)
    
    is_reversible = (max_violation < tol)
    
    return is_reversible, float(max_violation)

def is_reversible_chain(P, pi=None, tol=1e-8):
    """
    Check if a Markov chain is reversible.
    
    Parameters:
    -----------
    P : array-like
        Transition matrix
    pi : array-like, optional
        Stationary distribution. If None, computes it automatically
    tol : float, default=1e-8
        Numerical tolerance
    
    Returns:
    --------
    reversible : bool
        True if chain is reversible
    message : str
        Explanation
    """
    P = np.asarray(P, float)
    
    # Check if it's a valid transition matrix
    if not is_transition_matrix(P):
        return False, "Not a valid transition matrix"
    
    # Get stationary distribution if not provided
    if pi is None:
        # Check if chain has unique stationary distribution
        has_stat, reason = has_stationary_distribution(P)
        if not has_stat:
            return False, f"Cannot check reversibility: {reason}"
        pi = stationary_distribution(P)
    
    # Check detailed balance
    is_rev, max_viol = check_detailed_balance(P, pi, tol)
    
    if is_rev:
        return True, f"Chain is reversible (max violation: {max_viol:.2e})"
    else:
        return False, f"Chain is not reversible (max violation: {max_viol:.2e})"

def check_symmetry(P, tol=1e-8):
    """
    Check if transition matrix is symmetric (P = P^T).
    Symmetric chains are always reversible with uniform stationary distribution.
    
    Parameters:
    -----------
    P : array-like
        Transition matrix
    tol : float, default=1e-8
        Numerical tolerance
    
    Returns:
    --------
    is_symmetric : bool
        True if P = P^T
    max_diff : float
        Maximum absolute difference |P_ij - P_ji|
    """
    P = np.asarray(P, float)
    diff = P - P.T
    max_diff = float(np.max(np.abs(diff)))
    is_symmetric = (max_diff < tol)
    
    return is_symmetric, max_diff

In [107]:
# Sample calls for reversibility and detailed balance
P_rev = np.array([[0.7, 0.3], [0.3, 0.7]])  # Symmetric (reversible)
pi_rev = stationary_distribution(P_rev)

# Check detailed balance
is_rev, max_viol = check_detailed_balance(P_rev, pi_rev)
print(f"Detailed balance holds: {is_rev}")
print(f"Max violation: {max_viol:.6f}")

# Check if reversible
reversible, msg = is_reversible_chain(P_rev)
print(f"Chain is reversible: {reversible}")
print(f"Message: {msg}")

# Check symmetry
is_sym, max_diff = check_symmetry(P_rev)
print(f"Matrix is symmetric: {is_sym}")

Detailed balance holds: True
Max violation: 0.000000
Chain is reversible: True
Message: Chain is reversible (max violation: 2.78e-17)
Matrix is symmetric: True


### Mean Return Time

**What is this?**
The expected number of steps to return to a state, starting from that state.

**Mathematical definition:** 
For state $i$: $m_i = E[\text{time to return to } i | X_0 = i]$

**Relationship to Stationary Distribution:**
For an irreducible, aperiodic chain:
$$\pi_i = \frac{1}{m_i}$$

**Interpretation:**
- $m_i = 10$ means on average, the chain returns to state $i$ every 10 steps
- $\pi_i = 0.1$ means state $i$ is visited 10% of the time
- States with higher stationary probability have shorter return times

**Why it matters:**
- Understanding how "frequently" states are visited
- Quality control: How often does the system return to a desired state?
- Queueing: Average time between customer arrivals

**Example:**
Weather with $\pi_{\text{sunny}} = 0.7$:
- Mean return time to sunny = $1/0.7 \approx 1.43$ days

In [108]:
def mean_return_times(P, pi=None):
    """
    Compute mean return time for each state.
    
    For an ergodic chain, mean return time m_i = 1 / pi_i.
    
    Parameters:
    -----------
    P : array-like
        Transition matrix
    pi : array-like, optional
        Stationary distribution. If None, computes it automatically.
    
    Returns:
    --------
    return_times : array
        Mean return time for each state
    """
    P = np.asarray(P, float)
    
    # Get stationary distribution
    if pi is None:
        has_stat, reason = has_stationary_distribution(P)
        if not has_stat:
            raise ValueError(f"Cannot compute return times: {reason}")
        pi = stationary_distribution(P)
    
    pi = np.asarray(pi, float)
    
    # Mean return time = 1 / pi_i (for ergodic chains)
    return_times = np.zeros(len(pi))
    for i in range(len(pi)):
        if pi[i] > 1e-10:
            return_times[i] = 1.0 / pi[i]
        else:
            return_times[i] = np.inf  # State not visited in stationary distribution
    
    return return_times

In [109]:
# Sample call for mean_return_times
P_return = np.array([[0.8, 0.2], [0.3, 0.7]])
pi_return = stationary_distribution(P_return)
return_times = mean_return_times(P_return, pi_return)
print(f"Stationary distribution: {pi_return}")
print(f"Mean return times: {return_times}")
print(f"Verification: 1/π = {1.0/pi_return}")

Stationary distribution: [0.6 0.4]
Mean return times: [1.66666667 2.5       ]
Verification: 1/π = [1.66666667 2.5       ]


### Reducing and Reversing Markov Chains

**What is this?**
Techniques for simplifying Markov chains or constructing the time-reversed process.

**1. Canonical Form (Reducing to Standard Form)**

For chains with both transient and recurrent states, reorder the transition matrix into canonical form:
$$P = \begin{pmatrix} Q & R \\ 0 & S \end{pmatrix}$$

Where:
- $Q$: Transitions among transient states
- $R$: Transitions from transient to recurrent
- $S$: Transitions among recurrent states
- $0$: Recurrent states can't return to transient

**2. Lumping States (State Aggregation)**

Combine states into groups if they behave similarly:
- **Lumpable**: If for all states in a group, transition probabilities to other groups are identical
- Reduces state space complexity
- Preserves Markov property if done correctly

**3. Time-Reversed Chain**

The **reversed chain** has transition matrix $\tilde{P}$ where:
$$\tilde{P}_{ij} = \frac{\pi_j P_{ji}}{\pi_i}$$

**Properties:**
- If original chain has stationary distribution $\pi$, reversed chain has the same $\pi$
- Chain is reversible ⟺ Original and reversed chains are identical
- Used in MCMC diagnostics and theoretical analysis

**When to use:**
- **Canonical form**: Analyzing chains with absorbing states
- **Lumping**: Simplifying large state spaces
- **Reversing**: Testing reversibility, understanding detailed balance

In [110]:
def canonical_form(P, transient_states=None):
    """
    Convert transition matrix to canonical form.
    
    Reorders states so transient states come first, then recurrent states.
    Results in block matrix: [[Q, R], [0, S]]
    
    Parameters:
    -----------
    P : array-like
        Transition matrix
    transient_states : list, optional
        List of transient state indices. If None, automatically detected.
    
    Returns:
    --------
    P_canonical : array
        Reordered transition matrix in canonical form
    state_order : array
        New ordering of states
    partition : dict
        Dictionary with 'transient' and 'recurrent' state indices in new order
    """
    P = np.asarray(P, float)
    n = P.shape[0]
    
    # Detect transient and recurrent states if not provided
    if transient_states is None:
        classification = classify_states_transient_recurrent(P)
        transient_states = list(classification['transient'])
    
    transient_states = list(transient_states)
    recurrent_states = [i for i in range(n) if i not in transient_states]
    
    # Create new state ordering: transient first, then recurrent
    state_order = transient_states + recurrent_states
    
    # Reorder transition matrix
    P_canonical = P[np.ix_(state_order, state_order)]
    
    n_transient = len(transient_states)
    
    return P_canonical, np.array(state_order), {
        'transient': list(range(n_transient)),
        'recurrent': list(range(n_transient, n)),
        'n_transient': n_transient,
        'n_recurrent': len(recurrent_states)
    }

def reverse_chain(P, pi=None):
    """
    Compute the time-reversed Markov chain.
    
    The reversed chain has transition matrix P_tilde where:
    P_tilde[i,j] = (pi[j] * P[j,i]) / pi[i]
    
    Parameters:
    -----------
    P : array-like
        Transition matrix
    pi : array-like, optional
        Stationary distribution. If None, computes it automatically.
    
    Returns:
    --------
    P_reversed : array
        Transition matrix of reversed chain
    """
    P = np.asarray(P, float)
    n = P.shape[0]
    
    # Get stationary distribution
    if pi is None:
        has_stat, reason = has_stationary_distribution(P)
        if not has_stat:
            raise ValueError(f"Cannot compute reversed chain: {reason}")
        pi = stationary_distribution(P)
    
    pi = np.asarray(pi, float)
    
    # Compute reversed transition matrix
    P_reversed = np.zeros((n, n))
    for i in range(n):
        for j in range(n):
            if pi[i] > 1e-10:
                P_reversed[i, j] = (pi[j] * P[j, i]) / pi[i]
            else:
                P_reversed[i, j] = 0.0
    
    return P_reversed

def test_lumpability(P, partition, tol=1e-8):
    """
    Test if a partition of states is lumpable.
    
    A partition is lumpable if for each group A in the partition,
    all states in A have the same transition probability to each other group B.
    
    Parameters:
    -----------
    P : array-like
        Transition matrix
    partition : list of lists
        Each element is a list of states that form a group
    tol : float, default=1e-8
        Numerical tolerance
    
    Returns:
    --------
    is_lumpable : bool
        True if partition is lumpable
    lumped_P : array or None
        Transition matrix on lumped state space (if lumpable)
    """
    P = np.asarray(P, float)
    k = len(partition)  # Number of groups
    
    # Check lumpability condition
    lumped_P = np.zeros((k, k))
    
    for a, group_a in enumerate(partition):
        for b, group_b in enumerate(partition):
            # Compute transition probability from group_a to group_b
            # Should be same for all states in group_a
            probs = []
            for i in group_a:
                prob_i_to_b = sum(P[i, j] for j in group_b)
                probs.append(prob_i_to_b)
            
            # Check if all states in group_a have same transition prob to group_b
            if len(probs) > 0:
                avg_prob = np.mean(probs)
                if not all(abs(p - avg_prob) < tol for p in probs):
                    return False, None
                lumped_P[a, b] = avg_prob
    
    return True, lumped_P

In [111]:
# Sample calls for chain transformations
# Canonical form
P_canonical_test = np.array([[0.5, 0.5, 0],
                             [0, 0.6, 0.4],
                             [0, 0, 1.0]])
P_can, order, partition = canonical_form(P_canonical_test, transient_states=[0, 1])
print(f"Canonical form:\n{P_can}")
print(f"State ordering: {order}")
print(f"Partition: {partition}")

# Reverse chain
P_to_reverse = np.array([[0.7, 0.3], [0.4, 0.6]])
P_reversed = reverse_chain(P_to_reverse)
print(f"Original P:\n{P_to_reverse}")
print(f"Reversed P:\n{P_reversed}")

# Test lumpability
P_lump = np.array([[0.5, 0.5, 0, 0],
                   [0.5, 0.5, 0, 0],
                   [0, 0, 0.6, 0.4],
                   [0, 0, 0.3, 0.7]])
partition_test = [[0, 1], [2, 3]]
is_lump, lumped = test_lumpability(P_lump, partition_test)
print(f"Partition is lumpable: {is_lump}")
if is_lump:
    print(f"Lumped transition matrix:\n{lumped}")

Canonical form:
[[0.5 0.5 0. ]
 [0.  0.6 0.4]
 [0.  0.  1. ]]
State ordering: [0 1 2]
Partition: {'transient': [0, 1], 'recurrent': [2], 'n_transient': 2, 'n_recurrent': 1}
Original P:
[[0.7 0.3]
 [0.4 0.6]]
Reversed P:
[[0.7 0.3]
 [0.4 0.6]]
Partition is lumpable: True
Lumped transition matrix:
[[1. 0.]
 [0. 1.]]


### Expected Time to Hit a Target Set

**What is this?**
Calculates the expected number of steps to reach a target state (or set of states) from each starting state.

**Mathematical formulation:**
For state $i$ not in target: $h_i = 1 + \sum_j P_{ij} h_j$
For state $i$ in target: $h_i = 0$

**How it works:**
Solves a system of linear equations to find expected hitting times.

**Interpretation:**
- $h_i$ = Average number of steps to reach target starting from state $i$
- If target is never reachable from $i$, the value will be infinite

**Example use case:**
- Customer journey: Expected time until purchase
- Game: Expected moves until winning/losing
- Network: Expected hops to reach destination

In [112]:
def expected_hitting_times(P, target_states):
    P = np.asarray(P, float)
    n = P.shape[0]
    target = set(target_states)

    A = np.zeros((n, n), float)
    b = np.zeros(n, float)

    for i in range(n):
        if i in target:
            A[i, i] = 1.0
            b[i] = 0.0
        else:
            A[i, i] = 1.0
            A[i, :] -= P[i, :]
            b[i] = 1.0

    return np.linalg.solve(A, b)


In [113]:
# Sample call for expected_hitting_times (already has one after, but adding more complete example)
P_hit = np.array([[0.7, 0.2, 0.1],
                  [0.3, 0.5, 0.2],
                  [0.1, 0.3, 0.6]])
target_set = [2]
hitting_times = expected_hitting_times(P_hit, target_set)
print(f"Expected time to reach state 2 from each state: {hitting_times}")
print(f"From state 0: {hitting_times[0]:.4f} steps")
print(f"From state 1: {hitting_times[1]:.4f} steps")

Expected time to reach state 2 from each state: [7.77777778 6.66666667 0.        ]
From state 0: 7.7778 steps
From state 1: 6.6667 steps


In [114]:
expected_hitting_times(Mat_A, target_states=[2,3])

array([25., 20.,  0.,  0.])

### Convergence By Powering Matrix (K Steps)

**What is this?**
Computes the k-step transition probabilities by raising the transition matrix to the k-th power.

**Formula:** $P^{(k)} = P^k$

**Interpretation:**
- $P^k_{ij}$ = Probability of being in state $j$ after exactly $k$ steps starting from state $i$
- As $k \to \infty$, rows converge to the stationary distribution (for irreducible, aperiodic chains)

**Use cases:**
- Predict state distribution after k time steps
- Check convergence to stationary distribution
- Analyze mixing time

**Example:**
```python
P2 = n_step_transition(P, 2)  # 2-step transitions
P100 = n_step_transition(P, 100)  # Should be close to stationary
```

In [115]:
def n_step_transition(P, k):
    return np.linalg.matrix_power(np.asarray(P, float), k)

In [116]:
# Sample call for n_step_transition
P_nst = np.array([[0.8, 0.2], [0.3, 0.7]])
P_10 = n_step_transition(P_nst, k=10)
print(f"10-step transition matrix:\n{P_10}")
print(f"Compare to stationary: {stationary_distribution(P_nst)}")

10-step transition matrix:
[[0.60039063 0.39960938]
 [0.59941406 0.40058594]]
Compare to stationary: [0.6 0.4]


### Mixing Time

**What is this?**
The time it takes for a Markov chain to "forget" its initial state and get close to the stationary distribution.

**Mathematical Definition:**
The mixing time $\tau(\epsilon)$ is the smallest time $t$ such that:
$$\max_i \|P^t(i, \cdot) - \pi\|_{TV} \leq \epsilon$$

Where $\|\cdot\|_{TV}$ is the total variation distance.

**Total Variation Distance:**
$$\|p - q\|_{TV} = \frac{1}{2}\sum_j |p_j - q_j|$$

**Interpretation:**
- After $\tau(\epsilon)$ steps, distribution is within $\epsilon$ of stationary
- Common choice: $\epsilon = 0.25$ (within 25% of stationary)
- Smaller mixing time = Faster convergence

**Why it matters:**
- **MCMC**: How many samples to discard as "burn-in"
- **Randomized algorithms**: How long until output is "random enough"
- **Performance**: Fast mixing = Efficient sampling

**Factors affecting mixing time:**
- Chain connectivity (better connected = faster mixing)
- Second largest eigenvalue (closer to 1 = slower mixing)
- Bottlenecks in state space

**Example:**
Well-connected graph: $\tau \approx \log n$
Line graph: $\tau \approx n^2$ (very slow!)

In [117]:
def total_variation_distance(p, q):
    """
    Compute total variation distance between two probability distributions.
    
    TV(p, q) = 0.5 * sum_i |p_i - q_i|
    
    Parameters:
    -----------
    p, q : array-like
        Probability distributions (should sum to 1)
    
    Returns:
    --------
    tv_dist : float
        Total variation distance (between 0 and 1)
    """
    p = np.asarray(p, float)
    q = np.asarray(q, float)
    
    return 0.5 * np.sum(np.abs(p - q))

def estimate_mixing_time(P, epsilon=0.25, max_steps=10000, pi=None):
    """
    Estimate the mixing time of a Markov chain.
    
    Mixing time is the smallest t such that ||P^t(i,·) - π||_TV ≤ ε for all i.
    
    Parameters:
    -----------
    P : array-like
        Transition matrix
    epsilon : float, default=0.25
        Tolerance for total variation distance
    max_steps : int, default=10000
        Maximum number of steps to check
    pi : array-like, optional
        Stationary distribution. If None, computes it automatically.
    
    Returns:
    --------
    mixing_time : int
        Estimated mixing time (number of steps)
    convergence_info : dict
        Information about convergence from each starting state
    """
    P = np.asarray(P, float)
    n = P.shape[0]
    
    # Get stationary distribution
    if pi is None:
        has_stat, reason = has_stationary_distribution(P)
        if not has_stat:
            return None, {"error": f"Cannot compute mixing time: {reason}"}
        pi = stationary_distribution(P)
    
    pi = np.asarray(pi, float)
    
    # For each starting state, find when it gets close to stationary
    convergence_times = np.zeros(n, dtype=int)
    
    for start_state in range(n):
        # Initial distribution (start from state i)
        current_dist = np.zeros(n)
        current_dist[start_state] = 1.0
        
        P_power = np.eye(n)
        
        for t in range(1, max_steps + 1):
            P_power = P_power @ P
            current_dist = P_power[start_state, :]
            
            tv_dist = total_variation_distance(current_dist, pi)
            
            if tv_dist <= epsilon:
                convergence_times[start_state] = t
                break
        else:
            # Didn't converge within max_steps
            convergence_times[start_state] = max_steps
    
    mixing_time = int(np.max(convergence_times))
    
    return mixing_time, {
        "convergence_times": convergence_times,
        "max_time": mixing_time,
        "epsilon": epsilon,
        "all_converged": (mixing_time < max_steps)
    }

def spectral_gap_mixing_bound(P, pi=None):
    """
    Estimate mixing time using spectral gap (second largest eigenvalue).
    
    The spectral gap λ = 1 - |λ_2| where λ_2 is the second largest eigenvalue.
    Mixing time is roughly O(1/λ).
    
    Parameters:
    -----------
    P : array-like
        Transition matrix
    pi : array-like, optional
        Stationary distribution
    
    Returns:
    --------
    spectral_gap : float
        Gap between largest and second largest eigenvalue
    mixing_estimate : float
        Rough estimate of mixing time based on spectral gap
    """
    P = np.asarray(P, float)
    
    # Compute eigenvalues
    eigenvalues = np.linalg.eigvals(P)
    eigenvalues_sorted = np.sort(np.abs(eigenvalues))[::-1]
    
    # Spectral gap = 1 - |second largest eigenvalue|
    if len(eigenvalues_sorted) > 1:
        second_largest = eigenvalues_sorted[1]
        spectral_gap = 1.0 - second_largest
    else:
        spectral_gap = 1.0
    
    # Mixing time estimate: O(log(n) / spectral_gap)
    n = P.shape[0]
    if spectral_gap > 1e-10:
        mixing_estimate = np.log(n) / spectral_gap
    else:
        mixing_estimate = np.inf
    
    return float(spectral_gap), float(mixing_estimate)

In [118]:
# Sample calls for mixing time
P_mix = np.array([[0.8, 0.2], [0.3, 0.7]])

# Total variation distance
pi_mix = stationary_distribution(P_mix)
p1 = np.array([1.0, 0.0])  # Start from state 0
tv_dist = total_variation_distance(p1, pi_mix)
print(f"TV distance from deterministic start to stationary: {tv_dist:.4f}")

# Estimate mixing time
mix_time, mix_info = estimate_mixing_time(P_mix, epsilon=0.1, max_steps=100)
print(f"Mixing time (ε=0.1): {mix_time} steps")
print(f"Convergence times from each state: {mix_info['convergence_times']}")

# Spectral gap estimate
gap, estimate = spectral_gap_mixing_bound(P_mix)
print(f"Spectral gap: {gap:.4f}")
print(f"Mixing time estimate (spectral): {estimate:.2f}")

TV distance from deterministic start to stationary: 0.4000
Mixing time (ε=0.1): 3 steps
Convergence times from each state: [3 3]
Spectral gap: 0.5000
Mixing time estimate (spectral): 1.39


### Theoretical Foundations of Markov Chains

This section covers the key mathematical theorems that underpin Markov chain theory.

---

#### Chapman-Kolmogorov Equation

**What is this?**
The fundamental equation describing how multi-step transition probabilities compose.

**Statement:**
$$P^{(n+m)}_{ij} = \sum_{k} P^{(n)}_{ik} P^{(m)}_{kj}$$

In matrix form: $P^{(n+m)} = P^{(n)} \cdot P^{(m)}$

**Interpretation:**
- To go from $i$ to $j$ in $n+m$ steps, you must pass through some intermediate state $k$
- Probability = sum over all possible intermediate states
- This is essentially the Markov property in action

**Consequence:**
- $P^{(n)} = P^n$ (n-fold matrix multiplication)
- Multi-step probabilities can be computed by matrix powers
- Foundational for analyzing long-term behavior

---

#### Limiting Behavior Theorem

**What is this?**
Conditions under which a Markov chain converges to a stationary distribution.

**Statement:**
For a finite, irreducible, and aperiodic Markov chain:
$$\lim_{n \to \infty} P^n_{ij} = \pi_j$$

Where $\pi$ is the unique stationary distribution.

**What this means:**
- No matter where you start (state $i$), after many steps the probability of being in state $j$ approaches $\pi_j$
- The chain "forgets" its initial state
- All rows of $P^n$ converge to the same vector $\pi$

**Requirements:**
1. **Finite**: Finite number of states
2. **Irreducible**: Can reach any state from any other state
3. **Aperiodic**: No cyclical return patterns

**Visual interpretation:**
```
P^1: Initial transitions
P^2: 2-step transitions  
...
P^100: All rows ≈ [π₀, π₁, π₂, ...]
P^∞: Exact stationary distribution
```

---

#### Ergodic Theorem

**What is this?**
Connects **time averages** (how often you visit states in one long run) with **space averages** (stationary distribution).

**Statement:**
For an irreducible, aperiodic Markov chain with stationary distribution $\pi$:
$$\lim_{n \to \infty} \frac{1}{n} \sum_{k=1}^{n} \mathbb{1}_{X_k = j} = \pi_j \quad \text{almost surely}$$

**Interpretation:**
- Left side: Fraction of time spent in state $j$ over a long run
- Right side: Stationary probability of state $j$
- They're equal! (with probability 1)

**Practical meaning:**
- Simulate one long chain → frequency of visits estimates $\pi$
- Don't need multiple independent runs
- Justifies using simulation to estimate stationary distribution

**Example:**
Weather chain with $\pi_{\text{sunny}} = 0.7$:
- Run chain for 1000 days
- Count sunny days ≈ 700
- As days → ∞, proportion → 0.7 exactly

**Why it matters:**
- **Monte Carlo methods**: Single long run is enough
- **MCMC sampling**: Frequencies give you the target distribution
- **Real-world validation**: Can estimate $\pi$ from observed data

In [158]:
def verify_chapman_kolmogorov(P, n, m, tol=1e-10):
    """
    Verify Chapman-Kolmogorov equation: P^(n+m) = P^n * P^m
    
    Parameters:
    -----------
    P : array-like
        Transition matrix
    n, m : int
        Step counts to verify
    tol : float, default=1e-10
        Numerical tolerance
    
    Returns:
    --------
    verified : bool
        True if equation holds within tolerance
    max_error : float
        Maximum absolute difference between left and right sides
    """
    P = np.asarray(P, float)
    
    # Compute P^(n+m) directly
    P_n_plus_m = np.linalg.matrix_power(P, n + m)
    
    # Compute P^n * P^m
    P_n = np.linalg.matrix_power(P, n)
    P_m = np.linalg.matrix_power(P, m)
    P_n_times_P_m = P_n @ P_m
    
    # Check if they're equal
    diff = np.abs(P_n_plus_m - P_n_times_P_m)
    max_error = float(np.max(diff))
    verified = (max_error < tol)
    
    return verified, max_error

def demonstrate_limiting_behavior(P, max_steps=100, start_state=0):
    """
    Demonstrate convergence to stationary distribution.
    
    Shows how P^n rows converge to π as n increases.
    
    Parameters:
    -----------
    P : array-like
        Transition matrix
    max_steps : int, default=100
        Maximum number of steps to compute
    start_state : int, default=0
        Starting state for distribution evolution
    
    Returns:
    --------
    convergence_data : dict
        Contains:
        - 'steps': Array of step numbers
        - 'distributions': Distribution at each step
        - 'stationary': Stationary distribution
        - 'distances': Distance from stationary at each step
    """
    P = np.asarray(P, float)
    n_states = P.shape[0]
    
    # Get stationary distribution
    try:
        pi = stationary_distribution(P)
    except:
        pi = None
    
    # Track distribution starting from start_state
    steps = []
    distributions = []
    distances = []
    
    # Initial distribution (deterministic start)
    current_dist = np.zeros(n_states)
    current_dist[start_state] = 1.0
    
    P_power = np.eye(n_states)
    
    for step in range(max_steps + 1):
        steps.append(step)
        current_dist = P_power[start_state, :]
        distributions.append(current_dist.copy())
        
        if pi is not None:
            tv_dist = total_variation_distance(current_dist, pi)
            distances.append(tv_dist)
        
        if step < max_steps:
            P_power = P_power @ P
    
    return {
        'steps': np.array(steps),
        'distributions': np.array(distributions),
        'stationary': pi,
        'distances': np.array(distances) if pi is not None else None
    }

def demonstrate_ergodic_theorem(P, start_state=0, n_steps=10000, target_state=None):
    """
    Demonstrate the Ergodic Theorem by simulation.
    
    Shows that time average (frequency of visits) converges to stationary probability.
    
    Parameters:
    -----------
    P : array-like
        Transition matrix
    start_state : int, default=0
        Starting state
    n_steps : int, default=10000
        Number of simulation steps
    target_state : int, optional
        State to track. If None, tracks all states.
    
    Returns:
    --------
    result : dict
        Contains:
        - 'path': Simulated path
        - 'time_averages': Frequency of each state over time
        - 'stationary': Stationary distribution
        - 'final_frequencies': Final empirical frequencies
    """
    P = np.asarray(P, float)
    n_states = P.shape[0]
    
    # Simulate path
    path = simulate_chain(P, start_state, n_steps)
    
    # Compute cumulative frequencies
    visit_counts = np.zeros((n_steps + 1, n_states))
    
    for step in range(n_steps + 1):
        if step == 0:
            visit_counts[0, path[0]] = 1
        else:
            visit_counts[step] = visit_counts[step - 1]
            visit_counts[step, path[step]] += 1
    
    # Compute time averages (frequencies)
    time_averages = np.zeros((n_steps + 1, n_states))
    for step in range(1, n_steps + 1):
        time_averages[step] = visit_counts[step] / (step + 1)
    
    # Get stationary distribution
    try:
        pi = stationary_distribution(P)
    except:
        pi = None
    
    return {
        'path': path,
        'time_averages': time_averages,
        'stationary': pi,
        'final_frequencies': time_averages[-1],
        'steps': np.arange(n_steps + 1)
    }

In [159]:
# Sample calls for theoretical foundations
P_theory = np.array([[0.7, 0.3], [0.4, 0.6]])

# Verify Chapman-Kolmogorov equation
verified, max_err = verify_chapman_kolmogorov(P_theory, n=3, m=2)
print(f"Chapman-Kolmogorov verified: {verified}")
print(f"Maximum error: {max_err:.10f}")

# Demonstrate limiting behavior
limit_data = demonstrate_limiting_behavior(P_theory, max_steps=20, start_state=0)
print(f"Distribution at step 0: {limit_data['distributions'][0]}")
print(f"Distribution at step 20: {limit_data['distributions'][20]}")
print(f"Stationary distribution: {limit_data['stationary']}")
print(f"Distance from stationary at step 20: {limit_data['distances'][20]:.6f}")

# Demonstrate ergodic theorem
# Note: This might take a moment with 10000 steps
ergodic_data = demonstrate_ergodic_theorem(P_theory, start_state=0, n_steps=1000)
print(f"Final empirical frequencies: {ergodic_data['final_frequencies']}")
print(f"Stationary distribution: {ergodic_data['stationary']}")
print(f"Difference: {np.abs(ergodic_data['final_frequencies'] - ergodic_data['stationary'])}")

Chapman-Kolmogorov verified: True
Maximum error: 0.0000000000
Distribution at step 0: [1. 0.]
Distribution at step 20: [0.57142857 0.42857143]
Stationary distribution: [0.57142857 0.42857143]
Distance from stationary at step 20: 0.000000
Final empirical frequencies: [0.59340659 0.40659341]
Stationary distribution: [0.57142857 0.42857143]
Difference: [0.02197802 0.02197802]


# 5) Pattern Recognition & Classification

### 5.1 Confusion Matrix and Metrics

**What is this?**
A table showing the performance of a binary classifier by comparing predictions to true labels.

**Confusion Matrix:**
```
                Predicted
              0 (Neg)  1 (Pos)
Actual  0     TN       FP
        1     FN       TP
```

**Key Metrics:**
- **Precision** = $\frac{TP}{TP+FP}$ = Of all positive predictions, how many were correct?
  - High precision = Few false alarms
- **Recall (Sensitivity)** = $\frac{TP}{TP+FN}$ = Of all actual positives, how many did we catch?
  - High recall = Few missed positives
- **Accuracy** = $\frac{TP+TN}{TP+TN+FP+FN}$ = Overall correctness

![title](img/confusion_matrix.png)

**Trade-offs:**
- Precision ↑ Recall ↓ (and vice versa)
- Spam filter: High precision = Less good email marked as spam
- Disease screening: High recall = Catch more sick patients

In [121]:
def confusion_counts(y_true, y_pred):
    y_true = np.asarray(y_true, int)
    y_pred = np.asarray(y_pred, int)
    TP = int(np.sum((y_true==1) & (y_pred==1)))
    TN = int(np.sum((y_true==0) & (y_pred==0)))
    FP = int(np.sum((y_true==0) & (y_pred==1)))
    FN = int(np.sum((y_true==1) & (y_pred==0)))
    return {"TP":TP, "TN":TN, "FP":FP, "FN":FN}

def precision_recall_accuracy(counts):
    TP, TN, FP, FN = counts["TP"], counts["TN"], counts["FP"], counts["FN"]
    precision = TP/(TP+FP) if (TP+FP)>0 else 0.0
    recall = TP/(TP+FN) if (TP+FN)>0 else 0.0
    accuracy = (TP+TN)/(TP+TN+FP+FN) if (TP+TN+FP+FN)>0 else 0.0
    return precision, recall, accuracy


In [122]:
# Sample calls for confusion matrix and metrics
y_true_cf = np.array([1, 0, 1, 1, 0, 1, 0, 0])
y_pred_cf = np.array([1, 0, 1, 0, 0, 1, 1, 0])

counts = confusion_counts(y_true_cf, y_pred_cf)
print(f"Confusion matrix counts: {counts}")

prec, rec, acc = precision_recall_accuracy(counts)
print(f"Precision: {prec:.4f}, Recall: {rec:.4f}, Accuracy: {acc:.4f}")

Confusion matrix counts: {'TP': 3, 'TN': 3, 'FP': 1, 'FN': 1}
Precision: 0.7500, Recall: 0.7500, Accuracy: 0.7500


### 5.2 F1 Score

**What is this?**
The harmonic mean of precision and recall, providing a balanced measure of classifier performance.

**Formula:** $F_1 = 2 \cdot \frac{\text{Precision} \cdot \text{Recall}}{\text{Precision} + \text{Recall}}$

**Alternative form:** $F_1 = \frac{2 \cdot TP}{2 \cdot TP + FP + FN}$

**Why harmonic mean?**
- Penalizes extreme values (unlike arithmetic mean)
- If either precision or recall is low, F1 is low
- Example: Precision=1.0, Recall=0.1 → F1=0.18 (not 0.55)

**When to use:**
- Imbalanced datasets
- Need balance between precision and recall
- Single metric for model comparison

**Variants:**
- **F-beta score**: $F_\beta = (1+\beta^2) \cdot \frac{\text{Precision} \cdot \text{Recall}}{\beta^2 \cdot \text{Precision} + \text{Recall}}$
- β > 1: Favor recall
- β < 1: Favor precision

In [123]:
def f1_score(y_true, y_pred):
    """
    Compute F1 score for binary classification.
    
    Parameters:
    -----------
    y_true : array-like
        True binary labels
    y_pred : array-like
        Predicted binary labels
    
    Returns:
    --------
    f1 : float
        F1 score
    """
    counts = confusion_counts(y_true, y_pred)
    precision, recall, _ = precision_recall_accuracy(counts)
    
    if precision + recall == 0:
        return 0.0
    
    f1 = 2 * precision * recall / (precision + recall)
    return float(f1)

def fbeta_score(y_true, y_pred, beta=1.0):
    """
    Compute F-beta score.
    
    Parameters:
    -----------
    y_true : array-like
        True binary labels
    y_pred : array-like
        Predicted binary labels
    beta : float, default=1.0
        Weight of recall vs precision
        beta > 1: favor recall
        beta < 1: favor precision
    
    Returns:
    --------
    fbeta : float
        F-beta score
    """
    counts = confusion_counts(y_true, y_pred)
    precision, recall, _ = precision_recall_accuracy(counts)
    
    if (beta**2 * precision + recall) == 0:
        return 0.0
    
    fbeta = (1 + beta**2) * precision * recall / (beta**2 * precision + recall)
    return float(fbeta)

In [124]:
# Sample calls for F1 score
y_true_f1 = np.array([1, 1, 0, 1, 0, 1, 0, 0, 1, 1])
y_pred_f1 = np.array([1, 0, 0, 1, 0, 1, 1, 0, 1, 0])

f1 = f1_score(y_true_f1, y_pred_f1)
f2 = fbeta_score(y_true_f1, y_pred_f1, beta=2.0)
print(f"F1 score: {f1:.4f}")
print(f"F2 score (favors recall): {f2:.4f}")

F1 score: 0.7273
F2 score (favors recall): 0.6897


### 5.3 ROC Curve and AUC

**What is this?**
ROC (Receiver Operating Characteristic) curve visualizes classifier performance across all threshold values.

**How it works:**
1. For each threshold, compute TPR and FPR:
   - **TPR (True Positive Rate / Recall)**: $\frac{TP}{TP+FN}$ (y-axis)
   - **FPR (False Positive Rate)**: $\frac{FP}{FP+TN}$ (x-axis)
2. Plot (FPR, TPR) points for all thresholds
3. Connect points to form ROC curve

**AUC (Area Under Curve):**
- AUC = 1.0: Perfect classifier
- AUC = 0.5: Random classifier (diagonal line)
- AUC < 0.5: Worse than random (flip predictions!)

**Interpretation:**
- Closer to top-left corner = Better classifier
- AUC summarizes performance in single number
- Threshold-independent metric

**When to use:** Comparing classifiers, handling imbalanced data, when costs unknown

In [125]:
def compute_roc_curve(y_true, y_scores):
    """
    Compute ROC curve points.
    
    Parameters:
    -----------
    y_true : array-like
        True binary labels (0 or 1)
    y_scores : array-like
        Predicted scores/probabilities
    
    Returns:
    --------
    fpr : array
        False positive rates
    tpr : array
        True positive rates
    thresholds : array
        Thresholds used
    """
    y_true = np.asarray(y_true, int)
    y_scores = np.asarray(y_scores, float)
    
    # Get unique thresholds (sorted descending)
    thresholds = np.unique(y_scores)[::-1]
    
    fpr_list = []
    tpr_list = []
    
    for threshold in thresholds:
        y_pred = (y_scores >= threshold).astype(int)
        counts = confusion_counts(y_true, y_pred)
        
        # TPR = TP / (TP + FN)
        tpr = counts["TP"] / (counts["TP"] + counts["FN"]) if (counts["TP"] + counts["FN"]) > 0 else 0
        # FPR = FP / (FP + TN)
        fpr = counts["FP"] / (counts["FP"] + counts["TN"]) if (counts["FP"] + counts["TN"]) > 0 else 0
        
        fpr_list.append(fpr)
        tpr_list.append(tpr)
    
    return np.array(fpr_list), np.array(tpr_list), thresholds

def compute_auc(fpr, tpr):
    """Compute Area Under ROC Curve using trapezoidal rule."""
    # Sort by fpr
    indices = np.argsort(fpr)
    fpr_sorted = fpr[indices]
    tpr_sorted = tpr[indices]
    
    # Trapezoidal integration
    auc = float(np.trapz(tpr_sorted, fpr_sorted))
    return auc

In [126]:
# Sample calls for ROC curve and AUC
y_true_roc = np.array([1, 1, 0, 1, 0, 1, 0, 0, 1, 1])
y_scores_roc = np.array([0.9, 0.8, 0.3, 0.7, 0.2, 0.85, 0.4, 0.1, 0.75, 0.95])

fpr, tpr, thresholds = compute_roc_curve(y_true_roc, y_scores_roc)
auc_score = compute_auc(fpr, tpr)
print(f"AUC score: {auc_score:.4f}")

AUC score: 1.0000


### 5.4 Thresholding Score into Labels

**What is this?**
Converts continuous scores (e.g., probabilities, confidence values) into binary class labels.

**Algorithm:** 
- If score ≥ threshold → Predict class 1 (positive)
- If score < threshold → Predict class 0 (negative)

**Threshold selection:**
- **threshold = 0.5**: Default for balanced classes
- **Lower threshold**: More positive predictions (↑ recall, ↓ precision)
- **Higher threshold**: Fewer positive predictions (↓ recall, ↑ precision)

**Use cases:**
- After logistic regression (threshold probability)
- After SVM (threshold decision function)
- Tune threshold based on business costs

In [127]:
def score_to_label(scores, threshold=0.0):
    scores = np.asarray(scores, float)
    return (scores >= threshold).astype(int)


In [128]:
# Sample call for score_to_label
scores_thresh = np.array([-1.5, 0.5, 2.1, -0.3, 1.2])
labels_thresh = score_to_label(scores_thresh, threshold=0.0)
print(f"Scores: {scores_thresh}")
print(f"Labels (threshold=0.0): {labels_thresh}")

Scores: [-1.5  0.5  2.1 -0.3  1.2]
Labels (threshold=0.0): [0 1 1 0 1]


### 5.5 Cost-Sensitive Decisions

**What is this?**
Making classification decisions when different types of errors have different costs.

**Cost Components:**
- `c_fp`: Cost of False Positive (e.g., unnecessary treatment)
- `c_fn`: Cost of False Negative (e.g., missed disease)
- `c_tp`: Cost of True Positive (usually 0)
- `c_tn`: Cost of True Negative (usually 0)

**Total Cost:** $\text{Cost} = c_{FP} \cdot FP + c_{FN} \cdot FN + c_{TP} \cdot TP + c_{TN} \cdot TN$

**Threshold Sweeping:**
Try many threshold values and choose the one that minimizes total cost.

**Example:**
- Medical test: FN (missed disease) might cost 100x more than FP (unnecessary follow-up)
- Lower threshold to catch more cases (↑ recall) even if more false alarms

In [129]:
def total_cost_from_counts(counts, c_fp=1.0, c_fn=1.0, c_tp=0.0, c_tn=0.0):
    return (counts["FP"]*c_fp + counts["FN"]*c_fn + counts["TP"]*c_tp + counts["TN"]*c_tn)

def sweep_thresholds(y_true, scores, thresholds, c_fp=1.0, c_fn=1.0):
    y_true = np.asarray(y_true, int)
    scores = np.asarray(scores, float)
    best = None
    for t in thresholds:
        y_pred = (scores >= t).astype(int)
        counts = confusion_counts(y_true, y_pred)
        cost = total_cost_from_counts(counts, c_fp=c_fp, c_fn=c_fn)
        row = {"threshold": float(t), "cost": float(cost), **counts}
        if best is None or row["cost"] < best["cost"]:
            best = row
    return best


In [130]:
# Sample call for cost-sensitive decisions
y_true_cost = np.array([1, 0, 1, 0, 1, 1, 0, 0])
scores_cost = np.array([0.8, 0.3, 0.9, 0.2, 0.7, 0.85, 0.4, 0.15])
thresholds_to_try = np.linspace(0, 1, 11)

# FN costs 10x more than FP
best_result = sweep_thresholds(y_true_cost, scores_cost, thresholds_to_try, c_fp=1.0, c_fn=10.0)
print(f"Best threshold: {best_result['threshold']:.4f}")
print(f"Minimum cost: {best_result['cost']:.4f}")

Best threshold: 0.5000
Minimum cost: 0.0000


### 5.6 Logistic Regression (from scratch)

**What is this?**
A linear model for binary classification that predicts probabilities using the sigmoid function.

**Model:** $P(y=1|x) = \sigma(w^T x + b) = \frac{1}{1 + e^{-(w^T x + b)}}$

**How it works:**
1. **Predict probability:** Apply sigmoid to linear combination of features
2. **Loss function:** Negative log-likelihood (cross-entropy)
   - $L = -\sum [y \log(p) + (1-y)\log(1-p)]$
3. **Training:** Minimize loss using optimization (e.g., gradient descent, CG)

**Output:**
- `w`: Feature weights (coefficients)
- `b`: Bias (intercept)
- Higher probability → More confident in class 1

**Advantages:**
- Outputs calibrated probabilities
- Interpretable coefficients
- Works well for linearly separable data

In [131]:
import numpy as np

def logistic_predict_proba(X, w, b):
    X = np.asarray(X, float)
    z = X @ w + b
    return 1.0 / (1.0 + np.exp(-z))

def logistic_neg_loglik(params, X, y, eps=1e-12):
    X = np.asarray(X, float)
    y = np.asarray(y, float)
    w = params[:-1]
    b = params[-1]
    p = logistic_predict_proba(X, w, b)
    p = np.clip(p, eps, 1-eps)
    return float(np.sum(-(y*np.log(p) + (1-y)*np.log(1-p))))

from scipy import optimize

def fit_logistic(X, y):
    X = np.asarray(X, float)
    y = np.asarray(y, float)
    init = np.zeros(X.shape[1] + 1)
    res = optimize.minimize(lambda p: logistic_neg_loglik(p, X, y), init, method="CG")
    w = res.x[:-1]
    b = res.x[-1]
    return w, b, res


In [132]:
# Sample call for logistic regression
X_log, y_log = make_classification(n_samples=50, n_features=3, n_informative=3, n_redundant=0, random_state=42)
w_log, b_log, res_log = fit_logistic(X_log, y_log)
print(f"Logistic regression weights: {w_log}")
print(f"Logistic regression bias: {b_log:.4f}")

# Predict probabilities
probs_log = logistic_predict_proba(X_log[:5], w_log, b_log)
print(f"Sample probabilities: {probs_log}")

Logistic regression weights: [2.22986989 2.28435107 0.41216203]
Logistic regression bias: -1.9951
Sample probabilities: [0.03942043 0.07558844 0.93060092 0.14749589 0.89923138]


### 5.7 Vapnik–Chervonenkis (VC) Dimension

**What is this?**
A measure of the complexity/capacity of a hypothesis class - how flexible or expressive it is.

**Definition:** The VC dimension is the largest number of points that can be **shattered** by the hypothesis class.

**Shattering:** A set of $n$ points is shattered if the hypothesis class can perfectly separate them for ALL possible binary labelings (all $2^n$ ways).

**VC Dimensions for Common Classifiers:**
- **Linear classifier in $d$ dimensions**: VC dimension = $d + 1$
  - In 2D: Can shatter 3 points, but not all configurations of 4 points
- **Perceptron in $\mathbb{R}^d$**: VC dimension = $d + 1$
- **Decision tree of depth $h$**: VC dimension ≈ $2^h$
- **Neural network with $W$ weights**: VC dimension ≈ $O(W \log W)$

**Example: Linear classifier in 2D**
- Can shatter 3 points (VC dimension = 3)
- Cannot shatter 4 points in general position (e.g., XOR problem)

**Intuition:**
- Higher VC dimension = More expressive = Can fit more complex patterns
- But also = Higher risk of overfitting
- VC dimension tells us how many "effective parameters" a model has

**Why it matters:**
- Foundation for statistical learning theory
- Used to derive generalization bounds
- Helps understand model complexity without training

**Trade-off:**
- Low VC dimension: Risk underfitting (too simple)
- High VC dimension: Risk overfitting (too complex)
- Need enough data relative to VC dimension for good generalization

In [133]:
def vc_dimension_linear(d):
    """
    VC dimension of linear classifier in d-dimensional space.
    
    Parameters:
    -----------
    d : int
        Number of dimensions/features
    
    Returns:
    --------
    int : VC dimension (d + 1)
    """
    return d + 1

def can_shatter_check(n_points, vc_dim):
    """
    Check if n points can potentially be shattered given VC dimension.
    
    Parameters:
    -----------
    n_points : int
        Number of points to check
    vc_dim : int
        VC dimension of hypothesis class
    
    Returns:
    --------
    bool : True if n_points <= vc_dim (can be shattered)
    """
    return n_points <= vc_dim

In [134]:
# Sample calls for VC dimension
d_features = 10
vc_dim = vc_dimension_linear(d_features)
print(f"VC dimension of linear classifier in {d_features}D: {vc_dim}")
print(f"Can shatter 5 points: {can_shatter_check(5, vc_dim)}")
print(f"Can shatter 15 points: {can_shatter_check(15, vc_dim)}")

VC dimension of linear classifier in 10D: 11
Can shatter 5 points: True
Can shatter 15 points: False


### 5.8 Generalization Bounds

**What is this?**
Theoretical guarantees on how well a model trained on finite data will perform on unseen data.

**Main Idea:** With high probability, test error is close to training error, but depends on:
- Sample size $n$
- Model complexity (VC dimension $h$)
- Confidence level $\delta$

**VC Generalization Bound:**
With probability at least $1 - \delta$:
$$R_{\text{test}} \leq R_{\text{train}} + \sqrt{\frac{h(\log(2n/h) + 1) - \log(\delta/4)}{n}}$$

where:
- $R_{\text{test}}$ = True risk (test error)
- $R_{\text{train}}$ = Empirical risk (training error)
- $h$ = VC dimension
- $n$ = Number of training samples
- $\delta$ = Failure probability

**Key Insights:**
1. **Gap decreases with more data**: Bound is $O(1/\sqrt{n})$
2. **Gap increases with model complexity**: Higher VC dimension → Worse bound
3. **Trade-off**: Complex models may have lower training error but larger gap

**Practical Implications:**
- **Sample size requirements**: Need $n \gg h$ for good generalization
- **Model selection**: Simpler models generalize better with limited data
- **Overfitting detection**: If training error ≪ test error, you're overfitting

**Rademacher Complexity Bound (Alternative):**
$$R_{\text{test}} \leq R_{\text{train}} + 2\mathfrak{R}_n(\mathcal{H}) + \sqrt{\frac{\log(1/\delta)}{2n}}$$

where $\mathfrak{R}_n(\mathcal{H})$ measures how well the hypothesis class fits random noise.

**PAC Learning (Probably Approximately Correct):**
For $\epsilon$ accuracy and $\delta$ confidence, need sample size:
$$n = O\left(\frac{h + \log(1/\delta)}{\epsilon^2}\right)$$

**When to use these bounds:**
- Theoretical analysis of learning algorithms
- Determining how much data you need
- Understanding model selection trade-offs
- Note: Bounds are often loose in practice (pessimistic)

In [135]:
def vc_generalization_bound(train_error, n, h, delta=0.05):
    """
    Compute VC generalization bound on test error.
    
    Parameters:
    -----------
    train_error : float
        Empirical risk (training error rate)
    n : int
        Number of training samples
    h : int
        VC dimension of hypothesis class
    delta : float, default=0.05
        Failure probability (e.g., 0.05 for 95% confidence)
    
    Returns:
    --------
    upper_bound : float
        Upper bound on test error with probability 1-delta
    gap : float
        Generalization gap (difference from training error)
    """
    import math
    
    # VC bound formula
    term1 = h * (math.log(2 * n / h) + 1)
    term2 = -math.log(delta / 4)
    gap = math.sqrt((term1 + term2) / n)
    
    upper_bound = train_error + gap
    return float(upper_bound), float(gap)

def pac_sample_size(h, epsilon, delta=0.05):
    """
    Compute required sample size for PAC learning.
    
    Parameters:
    -----------
    h : int
        VC dimension
    epsilon : float
        Desired accuracy (error tolerance)
    delta : float, default=0.05
        Failure probability
    
    Returns:
    --------
    int : Minimum number of samples needed
    """
    import math
    
    # PAC formula: n = O((h + log(1/delta)) / epsilon^2)
    numerator = h + math.log(1 / delta)
    n = numerator / (epsilon ** 2)
    
    return int(np.ceil(n))

In [136]:
# Sample calls for generalization bounds
train_err = 0.05
n_samples = 1000
vc_dim = 10

bound, gap = vc_generalization_bound(train_err, n_samples, vc_dim, delta=0.05)
print(f"Training error: {train_err:.4f}")
print(f"Generalization bound (test error <= ): {bound:.4f}")
print(f"Generalization gap: {gap:.4f}")

# PAC sample size
required_n = pac_sample_size(h=10, epsilon=0.1, delta=0.05)
print(f"Required samples for PAC learning: {required_n}")

Training error: 0.0500
Generalization bound (test error <= ): 0.3095
Generalization gap: 0.2595
Required samples for PAC learning: 1300


**Comparing Bounds Across Different VC Dimensions:**

**Question:** If we use a classifier with smaller VC dimension, do we get a tighter bound?

**Answer:** YES! Lower VC dimension → Smaller generalization gap → Tighter bound on accuracy

**Formula reminder:**
$$\text{Test Error} \leq \text{Train Error} + \underbrace{\sqrt{\frac{h(\log(2n/h) + 1) - \log(\delta/4)}{n}}}_{\text{Gap depends on } h}$$

**Key insight:** The gap grows with $\sqrt{h}$, so:
- Classifier with VC-dim 3 has smaller gap than VC-dim 10
- More data (larger $n$) reduces the gap (grows as $1/\sqrt{n}$)
- Simpler models → Better guarantees (but may underfit!)

**Example comparison:**
Given same training error and sample size, which classifier gives tighter bound?

In [160]:
def compare_vc_bounds(train_error, n, vc_dims, delta=0.05):
    """
    Compare generalization bounds for different VC dimensions.
    
    Parameters:
    -----------
    train_error : float
        Training error (same for all classifiers)
    n : int
        Sample size
    vc_dims : list of int
        List of VC dimensions to compare
    delta : float
        Confidence level
    
    Returns:
    --------
    dict : Results for each VC dimension
    """
    results = {}
    
    for h in vc_dims:
        bound, gap = vc_generalization_bound(train_error, n, h, delta)
        results[h] = {
            'bound': bound,
            'gap': gap,
            'interval_width': 2 * gap  # ±gap around point estimate
        }
    
    return results

def answer_vc_comparison_question(train_error, n, h_current, h_alternative, delta=0.05):
    """
    Answer: "Would using VC-dim h_alternative give smaller interval than h_current?"
    
    Parameters:
    -----------
    train_error : float
        Training error
    n : int
        Number of samples (using "all data")
    h_current : int
        Current classifier's VC dimension
    h_alternative : int
        Alternative classifier's VC dimension
    delta : float
        Confidence level
    
    Returns:
    --------
    dict : Comparison results with answer
    """
    bound_curr, gap_curr = vc_generalization_bound(train_error, n, h_current, delta)
    bound_alt, gap_alt = vc_generalization_bound(train_error, n, h_alternative, delta)
    
    # Interval width is 2*gap (±gap)
    interval_curr = 2 * gap_curr
    interval_alt = 2 * gap_alt
    
    answer = interval_alt < interval_curr
    
    return {
        'h_current': h_current,
        'h_alternative': h_alternative,
        'current_bound': bound_curr,
        'current_gap': gap_curr,
        'current_interval_width': interval_curr,
        'alternative_bound': bound_alt,
        'alternative_gap': gap_alt,
        'alternative_interval_width': interval_alt,
        'smaller_interval': answer,
        'improvement': interval_curr - interval_alt if answer else 0
    }

In [161]:
# Example: Compare classifiers with different VC dimensions
train_error = 0.05  # 5% training error
n_samples = 1000    # Using all 1000 data points
delta = 0.05        # 95% confidence

# Compare VC dimensions: 3 vs 10 vs 50
vc_dimensions = [3, 10, 50]
comparison = compare_vc_bounds(train_error, n_samples, vc_dimensions, delta)

print("=" * 70)
print(f"Comparison: Train Error = {train_error:.3f}, n = {n_samples}, δ = {delta}")
print("=" * 70)
for h, result in comparison.items():
    print(f"\nVC Dimension = {h}:")
    print(f"  Upper Bound:      {result['bound']:.4f}")
    print(f"  Generalization Gap: {result['gap']:.4f}")
    print(f"  Interval Width:   ±{result['gap']:.4f} (total: {result['interval_width']:.4f})")

# Answer specific question: Would VC-dim 3 give smaller interval than VC-dim 10?
print("\n" + "=" * 70)
print("QUESTION: Would VC-dim 3 give smaller interval than VC-dim 10?")
print("=" * 70)
answer = answer_vc_comparison_question(train_error, n_samples, h_current=10, h_alternative=3, delta=delta)

print(f"\nCurrent (VC-dim {answer['h_current']}):")
print(f"  Bound: {answer['current_bound']:.4f}")
print(f"  Interval width: {answer['current_interval_width']:.4f}")

print(f"\nAlternative (VC-dim {answer['h_alternative']}):")
print(f"  Bound: {answer['alternative_bound']:.4f}")
print(f"  Interval width: {answer['alternative_interval_width']:.4f}")

print(f"\nANSWER: {'YES' if answer['smaller_interval'] else 'NO'}")
if answer['smaller_interval']:
    print(f"  → VC-dim {answer['h_alternative']} gives SMALLER interval")
    print(f"  → Improvement: {answer['improvement']:.4f} narrower interval")
else:
    print(f"  → VC-dim {answer['h_alternative']} gives LARGER interval")

Comparison: Train Error = 0.050, n = 1000, δ = 0.05

VC Dimension = 3:
  Upper Bound:      0.2140
  Generalization Gap: 0.1640
  Interval Width:   ±0.1640 (total: 0.3280)

VC Dimension = 10:
  Upper Bound:      0.3095
  Generalization Gap: 0.2595
  Interval Width:   ±0.2595 (total: 0.5191)

VC Dimension = 50:
  Upper Bound:      0.5387
  Generalization Gap: 0.4887
  Interval Width:   ±0.4887 (total: 0.9774)

QUESTION: Would VC-dim 3 give smaller interval than VC-dim 10?

Current (VC-dim 10):
  Bound: 0.3095
  Interval width: 0.5191

Alternative (VC-dim 3):
  Bound: 0.2140
  Interval width: 0.3280

ANSWER: YES
  → VC-dim 3 gives SMALLER interval
  → Improvement: 0.1911 narrower interval


# 6) High Dimensional Phenomena

### 6.1 Distance Concentration

**What is this?**
A surprising phenomenon: In high dimensions, distances between random points become very similar!

**The Curse of Dimensionality:**
As dimension $d$ increases:
- All points appear equally far apart
- Ratio $\frac{\text{max distance}}{\text{min distance}} \to 1$
- Intuition: In high-D space, there's "so much room" that everything spreads out

**Why it matters:**
- **Nearest neighbor methods fail**: "Nearest" and "farthest" become meaningless
- **Clustering becomes hard**: All points seem equally distant
- **Similarity search breaks down**: Need dimension reduction!

**Example:** 
In 2D: Some points close, some far (ratio ~5-10)
In 1000D: All points ~same distance (ratio ~1.1)

**Takeaway:** High-dimensional data needs special treatment (PCA, manifold learning, etc.)

In [137]:
def distance_concentration_demo(n=2000, d=2, seed=0):
    rng = np.random.default_rng(seed)
    X = rng.normal(0, 1, size=(n, d))
    # distances from first point
    diffs = X - X[0]
    dist = np.linalg.norm(diffs, axis=1)[1:]
    return {
        "mean_dist": float(np.mean(dist)),
        "min_dist": float(np.min(dist)),
        "max_dist": float(np.max(dist)),
        "ratio_max_min": float(np.max(dist)/np.min(dist))
    }


In [138]:
# Sample call for distance_concentration_demo
# Compare 2D vs 100D
result_2d = distance_concentration_demo(n=1000, d=2, seed=42)
result_100d = distance_concentration_demo(n=1000, d=100, seed=42)
print(f"2D - ratio max/min: {result_2d['ratio_max_min']:.4f}")
print(f"100D - ratio max/min: {result_100d['ratio_max_min']:.4f}")

2D - ratio max/min: 104.3617
100D - ratio max/min: 1.4944


### 6.2 Geometry of Hyperspheres

**What is this?**
Counter-intuitive properties of spheres in high-dimensional spaces.

**Volume Concentration:** In high dimensions, almost all the volume of a hypersphere is concentrated near its surface!

**Key Facts:**
1. **Volume of $d$-dimensional unit sphere**: $V_d = \frac{\pi^{d/2}}{\Gamma(d/2 + 1)}$
   - Initially grows with dimension
   - Peaks around $d \approx 5$
   - Then decays exponentially to 0!

2. **Volume in a shell**: Consider shell from radius $r$ to $1$ (with $r$ close to 1, e.g., $r=0.99$)
   - Fraction of volume in shell: $1 - r^d$
   - For $r=0.99$, $d=100$: Shell contains $>63\%$ of total volume
   - As $d \to \infty$, nearly all volume is near the surface

3. **Distance from center**: Random point in unit sphere has distance from center ≈ 1 for large $d$

**Implications:**
- **Curse of dimensionality**: Hard to sample interior of high-D sphere
- **Distance metrics**: Most points are far from center
- **Data distribution**: Real data often lies near surface of hypersphere when normalized

**Formula for shell volume:** 
$$\frac{V_{\text{shell}}}{V_{\text{sphere}}} = 1 - r^d$$

**Example:** In 100D, 99% of volume is in outer 10% radius shell!

In [139]:
from scipy.special import gamma

def hypersphere_volume(d, r=1.0):
    """
    Compute volume of d-dimensional hypersphere of radius r.
    
    Parameters:
    -----------
    d : int
        Dimension
    r : float, default=1.0
        Radius
    
    Returns:
    --------
    float : Volume of the hypersphere
    """
    volume = (np.pi ** (d/2)) / gamma(d/2 + 1) * (r ** d)
    return float(volume)

def hypersphere_shell_fraction(d, r_inner):
    """
    Fraction of hypersphere volume in shell from r_inner to 1.
    
    Parameters:
    -----------
    d : int
        Dimension
    r_inner : float
        Inner radius (0 < r_inner < 1)
    
    Returns:
    --------
    float : Fraction of volume in shell
    """
    fraction = 1 - (r_inner ** d)
    return float(fraction)

def hypersphere_sample_radius_stats(n_samples, d, seed=0):
    """
    Statistics of distances from origin for random points in unit hypersphere.
    
    Returns mean, std, min, max of radii.
    """
    rng = np.random.default_rng(seed)
    
    # Sample from unit sphere using normal distribution + normalization
    X = rng.normal(0, 1, size=(n_samples, d))
    radii = np.linalg.norm(X, axis=1)
    
    # Scale to unit sphere
    X = X / radii[:, np.newaxis]
    
    # For uniform distribution in sphere, scale by r^(1/d)
    r = rng.uniform(0, 1, size=n_samples) ** (1/d)
    X = X * r[:, np.newaxis]
    
    radii = np.linalg.norm(X, axis=1)
    
    return {
        "mean": float(np.mean(radii)),
        "std": float(np.std(radii)),
        "min": float(np.min(radii)),
        "max": float(np.max(radii))
    }

In [140]:
# Sample calls for hypersphere geometry
vol_3d = hypersphere_volume(d=3, r=1.0)
vol_10d = hypersphere_volume(d=10, r=1.0)
print(f"Unit hypersphere volume in 3D: {vol_3d:.4f}")
print(f"Unit hypersphere volume in 10D: {vol_10d:.6f}")

# Shell fraction
shell_frac_100d = hypersphere_shell_fraction(d=100, r_inner=0.99)
print(f"Fraction of volume in outer 1% shell (100D): {shell_frac_100d:.4f}")

Unit hypersphere volume in 3D: 4.1888
Unit hypersphere volume in 10D: 2.550164
Fraction of volume in outer 1% shell (100D): 0.6340


### 6.3 Geometry of Hypercubes

**What is this?**
Surprising properties of cubes (unit hypercubes $[0,1]^d$) in high dimensions.

**Key Facts:**

1. **Volume stays constant**: Unit hypercube always has volume = 1
   - Unlike hypersphere which shrinks!

2. **Diagonal length grows**: $\sqrt{d}$
   - In 2D: diagonal = $\sqrt{2} \approx 1.41$
   - In 100D: diagonal = $10$
   - In 10,000D: diagonal = $100$
   - Points at opposite corners get arbitrarily far apart!

3. **Inscribed sphere shrinks relative to cube**:
   - Largest sphere fitting in unit cube has radius $1/2$
   - Volume ratio: $\frac{V_{\text{sphere}}}{V_{\text{cube}}} = \frac{\pi^{d/2}}{2^d \cdot \Gamma(d/2+1)} \to 0$ as $d \to \infty$
   - In high dimensions, cube is "mostly corners"!

4. **Most volume is in corners**:
   - Consider inscribed sphere of radius $r=1/2$ (largest that fits)
   - Volume outside sphere = $1 - \frac{\pi^{d/2}}{2^d \cdot \Gamma(d/2+1)}$
   - For $d=10$: ~99.8% of cube volume is outside inscribed sphere
   - Random point in cube is almost surely far from center!

5. **Distance from center**: Random point in $[-1,1]^d$ cube has expected squared distance from origin = $d/3$
   - Typical distance ≈ $\sqrt{d/3}$

**Implications:**
- **Sampling**: Uniform sampling gives points mostly near corners
- **Optimization**: Search space is dominated by corners
- **Data analysis**: Center of cube is "empty" - most data near boundaries

**Why it matters:**
- Explains why high-D optimization is hard
- Motivates dimension reduction
- Shows limits of grid-based methods in high dimensions

In [141]:
def hypercube_diagonal_length(d):
    """
    Compute diagonal length of unit hypercube [0,1]^d.
    
    Parameters:
    -----------
    d : int
        Dimension
    
    Returns:
    --------
    float : Diagonal length (sqrt(d))
    """
    return float(np.sqrt(d))

def hypercube_inscribed_sphere_ratio(d):
    """
    Ratio of inscribed sphere volume to hypercube volume.
    
    The largest sphere that fits in unit cube [0,1]^d has radius 1/2.
    This computes what fraction of the cube's volume it occupies.
    
    Parameters:
    -----------
    d : int
        Dimension
    
    Returns:
    --------
    float : Volume ratio (goes to 0 as d increases!)
    """
    from scipy.special import gamma
    
    # Sphere radius = 0.5 (to fit in unit cube)
    # Sphere volume = pi^(d/2) / Gamma(d/2 + 1) * r^d
    sphere_vol = (np.pi ** (d/2)) / gamma(d/2 + 1) * (0.5 ** d)
    cube_vol = 1.0
    
    return float(sphere_vol / cube_vol)

def hypercube_sample_distance_stats(n_samples, d, seed=0):
    """
    Statistics of distances from origin for random points in unit cube [-1,1]^d.
    
    Returns mean, std of squared distances.
    """
    rng = np.random.default_rng(seed)
    
    # Sample uniformly from [-1, 1]^d
    X = rng.uniform(-1, 1, size=(n_samples, d))
    
    # Distances from origin
    squared_distances = np.sum(X**2, axis=1)
    
    return {
        "mean_squared_dist": float(np.mean(squared_distances)),
        "expected_squared_dist": d / 3.0,  # Theoretical value
        "typical_dist": float(np.sqrt(np.mean(squared_distances)))
    }

In [142]:
# Sample calls for hypercube geometry
diag_10d = hypercube_diagonal_length(10)
print(f"Diagonal length of 10D unit cube: {diag_10d:.4f}")

sphere_ratio_10d = hypercube_inscribed_sphere_ratio(10)
print(f"Inscribed sphere / cube volume ratio (10D): {sphere_ratio_10d:.6f}")

Diagonal length of 10D unit cube: 3.1623
Inscribed sphere / cube volume ratio (10D): 0.002490


### 6.4 Random Projections (Johnson-Lindenstrauss Lemma)

**What is this?**
A powerful dimensionality reduction technique: projecting high-dimensional data onto random lower-dimensional subspace preserves distances approximately!

**Johnson-Lindenstrauss Lemma:**
For any set of $n$ points in $\mathbb{R}^d$ and any $0 < \epsilon < 1$, there exists a linear map $f: \mathbb{R}^d \to \mathbb{R}^k$ where:
$$k = O\left(\frac{\log n}{\epsilon^2}\right)$$

such that for all pairs of points $u, v$:
$$(1-\epsilon)\|u-v\|^2 \leq \|f(u)-f(v)\|^2 \leq (1+\epsilon)\|u-v\|^2$$

**Key Points:**
1. **Target dimension depends on $n$, not $d$**: Can reduce from millions of dimensions to hundreds!
2. **Random projection works**: Use random Gaussian matrix $R \in \mathbb{R}^{k \times d}$
3. **Distances preserved**: Pairwise distances change by at most factor $(1 \pm \epsilon)$

**Algorithm:**
1. Generate random matrix $R$ with entries $\sim N(0, 1/k)$
2. Project: $X_{\text{low}} = X \cdot R^T / \sqrt{k}$
3. Result: Dimension reduced from $d$ to $k$ with distances approximately preserved

**Minimum target dimension:** 
$$k \geq \frac{4 \log n}{\epsilon^2 - \epsilon^3}$$

**Example:**
- Original: 10,000 dimensions
- Data: 1,000 points
- Target: $\epsilon = 0.1$ (10% error)
- Required: $k \approx 920$ dimensions
- Reduction: $>10\times$ smaller!

**When to use:**
- **Approximate nearest neighbor search**: Speed up with minimal accuracy loss
- **Fast distances**: Computing distances in lower dimension
- **Preprocessing**: Before clustering, classification
- **Privacy**: Random projection can obscure original data
- **Streaming data**: Online dimension reduction

**Advantages:**
- Very fast (just matrix multiplication)
- No training required (purely random)
- Theory guarantees quality
- Works for any data distribution

**Limitations:**
- Only preserves distances (not other structure)
- Requires $k = O(\log n / \epsilon^2)$ which can still be large for huge $n$
- Approximate method (not exact)

In [143]:
def johnson_lindenstrauss_min_dim(n_samples, epsilon=0.1):
    """
    Compute minimum target dimension for Johnson-Lindenstrauss.
    
    Parameters:
    -----------
    n_samples : int
        Number of data points
    epsilon : float, default=0.1
        Maximum distortion (e.g., 0.1 for 10% error)
    
    Returns:
    --------
    int : Minimum dimension k
    """
    k = 4 * np.log(n_samples) / (epsilon**2 - epsilon**3)
    return int(np.ceil(k))

def random_projection_matrix(d_original, k_target, seed=0):
    """
    Generate random projection matrix for Johnson-Lindenstrauss.
    
    Parameters:
    -----------
    d_original : int
        Original dimension
    k_target : int
        Target dimension
    seed : int, default=0
        Random seed
    
    Returns:
    --------
    R : array, shape (k_target, d_original)
        Random projection matrix
    """
    rng = np.random.default_rng(seed)
    
    # Gaussian random matrix
    R = rng.normal(0, 1, size=(k_target, d_original))
    
    # Normalize by sqrt(k) for variance preservation
    R = R / np.sqrt(k_target)
    
    return R

def random_projection(X, k_target, seed=0):
    """
    Apply random projection to reduce dimensionality.
    
    Parameters:
    -----------
    X : array, shape (n_samples, d_original)
        Original high-dimensional data
    k_target : int
        Target dimension
    seed : int, default=0
        Random seed
    
    Returns:
    --------
    X_projected : array, shape (n_samples, k_target)
        Projected lower-dimensional data
    R : array, shape (k_target, d_original)
        Projection matrix (for applying to new data)
    """
    X = np.asarray(X, float)
    n_samples, d_original = X.shape
    
    # Generate random projection matrix
    R = random_projection_matrix(d_original, k_target, seed)
    
    # Project data: X_new = X @ R.T
    X_projected = X @ R.T
    
    return X_projected, R

def verify_distance_preservation(X, X_projected, epsilon=0.1, n_pairs=100, seed=0):
    """
    Verify that random projection preserves pairwise distances.
    
    Parameters:
    -----------
    X : array
        Original data
    X_projected : array
        Projected data
    epsilon : float
        Expected distortion tolerance
    n_pairs : int
        Number of random pairs to check
    seed : int
        Random seed
    
    Returns:
    --------
    dict : Statistics on distance preservation
    """
    rng = np.random.default_rng(seed)
    n_samples = X.shape[0]
    
    # Sample random pairs
    pairs = rng.choice(n_samples, size=(n_pairs, 2), replace=True)
    
    ratios = []
    for i, j in pairs:
        if i == j:
            continue
        
        # Original distance
        dist_orig = np.linalg.norm(X[i] - X[j])
        
        # Projected distance
        dist_proj = np.linalg.norm(X_projected[i] - X_projected[j])
        
        if dist_orig > 0:
            ratio = dist_proj / dist_orig
            ratios.append(ratio)
    
    ratios = np.array(ratios)
    
    # Check how many are within (1-epsilon, 1+epsilon)
    within_bound = np.sum((ratios >= 1-epsilon) & (ratios <= 1+epsilon))
    
    return {
        "mean_ratio": float(np.mean(ratios)),
        "std_ratio": float(np.std(ratios)),
        "min_ratio": float(np.min(ratios)),
        "max_ratio": float(np.max(ratios)),
        "fraction_within_bound": float(within_bound / len(ratios)),
        "expected_bound": (1-epsilon, 1+epsilon)
    }

In [144]:
# Sample calls for random projections (Johnson-Lindenstrauss)
n_points = 500
min_dim = johnson_lindenstrauss_min_dim(n_points, epsilon=0.1)
print(f"Minimum dimension for {n_points} points (ε=0.1): {min_dim}")

# Apply random projection
X_high = np.random.randn(100, 200)  # 100 samples, 200 dimensions
X_low, R = random_projection(X_high, k_target=50, seed=42)
print(f"Original shape: {X_high.shape}, Projected shape: {X_low.shape}")

Minimum dimension for 500 points (ε=0.1): 2763
Original shape: (100, 200), Projected shape: (100, 50)


# 7) Dimensionality Reduction & PCA

### 7.1 PCA using SVD

**What is this?**
Principal Component Analysis - finds the directions of maximum variance in your data.

**How it works:**
1. **Center the data**: Subtract mean from each feature
2. **Apply SVD**: $X_c = U \Sigma V^T$
3. **Principal components**: Columns of $V$ (or rows of $V^T$)
4. **Variance explained**: Proportional to squared singular values $\sigma_i^2$

**Key outputs:**
- **mu**: Mean of original data (for centering)
- **Vt**: Principal component directions (rows are PCs)
- **s**: Singular values (related to variance)
- **evr**: Explained variance ratio per component
- **cum_evr**: Cumulative explained variance

**Transform:** Project data onto first $k$ components: $Z = X_c V_k$
**Inverse:** Reconstruct: $\hat{X} = Z V_k^T + \mu$

**Use cases:** Dimensionality reduction, visualization, noise removal, feature extraction

In [145]:
def pca_fit(X, center=True):
    X = np.asarray(X, float)
    mu = X.mean(axis=0) if center else np.zeros(X.shape[1])
    Xc = X - mu
    U, s, Vt = np.linalg.svd(Xc, full_matrices=False)
    # explained variance ratio
    var = s**2
    evr = var / var.sum()
    return {"mu": mu, "U": U, "s": s, "Vt": Vt, "evr": evr, "cum_evr": np.cumsum(evr)}

def pca_transform(X, pca, k):
    X = np.asarray(X, float)
    Xc = X - pca["mu"]
    V = pca["Vt"][:k].T
    Z = Xc @ V
    return Z

def pca_inverse_transform(Z, pca, k):
    Z = np.asarray(Z, float)
    V = pca["Vt"][:k].T
    Xhat = Z @ V.T + pca["mu"]
    return Xhat


In [146]:
# Sample calls for PCA
X_pca = np.random.randn(100, 20)
pca_result = pca_fit(X_pca, center=True)
print(f"Explained variance ratio (first 5 components): {pca_result['evr'][:5]}")
print(f"Cumulative variance (first 5): {pca_result['cum_evr'][:5]}")

# Transform to 3 components
Z = pca_transform(X_pca, pca_result, k=3)
print(f"Transformed shape: {Z.shape}")

# Inverse transform (reconstruct)
X_reconstructed = pca_inverse_transform(Z, pca_result, k=3)
print(f"Reconstructed shape: {X_reconstructed.shape}")

Explained variance ratio (first 5 components): [0.10479217 0.08844768 0.08157502 0.07323637 0.06681447]
Cumulative variance (first 5): [0.10479217 0.19323985 0.27481487 0.34805124 0.41486571]
Transformed shape: (100, 3)
Reconstructed shape: (100, 20)


### 7.2 Choosing Number of Components (Keep X% Variance)

**What is this?**
Determines how many principal components to keep based on desired variance explained.

**Algorithm:**
Find the smallest $k$ such that cumulative variance ≥ target (e.g., 90%)

**Example:**
```
Components: [1,    2,    3,    4,    5]
Cum. Var:   [0.5, 0.7, 0.85, 0.92, 0.95]
Target = 0.90 → Choose k=4
```

**Common targets:**
- **90%**: Good balance (retains most information, removes noise)
- **95%**: More conservative (less information loss)
- **99%**: Very conservative (mostly for compression)

**Trade-off:** More components = More information but higher dimension

In [147]:
def choose_k(cum_evr, target=0.90):
    cum_evr = np.asarray(cum_evr, float)
    return int(np.searchsorted(cum_evr, target) + 1)


In [148]:
# Sample call for choose_k
cum_var = np.array([0.4, 0.65, 0.80, 0.88, 0.93, 0.96, 0.98])
k_90 = choose_k(cum_var, target=0.90)
print(f"Number of components to keep 90% variance: {k_90}")

Number of components to keep 90% variance: 5


### 7.3 Scree Plot for Explained Variance

**What is this?**
A visualization showing variance explained by each principal component, helping decide how many components to keep.

**How to read it:**
- X-axis: Component number
- Y-axis: Explained variance (or variance ratio)
- Look for "elbow" where curve flattens

**Elbow Method:**
- Sharp drop initially = Components capture real structure
- Flat tail = Components capture noise
- Choose k at the elbow (where diminishing returns start)

**Alternative: Cumulative plot:**
Shows running total of variance explained
- Find where curve reaches target (e.g., 90%, 95%)

**Example interpretation:**
```
Components 1-3: Steep drop (keep these)
Components 4+: Flat line (likely noise)
→ Choose k=3
```

In [149]:
def plot_scree(pca_result, max_components=None):
    """
    Create data for scree plot visualization.
    
    Parameters:
    -----------
    pca_result : dict
        Result from pca_fit function
    max_components : int, optional
        Maximum number of components to show
    
    Returns:
    --------
    component_nums : array
        Component numbers (1, 2, 3, ...)
    variance_ratios : array
        Explained variance ratio for each component
    cumulative_variance : array
        Cumulative explained variance
    """
    evr = pca_result["evr"]
    cum_evr = pca_result["cum_evr"]
    
    if max_components is not None:
        evr = evr[:max_components]
        cum_evr = cum_evr[:max_components]
    
    component_nums = np.arange(1, len(evr) + 1)
    
    return component_nums, evr, cum_evr

def find_elbow_point(variance_ratios, method='max_curvature'):
    """
    Automatically detect elbow point in scree plot.
    
    Parameters:
    -----------
    variance_ratios : array-like
        Explained variance ratios
    method : str, default='max_curvature'
        Method to find elbow ('max_curvature' or 'threshold')
    
    Returns:
    --------
    elbow_idx : int
        Index of elbow point (0-based)
    """
    var = np.asarray(variance_ratios, float)
    
    if method == 'max_curvature':
        # Find point with maximum curvature
        # Approximate second derivative
        if len(var) < 3:
            return 0
        
        curvature = np.abs(np.diff(var, n=2))
        elbow_idx = int(np.argmax(curvature))
        return elbow_idx
    
    elif method == 'threshold':
        # Find where variance drops below threshold
        threshold = 0.05  # 5% variance
        below_threshold = np.where(var < threshold)[0]
        if len(below_threshold) > 0:
            return int(below_threshold[0])
        return len(var) - 1
    
    return 0

In [150]:
# Sample calls for scree plot functions
X_scree = np.random.randn(100, 10)
pca_scree = pca_fit(X_scree)

# Get scree plot data
comp_nums, var_ratios, cum_var = plot_scree(pca_scree, max_components=10)
print(f"Component numbers: {comp_nums}")
print(f"Variance ratios: {var_ratios}")
print(f"Cumulative variance: {cum_var}")

# Find elbow point
elbow_idx = find_elbow_point(var_ratios, method='max_curvature')
print(f"Elbow point at component: {elbow_idx + 1}")

Component numbers: [ 1  2  3  4  5  6  7  8  9 10]
Variance ratios: [0.16576588 0.15556487 0.13372965 0.10286273 0.09680176 0.08615833
 0.07701844 0.07238031 0.06274974 0.0469683 ]
Cumulative variance: [0.16576588 0.32133075 0.4550604  0.55792313 0.65472488 0.74088321
 0.81790165 0.89028197 0.9530317  1.        ]
Elbow point at component: 3


### 7.4 Reconstruction Error / Anomaly Detection

**What is this?**
Uses PCA reconstruction error to detect outliers/anomalies in data.

**How it works:**
1. **Fit PCA** with $k$ components (keeping most variance)
2. **Project & reconstruct**: $\hat{X} = \text{PCA}^{-1}(\text{PCA}(X))$
3. **Compute error**: $\text{error}_i = \|X_i - \hat{X}_i\|$
4. **Find anomalies**: Largest errors are outliers

**Why it works:**
- Normal data follows principal patterns (low reconstruction error)
- Anomalies don't fit main patterns (high reconstruction error)
- PCA captures "normal" structure, anomalies deviate

**Use cases:**
- Fraud detection
- Manufacturing defect detection
- Network intrusion detection
- Data quality checks

**Tip:** Use fewer components (smaller k) for more sensitive anomaly detection

In [151]:
def reconstruction_error(X, Xhat):
    X = np.asarray(X, float)
    Xhat = np.asarray(Xhat, float)
    return np.linalg.norm(X - Xhat, axis=1)

def top_anomalies(errors, k=10):
    idx = np.argsort(errors)[::-1][:k]
    return idx, errors[idx]


In [152]:
# Sample calls for reconstruction error / anomaly detection
X_anom = np.random.randn(50, 10)
pca_anom = pca_fit(X_anom)
Z_anom = pca_transform(X_anom, pca_anom, k=3)
X_recon = pca_inverse_transform(Z_anom, pca_anom, k=3)

errors = reconstruction_error(X_anom, X_recon)
top_idx, top_errors = top_anomalies(errors, k=5)
print(f"Top 5 anomalies (indices): {top_idx}")
print(f"Their reconstruction errors: {top_errors}")

Top 5 anomalies (indices): [27  6 12  2 19]
Their reconstruction errors: [3.57732622 3.53665016 3.47471192 3.14327573 3.11653902]


### 7.5 PCA Standardization

**What is this?**
Scales features to have mean=0 and standard deviation=1 before PCA.

**Why standardize?**
- Features with large scales dominate PCA
- Example: Feature A in [0,1000], Feature B in [0,1]
  - Without standardization: PC1 ≈ Feature A direction
  - With standardization: Equal importance to both

**When to standardize:**
- ✓ Features have different units (meters, dollars, age)
- ✓ Features have very different scales
- ✗ Features already on same scale (e.g., all pixel values 0-255)
- ✗ Scale is meaningful for your problem

**Formula:** $X_{\text{std}} = \frac{X - \mu}{\sigma}$

**Best practice:** Fit standardization on training data, apply same transform to test data

In [153]:
def standardize_fit(X, eps=1e-12):
    X = np.asarray(X, float)
    mu = X.mean(axis=0)
    sd = X.std(axis=0, ddof=0)
    sd = np.maximum(sd, eps)
    return mu, sd

def standardize_apply(X, mu, sd):
    X = np.asarray(X, float)
    return (X - mu) / sd


In [154]:
# Sample calls for standardization
X_std = np.array([[1, 100], [2, 200], [3, 300], [4, 400]])
mu_std, sd_std = standardize_fit(X_std)
X_standardized = standardize_apply(X_std, mu_std, sd_std)
print(f"Original data:\n{X_std}")
print(f"Standardized data:\n{X_standardized}")
print(f"Standardized mean: {np.mean(X_standardized, axis=0)}")
print(f"Standardized std: {np.std(X_standardized, axis=0)}")

Original data:
[[  1 100]
 [  2 200]
 [  3 300]
 [  4 400]]
Standardized data:
[[-1.34164079 -1.34164079]
 [-0.4472136  -0.4472136 ]
 [ 0.4472136   0.4472136 ]
 [ 1.34164079  1.34164079]]
Standardized mean: [0. 0.]
Standardized std: [1. 1.]


### 7.6 Whitening Transformation

**What is this?**
Transforms data so features are uncorrelated AND have unit variance (stricter than standardization).

**Formula:** $X_{\text{whitened}} = X_c \Sigma^{-1/2} V^T$

Where:
- $X_c$ = Centered data
- $\Sigma$ = Diagonal matrix of singular values
- $V$ = PCA component directions

**Effect:**
- Removes correlation between features
- Makes all features have variance = 1
- Spheres the data distribution

**When to use:**
- Before ICA (Independent Component Analysis)
- Some neural networks benefit
- When you want truly independent features

**Difference from standardization:**
- Standardization: Per-feature scaling (doesn't remove correlation)
- Whitening: Decorrelates AND scales

In [155]:
def pca_whiten(X, pca=None, eps=1e-5):
    """
    Apply whitening transformation using PCA.
    
    Parameters:
    -----------
    X : array-like
        Data to whiten
    pca : dict, optional
        Pre-fitted PCA result. If None, fits PCA on X
    eps : float, default=1e-5
        Small constant to prevent division by zero
    
    Returns:
    --------
    X_whitened : array
        Whitened data
    pca_result : dict
        PCA parameters (for inverse transform)
    """
    X = np.asarray(X, float)
    
    # Fit PCA if not provided
    if pca is None:
        pca = pca_fit(X, center=True)
    
    # Center data
    X_centered = X - pca["mu"]
    
    # Whiten: X_white = X_c @ V @ Sigma^(-1)
    # This decorrelates and scales to unit variance
    V = pca["Vt"].T  # Principal components as columns
    s = pca["s"]
    
    # Prevent division by very small singular values
    s_inv = 1.0 / np.maximum(s, eps)
    
    X_whitened = X_centered @ V @ np.diag(s_inv)
    
    return X_whitened, pca

def pca_unwhiten(X_whitened, pca):
    """
    Inverse whitening transformation.
    
    Parameters:
    -----------
    X_whitened : array-like
        Whitened data
    pca : dict
        PCA parameters from pca_whiten
    
    Returns:
    --------
    X : array
        Original-scale data
    """
    X_whitened = np.asarray(X_whitened, float)
    
    V = pca["Vt"].T
    s = pca["s"]
    
    # Reverse transformation
    X_centered = X_whitened @ np.diag(s) @ V.T
    X = X_centered + pca["mu"]
    
    return X

In [156]:
# Sample calls for whitening
X_whiten = np.random.randn(100, 5) @ np.array([[2, 0.5], [0.5, 1], [0, 0], [0, 0], [0, 0]])
X_whitened, pca_white = pca_whiten(X_whiten)
print(f"Original shape: {X_whiten.shape}")
print(f"Whitened shape: {X_whitened.shape}")
print(f"Whitened covariance (should be ~identity):\n{np.cov(X_whitened.T)}")

# Unwhiten
X_unwhitened = pca_unwhiten(X_whitened, pca_white)
print(f"Reconstruction error: {np.max(np.abs(X_whiten - X_unwhitened)):.6f}")

Original shape: (100, 2)
Whitened shape: (100, 2)
Whitened covariance (should be ~identity):
[[1.01010101e-02 4.25892897e-19]
 [4.25892897e-19 1.01010101e-02]]
Reconstruction error: 0.000000
