In [7]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import accuracy_score, classification_report
from sklearn.impute import SimpleImputer
from sklearn.svm import SVC
from sklearn.linear_model import SGDClassifier
from sklearn.metrics import classification_report, accuracy_score
import matplotlib.pyplot as plt
import time
import os


from google.colab import drive
drive.mount('/content/drive')

# convert 'Credit_Mix' to num
credit_mix_mapping = {
   'Good': 1.0,
   'Standard': 0.5,
   'Bad': 0.0
}

# convert 'Credit_History_Age' item to float
def convert_to_float(age_str):
   if isinstance(age_str, str):
       years, months = map(int, age_str.replace('Years and', '').replace('Months', '').split())
       return years + months / 12.0
   return age_str

def compute_kernel(X1, X2, kernel='rbf', gamma=1.0, degree=3, coef0=1.0):
    """Compute kernel matrix"""
    if kernel == 'linear':
        return X1 @ X2.T
    elif kernel == 'rbf':
        sq_dists = np.sum(X1**2, axis=1, keepdims=True) + np.sum(X2**2, axis=1) - 2 * X1 @ X2.T
        return np.exp(-gamma * sq_dists)
    elif kernel == 'poly':
        return (gamma * X1 @ X2.T + coef0) ** degree
    elif kernel == 'sigmoid':
        return np.tanh(gamma * X1 @ X2.T + coef0)

def interior_point_svm_binary(X, y, C=1.0, kernel='rbf', gamma=1.0, degree=3, coef0=1.0, tol=1e-6, max_iter=100, collect_data=False):
    """Binary SVM using Interior Point Method with kernel support"""
    start_time = time.time()
    n, d = X.shape
    K = compute_kernel(X, X, kernel, gamma, degree, coef0)
    alpha = np.ones(n) * 0.1
    mu = 1.0

    residuals, objectives, mus = [], [], []

    for iteration in range(max_iter):
        barrier_hess = 1/(alpha**2) + 1/((C - alpha)**2)
        H = K + mu * np.diag(barrier_hess)
        barrier_grad = -1/alpha + 1/(C - alpha)
        grad = np.ones(n) - K @ alpha + mu * barrier_grad
        delta_alpha = np.linalg.solve(H, -grad)

        step_size = 1.0
        while np.any(alpha + step_size * delta_alpha <= 0) or \
            np.any(alpha + step_size * delta_alpha >= C):
            step_size *= 0.5
            if step_size < 1e-10:
                break

        alpha += step_size * delta_alpha
        alpha = np.clip(alpha, 1e-10, C - 1e-10)

        residual = np.linalg.norm(delta_alpha)
        objective = 0.5 * np.sum(alpha * (K @ alpha)) - np.sum(alpha)

        residuals.append(residual)
        objectives.append(objective)
        mus.append(mu)

        mu *= 0.9

        if residual < tol:
            print('stop iteration:', iteration)
            break

    # For non-linear kernels, store alpha and support vectors
    support_idx = (alpha > 1e-6) & (alpha < C - 1e-6)
    if kernel == 'linear':
        w = X.T @ (alpha * y)
        b = np.mean(y[support_idx] - X[support_idx] @ w) if np.any(support_idx) else 0
        training_time = time.time() - start_time

        return w, b, training_time, (residuals, objectives, mus) if collect_data else (w, b, training_time)
    else:
        # For non-linear kernels, compute bias using support vectors
        if np.any(support_idx):
            K_sv = compute_kernel(X[support_idx], X, kernel, gamma, degree, coef0)
            b = np.mean(y[support_idx] - np.sum((alpha * y) * K_sv, axis=1))
        else:
            b = 0
        # Return alpha, X, y for prediction
        model = {'alpha': alpha, 'X': X, 'y': y, 'b': b, 'kernel': kernel, 'gamma': gamma, 'degree': degree, 'coef0': coef0}

    training_time = time.time() - start_time

    if collect_data:
        return model, training_time, (residuals, objectives, mus)
    return model, training_time

def predict_nonlinear(X_test, model):
    """Predict using non-linear SVM model"""
    K_test = compute_kernel(X_test, model['X'], model['kernel'], model['gamma'], model['degree'], model['coef0'])
    return np.sign(np.sum((model['alpha'] * model['y']) * K_test, axis=1) + model['b'])

def multiclass_svm_ovo(X_train, y_train, X_test, classes, C=1.0, kernel='rbf', gamma=1.0, degree=3, coef0=1.0, datascale='', data_folder = ''):
    """One-vs-One multiclass SVM"""
    n_classes = len(classes)
    classifiers = {}
    all_residuals = []
    labels = []

    print(f"Training {n_classes*(n_classes-1)//2} binary classifiers...")

    # Train binary classifiers for each pair
    classifier_count = 0
    for i in range(n_classes):
        for j in range(i+1, n_classes):
            classifier_count += 1
            print(f"\nTraining classifier {classifier_count}: classes {classes[i]} vs {classes[j]}")

            # Get samples for classes i and j
            mask = (y_train == classes[i]) | (y_train == classes[j])
            X_pair = X_train[mask]
            y_pair = y_train[mask]
            y_binary = np.where(y_pair == classes[i], 1, -1)

            # Train binary classifier and collect convergence data
            if kernel == 'linear':
                w, b, train_time, (residuals, objectives, mus) = interior_point_svm_binary(X_pair, y_binary, C, kernel, gamma, degree, coef0, collect_data=True)
                classifiers[(i, j)] = (w, b)
            else:
                model, train_time, (residuals, objectives, mus) = interior_point_svm_binary(X_pair, y_binary, C, kernel, gamma, degree, coef0, collect_data=True)
                classifiers[(i, j)] = model
            print(f"Training time: {train_time:.4f} seconds")
            all_residuals.append((residuals, objectives, mus))
            labels.append(f"Class {classes[i]} vs {classes[j]}")

    # Plot all convergence curves in one figure
    fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(18, 5))

    for (residuals, objectives, mus), label in zip(all_residuals, labels):
        ax1.semilogy(residuals, label=label)
        ax2.plot(objectives, label=label)
        ax3.semilogy(mus, label=label)

    ax1.set_title('Residual Convergence')
    ax1.set_xlabel('Iteration')
    ax1.set_ylabel('Residual (log scale)')
    ax1.legend()
    ax1.grid(True)

    ax2.set_title('Objective Function of the Dual Problem')
    ax2.set_xlabel('Iteration')
    ax2.set_ylabel('Objective Value')
    ax2.legend()
    ax2.grid(True)

    ax3.set_title('Barrier Parameter (Mu)')
    ax3.set_xlabel('Iteration')
    ax3.set_ylabel('Mu (log scale)')
    ax3.legend()
    ax3.grid(True)

    plt.tight_layout()
    plt.savefig(f'{data_folder}all_classifiers_convergence_{kernel}_{datascale}.png', dpi=150, bbox_inches='tight')
    plt.close()

    # Predict using voting
    predictions = []
    for x in X_test:
        votes = np.zeros(n_classes)
        for (i, j), classifier in classifiers.items():
            if kernel == 'linear':
                w, b = classifier
                pred = np.sign(x @ w + b)
            else:
                pred = predict_nonlinear(x.reshape(1, -1), classifier)[0]
            if pred > 0:
                votes[i] += 1
            else:
                votes[j] += 1
        predictions.append(classes[np.argmax(votes)])

    return np.array(predictions)


def processing_train_test(train_data, test_data):
    train_data['Credit_History_Age'] = train_data['Credit_History_Age'].apply(convert_to_float)
    test_data['Credit_History_Age'] = test_data['Credit_History_Age'].apply(convert_to_float)
    train_data['Credit_Mix'] = train_data['Credit_Mix'].map(credit_mix_mapping)
    test_data['Credit_Mix'] = test_data['Credit_Mix'].map(credit_mix_mapping)
    # convert 'Payment_Behaviour' to num
    unique_behaviors = train_data['Payment_Behaviour'].unique()
    behavior_mapping = {behavior: idx for idx, behavior in enumerate(unique_behaviors)}
    train_data['Payment_Behaviour'] = train_data['Payment_Behaviour'].map(behavior_mapping)
    test_data['Payment_Behaviour'] = test_data['Payment_Behaviour'].map(behavior_mapping)

    train_data = train_data.dropna()
    test_data = test_data.dropna()

    # feature selection
    features = ['Age', 'Annual_Income', 'Monthly_Inhand_Salary', 'Num_Bank_Accounts',
            'Num_Credit_Card', 'Interest_Rate', 'Num_of_Loan', 'Delay_from_due_date',
            'Num_of_Delayed_Payment', 'Changed_Credit_Limit', 'Num_Credit_Inquiries',
            'Credit_Mix', 'Outstanding_Debt', 'Credit_Utilization_Ratio', 'Credit_History_Age',
                'Amount_invested_monthly', 'Monthly_Balance', 'Payment_Behaviour']

    X_train = train_data[features]
    y_train = train_data['Credit_Score']
    X_test = test_data[features]
    y_test = test_data['Credit_Score']

    # convert label to num: good->2, standard->1, poor->0
    le = LabelEncoder()
    y_train = le.fit_transform(y_train)
    y_test = le.transform(y_test)

    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)

    return X_train, y_train, X_test, y_test

# Preprocess function
def preprocess_data(df):
    df = df.drop(['ID', 'Customer_ID'], axis=1)
    df['Name'].fillna('Unknown', inplace=True)
    df['Occupation'].fillna('Unknown', inplace=True)
    df['SSN'] = df['SSN'].replace('#F%$D@*&8', np.nan)
    df['SSN'].fillna('000-00-0000', inplace=True)
    df['Num_of_Loan'] = df['Num_of_Loan'].replace(-100, 0)
    df['Interest_Rate'] = df['Interest_Rate'].replace(-100, df['Interest_Rate'].median())
    return df


def run_svm_experiment(datascale='s', kernel='rbf', data_folder='/home/zitong/COMP6704/dataset/', C=1.0, gamma=0.1, degree=3, coef0=1.0):
    """Run SVM experiment with given parameters"""
    # Read data
    train_data_path = f'{data_folder}train_data_{datascale}.csv'
    test_data_path = f'{data_folder}test_data.csv'
    result_data_path = f'{data_folder}result_data.csv'

    train_data = pd.read_csv(train_data_path)
    test_data = pd.read_csv(test_data_path)

    print(f'\nExperiment: datascale={datascale}, kernel={kernel}')

    X_train, y_train, X_test, y_test = processing_train_test(train_data, test_data)
    classes = np.unique(y_train)

    # Train SVM
    print("Training SVM with Interior Point Method...")
    total_start_time = time.time()
    y_pred = multiclass_svm_ovo(X_train, y_train, X_test, classes, C, kernel, gamma, degree, coef0, datascale, data_folder)
    total_time = time.time() - total_start_time

    # Evaluate
    accuracy = accuracy_score(y_test, y_pred)
    print(f"Accuracy: {accuracy:.4f}, Time: {total_time:.4f}s")

    # Save results
    results_df = pd.DataFrame({
        'datascale': [datascale],
        'kernel': [kernel],
        'timing': [total_time],
        'accuracy': [accuracy]
    })
    results_df.to_csv(result_data_path, mode='a', header=not os.path.exists(result_data_path), index=False)

    return accuracy, total_time

# Run experiments
data_folder = '/content/drive/MyDrive/COMP6704_dataset/'  # Change this path as needed
for datascale in ['s', 'm', 'l']:
    for kernel in ['linear', 'rbf', 'poly']:
        run_svm_experiment(datascale, kernel, data_folder)

print(f"\nAll experiments completed. Results saved to {data_folder}result_data.csv")


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).

Experiment: datascale=s, kernel=linear
Training SVM with Interior Point Method...
Training 3 binary classifiers...

Training classifier 1: classes 0 vs 1
Training time: 51.8767 seconds

Training classifier 2: classes 0 vs 2
Training time: 168.5532 seconds

Training classifier 3: classes 1 vs 2
Training time: 223.9470 seconds
Accuracy: 0.5793, Time: 446.3070s

Experiment: datascale=s, kernel=rbf
Training SVM with Interior Point Method...
Training 3 binary classifiers...

Training classifier 1: classes 0 vs 1
Training time: 53.4103 seconds

Training classifier 2: classes 0 vs 2
Training time: 166.6525 seconds

Training classifier 3: classes 1 vs 2
Training time: 219.6823 seconds
Accuracy: 0.5469, Time: 451.6788s

Experiment: datascale=s, kernel=poly
Training SVM with Interior Point Method...
Training 3 binary classifiers...

Training classifier 1: classes 0 vs