# Assignment 2

In [None]:
import pandas as pd
import seaborn as sns
import numpy as np
import matplotlib.pyplot as plt

In [None]:
data = pd.read_csv('forestCover.csv')

In [None]:
data.columns.shape

In [None]:
data.head()

## Data Quality Issue Investigation

### Encoding of textual entries

In [None]:
cols = data.columns.tolist()
idx1, idx2 = cols.index('Water_Level'), cols.index('Observation_ID')
cols[idx1], cols[idx2] = cols[idx2], cols[idx1]
data.columns = cols

In [None]:
data['Soil_Type1'].value_counts()

In [None]:
data['Soil_Type1'] = data['Soil_Type1'].map(lambda x : 1 if x=='positive' else 0)

### Missing value analysis and correction

In [None]:
100*298/len(data)

In [None]:
data['Slope'][data['Slope']!= '?'].astype(int).median()

In [None]:
data['Slope'][data['Slope']!= '?'].astype(int).mean()

In [None]:
sns.histplot(data['Slope'][data['Slope']!= '?'].astype(int))

In [None]:
data[data['Slope']== '?'] # before imputation

In [None]:
from sklearn.impute import KNNImputer
imputer = KNNImputer(n_neighbors=5, weights="uniform")
df = data.replace('?', np.nan)
df = df.apply(pd.to_numeric, errors='ignore')
imputer = KNNImputer(n_neighbors=5, weights="uniform")
df_imputed = pd.DataFrame(
    imputer.fit_transform(df),
    columns=df.columns
)

In [None]:
# Slope feature imputation results in non-rounded values which is not an issue for modelling but might conflict with domain

In [None]:
df_imputed[data['Slope']== '?'] # after imputation

### Correlation analysis

In [None]:
data[['Facet', 'Aspect']].corr()

In [None]:
from scipy import stats
def correlation_ratio(categories, measurements):
    """
    (eta) correlation ratio between continuous and categorical features.
    Source: https://www.analyticsvidhya.com/blog/2022/08/statistical-effect-size-and-python-implementation
    """
    df = pd.DataFrame({"cat": categories, "meas": measurements}).dropna()

    fcat, _ = pd.factorize(df["cat"])
    cat_num = np.max(fcat) + 1
    y_avg_array = np.zeros(cat_num)
    n_array = np.zeros(cat_num)

    for i in range(cat_num):
        cat_measures = df["meas"].values[np.argwhere(fcat == i).flatten()]
        n_array[i] = len(cat_measures)
        y_avg_array[i] = np.mean(cat_measures) if len(cat_measures) > 0 else 0

    y_total_avg = np.sum(y_avg_array * n_array) / np.sum(n_array)
    numerator = np.sum(n_array * (y_avg_array - y_total_avg) ** 2)
    denominator = np.sum((df["meas"].values - y_total_avg) ** 2)

    return np.sqrt(numerator / denominator) if denominator != 0 else 0.0

In [None]:
correlation_ratio(data['Cover_Type'], data['Facet'])

In [None]:
correlation_ratio(data['Cover_Type'], data['Aspect'])

In [None]:
# At very least, drop at one. At most, drop both due to low target correlation. Likely drop only one to be cautious

In [None]:
sns.scatterplot(x=data['Facet'], y=data['Aspect'])

### Noisy feature analysis

In [None]:
correlation_ratio(data['Cover_Type'], data['Inclination'])

In [None]:
# Inclination has stochastic noise

In [None]:
sns.histplot(data['Inclination'])

In [None]:
# Likely remove since the whole feature is noisy and it has basically no correlation to target. Noise removal like through NN or CEWS or iterative partitioning won't help it enough
# further, there are already so many features so for the sake of model complexity, it should be dropped

In [None]:
sns.scatterplot(x=data['Cover_Type'], y=data['Inclination'])

### Features to drop

In [None]:
# drop water_level and observation_id

## Model Training and Evaluation

### Metric-related functions

In [None]:
from sklearn.model_selection import StratifiedKFold

In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, cohen_kappa_score, matthews_corrcoef, roc_curve, roc_auc_score, log_loss
def get_results(model, X_test, y_test, y_pred=None, y_prob=None):
    """
    Performs various classification performance metrics on either new or given predictions.
    """
    y_pred = model.predict(X_test) if y_pred is None else y_pred
    y_prob = model.predict_proba(X_test) if y_prob is None else y_prob
    results = {
        "Accuracy" : accuracy_score(y_test, y_pred),
        "Precision" : precision_score(y_test, y_pred, average='weighted', zero_division=0.0),
        "Recall" : recall_score(y_test, y_pred, average='weighted'),
        "F1-score" : f1_score(y_test, y_pred, average='weighted'),
        "Cohen's kappa" : cohen_kappa_score(y_test, y_pred),
        "MCC" : matthews_corrcoef(y_test, y_pred),
        "AUC" : roc_auc_score(y_test, y_prob, average='weighted', multi_class='ovo'),
        "Cross-Entropy" : log_loss(y_test, y_prob)
    }
    return results

In [None]:
def collapse_dict(data):
    """
    Collapses seperate fold runs into a cohesive dictionary.
    """
    from collections import defaultdict
    collapsed_data = []

    for element in data:
        combined = defaultdict(list)
        
        # Collect all values for each metric
        for run in element.values():
            for metric, value in run.items():
                combined[metric].append(value)
        
        # Calculate mean and std for each metric
        mean_dict = {metric: float(np.mean(values)) for metric, values in combined.items()}
        std_dict = {metric: float(np.std(values, ddof=1)) for metric, values in combined.items()}  # sample std
        
        collapsed_data.append({'mean': mean_dict, 'std': std_dict})
    return collapsed_data

In [None]:
def plot_metrics(list_of_dicts, hyperparams, x_label, fname):
    """
    Plots metrics as lines against hyperparameter choices.
    """
    all_dicts = collapse_dict(list_of_dicts)
    list_of_dicts_mean = [x['mean'] for x in all_dicts]
    list_of_dicts_std = [x['std'] for x in all_dicts]
    
    metrics = list(list_of_dicts_mean[0].keys())
    
    plt.figure(figsize=(10, 6))
    
    markers = ['o', 's', '^', 'd', 'v', 'x', '*']
    linestyles = ['-', '--', ':', '-.', '-', '--', ':']
    num_metrics = len(metrics)
    colors = plt.get_cmap("viridis", num_metrics)
    
    for i, metric in enumerate(metrics):
        if metric in ["Cross-Entropy", "Precision", "Recall"]: print(metric, np.round(y_values,3)); continue
        
        y_values = [d[metric] for d in list_of_dicts_mean]
        print(metric, np.round(y_values,3))
        
        y_stds = [d[metric] for d in list_of_dicts_std] 

        plt.errorbar(hyperparams, y_values, yerr=y_stds, 
             marker=markers[i % len(markers)], markersize=4,
             linestyle=linestyles[i % len(linestyles)], 
             label=metric, alpha=0.8, color=colors(i),
             capsize=3)
    
    plt.xlabel(x_label)
    plt.ylabel("Score")
    plt.ylim(top=1)
    plt.grid(True, alpha=0.3)
    plt.legend()
    plt.tight_layout()
    plt.savefig(f"plots/{fname}.pdf", bbox_inches='tight', dpi=150)
    plt.show()

### Preprocessing and division for CT

In [None]:
from sklearn.preprocessing import StandardScaler # z-normalisation
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.neighbors import KNeighborsClassifier

mod_data = data.replace('?', np.nan).drop(columns=["Observation_ID", "Water_Level", "Cover_Type"])
target = data["Cover_Type"]

tree_X_train, tree_X_test, tree_y_train, tree_y_test = train_test_split(
    mod_data, target, test_size=0.3, random_state=0, stratify=target
)

### Tree running

In [None]:
def run_tree(X, y, criterion='entropy'):
    ccp_alphas = np.linspace(0, 0.4, 50)
    results = []
    for i, ccp_alpha in enumerate(ccp_alphas):
        print(f"{100.0*i/50}%")
        skf = StratifiedKFold(n_splits=3, shuffle=True, random_state=0)
        folds = {}
        for fold, (train_index, test_index) in enumerate(skf.split(X, y), 1):
            X_train, y_train, X_val, y_val = X.iloc[train_index], y.iloc[train_index], X.iloc[test_index], y.iloc[test_index]
            clf = DecisionTreeClassifier(criterion=criterion, class_weight="balanced", random_state=0, ccp_alpha=ccp_alpha)
            clf.fit(X_train, y_train)
            folds[fold] = get_results(clf, X_val, y_val)
        results.append(folds)
    return results, ccp_alphas

In [None]:
%%time
tree_results, ccp_alphas = run_tree(tree_X_train, tree_y_train, criterion='entropy')
plot_metrics(tree_results, ccp_alphas, "Cost-Complexity", "tree_entropy")

In [None]:
%%time
tree_results, ccp_alphas = run_tree(tree_X_train, tree_y_train, criterion='gini')
plot_metrics(tree_results, ccp_alphas, "Cost-Complexity", "tree_gini")

### Preprocessing and division for KNN

In [None]:
mod_data = df_imputed.drop(columns=["Aspect", "Observation_ID", "Water_Level", "Inclination", "Cover_Type"])
target = data["Cover_Type"]

scaled_data = mod_data.copy()
scaled_data.iloc[:,:10] = StandardScaler().fit_transform(mod_data.iloc[:,:10]) # normalise continuous features only

knn_X_train, knn_X_test, knn_y_train, knn_y_test = train_test_split(
    scaled_data, target, test_size=0.3, random_state=0, stratify=target
)

### Functions for KNN running

In [None]:
from numba import jit
from pynndescent import NNDescent
from scipy.stats import mode
from numba import typed
# Numba-optimized Gower distance for two vectors
@jit(nopython=True, fastmath=True)
def gower_distance(x, y, num_indices, cat_indices, ranges, num_features):
    dist = 0.0
    # Numerical features: normalized Manhattan
    for idx, i in enumerate(num_indices):
        dist += np.abs(x[i] - y[i]) / ranges[idx]
    # Categorical features: Hamming distance
    for i in cat_indices:
        dist += 1.0 if x[i] != y[i] else 0.0
    return dist / num_features
    
def compute_ranges(X, num_indices):
    ranges = np.array([np.ptp(X[:, i]) for i in num_indices], dtype=np.float64)
    ranges[ranges == 0] = 1.0  # Avoid division by zero
    return ranges

class GowerKNNClassifier:
    """
    Special class wrapping NNDescent for this dataset.
    """
    def __init__(self, n_neighbors=5, n_jobs=-1):
        self.n_neighbors = n_neighbors
        self.n_jobs = n_jobs
        self.index = None
        self.y_train = None
        self.num_indices = None
        self.cat_indices = None
        self.ranges = None
        self.num_features = None

    def fit(self, X, y, num_indices, cat_indices, num_features):
        self.num_indices = np.array(num_indices, dtype=np.int64)
        self.cat_indices = np.array(cat_indices, dtype=np.int64)
        self.num_features = num_features
        self.y_train = np.array(y)
        
        self.ranges = compute_ranges(X, num_indices)
        self.ranges = np.array(self.ranges, dtype=np.float64)
        
        self.index = NNDescent(X, metric=gower_distance, metric_kwds={
                                "num_indices": self.num_indices,
                                "cat_indices": self.cat_indices,
                                "ranges": self.ranges,
                                "num_features": self.num_features},
                               n_neighbors=self.n_neighbors, n_jobs=self.n_jobs, random_state=0)
        return self

    def query(self, X):
        # Query nearest neighbors
        indices, distances = self.index.query(X, k=self.n_neighbors)
        
        return distances, indices

In [None]:
def predict_for_k(distances, indices, y_train, classes, k):
    """
    Slices the given KNN distances to quickly calculate KNN output for different k.
    """
    y_pred = []
    y_proba = []
    for i in range(len(distances)):
        # Slice to top k for this test point
        k_dist = distances[i][:k]
        k_idx = indices[i][:k]
        k_labels = y_train[k_idx]
        
        # Apply weights
        w = 1.0 / (k_dist + 1e-5)  # Avoid division by zero
        
        # Weighted voting with all classes initialised
        class_votes = {c: 0.0 for c in classes}
        for label, weight in zip(k_labels, w):
            class_votes[label] += weight
        
        # Compute probabilities
        total_weight = sum(class_votes.values())
        if total_weight == 0:
            probs = [1.0 / len(classes)] * len(classes)  # Uniform if no weights (edge case)
        else:
            probs = [class_votes[c] / total_weight for c in classes]
        
        # Predict class (argmax)
        pred = classes[np.argmax(probs)]
        
        y_pred.append(pred)
        y_proba.append(probs)
    
    return np.array(y_pred), np.array(y_proba)

In [None]:
def start_knn(X_train, X_val, y_train, y_val, metric='minkowski', p=2):
    use_gower = metric == 'gower'
    MAX_K = 100
    if use_gower:
        X_train = X_train.values
        y_train = y_train.values
        # Create a partial function with categorical indices fixed
        cat_indices = list(range(10, X_train.shape[1]))
        num_indices = [i for i in range(X_train.shape[1]) if i not in cat_indices]
        
        num_features = X_train.shape[1]  # Total features (for averaging)
        knn = GowerKNNClassifier(n_neighbors=MAX_K, n_jobs=-1)
        knn.fit(X_train, y_train, num_indices, cat_indices, num_features)
        return *knn.query(X_val), np.unique(y_train)
    
    clf = KNeighborsClassifier(n_neighbors=MAX_K, weights='distance', metric=metric, p=p, n_jobs=-1)
    clf.fit(X_train, y_train)
    
    # Get distances and indices for MAX_K
    distances, indices = clf.kneighbors(X_val, return_distance=True)
    return distances, indices, clf.classes_# Get the sorted unique classes from the classifier

def run_knn(X_train, X_val, y_train, y_val, distances, indices, classes):
    results = []
    # Test multiple k without recomputing distances
    ks = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 15, 18, 20, 25, 30, 50, 70, 90, 100]
    for i, k in enumerate(ks):
        print(f"{100.0*i/len(ks)}%")
        y_pred, y_prob = predict_for_k(distances, indices, y_train.values, classes, k)
        results.append(get_results(None, X_val, y_val, y_pred, y_prob))
    return results, ks

In [None]:
folds = {}
ks = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 15, 18, 20, 25, 30, 50, 70, 90, 100]
skf = StratifiedKFold(n_splits=3, shuffle=True, random_state=0)
for fold, (train_index, test_index) in enumerate(skf.split(knn_X_train, knn_y_train), 1):
    X_train, y_train, X_val, y_val = knn_X_train.iloc[train_index], knn_y_train.iloc[train_index], knn_X_train.iloc[test_index], knn_y_train.iloc[test_index]
    distances, indices, classes = start_knn(X_train, X_val, y_train, y_val, metric='minkowski', p=2)
    knn_results, _ = run_knn(X_train, X_val, y_train, y_val, distances, indices, classes)
    folds[fold] = knn_results
knn_results = [{key: folds[key][i] for key in folds} for i in range(len(next(iter(folds.values()))))]
plot_metrics(knn_results, ks, "$k$", "knn_euclid")

In [None]:
folds = {}
ks = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 15, 18, 20, 25, 30, 50, 70, 90, 100]
skf = StratifiedKFold(n_splits=3, shuffle=True, random_state=0)
for fold, (train_index, test_index) in enumerate(skf.split(knn_X_train, knn_y_train), 1):
    X_train, y_train, X_val, y_val = knn_X_train.iloc[train_index], knn_y_train.iloc[train_index], knn_X_train.iloc[test_index], knn_y_train.iloc[test_index]
    distances, indices, classes = start_knn(X_train, X_val, y_train, y_val, metric='gower')
    knn_results, _ = run_knn(X_train, X_val, y_train, y_val, distances, indices, classes)
    folds[fold] = knn_results
knn_results = [{key: folds[key][i] for key in folds} for i in range(len(next(iter(folds.values()))))]
plot_metrics(knn_results, ks, "$k$", "knn_gower")

### Test set evaluation

In [None]:
from sklearn.metrics import roc_curve, auc
from sklearn.preprocessing import label_binarize
def plot_roc(y_val, y_scores, fname):
    """
    Plots per-class ROC curves.
    """
    classes = np.unique(y_val)
    y_test_bin = label_binarize(y_val, classes=classes)
    n_classes = y_test_bin.shape[1]
    
    fpr_dict = {}
    tpr_dict = {}
    roc_auc_dict = {}
    for i in range(n_classes):
        fpr_dict[i], tpr_dict[i], _ = roc_curve(y_test_bin[:, i], y_scores[:, i])
        roc_auc_dict[i] = auc(fpr_dict[i], tpr_dict[i])
    
    colors = plt.get_cmap("viridis", n_classes)
    plt.figure()
    for i in range(n_classes):
        plt.plot(fpr_dict[i], tpr_dict[i], label=f'Class {classes[i]} (AUC = {roc_auc_dict[i]:.2f})', color=colors(i))
    plt.plot([0, 1], [0, 1], 'k--', alpha=0.7)  # Reference line
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.legend(loc='lower right')
    
    plt.savefig(f"plots/roc_{fname}.pdf", bbox_inches='tight', dpi=150)
    plt.show()

#### CT evaluation

In [None]:
tree = DecisionTreeClassifier(criterion='entropy', random_state=0, class_weight="balanced")
tree.fit(tree_X_train, tree_y_train)
tree_y_scores = tree.predict_proba(tree_X_test)
tree_y_pred = tree.predict_proba(tree_X_test)

In [None]:
get_results(tree, tree_X_test, tree_y_test)

In [None]:
plot_roc(tree_y_test, tree_y_scores, "tree")

In [None]:
# num 0-1 probability predictions
print(f"{100.0*np.sum(np.all((tree_y_scores == 0) | (tree_y_scores == 1), axis=1))/len(tree_X_test):.3f}%")

#### KNN evaluation

In [None]:
%%time
distances, indices, classes = start_knn(knn_X_train, knn_X_test, knn_y_train, knn_y_test, metric='gower')
y_pred, y_scores = predict_for_k(distances, indices, knn_y_train.values, classes, 8)
get_results(None, knn_X_test, knn_y_test, y_pred, y_scores)

In [None]:
plot_roc(knn_y_test, y_scores, "knn")

In [None]:
print(f"{100.0*np.sum(np.all((y_scores == 0) | (y_scores == 1), axis=1))/len(knn_X_test):.3f}%")

### Training set evaluation

In [None]:
%%time
tree = DecisionTreeClassifier(criterion='entropy', random_state=0, class_weight="balanced")
tree.fit(tree_X_train, tree_y_train)
tree_y_scores = tree.predict_proba(tree_X_train)
tree_y_pred = tree.predict_proba(tree_X_train)
get_results(tree, tree_X_train, tree_y_train)

In [None]:
%%time
distances, indices, classes = start_knn(knn_X_train, knn_X_train, knn_y_train, knn_y_train, metric='gower')
y_pred, y_scores = predict_for_k(distances, indices, knn_y_train.values, classes, 8)
get_results(None, knn_X_train, knn_y_train, y_pred, y_scores)

### Statistical Tests

In [None]:
def mcnemar_test(y_true, y_pred_a, y_pred_b):
    """
    Runs McNemar test and finds p-value.
    """
    y_true = np.asarray(y_true)
    classes = np.unique(knn_y_test)
    if len(y_true.shape) == 1: y_true = label_binarize(y_true, classes=classes)
    if len(y_pred_a.shape) == 1: y_pred_a = label_binarize(y_pred_a, classes=classes)
    if len(y_pred_b.shape) == 1: y_pred_b = label_binarize(y_pred_b, classes=classes)
    a_correct = (np.asarray(y_pred_a) == y_true)
    b_correct = (np.asarray(y_pred_b) == y_true)

    b = int(np.sum(a_correct & ~b_correct))  # A correct, B wrong
    c = int(np.sum(~a_correct & b_correct))  # A wrong, B correct

    n = b + c
    if n == 0:
        # classifiers agree on every sample (both correct or both wrong)
        return {'b': b, 'c': c, 'statistic': 0.0, 'pvalue': 1.0}

    # continuity corrected chi-square
    chi2 = (abs(b - c) - 1)**2 / (b + c)
    p = 1 - stats.chi2.cdf(chi2, df=1)
    return {'b': b, 'c': c, 'statistic': chi2, 'pvalue': float(p)}

def wilcoxon(y_true, prob_a, prob_b, alternative='two-sided'):
    """
    Runs Wilcoxon signed-rank test and finds p-value.
    """
    y_true = np.asarray(y_true) - 1 # correct to indices from 1-based labels
    prob_a = np.asarray(prob_a)
    prob_b = np.asarray(prob_b)

    # If 2D arrays, extract predicted probability for the true class
    if prob_a.ndim == 2:
        p_true_a = prob_a[np.arange(len(y_true)), y_true]
    elif prob_a.ndim == 1:
        p_true_a = prob_a

    if prob_b.ndim == 2:
        p_true_b = prob_b[np.arange(len(y_true)), y_true]
    elif prob_b.ndim == 1:
        p_true_b = prob_b

    diffs = p_true_a - p_true_b
    print(f"Median difference: {np.median(diffs)}")
    print(f"Mean difference: {np.mean(diffs)}")
    nonzero = diffs != 0
    n_nonzero = int(np.sum(nonzero))
    if n_nonzero == 0:
        return {'statistic': 0.0, 'pvalue': 1.0, 'n_nonzero': 0}

    stat, p = stats.wilcoxon(diffs[nonzero], alternative=alternative)
    return {'statistic': float(stat), 'pvalue': float(p), 'n_nonzero': n_nonzero}

In [None]:
m_res = mcnemar_test(knn_y_test, tree_y_pred, y_pred)

w_res = wilcoxon(knn_y_test, tree_y_scores, y_scores, alternative='two-sided')

In [None]:
m_res

In [None]:
w_res