In [None]:
import os
import time
import json
import numpy as np
import pandas as pd
import xgboost as xgb
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split, StratifiedKFold, GridSearchCV
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score

# In this section, the main objective is to train the xgboost model and to compute the probabilities

# ...............................
# 0. Load and split the encoded data
# ...............................
current_dir = os.getcwd()
path_to_csv = os.path.join(current_dir, "data.csv")
df = pd.read_csv(path_to_csv, encoding='ISO-8859-1')

# Split data into Train+Cal (80%) and Test (20%)
X = df.drop(columns=['Main_mode'])
y = df['Main_mode']
X_tmp, X_test, y_tmp, y_test = train_test_split(
    X, y, test_size=0.20, stratify=y, random_state=42
)

# Split Train+Cal into Train (60%) and Calibration (20%)
X_train, X_cal, y_train, y_cal = train_test_split(
    X_tmp, y_tmp, test_size=0.25,  # 0.25 * 80% = 20% total
    stratify=y_tmp, random_state=42
)

# ...............................
# 1. Log-transforming continuous valued questions to reduce the effect of the outliers
# ...............................
log_cols = [
    'Work_home_distance_mode',
    'Work_home_distance_on_foot',
    'Work_home_travel_time_on_foot',
    'Number_of_employees'
]
for col in log_cols:
    if col in X_train:
        X_train[col] = np.log1p(X_train[col])
        X_cal[col]   = np.log1p(X_cal[col])
        X_test[col]  = np.log1p(X_test[col])

# ...............................
# 2. Identify numerical columns to normalize
# ...............................
# Load metadata
path_to_json = os.path.join(current_dir, "question_types.json")
with open(path_to_json, 'r', encoding='utf-8') as f:
    metadata = json.load(f)

numerical_cols = [
    col for col, meta in metadata.items()
    if meta.get('type') == 'numerical_value' and col in X_train.columns
]
# ...............................
# 3. Build preprocessing + model pipeline
# ...............................
preprocessor = ColumnTransformer(
    transformers=[
        ('num', StandardScaler(), numerical_cols)
    ],
    remainder='passthrough'
)

pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('clf', xgb.XGBClassifier(
        objective='multi:softprob',
        eval_metric='mlogloss',
        random_state=42
    ))
])

# ...............................
# 4. Hyperparameter tuning with CV on the Train set
# ...............................
param_grid = {
    'clf__max_depth': [4, 6, 8],
    'clf__learning_rate': [0.05, 0.1, 0.2],
    'clf__n_estimators': [100, 200],
    'clf__reg_alpha': [0, 0.1, 0.5, 1],
    'clf__reg_lambda': [0, 0.1, 1, 5]
}

cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
grid_search = GridSearchCV(
    estimator=pipeline,
    param_grid=param_grid,
    scoring='accuracy',
    cv=cv,
    verbose=1,
    n_jobs=-1,
    refit=True
)

start_time = time.time()
grid_search.fit(X_train, y_train)
print(f"Grid search took {time.time() - start_time:.1f}s")
print("Best hyperparameters:", grid_search.best_params_)

best_pipeline = grid_search.best_estimator_

# ...............................
# 5. Final evaluation on Test set
# ...............................
y_pred = best_pipeline.predict(X_test)
print("Test Set Accuracy:", accuracy_score(y_test, y_pred))
print(classification_report(y_test, y_pred, digits=3))
print("Confusion Matrix:\n", confusion_matrix(y_test, y_pred))


In [None]:
# In this section, we build up the conformal prediction layer using the calibration set and then construct
# the prediction sets using the test set. We then calculate the average set size and the global coverage

# ...............................
# 6. Inductive Mondrian Conformal Prediction (Building the layer and collecting the non-conformity scores)
# ...............................
P_cal = best_pipeline.predict_proba(X_cal)
classes = best_pipeline.classes_
alpha_mis = 0.1 # Which corresponds to 90 % of coverage

# 6a. Compute non-conformity scores per class
scores_by_class = {ℓ: [] for ℓ in classes}
for i, ℓ_true in enumerate(y_cal):
    p_true = P_cal[i, np.where(classes == ℓ_true)[0][0]]
    scores_by_class[ℓ_true].append(1.0 - p_true)

# 6b. Compute τ_ℓ using classic conformal quantile rule
tau_by_class = {}
for ℓ, scores in scores_by_class.items():
    scores = np.asarray(scores)
    n = len(scores)
    q_level = np.ceil((n + 1) * (1 - alpha_mis)) / n
    q_level = min(q_level, 1.0)
    tau_by_class[ℓ] = np.quantile(scores, q_level, method="higher")

print("Class-conditional τ thresholds (linear interpolation):", tau_by_class)

# ...............................
# 7. Mondrian CP prediction sets on Test set (Using the test sets to get the prediction sets)
# -------------------------------
P_test = best_pipeline.predict_proba(X_test)
prediction_sets = []
for i in range(len(X_test)):
    current_set = []
    for j, ℓ in enumerate(classes):
        nonconf_score = 1.0 - P_test[i, j]
        if nonconf_score <= tau_by_class[ℓ]:
            current_set.append(ℓ)
    prediction_sets.append(current_set)

# 7a. Evaluate coverage and set size
coverage = np.mean([y_test.iloc[i] in s for i, s in enumerate(prediction_sets)])
avg_set_size = np.mean([len(s) for s in prediction_sets])
print(f"Mondrian CP Coverage: {coverage:.3f}, Avg set size: {avg_set_size:.2f}")

In [None]:
# Having created the prediction sets, we then obtain the class conditional coverages

# ...............................
# 8. Get class conditional coverages
# ...............................
# 8a. Getting the classes from the fitted pipeline
classes = best_pipeline.classes_

# 8b. map numeric codes → mode names
label_map = {
    0: "feet",
    1: "auto",
    2: "bike",
    3: "PT",
    4: "motorbike",
    5: "multimodal"
}

# 8c. Compute coverage per class
coverage_by_class = {}
for ℓ in classes:
    idxs = np.where(y_test.to_numpy() == ℓ)[0]
    cov = np.mean([ℓ in prediction_sets[i] for i in idxs])
    coverage_by_class[ℓ] = cov

# 8d. Display the results
df_cov = pd.DataFrame.from_dict(coverage_by_class, orient='index', columns=['coverage'])
df_cov.index.name = 'class_code'
df_cov.reset_index(inplace=True)
df_cov['mode_name'] = df_cov['class_code'].map(label_map)

print(df_cov[['mode_name','coverage']])

In [None]:
# Given the effect of finite sample variations, we further enhance our conformal prediction layer using
# the wilson intervals

# ...............................
# 9. Compute Wilson interval for each class
# ...............................

# 9a. Compute the intervals
z = 1.96  # This is the standard-normal interval half-width value corresponding to 95% confidence interval
intervals = []

for ℓ in classes:
    idxs = np.where(y_test.to_numpy() == ℓ)[0]
    n = len(idxs)
    p_hat = coverage_by_class[ℓ]

    denominator = 1 + (z**2) / n
    center = (p_hat + (z**2) / (2 * n)) / denominator
    margin = (z * np.sqrt((p_hat * (1 - p_hat) / n) + (z**2 / (4 * n**2)))) / denominator

    lower = max(0.0, center - margin)
    upper = min(1.0, center + margin)

    intervals.append((ℓ, label_map[ℓ], p_hat, lower, upper))

# 9b. Display the results
df_intervals = pd.DataFrame(intervals, columns=['class_code', 'mode_name', 'coverage', 'wilson_lower', 'wilson_upper'])
print(df_intervals)