<font color='White'>***Libraries and Constants***</font>
---
--- 

In [23]:
# %pip install seaborn

In [24]:
import pprint
import pickle
import matplotlib.pyplot as plt
from sklearn.metrics import classification_report
import numpy as np
import seaborn as sns
from sklearn.metrics import confusion_matrix
ROUNDS = 40 
SPLITS = 1
CLASS_COUNT = 5
# Define a variable for the base path to use for all use cases
PATH = "FL:TrueDNN48-clients-11atk-40-rounds-3-epochs-0.0025-lr-500-groups"  # Change this to your desired path


<font color='Green'>***Metrics and Functions***</font>
---
--- 

In [25]:
# ============================================================
# AUTO-DETECT CLASSCOUNT METRIC FUNCTIONS (DROP-IN SAFE)
# ============================================================

def calculate_accuracy(actual_values, predicted_values):
    """Fraction of correct predictions."""
    if len(actual_values) == 0:
        return 0.0
    return sum(a == p for a, p in zip(actual_values, predicted_values)) / len(actual_values)


def _build_label_index(actual_values, predicted_values):
    """
    Auto-detect all class labels and map them to 0..K-1.
    This avoids IndexErrors and handles any label format.
    """
    unique_labels = sorted(set(actual_values) | set(predicted_values))
    label_to_idx = {lab: i for i, lab in enumerate(unique_labels)}
    return unique_labels, label_to_idx


def calculate_weighted_precision(actual_values, predicted_values):
    if len(actual_values) == 0:
        return 0.0

    labels, idx = _build_label_index(actual_values, predicted_values)
    K = len(labels)

    tp = [0] * K
    fp = [0] * K
    actual_count = [0] * K

    for a, p in zip(actual_values, predicted_values):
        ai = idx[a]
        pi = idx[p]
        actual_count[ai] += 1
        if a == p:
            tp[ai] += 1
        else:
            fp[pi] += 1

    total = len(actual_values)
    precision_sum = 0.0

    for i in range(K):
        denom = tp[i] + fp[i]
        precision_i = tp[i] / denom if denom > 0 else 0.0
        weight_i = actual_count[i] / total
        precision_sum += precision_i * weight_i

    return precision_sum


def calculate_weighted_recall(actual_values, predicted_values):
    if len(actual_values) == 0:
        return 0.0

    labels, idx = _build_label_index(actual_values, predicted_values)
    K = len(labels)

    tp = [0] * K
    fn = [0] * K
    actual_count = [0] * K

    for a, p in zip(actual_values, predicted_values):
        ai = idx[a]
        actual_count[ai] += 1
        if a == p:
            tp[ai] += 1
        else:
            fn[ai] += 1

    total = len(actual_values)
    recall_sum = 0.0

    for i in range(K):
        denom = tp[i] + fn[i]
        recall_i = tp[i] / denom if denom > 0 else 0.0
        weight_i = actual_count[i] / total
        recall_sum += recall_i * weight_i

    return recall_sum


def calculate_weighted_f1(actual_values, predicted_values):
    if len(actual_values) == 0:
        return 0.0

    labels, idx = _build_label_index(actual_values, predicted_values)
    K = len(labels)

    tp = [0] * K
    fp = [0] * K
    fn = [0] * K

    for a, p in zip(actual_values, predicted_values):
        ai = idx[a]
        pi = idx[p]
        if a == p:
            tp[ai] += 1
        else:
            fp[pi] += 1
            fn[ai] += 1

    total = len(actual_values)
    f1_sum = 0.0

    for i in range(K):
        prec_i = tp[i] / (tp[i] + fp[i]) if (tp[i] + fp[i]) > 0 else 0.0
        rec_i  = tp[i] / (tp[i] + fn[i]) if (tp[i] + fn[i]) > 0 else 0.0

        f1_i = (2 * prec_i * rec_i / (prec_i + rec_i)) if (prec_i + rec_i) > 0 else 0.0

        weight_i = (tp[i] + fn[i]) / total
        f1_sum += f1_i * weight_i

    return f1_sum


In [26]:
def update_values(values_list):
    return [value if value >= 0.5 else 0.5 for value in values_list]

In [27]:
def pad_list(values_list):
    while len(values_list) < ROUNDS:
        values_list.append(values_list[-1])
    return values_list

In [28]:
def print_classification_report(actual, predicted, class_labels=None):
    if class_labels is None:
        class_labels = sorted(set(actual) | set(predicted))
    report = classification_report(actual, predicted, target_names=[str(label) for label in class_labels])
    print("Classification Report:\n")
    print(report)

In [29]:
def save_classification_report(actual, predicted, file_path, class_labels=None):
    if class_labels is None:
        class_labels = sorted(set(actual) | set(predicted))
    report = classification_report(actual, predicted, target_names=[str(label) for label in class_labels])
    with open(file_path, 'w') as file:
        file.write("Classification Report:\n\n")
        file.write(report)
    print(f"Classification report saved to {file_path}")

<font color='Orange'>***Confusion Matrix***</font>
---
--- 

In [30]:
Label_names = ['Normal', 'DDoS_UDP', 'DDoS_ICMP', 'DDoS_TCP', 'DDoS_HTTP', 'Password', 'Vulnerability_scanner', 'SQL_injection']#, 'Uploading', 'Backdoor', 'Port_Scanning', 'XSS', 'Ransomware', 'MITM', 'OS_Fingerprinting']
Label_numbers = [0, 1, 2, 3, 4, 5, 6, 7]#, 8, 9, 10, 11, 12, 13, 14]

def plot_confusion_matrix_with_names(actual, predicted, label_numbers, label_names, title="Confusion Matrix"):
    """
    Plots a confusion matrix with label names instead of numbers.

    Args:
        actual (list): The list of actual class labels.
        predicted (list): The list of predicted class labels.
        label_numbers (list): The list of numeric label identifiers.
        label_names (list): The corresponding list of label names.
        title (str): The title of the confusion matrix plot.
    """
    # Map numeric labels to their names
    label_map = dict(zip(label_numbers, label_names))
    
    # Compute confusion matrix
    cm = confusion_matrix(actual, predicted, labels=label_numbers)
    
    # Replace numeric labels with names for the axes
    class_labels = [label_map[num] for num in label_numbers]
    
    # Create a heatmap
    plt.figure(figsize=(12, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='magma', xticklabels=class_labels, yticklabels=class_labels)
    
    # Add labels, title, and a color bar
    plt.xlabel("Predicted Labels")
    plt.ylabel("Actual Labels")
    plt.title(title)
    plt.tight_layout()
    plt.show()

In [31]:
# plot_confusion_matrix_with_names(Data["Perfect Post-Detection"]["Actual"][160], Data["Perfect Post-Detection"]["Predictions"][160], 
#                                  Label_numbers, Label_names, title="Confusion Matrix")

<font color='Red'>***Calculating and Saving Results***</font>
---
--- 

In [32]:
Data = {}
Data["Centralized"] = {}
Data["Centralized"]["Path"] = "DNNCent5atk_40_rounds_48_clients_3_epochs_64_batch_0.0025_lr_40_data_groups"  # Your centralized model output directory
Data["Centralized"]["Actual"] = {}
Data["Centralized"]["Predictions"] = {}
# Data["Perfect Post-Detection"] = {}
# Data["Confidence Thresholding-Round 4"] = {}
# Data["Outlier Removing-Round 4"] = {}
# Data["No Post-Detection-Round 4"] = {}
# Data["Confidence Thresholding-Round 10"] = {}
# Data["Outlier Removing-Round 10"] = {}
# Data["No Post-Detection-Round 10"] = {}
# Data["Perfect Post-Detection"]['Path'] = "Ideal_20250206"
# Data["Confidence Thresholding-Round 4"]['Path'] = "Confidence_20250214_4"
# Data["Outlier Removing-Round 4"]['Path'] = "Outlier_20250214_4"
# Data["No Post-Detection-Round 4"]['Path'] = "NoPD_20250214_4"
# Data["Confidence Thresholding-Round 10"]['Path'] = "Confidence_20250214_10"
# Data["Outlier Removing-Round 10"]['Path'] = "Outlier_20250214_10"
# Data["No Post-Detection-Round 10"]['Path'] = "NoPD_20250214_10"
for Usecase in Data:
    Data[Usecase]['Actual'] = {}
    Data[Usecase]['Predictions'] = {}
pprint.pprint(Data)

{'Centralized': {'Actual': {},
                 'Path': 'DNNCent5atk_40_rounds_48_clients_3_epochs_64_batch_0.0025_lr_40_data_groups',
                 'Predictions': {}}}


In [33]:
for Usecase in Data:
    for i in range(1, ROUNDS*SPLITS+1):
        try:
            filename = f"{Data[Usecase]['Path']}/Global_{i}_actual"
            with open(filename, 'rb') as file:
                Actual = pickle.load(file)
            Data[Usecase]['Actual'][i] = [item for sublist in Actual for item in sublist]
            filename = f"{Data[Usecase]['Path']}/Global_{i}_pred"
            with open(filename, 'rb') as file:
                Pred = pickle.load(file)
            Data[Usecase]['Predictions'][i] = [item for sublist in Pred for item in sublist]
        except FileNotFoundError:
            print(f"File not found: {filename}")
        except Exception as e:
            print(f"An error occurred while processing file {filename}: {e}")

In [34]:
# from collections import Counter
# value_counts = Counter(Data['Confidence Thresholding']['Predictions'][160])
# for value, count in value_counts.items():
#     print(f"Value: {value}, Count: {count}")

In [35]:
Results = {}
for Usecase in Data:
    Results[Usecase] = {}
for Usecase in Results:
    Results[Usecase]['Accuracy'] = []
    Results[Usecase]['Recall'] = []
    Results[Usecase]['Precision'] = [] 
    Results[Usecase]['F1_Score'] = []
pprint.pprint(Results)

{'Centralized': {'Accuracy': [], 'F1_Score': [], 'Precision': [], 'Recall': []}}


In [36]:

for Usecase in Results:
    for Round in range(1, ROUNDS*SPLITS+1):
        try:
            Results[Usecase]['Accuracy'].append(calculate_accuracy(Data[Usecase]['Actual'][Round], Data[Usecase]['Predictions'][Round]))
            Results[Usecase]['Precision'].append(calculate_weighted_precision(Data[Usecase]['Actual'][Round], Data[Usecase]['Predictions'][Round]))
            Results[Usecase]['Recall'].append(calculate_weighted_recall(Data[Usecase]['Actual'][Round], Data[Usecase]['Predictions'][Round]))
            Results[Usecase]['F1_Score'].append(calculate_weighted_f1(Data[Usecase]['Actual'][Round], Data[Usecase]['Predictions'][Round]))
        except KeyError:
            print('Usecase:', Usecase, 'Round:', Round)
            continue

In [37]:
# for Usecase in Results:
#     Results[Usecase]['Accuracy'] = update_values(Results[Usecase]['Accuracy'])
#     Results[Usecase]['Recall'] = update_values(Results[Usecase]['Recall'])
#     Results[Usecase]['Precision'] = update_values(Results[Usecase]['Precision'])    
#     Results[Usecase]['F1_Score'] = update_values(Results[Usecase]['F1_Score'])    

<font color='Light Blue'>***Save/ Load Calculated Results***</font>
---
--- 

In [38]:
with open(f"{PATH}/PD_Results.pkl", "wb") as file:
    pickle.dump(Results, file)

In [39]:
# with open("PD_Results.pkl", "rb") as file:
#     Results = pickle.load(file)

<font color='Green'>***Plotting Results***</font>
---
--- 

In [40]:
plt.style.use('ggplot')
LineStyle = ['ro-', 'b*-', 'ks-', 'gh-', 'm<-', 'yp-', 'b*-', 'gh-', 'rH-', 'c+-', 'mx-', 'ro-', 'b*-', 'ks-', 'gh-', 'y<-']
fig = plt.figure(figsize=(17.2, 13), dpi=450)
axx = fig.add_subplot(1,1,1)
plt.figure(dpi=1000)
# plt.figure(figsize=(8, 6))

x_axis = np.array(np.arange(1, ROUNDS+1, 1).tolist())
x_axis = np.insert(x_axis, 0, 0)
y_points = {}
for Usecase in Data:
    y_points[Usecase] = np.array(Results[Usecase]['Accuracy'])
    y_points[Usecase] = np.insert(y_points[Usecase], 0, 0.5)
index = 0
for Usecase in Data:
    axx.plot(x_axis, y_points[Usecase],LineStyle[index], label = Usecase, linewidth=2.5,  markersize=10)
    index += 1
    
axx.set_xlabel(f'{PATH}', fontdict={'fontsize': 36})
axx.set_ylabel('Detection Accuracy', fontdict={'fontsize': 36})
axx.set_xticks(np.arange(0, ROUNDS+1, 5).tolist()) 
axx.set_yticks(np.arange(0.30, 1.05, 0.05).tolist())
axx.legend(loc = 'lower right', prop={'size': 16})
axx.tick_params(axis='x', which='both', bottom=True, top=True, labelbottom=True, labeltop=True, labelsize=20, colors='black')
axx.xaxis.set_ticks_position('both')
axx.tick_params(axis='y', which='both', left=True, right=True, labelleft=True, labelright=True, labelsize=20, colors='black')
axx.yaxis.set_ticks_position('both')
axx.xaxis.label.set_color('black')
axx.yaxis.label.set_color('black')
fig.savefig(f"{PATH}/1_Accuracy.pdf", format="pdf", bbox_inches="tight")
plt.close(fig) 

<Figure size 6400x4800 with 0 Axes>

In [41]:
plt.style.use('ggplot')
LineStyle = ['ro-', 'b*-', 'ks-', 'gh-', 'm<-', 'yp-', 'b*-', 'gh-', 'rH-', 'c+-', 'mx-', 'ro-', 'b*-', 'ks-', 'gh-', 'y<-']
fig = plt.figure(figsize=(17.2, 13), dpi=450)
axx = fig.add_subplot(1,1,1)
plt.figure(dpi=1000)
# plt.figure(figsize=(8, 6))

x_axis = np.array(np.arange(1, ROUNDS+1, 1).tolist())
x_axis = np.insert(x_axis, 0, 0)
y_points = {}
for Usecase in Data:
    y_points[Usecase] = np.array(Results[Usecase]['Recall'])
    y_points[Usecase] = np.insert(y_points[Usecase], 0, 0.5)

index = 0
for Usecase in Data:
    axx.plot(x_axis, y_points[Usecase],LineStyle[index], label = Usecase, linewidth=2.5,  markersize=10)
    index += 1
    
axx.set_xlabel(f'{PATH}', fontdict={'fontsize': 36})
axx.set_ylabel('Detection Recall', fontdict={'fontsize': 36})
axx.set_xticks(np.arange(0, ROUNDS+1, 5).tolist()) 
axx.set_yticks(np.arange(0.30, 1.05, 0.05).tolist())
axx.legend(loc = 'lower right', prop={'size': 16})
axx.tick_params(axis='x', which='both', bottom=True, top=True, labelbottom=True, labeltop=True, labelsize=20, colors='black')
axx.xaxis.set_ticks_position('both')
axx.tick_params(axis='y', which='both', left=True, right=True, labelleft=True, labelright=True, labelsize=20, colors='black')
axx.yaxis.set_ticks_position('both')
axx.xaxis.label.set_color('black')
axx.yaxis.label.set_color('black')
fig.savefig(f"{PATH}/1_Wieghted_Recall.pdf", format="pdf", bbox_inches="tight")
plt.close(fig) 

<Figure size 6400x4800 with 0 Axes>

In [42]:
plt.style.use('ggplot')
LineStyle = ['ro-', 'b*-', 'ks-', 'gh-', 'm<-', 'yp-', 'b*-', 'gh-', 'rH-', 'c+-', 'mx-', 'ro-', 'b*-', 'ks-', 'gh-', 'y<-']
fig = plt.figure(figsize=(17.2, 13), dpi=450)
axx = fig.add_subplot(1,1,1)
plt.figure(dpi=1000)
# plt.figure(figsize=(8, 6))

x_axis = np.array(np.arange(1, ROUNDS+1, 1).tolist())
x_axis = np.insert(x_axis, 0, 0)
y_points = {}
for Usecase in Data:
    y_points[Usecase] = np.array(Results[Usecase]['Precision'])
    y_points[Usecase] = np.insert(y_points[Usecase], 0, 0.5)

index = 0
for Usecase in Data:
    axx.plot(x_axis, y_points[Usecase],LineStyle[index], label = Usecase, linewidth=2.5,  markersize=10)
    index += 1
    
axx.set_xlabel(f'{PATH}', fontdict={'fontsize': 36})
axx.set_ylabel('Detection Precision', fontdict={'fontsize': 36})
axx.set_xticks(np.arange(0, ROUNDS+1, 5).tolist()) 
axx.set_yticks(np.arange(0.30, 1.05, 0.05).tolist())
axx.legend(loc = 'lower right', prop={'size': 16})
axx.tick_params(axis='x', which='both', bottom=True, top=True, labelbottom=True, labeltop=True, labelsize=20, colors='black')
axx.xaxis.set_ticks_position('both')
axx.tick_params(axis='y', which='both', left=True, right=True, labelleft=True, labelright=True, labelsize=20, colors='black')
axx.yaxis.set_ticks_position('both')
axx.xaxis.label.set_color('black')
axx.yaxis.label.set_color('black')
fig.savefig(f"{PATH}/1_Wieghted_Precision.pdf", format="pdf", bbox_inches="tight")
plt.close(fig) 

<Figure size 6400x4800 with 0 Axes>

In [43]:
plt.style.use('ggplot')
LineStyle = ['ro-', 'b*-', 'ks-', 'gh-', 'm<-', 'yp-', 'b*-', 'gh-', 'rH-', 'c+-', 'mx-', 'ro-', 'b*-', 'ks-', 'gh-', 'y<-']
fig = plt.figure(figsize=(17.2, 13), dpi=450)
axx = fig.add_subplot(1,1,1)
plt.figure(dpi=1000)
# plt.figure(figsize=(8, 6))

x_axis = np.array(np.arange(1, ROUNDS+1, 1).tolist())
x_axis = np.insert(x_axis, 0, 0)
y_points = {}
for Usecase in Data:
    y_points[Usecase] = np.array(Results[Usecase]['F1_Score'])
    y_points[Usecase] = np.insert(y_points[Usecase], 0, 0.5)

index = 0
for Usecase in Data:
    axx.plot(x_axis, y_points[Usecase],LineStyle[index], label = Usecase, linewidth=2.5,  markersize=10)
    index += 1
    
axx.set_xlabel(f'{PATH}', fontdict={'fontsize': 36})
axx.set_ylabel('Detection F1_Score', fontdict={'fontsize': 36})
axx.set_xticks(np.arange(0, ROUNDS+1, 5).tolist()) 
axx.set_yticks(np.arange(0.30, 1.05, 0.05).tolist())
axx.legend(loc = 'lower right', prop={'size': 16})
axx.tick_params(axis='x', which='both', bottom=True, top=True, labelbottom=True, labeltop=True, labelsize=20, colors='black')
axx.xaxis.set_ticks_position('both')
axx.tick_params(axis='y', which='both', left=True, right=True, labelleft=True, labelright=True, labelsize=20, colors='black')
axx.yaxis.set_ticks_position('both')
axx.xaxis.label.set_color('black')
axx.yaxis.label.set_color('black')
fig.savefig(f"{PATH}/1_Wieghted_F1_Score.pdf", format="pdf", bbox_inches="tight")
plt.close(fig) 

<Figure size 6400x4800 with 0 Axes>

In [44]:
# Multi plot merger
import pickle
import numpy as np
import matplotlib.pyplot as plt

def load_results_from_path(path):
    """Load Results dictionary from PATH/PD_Results.pkl"""
    with open(f"{path}/PD_Results.pkl", "rb") as f:
        results = pickle.load(f)
    return results


def plot_merged_metric(paths, metric_name, labels=None, output=None):
    """
    Merge metric graphs from multiple experiment folders into one graph.

    paths      : list of folder paths (each folder must contain PD_Results.pkl)
    metric_name: "Accuracy", "Recall", "Precision", "F1_Score"
    labels     : legend names (optional)
    output     : file output name (.pdf). If None â†’ auto-generate name.
    """

    plt.style.use('ggplot')
    fig = plt.figure(figsize=(17.2, 13), dpi=450)
    axx = fig.add_subplot(1,1,1)

    LineStyle = ['ro-', 'b*-', 'ks-', 'gh-', 'm<-', 'yp-', 'c+-', 'mx-'] * 4

    if labels is None:
        labels = [f"Run {i+1}" for i in range(len(paths))]

    for idx, path in enumerate(paths):
        results = load_results_from_path(path)
        usecase = next(iter(results.keys()))  

        y = np.array(results[usecase][metric_name])
        y = np.insert(y, 0, 0.5)            
        x = np.arange(len(y))

        axx.plot(
            x, y,
            LineStyle[idx],
            label=labels[idx],
            linewidth=2.5,
            markersize=10
        )

    axx.set_xlabel("Rounds", fontdict={'fontsize': 36})
    axx.set_ylabel(f"Detection {metric_name}", fontdict={'fontsize': 36})
    axx.set_xticks(np.arange(0, len(y), 5))
    axx.set_yticks(np.arange(0.30, 1.05, 0.05))
    axx.legend(loc='lower right', prop={'size': 16})

    axx.tick_params(axis='x', which='both', bottom=True, top=True,
                    labelbottom=True, labeltop=True, labelsize=20, colors='black')
    axx.xaxis.set_ticks_position('both')
    axx.tick_params(axis='y', which='both', left=True, right=True,
                    labelleft=True, labelright=True, labelsize=20, colors='black')
    axx.yaxis.set_ticks_position('both')
    axx.xaxis.label.set_color('black')
    axx.yaxis.label.set_color('black')

    if output is None:
        output = f"Merged_{metric_name}.pdf"

    fig.savefig(output, format="pdf", bbox_inches="tight")
    plt.close(fig)

    print(f"[Saved] {output}")


def merge_all_metrics(paths, labels=None, out_prefix="Merged"):
    """Generate Accuracy, Recall, Precision, F1 merged graphs for all paths."""
    plot_merged_metric(paths, "Accuracy",    labels, f"{out_prefix}_Accuracy.pdf")
    plot_merged_metric(paths, "Recall",      labels, f"{out_prefix}_Recall.pdf")
    plot_merged_metric(paths, "Precision",   labels, f"{out_prefix}_Precision.pdf")
    plot_merged_metric(paths, "F1_Score",    labels, f"{out_prefix}_F1.pdf")

################################################
################################################
# Change paths and out_prefix as needed.
# Paths can contain any number of paths
paths = [
    "DNNCent5atk-40-rounds-1-epochs-0.0015-lr-40-batch-2560-size",
    "DNNCent5atk-40-rounds-3-epochs-0.0015-lr-40-batch-2560-size"
]
labels = [
    "1 Epoch",
    "3 Epochs"
]
merge_all_metrics(paths, labels, out_prefix="1 vs 3 Epochs")


[Saved] 1 vs 3 Epochs_Accuracy.pdf
[Saved] 1 vs 3 Epochs_Recall.pdf
[Saved] 1 vs 3 Epochs_Precision.pdf
[Saved] 1 vs 3 Epochs_F1.pdf
