In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.linear_model import LogisticRegression

PROBLEM 1: Data analysis using markov chians 

In this problem, you will empirically analyze a Markov chain 
with a finite state space. Transition probabilities are unknown.

The state space is:
    S = {0, 1, 2, 3}

You are given the data for the observed X_t for t  = 0..19

Tasks:
1. Estimate the transition matrix P from the observed transitions.
2. Verify that the estimated matrix is a probability transition matrix.
3. Compute the stationary distribution pi of the chain.
4. Simulate the chain using the estimated transition matrix
5. Compute the expected hitting times via

   (a) Simulation

   (b) Solving linear equations (analytical hitting times). 

Compare the estimates and interpret the results


In [3]:
import numpy as np

# state space
S = [0, 1, 2, 3]
N_states = len(S)

# Observed transitions: each row is (current_state, next_state)
X_t = np.array([
    [0, 1],
    [1, 2],
    [2, 3],
    [3, 0],
    [0, 1],
    [1, 1],
    [1, 2],
    [2, 2],
    [2, 3],
    [3, 3],
    [3, 0],
    [0, 2],
    [2, 1],
    [1, 3],
    [3, 1],
    [1, 0],
    [0, 0],
    [0, 1],
    [1, 2],
    [2, 0],
], dtype=int)




Below are methods that you need to complete

In [6]:
import numpy as np

# 1.1 Estimate transition matrix from observed transitions

def comp_transition_matrix(transitions, n_states):
    """
    Estimate the transition matrix P from observed transitions.

    transitions : array of shape (n_samples, 2)
        Each row contains (current_state, next_state)
    n_states : int
        Total number of states in the Markov chain

    Returns:
        P_hat : (n_states x n_states) numpy array
            Empirical estimate of transition probabilities
    """

    # Initialize matrix of transition counts
    P_hat = np.zeros((n_states, n_states), dtype=float)

    # Count how many times each transition i -> j occurs
    for i, j in transitions:
        P_hat[i, j] += 1.0

    # Compute total outgoing transitions from each state i
    row_sums = P_hat.sum(axis=1, keepdims=True)

    # Normalize rows to convert counts into probabilities
    # States with no outgoing transitions remain zero rows
    with np.errstate(divide='ignore', invalid='ignore'):
        P_hat = np.where(row_sums > 0, P_hat / row_sums, 0.0)

    return P_hat


# 1.2 Check if a matrix is a valid transition matrix

def is_transition_matrix(P, tol=1e-12):
    """
    Check whether P is a valid Markov transition matrix.

    Conditions:
    1. P is square
    2. All entries are non-negative
    3. Each row sums to 1 (within numerical tolerance)
    """

    P = np.asarray(P, dtype=float)

    # Check matrix is square
    if P.ndim != 2 or P.shape[0] != P.shape[1]:
        return False

    # Check non-negativity (allow tiny numerical errors)
    if np.any(P < -tol):
        return False

    # Each row must sum to 1
    row_sums = P.sum(axis=1)
    return np.all(np.abs(row_sums - 1.0) <= 1e-8)


# 1.3 Compute stationary distribution

def stationary_distribution(P):
    """
    Compute a stationary distribution pi such that:

        pi P = pi
        sum(pi) = 1
        pi_i >= 0

    Uses the eigenvector method with a linear-system fallback.
    """

    P = np.asarray(P, dtype=float)

    # Compute eigenvalues and eigenvectors of P^T
    # Stationary distribution is a left eigenvector with eigenvalue 1
    w, v = np.linalg.eig(P.T)

    # Find eigenvalue closest to 1
    idx = np.argmin(np.abs(w - 1.0))
    pi = np.real(v[:, idx])

    # Enforce non-negativity (eigenvectors are defined up to sign)
    pi = np.where(pi < 0, -pi, pi)

    # Normalize to make it a probability distribution
    s = pi.sum()

    if s == 0:
        # Fallback: solve (P^T - I) pi = 0 with sum(pi) = 1
        n = P.shape[0]
        A = np.vstack([P.T - np.eye(n), np.ones(n)])
        b = np.zeros(n + 1)
        b[-1] = 1

        pi, *_ = np.linalg.lstsq(A, b, rcond=None)
        pi = np.maximum(pi, 0)
        pi = pi / pi.sum()
    else:
        pi = pi / s

    return pi


# 1.4 Simulate a Markov chain trajectory

def simulate_chain(P, start_state, n_steps):
    """
    Simulate a Markov chain path with a fixed random seed.

    Returns:
        path : array of length n_steps + 1
            path[t] is the state at time t
    """

    seed = 1234  # fixed seed for reproducibility
    rng = np.random.default_rng(seed)

    P = np.asarray(P, dtype=float)
    n_states = P.shape[0]

    path = np.zeros(n_steps + 1, dtype=int)
    path[0] = start_state

    cur = start_state
    for t in range(1, n_steps + 1):
        # Sample next state using the transition probabilities
        cur = rng.choice(n_states, p=P[cur])
        path[t] = cur

    return path


# 1.5 Monte Carlo estimation of hitting times

def hitting_times_sim(P, start_state, n_sim=10_000):
    """
    Estimate expected hitting times E[T_{start -> j}] by simulation.

    Convention:
    - T_{j -> j} is the first return time (t >= 1)

    Returns:
        est[j] = estimated expected number of steps to hit state j
    """

    P = np.asarray(P, dtype=float)
    n_states = P.shape[0]

    seed = 1234
    rng = np.random.default_rng(seed)

    est = np.full(n_states, np.nan, dtype=float)

    for target in range(n_states):
        times = np.empty(n_sim, dtype=float)

        for s in range(n_sim):
            cur = start_state
            steps = 0

            # Simulate until target is reached
            while True:
                cur = rng.choice(n_states, p=P[cur])
                steps += 1
                if cur == target:
                    times[s] = steps
                    break

        est[target] = times.mean()

    return est


# 1.6 Analytical (theoretical) hitting times

def theoretical_hitting_times(P, start_state):
    """
    Compute expected hitting times analytically using linear equations.

    For each target j:
        m_i = E_i[T_j]
        m_i = 1 + sum_{k != j} P[i,k] m_k

    Returns:
        hit_theor[j] = E_{start_state}[T_j]
    """

    P = np.asarray(P, dtype=float)
    n_states = P.shape[0]

    hit_theor = np.full(n_states, np.nan, dtype=float)

    for target in range(n_states):

        # Initialize system A m = b
        A = np.eye(n_states)
        b = np.ones(n_states)

        # Build equations using first-step analysis
        for i in range(n_states):
            for k in range(n_states):
                if k == target:
                    continue
                A[i, k] -= P[i, k]

        # Solve linear system
        m = np.linalg.solve(A, b)

        # Extract hitting time from start_state
        hit_theor[target] = m[start_state]

    return hit_theor


When you are done, run the following cell (no need to implement anything else)

In [8]:
def problem1_main():
    print("\n=== Problem 1: Markov chain estimation + hitting times ===")

    # 1) Estimate P
    P_hat = comp_transition_matrix(X_t, N_states)
    print("Estimated P_hat:\n", np.round(P_hat, 3))

    # 2) Validate
    print("Is valid transition matrix?", is_transition_matrix(P_hat))

    # 3) Expected steps from given start state to all states
    start_state = 0

    # simulation
    mc = hitting_times_sim(P_hat, start_state=start_state, n_sim=5000)

    # Theory (linear system)
    th = theoretical_hitting_times(P_hat, start_state=start_state)

    # 4) Compare
    df = pd.DataFrame({
        "target_state": np.arange(N_states),
        "MC_estimate": mc,
        "theoretical": th,
        "abs_diff": np.abs(mc - th),
    })
    print("\nComparison table:\n", df)

PROBLEM 2: Cost-Sensitive Classification

You are given a binary classification problem for fraud detection.

Class labels:

    y = 1 => fraud

    y = 0 => ok



The costs of classification outcomes are:
    TP = 0, TN = 0, FP = 100, FN = 500

Tasks:
1. Train an SVM classifier.
2. Compute classification costs at a fixed threshold (0.5).
3. Evaluate total cost for multiple probability thresholds.
4. Find the threshold that minimizes total cost.

In [None]:
import numpy as np
import pandas as pd

costs = {"TP": 0, "TN": 0, "FP": 100, "FN": 500}


def generate_fraud_table(seed=0, n=3000, fraud_rate=0.05):
    """
    Generate a simple fraud dataset as a single table. The table contains:
        - numerical features: x1, x2, x3
        - binary target column: fraud (1 = fraud, 0 = legitimate)
    """
    rng = np.random.default_rng(seed)

    # Target variable
    fraud = (rng.random(n) < fraud_rate).astype(int)

    # Features
    x1 = rng.normal(0, 1, size=n)
    x2 = rng.normal(0, 1, size=n)
    x3 = rng.normal(0, 1, size=n)

    #  fraud cases are shifted
    x1[fraud == 1] += 2.0
    x2[fraud == 1] += 1.0

    df = pd.DataFrame({
        "x1": x1,
        "x2": x2,
        "x3": x3,
        "fraud": fraud,
    })

    return df


fraud_data = generate_fraud_table()

fraud_data.head()

Unnamed: 0,x1,x2,x3,fraud
0,-0.250243,-0.863902,-0.307019,0
1,-0.380736,0.018756,-0.559577,0
2,1.126431,2.055912,0.973126,1
3,0.806991,2.10416,-0.211368,1
4,0.059649,0.652374,-0.437259,0


Fill in the methods in the cell below:

In [12]:
#from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC
from sklearn.model_selection import train_test_split


def train_test_split_table(df):
    """
    Split a data table into training and test sets.

    Returns:
        X_train, X_test, y_train, y_test
    """
    # implement splitting
    # first, decide what are features and what are target 
    # Features and target
    X = df[["x1", "x2", "x3"]]
    y = df["fraud"].astype(int)

    X_train, X_test, y_train, y_test = train_test_split(
        X, y,
        test_size=0.2,
        random_state=42,
    )

    return X_train, X_test, y_train.to_numpy(), y_test.to_numpy()


def fit_linear_svm(fraud_data):
    """
    Fit a linear SVM classifier.

    Args: data table

    Returns:
        predicted labels of length len(y_test) 
    """
    # define our model
    clf = LinearSVC(
        C=1.0,
        max_iter=10_000,
        random_state=0
    )

    # split the data into trian and test:
    X_train, X_test, y_train, y_test = train_test_split_table(fraud_data)
    #   Fit the SVM using X_train and y_train and predict the label using y_test. return y_pred
    clf.fit(X_train, y_train)

    y_pred = clf.predict(X_test)

    return y_pred


def confusion_counts(y_true, y_pred):
    
    """
    Computes TP, TN, FP, FN.
    """
    
    TP_est, TN_est, FP_est, FN_est = 0,0,0,0 
    
    # Here you Ccmpute TP, TN, FP, FN.
    y_true = np.asarray(y_true).astype(int)
    y_pred = np.asarray(y_pred).astype(int)

    TP_est = int(np.sum((y_true == 1) & (y_pred == 1)))
    TN_est = int(np.sum((y_true == 0) & (y_pred == 0)))
    FP_est = int(np.sum((y_true == 0) & (y_pred == 1)))
    FN_est = int(np.sum((y_true == 1) & (y_pred == 0)))

    
    return {"TP": TP_est, "TN": TN_est, "FP": FP_est, "FN": FN_est}


def confusion_counts(y_true, y_pred):
    """
    Computes TP, TN, FP, FN.

    Inputs:
        y_true : array-like : True class labels (0 or 1)
        y_pred : array-like: Predicted class labels (0 or 1)

    Returns:
        Dictionary with counts: TP, TN, FP, FN 
    """

    # Initialize counters (not strictly necessary, but improves readability)
    TP_est, TN_est, FP_est, FN_est = 0, 0, 0, 0

    # Convert inputs to numpy arrays and ensure integer type
    # This allows vectorized logical comparisons
    y_true = np.asarray(y_true).astype(int)
    y_pred = np.asarray(y_pred).astype(int)

    # True Positives (TP):
    # Model predicts 1 AND the true label is 1
    TP_est = int(np.sum((y_true == 1) & (y_pred == 1)))

    # True Negatives (TN):
    # Model predicts 0 AND the true label is 0
    TN_est = int(np.sum((y_true == 0) & (y_pred == 0)))

    # False Positives (FP):
    # Model predicts 1 BUT the true label is 0
    # (Type I error)
    FP_est = int(np.sum((y_true == 0) & (y_pred == 1)))

    # False Negatives (FN):
    # Model predicts 0 BUT the true label is 1
    # (Type II error)
    FN_est = int(np.sum((y_true == 1) & (y_pred == 0)))

    # Return results in a dictionary for convenient access
    return {"TP": TP_est, "TN": TN_est, "FP": FP_est, "FN": FN_est}


def total_cost(counts):
    """
    Compute total cost from confusion counts.

    """
    # Multiply counts by costs and sum
    total_cost = (counts["TP"] * costs["TP"]) 
    + (counts["TN"] * costs["TN"]) 
    + (counts["FP"] * costs["FP"]) 
    + (counts["FN"] * costs["FN"])
    
    return total_cost

# evaluate how the classification cost changes when you change the decision threshold.
def sweep_thresholds(y_true, thresholds, X, clf):
    """
    Evaluate total cost for a range of thresholds.
    
    Here, clf is your trained SVM classifier
    """

    results = []
    
    # note: here, I define y_probs to be just a decision function. Think: does it need to be calibrated to be used in this problem?
    y_probs = clf.decision_function(X)

    for t in thresholds:
        # 1) compute the prediction for a chosen theshold
        y_pred = (y_probs >= t).astype(int)

        # 2) Confusion matrix counts  (previoulsy implemented by you)
        counts = confusion_counts(y_true, y_pred)

        # 3) Total cost (previoulsly implemented by you)
        cost = total_cost(counts)

        # 4) Store results
        results.append({
            "threshold": t,     # "threshold": float(t),
            "TP": counts["TP"],
            "TN": counts["TN"],
            "FP": counts["FP"],
            "FN": counts["FN"],
            "total_cost": cost, # "total_cost": float(cost),
        })

    return pd.DataFrame(results)



When you are done, run the following cell (no need to implement anything else)

In [13]:
def main():

    df = fraud_data

    print("Dataset head:")
    print(df.head(), "\n")

    # split in train and test:
    _, X_test, _, y_test = train_test_split_table(df)
    # Fit linear SVM
    clf = fit_linear_svm(df)

    # thresholds
    thresholds = np.linspace(-2.0, 2.0, 21)
    df_results = sweep_thresholds(
        y_test,
        clf,
        X_test,
        thresholds,
    )

    print("Threshold sweep results:")
    print(df_results)

    # 6) Identify optimal threshold
    best_row = df_results.loc[df_results["total_cost"].idxmin()]
    print("Optimal threshold:", best_row)

PROBLEM 3: Confidence estimation of the cost

In Problem 2, you trained a classifier, selected a decision threshold, evaluated its performance on a test set, and computed the cost

In this problem, you will quantify the uncertainty of this estimated cost. Each observation in the test set produces a cost depending on the
classification outcome:

    TN: 0
   
    FP: 100

    TP: 0

    FN: 500

Thus, the cost per observation is a bounded random variable taking
values in the interval [0, 500].

Tasks:
1. Compute the average cost per observation on the test set.
2. Use Hoeffdingâ€™s inequality to construct a 95% confidence interval
   for the true expected cost of the classifier.
3. Interpret the resulting interval:
   - What does it say about the reliability of your estimate?
   - Is the interval likely to be tight or conservative? Why?

You may assume that test observations are independent and identically
distributed.

In [16]:
def per_observation_cost(y_true, y_pred):
    """
    Compute per-observation cost vector.
    """
    y_true = np.asarray(y_true).astype(int)
    y_pred = np.asarray(y_pred).astype(int)

    c = np.zeros_like(y_true, dtype=float)

    # FP: y=0, pred=1 -> 100
    c[(y_true == 0) & (y_pred == 1)] = costs["FP"]

    # FN: y=1, pred=0 -> 500
    c[(y_true == 1) & (y_pred == 0)] = costs["FN"]

    # here, you will compute the average cost using the test set
    cost_avg = np.mean(c)


    # TP/TN -> 0 already
    return c




def hoeffding_ci(per_obs_costs, mean, n, a, b, delta=0.05):
    """
    Hoeffding confidence interval
    """
    # Step 1: deterministic costs per observation
    c = per_obs_costs

    # Step 2:   average cost
    mean_cost = np.mean(c)

    # Step 3: construct a Hoeffding intevral of the estimated cost

    n = int(c.size)

    # Hoeffding: P(|mean - E| >= eps) <= 2 exp(-2 n eps^2 / (b-a)^2)

    eps = (b - a) * np.sqrt(np.log(2.0 / delta) / (2.0 * n))

    lower = max(a, mean_cost - eps)
    upper = min(b, mean_cost + eps)

    ci = (lower,upper)
    
    
    return ci

## Discussion
Check Exam template tutoring for discussion
