In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.linear_model import LogisticRegression

PROBLEM 1: Data analysis using markov chians 

In this problem, you will empirically analyze a Markov chain 
with a finite state space. Transition probabilities are unknown.

The state space is:
    S = {0, 1, 2, 3}

You are given the data for the observed X_t for t  = 0..19

Tasks:
1. Estimate the transition matrix P from the observed transitions.
2. Verify that the estimated matrix is a probability transition matrix.
3. Compute the stationary distribution pi of the chain.
4. Simulate the chain using the estimated transition matrix
5. Compute the expected hitting times via

   (a) Simulation

   (b) Solving linear equations (analytical hitting times). 

Compare the estimates and interpret the results


In [2]:
import numpy as np

# state space
S = [0, 1, 2, 3]
N_states = len(S)

# Observed transitions: each row is (current_state, next_state)
X_t = np.array([
    [0, 1],
    [1, 2],
    [2, 3],
    [3, 0],
    [0, 1],
    [1, 1],
    [1, 2],
    [2, 2],
    [2, 3],
    [3, 3],
    [3, 0],
    [0, 2],
    [2, 1],
    [1, 3],
    [3, 1],
    [1, 0],
    [0, 0],
    [0, 1],
    [1, 2],
    [2, 0],
], dtype=int)




Below are methods that you need to complete

In [3]:
# 1.1
def comp_transition_matrix(transitions, n_states):
    """
    Estimate the transition matrix P from observed transitions.

    Args:
        transitions: array of shape (n_samples, 2)
        n_states: number of states

    Returns:
        P_hat: estimated transition matrix
    """
    P_hat = np.zeros((n_states, n_states))
    
    # implement P_hat
    for n in range(n_states):
        counter0 = 0
        counter1 = 0
        counter2 = 0
        counter3 = 0
        for j in transitions:
            if(j[0]==n):
                if(j[1]==0):
                    counter0+=1
                if(j[1]==1):
                    counter1+=1
                if(j[1]==2):
                    counter2+=1
                if(j[1]==3):
                    counter3+=1
        tot = counter0+counter1+counter2+counter3
        P_hat[n][0]=counter0/tot
        P_hat[n][1]=counter1/tot
        P_hat[n][2]=counter2/tot
        P_hat[n][3]=counter3/tot

    return P_hat


#  1.2
def is_transition_matrix(P):
    """
    Check if P is a transition matrix.
    """
    for row in P:
        sum=0
        for n in row:
            sum+=n
        if(sum<0.9999 or sum>1.0001):
            print(sum)
            return False
    
    return True


# 1.3
def stationary_distribution(P): 
    """
    Compute stationary distribution
    """

    P = P.transpose()
    eigvals, eigvecs = np.linalg.eig(P)

    pi = np.zeros(len(eigvals))
    for y in range(len(eigvals)):
        if np.isclose(eigvals[y], 1):
            pi = eigvecs[:, y].real
            break

    tot=0
    for i in pi:
        tot+=i
    norm = np.zeros(len(pi))
    for y in range(len(pi)):
        norm[y] = pi[y]/tot
    
    
    # Here you implement the method for computing pi. Remember that we did it during lessons - and there are at least 2 ways of computing pi. You can choose either of them
    print(pi)
    print(norm)
    return norm



def simulate_chain(P, start_state, n_steps):
    """
    Simulate a Markov chain trajectory with a fixed random seed.

    Returns: array of visited states of length n_steps + 1
    """
    seed = 1234 # don't change that
    
    rng = np.random.default_rng(seed)

    path = np.zeros(n_steps + 1, dtype=int)
    path[0] = start_state

    for n in range(n_steps):
        path[n+1] = rng.choice([0,1,2,3], p=P[path[n]])

    #  sample next states using rng.choice

    return path



def hitting_times_sim(P, start_state, n_sim=10_000):
    N_states = P.shape[0]
    est = np.zeros(N_states)

    rng = np.random.default_rng(1234)
    states = np.arange(N_states)

    for j in range(N_states):
        times = []

        for _ in range(n_sim):
            current = start_state
            t = 1  # convention: hitting time of start_state is 1

            while current != j:
                current = rng.choice(states, p=P[current])
                t += 1

            times.append(t)

        est[j] = np.mean(times)

    return est



def theoretical_hitting_times(P, start_state):
    N_states = P.shape[0]
    hit_theor = np.zeros(N_states)

    for target in range(N_states):
        if target == start_state:
            hit_theor[target] = 1 # Matches your simulation convention
            continue
            
        # Solve (I - P')h = 1, where P' is P with the target state row modified
        A = np.eye(N_states) - P
        b = np.ones(N_states)
        
        # Boundary condition: Expected time to hit target from target is 0
        # In your simulation, you use a convention where start is 1, 
        # so we solve for 0 and add 1 at the end if necessary.
        A[target, :] = 0
        A[target, target] = 1
        b[target] = 0
        
        h = np.linalg.solve(A, b)
        # Add 1 to match your simulation's 't = 1' starting convention
        hit_theor[target] = h[start_state] + 1 

    return hit_theor

When you are done, run the following cell (no need to implement anything else)

In [4]:
def problem1_main():
    print("\n=== Problem 1: Markov chain estimation + hitting times ===")

    # 1) Estimate P
    P_hat = comp_transition_matrix(X_t, N_states)
    print("Estimated P_hat:\n", np.round(P_hat, 3))

    # 2) Validate
    print("Is valid transition matrix?", is_transition_matrix(P_hat))

    stationary_distribution(P_hat)

    # 3) Expected steps from given start state to all states
    start_state = 0

    # simulation
    mc = hitting_times_sim(P_hat, start_state=start_state, n_sim=5000)

    # Theory (linear system)
    th = theoretical_hitting_times(P_hat, start_state=start_state)

    # 4) Compare
    df = pd.DataFrame({
        "target_state": np.arange(N_states),
        "MC_estimate": mc,
        "theoretical": th,
        "abs_diff": np.abs(mc - th),
    })
    print("\nComparison table:\n", df)

problem1_main()


=== Problem 1: Markov chain estimation + hitting times ===
Estimated P_hat:
 [[0.2   0.6   0.2   0.   ]
 [0.167 0.167 0.5   0.167]
 [0.2   0.2   0.2   0.4  ]
 [0.5   0.25  0.    0.25 ]]
Is valid transition matrix? True
[-0.49507377 -0.59408853 -0.49507377 -0.39605902]
[0.25 0.3  0.25 0.2 ]

Comparison table:
    target_state  MC_estimate  theoretical  abs_diff
0             0       1.0000     1.000000  0.000000
1             1       3.0154     3.024390  0.008990
2             2       4.2912     4.317073  0.025873
3             3       6.6374     6.682927  0.045527


PROBLEM 2: Cost-Sensitive Classification

You are given a binary classification problem for fraud detection.

Class labels:

    y = 1 => fraud

    y = 0 => ok



The costs of classification outcomes are:
    TP = 0, TN = 0, FP = 100, FN = 500

Tasks:
1. Train an SVM classifier.
2. Compute classification costs at a fixed threshold (0.5).
3. Evaluate total cost for multiple probability thresholds.
4. Find the threshold that minimizes total cost.

In [5]:
import numpy as np
import pandas as pd

costs = {"TP": 0, "TN": 0, "FP": 100, "FN": 500}


def generate_fraud_table(seed=0, n=3000, fraud_rate=0.05):
    """
    Generate a simple fraud dataset as a single table. The table contains:
        - numerical features: x1, x2, x3
        - binary target column: fraud (1 = fraud, 0 = legitimate)
    """
    rng = np.random.default_rng(seed)

    # Target variable
    fraud = (rng.random(n) < fraud_rate).astype(int)

    # Features
    x1 = rng.normal(0, 1, size=n)
    x2 = rng.normal(0, 1, size=n)
    x3 = rng.normal(0, 1, size=n)

    #  fraud cases are shifted
    x1[fraud == 1] += 2.0
    x2[fraud == 1] += 1.0

    df = pd.DataFrame({
        "x1": x1,
        "x2": x2,
        "x3": x3,
        "fraud": fraud,
    })

    return df


fraud_data = generate_fraud_table()

fraud_data.head()

Unnamed: 0,x1,x2,x3,fraud
0,-0.250243,-0.863902,-0.307019,0
1,-0.380736,0.018756,-0.559577,0
2,1.126431,2.055912,0.973126,1
3,0.806991,2.10416,-0.211368,1
4,0.059649,0.652374,-0.437259,0


Fill in the methods in the cell below:

In [6]:
import numpy as np
import pandas as pd
from sklearn.svm import LinearSVC
from sklearn.model_selection import train_test_split

costs = {"TP": 0, "TN": 0, "FP": 100, "FN": 500}

def train_test_split_table(df):
    # Fix: Use double brackets for list of column names
    X = df[["x1", "x2", "x3"]]
    y = df["fraud"]
    # Added random_state for consistency
    return train_test_split(X, y, test_size=0.2, random_state=0)

def fit_linear_svm(fraud_data):
    clf = LinearSVC(C=1.0, max_iter=10_000, random_state=0)
    X_train, X_test, y_train, y_test = train_test_split_table(fraud_data)
    
    # Fix: Fit first, then predict
    clf.fit(X_train, y_train)
    y_pred = clf.predict(X_test)
    
    return y_pred, y_test, clf, X_test

def confusion_counts(y_true, y_pred):
    # Fix: Reset to numpy arrays to ensure indexing works
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)
    
    TP, TN, FP, FN = 0, 0, 0, 0 
    
    for n in range(len(y_true)):
        if y_true[n] == 1 and y_pred[n] == 1:
            TP += 1
        elif y_true[n] == 1 and y_pred[n] == 0:
            FN += 1
        elif y_true[n] == 0 and y_pred[n] == 0:
            TN += 1
        elif y_true[n] == 0 and y_pred[n] == 1:
            FP += 1
    
    return {"TP": TP, "TN": TN, "FP": FP, "FN": FN}

def total_cost(counts):
    # Fix: Direct dictionary key multiplication
    total = (counts["TP"] * costs["TP"] + 
             counts["TN"] * costs["TN"] + 
             counts["FP"] * costs["FP"] + 
             counts["FN"] * costs["FN"])
    return total

def sweep_thresholds(y_true, thresholds, X, clf):
    results = []
    # decision_function returns the distance to the separating hyperplane
    y_scores = clf.decision_function(X)

    for t in thresholds:
        y_pred = (y_scores >= t).astype(int)
        counts = confusion_counts(y_true, y_pred)
        cost = total_cost(counts)

        results.append({
            "threshold": t,
            "TP": counts["TP"],
            "TN": counts["TN"],
            "FP": counts["FP"],
            "FN": counts["FN"],
            "total_cost": cost,
        })

    return pd.DataFrame(results)

When you are done, run the following cell (no need to implement anything else)

In [9]:
import numpy as np
import pandas as pd
from sklearn.svm import LinearSVC
from sklearn.model_selection import train_test_split

# Global cost configuration
costs = {"TP": 0, "TN": 0, "FP": 100, "FN": 500}

def generate_fraud_table(seed=0, n=3000, fraud_rate=0.05):
    rng = np.random.default_rng(seed)
    fraud = (rng.random(n) < fraud_rate).astype(int)
    x1 = rng.normal(0, 1, size=n)
    x2 = rng.normal(0, 1, size=n)
    x3 = rng.normal(0, 1, size=n)
    x1[fraud == 1] += 2.0
    x2[fraud == 1] += 1.0
    return pd.DataFrame({"x1": x1, "x2": x2, "x3": x3, "fraud": fraud})

def train_test_split_table(df):
    # Use double brackets for list of columns
    X = df[["x1", "x2", "x3"]]
    y = df["fraud"]
    return train_test_split(X, y, test_size=0.2, random_state=0)

def fit_linear_svm(df):
    X_train, X_test, y_train, y_test = train_test_split_table(df)
    clf = LinearSVC(C=1.0, max_iter=10_000, random_state=0)
    clf.fit(X_train, y_train)
    return clf  # Return the fitted model object

def confusion_counts(y_true, y_pred):
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)
    TP = np.sum((y_true == 1) & (y_pred == 1))
    TN = np.sum((y_true == 0) & (y_pred == 0))
    FP = np.sum((y_true == 0) & (y_pred == 1))
    FN = np.sum((y_true == 1) & (y_pred == 0))
    return {"TP": TP, "TN": TN, "FP": FP, "FN": FN}

def total_cost(counts):
    return (counts["TP"] * costs["TP"] + 
            counts["TN"] * costs["TN"] + 
            counts["FP"] * costs["FP"] + 
            counts["FN"] * costs["FN"])

def sweep_thresholds(y_true, thresholds, X, clf):
    results = []
    # decision_function returns raw distance from the hyperplane
    y_scores = clf.decision_function(X)

    for t in thresholds:
        y_pred = (y_scores >= t).astype(int)
        counts = confusion_counts(y_true, y_pred)
        cost = total_cost(counts)

        results.append({
            "threshold": t,
            "TP": counts["TP"],
            "TN": counts["TN"],
            "FP": counts["FP"],
            "FN": counts["FN"],
            "total_cost": cost,
        })
    return pd.DataFrame(results)

def main():
    df = generate_fraud_table()
    
    # 1) Get the test data for evaluation
    # We use a fixed random_state in split function to ensure X_test matches clf training
    _, X_test, _, y_test = train_test_split_table(df)
    
    # 2) Fit the model
    clf = fit_linear_svm(df)

    # 3) Define thresholds and sweep
    thresholds = np.linspace(-2.0, 2.0, 21)
    
    # FIX: Ensure arguments match the definition: (y_true, thresholds, X, clf)
    df_results = sweep_thresholds(
        y_test,
        thresholds,
        X_test,
        clf
    )

    print("Threshold sweep results (first 5 rows):")
    print(df_results)

    # 4) Identify optimal threshold
    best_row = df_results.loc[df_results["total_cost"].idxmin()]
    print("\n--- Optimal Result ---")
    print(best_row)

if __name__ == "__main__":
    main()

Threshold sweep results (first 5 rows):
    threshold  TP   TN   FP  FN  total_cost
0        -2.0  39  182  379   0       37900
1        -1.8  39  259  302   0       30200
2        -1.6  39  306  255   0       25500
3        -1.4  39  376  185   0       18500
4        -1.2  39  422  139   0       13900
5        -1.0  39  472   89   0        8900
6        -0.8  36  506   55   3        7000
7        -0.6  32  527   34   7        6900
8        -0.4  31  539   22   8        6200
9        -0.2  26  551   10  13        7500
10        0.0  23  557    4  16        8400
11        0.2  21  560    1  18        9100
12        0.4  14  560    1  25       12600
13        0.6   9  561    0  30       15000
14        0.8   6  561    0  33       16500
15        1.0   2  561    0  37       18500
16        1.2   1  561    0  38       19000
17        1.4   1  561    0  38       19000
18        1.6   0  561    0  39       19500
19        1.8   0  561    0  39       19500
20        2.0   0  561    0  39     

PROBLEM 3: Confidence estimation of the cost

In Problem 2, you trained a classifier, selected a decision threshold, evaluated its performance on a test set, and computed the cost

In this problem, you will quantify the uncertainty of this estimated cost. Each observation in the test set produces a cost depending on the
classification outcome:

    TN: 0
   
    FP: 100

    TP: 0

    FN: 500

Thus, the cost per observation is a bounded random variable taking
values in the interval [0, 500].

Tasks:
1. Compute the average cost per observation on the test set.
2. Use Hoeffdingâ€™s inequality to construct a 95% confidence interval
   for the true expected cost of the classifier.
3. Interpret the resulting interval:
   - What does it say about the reliability of your estimate?
   - Is the interval likely to be tight or conservative? Why?

You may assume that test observations are independent and identically
distributed.

In [None]:
def per_observation_cost(y_true, y_pred):
    """
    Compute a vector where each element is the cost of a single prediction.
    """
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)
    costs_vector = np.zeros_like(y_true, dtype=float)

    # Assign costs based on outcomes
    # FN: True=1, Pred=0 -> 500
    costs_vector[(y_true == 1) & (y_pred == 0)] = 500
    # FP: True=0, Pred=1 -> 100
    costs_vector[(y_true == 0) & (y_pred == 1)] = 100
    # TP and TN are 0, so no need to explicitly set them
    
    return costs_vector

def hoeffding_ci(per_obs_costs, delta=0.05):
    """
    Computes the 1-delta confidence interval for the mean cost.
    """
    n = len(per_obs_costs)
    mean_cost = np.mean(per_obs_costs)
    
    # Range of the random variable [a, b]
    a = 0
    b = 500
    range_sq = (b - a)**2

    # Compute epsilon (the margin of error)
    epsilon = np.sqrt((range_sq * np.log(2 / delta)) / (2 * n))

    ci_lower = mean_cost - epsilon
    ci_upper = mean_cost + epsilon
    
    return mean_cost, (ci_lower, ci_upper)

def main():
    # 1) Setup data and model
    df = generate_fraud_table()
    _, X_test, _, y_test = train_test_split_table(df)
    clf = fit_linear_svm(df)

    # 2) Define a threshold and get predictions
    # Let's use 0.0 (default) or the best one from your sweep
    target_threshold = 0.0 
    y_scores = clf.decision_function(X_test)
    y_pred = (y_scores >= target_threshold).astype(int)

    # 3) Compute per-observation costs
    costs_vec = per_observation_cost(y_test, y_pred)

    # 4) Calculate Hoeffding Confidence Interval
    mean_val, interval = hoeffding_ci(costs_vec, delta=0.05)

    # This is correct but the cost cant be negative so clip it to 0

    print(f"--- Cost Analysis (Threshold: {target_threshold}) ---")
    print(f"Mean Cost per Observation: ${mean_val:.2f}")
    print(f"95% Hoeffding CI: [${interval[0]:.2f}, ${interval[1]:.2f}]")
    print(f"Sample Size (n): {len(y_test)}")

main()


# Discussion:

# Tightness (Conservative Nature):
# The interval is very conservative (wide).Reason 1:
# It only considers the range ($0$ to $500$). It doesn't care if $99\%$ of your costs are actually $\$0$.
# It assumes the worst-case variance.
# 
# Reason 2: It ignores the actual variance of your sample.
# If you used a Central Limit Theorem (CLT) based approach (like a T-distribution),
# your interval would likely be much tighter because your actual standard deviation is probably much smaller than the theoretical maximum of $250$.

--- Cost Analysis (Threshold: 0.0) ---
Mean Cost per Observation: $14.00
95% Hoeffding CI: [$-13.72, $41.72]
Sample Size (n): 600
