In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
from sklearn.preprocessing import StandardScaler
import pickle
import os
import joblib
from tensorflow.keras.models import load_model
import tensorflow as tf
import matplotlib.pyplot as plt
tf.random.set_seed(42)

In [None]:
clf=joblib.load('iscx_final_reoptimised_gan_cw_dt.pkl')

In [None]:
def load_and_preprocess(filepath):
    df = pd.read_csv(filepath, index_col=[0])
    # df=df[['SrcWin','sHops','dHops','sTtl','dTtl','SynAck','SrcBytes','DstBytes','SAppBytes',\
    #                    'Dur','TotPkts','TotBytes','TotAppByte','Rate','SrcRate','Label']]
    #Le = LabelEncoder()
    #df['Label'] = le.fit_transform(df['Label'])
    df=df[['SrcWin', 'sHops', 'sTtl', 'dTtl', 'SrcBytes', 'DstBytes', 'Dur', 'TotBytes', 'Rate','Label']]
    print(df.shape)
    print("loading data")
    X = df.iloc[:,:-1]
    y = df.iloc[:,-1]
    return X, y,df


In [None]:
data_path='../data/'
train_file = os.path.join(data_path, 'ISCX_training.csv')
test_file = os.path.join(data_path, 'ISCX_Testing.csv')
X_train, y_train,train_df = load_and_preprocess(train_file)
X_test, y_test,test_df = load_and_preprocess(test_file)
# Scale the features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

In [None]:
X_train_base, X_val_base, y_train_base, y_val_base = train_test_split(X_train_scaled,y_train, test_size=0.01, random_state=2, shuffle=True)

In [None]:
predictions = clf.predict(X_test_scaled)
print("Accuracy: ", accuracy_score(y_test, predictions))
print("Precision: ", precision_score(y_test, predictions))
print("Recall: ", recall_score(y_test, predictions))
print("F1 score: ", f1_score(y_test, predictions))
print("Confusion Matrix: \n", confusion_matrix(y_test, predictions))

In [None]:
val_prob = clf.predict_proba(X_val_base)
test_prob=clf.predict_proba(X_test_scaled)

In [None]:
def get_q_hat(cal_smx,cal_labels,alpha):
    n=cal_smx.shape[0]
    cal_scores = 1-cal_smx[np.arange(n),cal_labels]
    # 2: get adjusted quantile
    q_level = np.ceil((n+1)*(1-alpha))/n
    qhat = np.quantile(cal_scores, q_level, interpolation='higher')
    return qhat

In [None]:
def calculate_detailed_metrics_optimized(probabilities, true_labels, threshold):
    predicted_classes = np.argmax(probabilities, axis=1)
    is_correct = predicted_classes == true_labels

    final_prediction_sets = probabilities>= (1 - threshold)
    decision_vectorized = np.where(np.all(final_prediction_sets, axis=1) | np.all(~final_prediction_sets, axis=1), 'No-Conformity', 'Conform')
    decisions = np.array(decision_vectorized) == 'Conform'
    # Using boolean masks for calculations
    correct_accepted = np.logical_and(decisions, is_correct)
    correct_rejected = np.logical_and(~decisions, is_correct)
    incorrect_accepted = np.logical_and(decisions, ~is_correct)
    incorrect_rejected = np.logical_and(~decisions, ~is_correct)

    # Summarize results
    metrics = {
        'correct_accepted': np.sum(correct_accepted),
        'correct_rejected': np.sum(correct_rejected),
        'incorrect_accepted': np.sum(incorrect_accepted),
        'incorrect_rejected': np.sum(incorrect_rejected),
    }

    metrics['correct_total_acceptance_percentage'] = (metrics['correct_accepted'] / np.sum(is_correct)) * 100
    metrics['incorrect_total_rejection_percentage'] = (metrics['incorrect_rejected'] / np.sum(~is_correct)) * 100

    return metrics

In [None]:
def calculate_detailed_metrics_extended_optimized(probabilities,true_labels, threshold):
    predicted_classes = np.argmax(probabilities, axis=1)
    is_correct = predicted_classes == true_labels

    predicted_classes = np.argmax(probabilities, axis=1)
    is_correct = predicted_classes == true_labels

    final_prediction_sets = probabilities>= (1 - threshold)
    decision_vectorized = np.where(np.all(final_prediction_sets, axis=1) | np.all(~final_prediction_sets, axis=1), 'No-Conformity', 'Conform')
    decisions = np.array(decision_vectorized) == 'Conform'
    
    # Initialize metrics structure
    metrics = {
        'B': {'correct_accepted': 0, 'correct_rejected': 0, 'incorrect_accepted': 0, 'incorrect_rejected': 0},
        'M': {'correct_accepted': 0, 'correct_rejected': 0, 'incorrect_accepted': 0, 'incorrect_rejected': 0},
    }
    
    # Vectorized operations for each label
    for label in [0, 1]:  # Assuming binary labels: 0 ('B') and 1 ('M')
        label_key = 'B' if label == 0 else 'M'
        label_mask = true_labels == label
        
        # Boolean masks for conditions
        correct_mask = is_correct & label_mask
        incorrect_mask = ~is_correct & label_mask
        accepted_mask = decisions & label_mask
        rejected_mask = ~decisions & label_mask
        
        # Apply masks to compute metrics
        metrics[label_key]['correct_accepted'] = np.sum(correct_mask & accepted_mask)
        metrics[label_key]['correct_rejected'] = np.sum(correct_mask & rejected_mask)
        metrics[label_key]['incorrect_accepted'] = np.sum(incorrect_mask & accepted_mask)
        metrics[label_key]['incorrect_rejected'] = np.sum(incorrect_mask & rejected_mask)
    
    # Aggregate and calculate percentages
    metrics = calculate_percentages(metrics)
    
    return metrics

def calculate_percentages(metrics):
    for label in ['B', 'M']:
        ca = metrics[label]['correct_accepted']
        cr = metrics[label]['correct_rejected']
        ia = metrics[label]['incorrect_accepted']
        ir = metrics[label]['incorrect_rejected']
        
        total_correct = ca + cr
        total_incorrect = ia + ir
        
        metrics[label].update({
            'correctly_predicted_total': total_correct,
            'incorrectly_predicted_total': total_incorrect,
            'accepted_correct_percentage': 100 * ca / total_correct if total_correct else 0,
            'rejected_correct_percentage': 100 * cr / total_correct if total_correct else 0,
            'accepted_incorrect_percentage': 100 * ia / total_incorrect if total_incorrect else 0,
            'rejected_incorrect_percentage': 100 * ir / total_incorrect if total_incorrect else 0,
        })
    
    # Aggregate 'T' metrics
    metrics['T'] = aggregate_totals(metrics)
    
    return metrics

def aggregate_totals(metrics):
    t_metrics = {}
    for metric_key in metrics['B']:
        t_metrics[metric_key] = metrics['B'][metric_key] + metrics['M'][metric_key]
    
    # Calculate 'T' percentages based on aggregated totals
    t_ca = t_metrics['correct_accepted']
    t_cr = t_metrics['correct_rejected']
    t_ia = t_metrics['incorrect_accepted']
    t_ir = t_metrics['incorrect_rejected']
    t_total_correct = t_ca + t_cr
    t_total_incorrect = t_ia + t_ir
    
    t_metrics.update({
        'correctly_predicted_total': t_total_correct,
        'incorrectly_predicted_total': t_total_incorrect,
        'accepted_correct_percentage': 100 * t_ca / t_total_correct if t_total_correct else 0,
        'rejected_correct_percentage': 100 * t_cr / t_total_correct if t_total_correct else 0,
        'accepted_incorrect_percentage': 100 * t_ia / t_total_incorrect if t_total_incorrect else 0,
        'rejected_incorrect_percentage': 100 * t_ir / t_total_incorrect if t_total_incorrect else 0,
    })
    
    return t_metrics


In [None]:
def find_best_threshold_balanced(probabilities_val, true_labels):
    best_threshold = 0.0
    best_correct_accept = 0
    best_incorrect_reject = 0
    best_harmonic_mean = 0  # Best harmonic mean of F1-Accept and F1-Reject
    
    for threshold in np.linspace(0.5, 0.001, 200):
        ucsb_q_hat_lgb_nn=get_q_hat(probabilities_val,y_val_base_ucsb,threshold)
        metrics = calculate_detailed_metrics_extended_optimized(probabilities_val, true_labels,ucsb_q_hat_lgb_nn)
        correct_accept=metrics['T']['accepted_correct_percentage']
        incorrect_reject=metrics['T']['rejected_incorrect_percentage']
        
        # Calculate the harmonic mean of F1-Accept and F1-Reject
        if correct_accept > 0 and incorrect_reject > 0:  # Ensure no division by zero
            harmonic_mean = 2 * (correct_accept * incorrect_reject) / (correct_accept + incorrect_reject)
        else:
            harmonic_mean = 0
        
        if harmonic_mean > best_harmonic_mean:
            best_threshold = threshold
            best_correct_accept = correct_accept
            best_incorrect_reject = incorrect_reject
            best_harmonic_mean = harmonic_mean
    
    return best_threshold, best_correct_accept, best_incorrect_reject, best_harmonic_mean

In [None]:
def calculate_accept_reject_per_optimized(probabilities_test,probabilities_val, true_labels):
    thresholds = np.linspace(0.5, 0.001, 300)
    accepts, rejects = [], []
    for threshold in thresholds:
        ucsb_q_hat_lgb_nn=get_q_hat(probabilities_val,y_val_base.astype(int),threshold)
        metrics = calculate_detailed_metrics_optimized(probabilities_test, true_labels,ucsb_q_hat_lgb_nn )
        accepts.append(metrics['correct_total_acceptance_percentage'])
        rejects.append(metrics['incorrect_total_rejection_percentage'])

    return accepts, rejects

In [None]:
accept, reject = calculate_accept_reject_per_optimized(test_prob,val_prob, y_test.astype(int))



In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Assuming thresholds are defined as before
# Using your provided setup for thresholds
thresholds = np.linspace(0.5, 0.001, 300)
plt.figure(figsize=(10, 8))

# Define a marker style for each line for better visibility
marker_style = dict(linestyle='-', linewidth=3, marker='o', markersize=5, markerfacecolor='white', markevery=10)

# Plot for Uncorrected Entropy
plt.plot(thresholds, accept, label='$\mathbf{Accept \%}$', color='blue', marker='d', markersize=10, markerfacecolor='white', markevery=10)
plt.plot(thresholds, reject, label='$\mathbf{Reject \%}$', color='blue', linestyle='--', linewidth=3, marker='^', markersize=5, markerfacecolor='white', markevery=10)


plt.xticks(fontweight='bold')
plt.yticks(fontweight='bold')
plt.title('Acceptance and Rejection Percentages using Confromal Prediction', fontsize=16, fontweight='bold')
plt.xlabel('Threshold', fontsize=16, fontweight='bold')
plt.ylabel('Percentage', fontsize=16, fontweight='bold')
plt.legend()
plt.grid(True)
plt.savefig('iscx_threshold.png', format='png', dpi=300)
plt.show()


In [None]:
def find_best_threshold_balanced(probabilities_val, true_labels):
    best_threshold = 0.0
    best_correct_accept = 0
    best_incorrect_reject = 0
    best_harmonic_mean = 0  # Best harmonic mean of F1-Accept and F1-Reject
    
    for threshold in np.linspace(0.5, 0.001, 200):
        ucsb_q_hat_lgb_nn=get_q_hat(probabilities_val,y_val_base.astype(int),threshold)
        metrics = calculate_detailed_metrics_extended_optimized(probabilities_val, true_labels,ucsb_q_hat_lgb_nn)
        correct_accept=metrics['T']['accepted_correct_percentage']
        incorrect_reject=metrics['T']['rejected_incorrect_percentage']
        
        # Calculate the harmonic mean of F1-Accept and F1-Reject
        if correct_accept > 0 and incorrect_reject > 0:  # Ensure no division by zero
            harmonic_mean = 2 * (correct_accept * incorrect_reject) / (correct_accept + incorrect_reject)
        else:
            harmonic_mean = 0
        
        if harmonic_mean > best_harmonic_mean:
            best_threshold = threshold
            best_correct_accept = correct_accept
            best_incorrect_reject = incorrect_reject
            best_harmonic_mean = harmonic_mean
    
    return best_threshold, best_correct_accept, best_incorrect_reject, best_harmonic_mean

In [None]:
# Assuming 'probabilities' and 'true_labels' are defined as before
best_threshold, best_correct_accept, best_incorrect_reject, best_harmonic_mean_f1 = find_best_threshold_balanced(X_val_base,y_val_base.astype(int))

print(f"Optimal Threshold: {best_threshold}, Best F1-Accept: {best_correct_accept}, Best F1-Reject: {best_incorrect_reject}, Best Harmonic Mean F1: {best_harmonic_mean_f1}")

In [None]:
def get_q_hat(cal_smx,cal_labels,alpha):
    n=cal_smx.shape[0]
    cal_scores = 1-cal_smx[np.arange(n),cal_labels]
    # 2: get adjusted quantile
    q_level = np.ceil((n+1)*(1-alpha))/n
    qhat = np.quantile(cal_scores, q_level, interpolation='higher')
    return qhat

In [None]:
ucsb_q_hat=get_q_hat(test_prob,y_test.values.astype(int),best_threshold)
Metrics=calculate_detailed_metrics_extended_optimized(test_prob, y_test.values.astype(int),ucsb_q_hat)
Metrics