In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.linear_model import LogisticRegression

PROBLEM 1: Data analysis using markov chians 

In this problem, you will empirically analyze a Markov chain 
with a finite state space. Transition probabilities are unknown.

The state space is:
    S = {0, 1, 2, 3}

You are given the data for the observed X_t for t  = 0..19

Tasks:
1. Estimate the transition matrix P from the observed transitions.
2. Verify that the estimated matrix is a probability transition matrix.
3. Compute the stationary distribution pi of the chain.
4. Simulate the chain using the estimated transition matrix
5. Compute the expected hitting times via

   (a) Simulation

   (b) Solving linear equations (analytical hitting times). 

Compare the estimates and interpret the results


In [2]:
import numpy as np

# state space
S = [0, 1, 2, 3]
N_states = len(S)

# Observed transitions: each row is (current_state, next_state)
X_t = np.array([
    [0, 1],
    [1, 2],
    [2, 3],
    [3, 0],
    [0, 1],
    [1, 1],
    [1, 2],
    [2, 2],
    [2, 3],
    [3, 3],
    [3, 0],
    [0, 2],
    [2, 1],
    [1, 3],
    [3, 1],
    [1, 0],
    [0, 0],
    [0, 1],
    [1, 2],
    [2, 0],
], dtype=int)




Below are methods that you need to complete

In [3]:
# 1.1
import numpy as np

def comp_transition_matrix(transitions, n_states):
    """
    Estimate the transition matrix P from observed transitions.

    Args:
        transitions : numpy array of shape (n_samples, 2)
                      Each row is [current_state, next_state]
        n_states    : total number of states

    Returns:
        P_hat       : estimated transition matrix (n_states x n_states)
    """

    # Create an empty transition matrix (all zeros)
    P_hat = np.zeros((n_states, n_states))

    # --------------------------------------------------
    # Step 1: Count how many times each transition occurs
    # --------------------------------------------------
    for row in transitions:
        current_state = row[0]   # state at time t
        next_state = row[1]      # state at time t+1

        # Increase count for transition current_state -> next_state
        P_hat[current_state, next_state] = P_hat[current_state, next_state] + 1

    # --------------------------------------------------
    # Step 2: Normalize each row so it sums to 1
    # (convert counts into probabilities)
    # --------------------------------------------------
    for i in range(n_states):
        row_sum = np.sum(P_hat[i])

        # Only normalize if we have observed outgoing transitions
        if row_sum > 0:
            for j in range(n_states):
                P_hat[i, j] = P_hat[i, j] / row_sum
        # If row_sum == 0, leave the row as zeros

    return P_hat



import numpy as np

def is_transition_matrix(P):
    """
    Check if P is a valid transition matrix.

    Conditions:
    1) P must be a square matrix
    2) All entries must be >= 0
    3) Each row must sum to 1 (within numerical tolerance)
    """

    # Check that P is a square matrix
    n_rows, n_cols = P.shape
    if n_rows != n_cols:
        return False

    # Loop over each row
    for i in range(n_rows):

        # Condition 1: no negative probabilities
        for j in range(n_cols):
            if P[i, j] < 0:
                return False

        # Condition 2: row sums to 1 (allowing small numerical errors)
        row_sum = np.sum(P[i])
        if not np.isclose(row_sum, 1.0):
            return False

    # If all checks passed
    return True



import numpy as np

def stationary_distribution(P):
    """
    Compute the stationary distribution using eigenvectors.

    Uses the fact that:
        pi^T P = pi^T
    which means:
        pi^T is an eigenvector of P^T with eigenvalue 1
    """

    # Compute eigenvalues and eigenvectors of P^T
    eigenvalues, eigenvectors = np.linalg.eig(P.T)

    # Find index of eigenvalue closest to 1
    idx = np.argmin(np.abs(eigenvalues - 1))

    # Extract corresponding eigenvector
    pi = np.real(eigenvectors[:, idx])

    # Normalize so that probabilities sum to 1
    pi = pi / np.sum(pi)

    return pi

def stationary_distribution2(P):
    """
    Compute the stationary distribution by solving a linear system.

    Solves:
        (P^T - I) pi = 0
    with constraint:
        sum(pi) = 1
    """

    n_states = P.shape[0]

    # Create matrix A = P^T - I
    A = P.T - np.eye(n_states)

    # Replace last equation with sum(pi) = 1
    A[-1, :] = 1

    # Right-hand side vector
    b = np.zeros(n_states)
    b[-1] = 1

    # Solve linear system
    pi = np.linalg.solve(A, b)

    return pi





import numpy as np

def simulate_chain(P, start_state, n_steps):
    """
    Simulate a Markov chain trajectory with a fixed random seed.

    Args:
        P           : transition matrix (n_states x n_states)
        start_state : initial state of the chain
        n_steps     : number of steps to simulate

    Returns:
        path        : array of visited states of length n_steps + 1
    """

    # Fixed random seed (DO NOT change)
    seed = 1234
    rng = np.random.default_rng(seed)

    # Array to store the visited states
    path = np.zeros(n_steps + 1, dtype=int)

    # Set initial state
    path[0] = start_state

    # ---------------------------------------
    # Simulate the Markov chain step by step
    # ---------------------------------------
    for t in range(n_steps):
        current_state = path[t]

        # Sample next state according to transition probabilities
        # P[current_state] is a probability vector over all states
        next_state = rng.choice(len(P), p=P[current_state])

        # Store next state
        path[t + 1] = next_state

    return path




import numpy as np

def hitting_times_sim(P, start_state, n_sim=10_000):
    """
    Estimate expected hitting times E[T_{start -> j}] for ALL states j
    using Monte Carlo simulation.

    Args:
        P           : transition matrix
        start_state : initial state
        n_sim       : number of simulations

    Returns:
        est         : 1D array where est[j] is the estimated expected
                      number of steps to hit state j from start_state
    """

    # Number of states
    N_states = P.shape[0]

    # Array to store estimated hitting times
    est = np.zeros(N_states, dtype=float)

    # Fixed random seed (same simulations every run)
    seed = 1234
    rng = np.random.default_rng(seed)

    # --------------------------------------------------
    # Loop over all target states j
    # --------------------------------------------------
    for j in range(N_states):

        hitting_times = []

        # --------------------------------------------------
        # Run many simulations
        # --------------------------------------------------
        for sim in range(n_sim):

            current_state = start_state
            t = 0

            # If start_state == j, hitting time is 0
            if current_state == j:
                hitting_times.append(0)
                continue

            # Simulate until we hit state j
            while True:
                # Sample next state
                current_state = rng.choice(N_states, p=P[current_state])
                t += 1

                if current_state == j:
                    hitting_times.append(t)
                    break

        # Average hitting time for state j
        est[j] = np.mean(hitting_times)

    return est




import numpy as np

def theoretical_hitting_times(P, start_state):
    """
    Compute theoretical (analytical) expected hitting times from start_state
    to ALL target states j by solving linear systems.

    Returns:
        hit_theor: 1D array where hit_theor[j] = E[T_{start_state -> j}]
    """

    # Number of states
    N_states = P.shape[0]

    # Array to store theoretical hitting times for each target state j
    hit_theor = np.full(N_states, np.nan, dtype=float)

    # ------------------------------------------------------------
    # Loop over each target state j
    # ------------------------------------------------------------
    for j in range(N_states):

        # If target is the start state, hitting time is 0 (already there)
        if j == start_state:
            hit_theor[j] = 0.0
            continue

        # ------------------------------------------------------------
        # Build the linear system for target j
        #
        # For all states i != j:
        #   h(i) = 1 + sum_k P[i,k] h(k)
        #
        # Move terms to left:
        #   h(i) - sum_k P[i,k] h(k) = 1
        #
        # We solve this only for states != j (since h(j)=0 is known).
        # ------------------------------------------------------------

        # Indices of states that are NOT the target j
        states = [s for s in range(N_states) if s != j]

        # Create matrix A and vector b for A * h = b
        # Size: (N_states-1) x (N_states-1)
        A = np.zeros((N_states - 1, N_states - 1))
        b = np.ones(N_states - 1)  # right-hand side is 1's

        # Fill A
        for row_idx, i in enumerate(states):
            for col_idx, k in enumerate(states):
                # This represents: (I - P_restricted)
                if i == k:
                    A[row_idx, col_idx] = 1.0 - P[i, k]
                else:
                    A[row_idx, col_idx] = -P[i, k]

            # Note:
            # We do NOT include transitions to j in the unknowns, because h(j)=0.
            # So those terms disappear automatically.

        # Solve for h over states != j
        h = np.linalg.solve(A, b)

        # Extract the value corresponding to start_state
        start_index_in_states = states.index(start_state)
        hit_theor[j] = h[start_index_in_states]

    return hit_theor


When you are done, run the following cell (no need to implement anything else)

In [4]:
def problem1_main():
    print("\n=== Problem 1: Markov chain estimation + hitting times ===")

    # 1) Estimate P
    P_hat = comp_transition_matrix(X_t, N_states)
    print("Estimated P_hat:\n", np.round(P_hat, 3))

    # 2) Validate
    print("Is valid transition matrix?", is_transition_matrix(P_hat))
    
    # Check stationary distribution:
    print("This is stationary distribution: ", stationary_distribution(P_hat))

    # 3) Expected steps from given start state to all states
    start_state = 0

    # simulation
    mc = hitting_times_sim(P_hat, start_state=start_state, n_sim=5000)

    # Theory (linear system)
    th = theoretical_hitting_times(P_hat, start_state=start_state)

    # 4) Compare
    df = pd.DataFrame({
        "target_state": np.arange(N_states),
        "MC_estimate": mc,
        "theoretical": th,
        "abs_diff": np.abs(mc - th),
    })
    print("\nComparison table:\n", df)
    
problem1_main()




=== Problem 1: Markov chain estimation + hitting times ===
Estimated P_hat:
 [[0.2   0.6   0.2   0.   ]
 [0.167 0.167 0.5   0.167]
 [0.2   0.2   0.2   0.4  ]
 [0.5   0.25  0.    0.25 ]]
Is valid transition matrix? True
This is stationary distribution:  [0.25 0.3  0.25 0.2 ]

Comparison table:
    target_state  MC_estimate  theoretical  abs_diff
0             0       0.0000     0.000000  0.000000
1             1       2.0154     2.024390  0.008990
2             2       3.2912     3.317073  0.025873
3             3       5.6374     5.682927  0.045527


PROBLEM 2: Cost-Sensitive Classification

You are given a binary classification problem for fraud detection.

Class labels:

    y = 1 => fraud

    y = 0 => ok



The costs of classification outcomes are:
    TP = 0, TN = 0, FP = 100, FN = 500

Tasks:
1. Train an SVM classifier.
2. Compute classification costs at a fixed threshold (0.5).
3. Evaluate total cost for multiple probability thresholds.
4. Find the threshold that minimizes total cost.

In [5]:
import numpy as np
import pandas as pd

costs = {"TP": 0, "TN": 0, "FP": 100, "FN": 500}


def generate_fraud_table(seed=0, n=3000, fraud_rate=0.05):
    """
    Generate a simple fraud dataset as a single table. The table contains:
        - numerical features: x1, x2, x3
        - binary target column: fraud (1 = fraud, 0 = legitimate)
    """
    rng = np.random.default_rng(seed)

    # Target variable
    fraud = (rng.random(n) < fraud_rate).astype(int)

    # Features
    x1 = rng.normal(0, 1, size=n)
    x2 = rng.normal(0, 1, size=n)
    x3 = rng.normal(0, 1, size=n)

    #  fraud cases are shifted
    x1[fraud == 1] += 2.0
    x2[fraud == 1] += 1.0

    df = pd.DataFrame({
        "x1": x1,
        "x2": x2,
        "x3": x3,
        "fraud": fraud,
    })

    return df


fraud_data = generate_fraud_table()

fraud_data.head()

Unnamed: 0,x1,x2,x3,fraud
0,-0.250243,-0.863902,-0.307019,0
1,-0.380736,0.018756,-0.559577,0
2,1.126431,2.055912,0.973126,1
3,0.806991,2.10416,-0.211368,1
4,0.059649,0.652374,-0.437259,0


Fill in the methods in the cell below:

In [6]:
#from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC


def train_test_split_table(df):
    """
    Split a data table into training and test sets.

    Returns:
        X_train, X_test, y_train, y_test
    """
    # implement splitting
    # first, decide what are features and what are target 
    X = df[["x1", "x2", "x3"]]
    y = df["fraud"]
    
    n = len(y)
    n_train = int(n*0.8)
    #n_test = int(n*0.2)
    #print("length of y", n)

    # then split into train and test
    X_train = X[:n_train]
    X_test = X[n_train:] #X[n_train:n_train+n_test]
    y_train = y[:n_train]
    y_test = y[n_train:]

    return X_train, X_test, y_train, y_test

def fit_linear_svm(fraud_data):
    """
    Fit a linear SVM classifier.

    Args: data table

    Returns:
        predicted labels of length len(y_test) 
    """
    # define our model
    clf = LinearSVC(
        C=1.0,
        max_iter=10_000,
        random_state=0
    )

    # split the data into trian and test:
    X_train, X_test, y_train, y_test = train_test_split_table(fraud_data)
    #   Fit the SVM using X_train and y_train and predict the label using y_test. return y_pred
    clf.fit(X_train, y_train)
    y_pred = clf.predict(X_test)

    return clf


def confusion_counts(y_true, y_pred):
    
    """
    Computes TP, TN, FP, FN.
    """
    
    #TP_est, TN_est, FP_est, FN_est = 0,0,0,0 
    
    TP_est = np.sum((y_true == 1) & (y_pred == 1))
    
    TN_est = np.sum((y_true == 0) & (y_pred == 0))
    
    FP_est = np.sum((y_true == 0) & (y_pred == 1))
    
    FN_est = np.sum((y_true == 1) & (y_pred == 0))
    
    return {"TP": TP_est, "TN": TN_est, "FP": FP_est, "FN": FN_est}


def total_cost(counts):
    """
    Compute total cost from confusion counts.

    """
    # Multiply counts by costs and sum
    tp_cost = costs["TP"] * counts["TP"]
    tn_cost = costs["TN"] * counts["TN"]
    fp_cost = costs["FP"] * counts["FP"]
    fn_cost = costs["FN"] * counts["FN"]

    total_cost = tp_cost + tn_cost + fp_cost + fn_cost
    return total_cost




# evaluate how the classification cost changes when you change the decision threshold.
def sweep_thresholds(y_true, thresholds, X, clf):
    """
    Evaluate total cost for a range of thresholds.
    
    Here, clf is your trained SVM classifier
    """

    results = []
    
    # note: here, I define y_probs to be just a decision function. Think: does it need to be calibrated to be used in this problem?
    y_probs = clf.decision_function(X)

    for t in thresholds:
        # 1) compute the prediction for a chosen theshold
        y_pred = (y_probs >= t).astype(int)

        # 2) Confusion matrix counts  (previoulsy implemented by you)
        counts = confusion_counts(y_true, y_pred)

        # 3) Total cost (previoulsly implemented by you)
        cost = total_cost(counts)

        # 4) Store results
        results.append({
            "threshold": t,
            "TP": counts["TP"],
            "TN": counts["TN"],
            "FP": counts["FP"],
            "FN": counts["FN"],
            "total_cost": cost,
        })

    return pd.DataFrame(results)



When you are done, run the following cell (no need to implement anything else)

In [7]:
def main():

    df = fraud_data

    print("Dataset head:")
    print(df.head(), "\n")

    # split in train and test:
    _, X_test, _, y_test = train_test_split_table(df)
    # Fit linear SVM
    clf = fit_linear_svm(df)

    # thresholds
    thresholds = np.linspace(-2.0, 2.0, 21)
    
    df_results = sweep_thresholds(
        y_test,
        thresholds,
        X_test,
        clf,
    )

    print("Threshold sweep results:")
    print(df_results)

    # 6) Identify optimal threshold
    best_row = df_results.loc[df_results["total_cost"].idxmin()]
    print("Optimal threshold:", best_row)
    
main()

Dataset head:
         x1        x2        x3  fraud
0 -0.250243 -0.863902 -0.307019      0
1 -0.380736  0.018756 -0.559577      0
2  1.126431  2.055912  0.973126      1
3  0.806991  2.104160 -0.211368      1
4  0.059649  0.652374 -0.437259      0 

Threshold sweep results:
    threshold  TP   TN   FP  FN  total_cost
0        -2.0  29  185  386   0       38600
1        -1.8  29  241  330   0       33000
2        -1.6  29  306  265   0       26500
3        -1.4  29  387  184   0       18400
4        -1.2  29  438  133   0       13300
5        -1.0  29  482   89   0        8900
6        -0.8  27  518   53   2        6300
7        -0.6  26  537   34   3        4900
8        -0.4  24  553   18   5        4300
9        -0.2  19  564    7  10        5700
10        0.0  14  566    5  15        8000
11        0.2  12  567    4  17        8900
12        0.4   7  568    3  22       11300
13        0.6   3  569    2  26       13200
14        0.8   2  571    0  27       13500
15        1.0   1  57

PROBLEM 3: Confidence estimation of the cost

In Problem 2, you trained a classifier, selected a decision threshold, evaluated its performance on a test set, and computed the cost

In this problem, you will quantify the uncertainty of this estimated cost. Each observation in the test set produces a cost depending on the
classification outcome:

    TN: 0
   
    FP: 100

    TP: 0

    FN: 500

Thus, the cost per observation is a bounded random variable taking
values in the interval [0, 500].

Tasks:
1. Compute the average cost per observation on the test set.
2. Use Hoeffdingâ€™s inequality to construct a 95% confidence interval
   for the true expected cost of the classifier.
3. Interpret the resulting interval:
   - What does it say about the reliability of your estimate?
   - Is the interval likely to be tight or conservative? Why?

You may assume that test observations are independent and identically
distributed.

In [8]:
import numpy as np

def per_observation_cost(y_true, y_pred):
    """
    Per-observation costs:
      TN: 0, FP: 100, TP: 0, FN: 500
    Returns: array of shape (n,)
    """
    y_true = np.asarray(y_true)
    y_pred = np.asarray(y_pred)

    costs = np.zeros_like(y_true, dtype=float)

    # FP: y_true=0, y_pred=1 -> 100
    costs[(y_true == 0) & (y_pred == 1)] = 100.0

    # FN: y_true=1, y_pred=0 -> 500
    costs[(y_true == 1) & (y_pred == 0)] = 500.0

    # TP and TN stay 0
    return costs


def hoeffding_ci(per_obs_costs, a, b, delta=0.05):
    """
    Hoeffding CI for E[C], where each C_i in [a,b].
    Returns (lower, upper).
    """
    c = np.asarray(per_obs_costs, dtype=float)
    n = len(c)
    mean_cost = np.mean(c)

    epsilon = (b - a) * np.sqrt(np.log(2.0 / delta) / (2.0 * n))
    lower = max(a, mean_cost - epsilon)
    upper = min(b, mean_cost + epsilon)

    return (lower, upper), mean_cost


# Usage
a, b = 0.0, 500.0

X_train, X_test, y_train, y_test = train_test_split_table(fraud_data)

clf = LinearSVC(
    C=1.0,
    max_iter=10_000,
    random_state=0
)

clf.fit(X_train, y_train)
y_pred = clf.predict(X_test)

costs = per_observation_cost(y_test, y_pred)
ci, avg_cost = hoeffding_ci(costs, a, b, delta=0.05)

print("Average cost per observation:", avg_cost)
print("95% Hoeffding CI:", ci)


Average cost per observation: 13.333333333333334
95% Hoeffding CI: (0.0, np.float64(41.05546443720779))
