In [None]:
import os
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from sklearn import linear_model, preprocessing
from sklearn.metrics import RocCurveDisplay, auc, roc_curve, confusion_matrix
from sklearn.model_selection import KFold
from prettytable import PrettyTable

In [None]:
PATH_ROOT = os.path.join(Path().absolute().parent)
print(PATH_ROOT)

In [None]:
df_cm = pd.read_csv(os.path.join(PATH_ROOT, 'results', 'synth_cmeasures_nn.csv'))
df_cm = df_cm.sort_values(by=['Data', 'Rate'], ignore_index=True)
print(df_cm.shape)
print(df_cm.columns)
df_cm.head()

In [None]:
df_cm_rand = pd.read_csv(os.path.join(PATH_ROOT, 'results', 'synth_cmeasures_rand_nn.csv'))
df_cm_rand = df_cm_rand.sort_values(by=['Data', 'Rate'], ignore_index=True)
print(df_cm_rand.shape)
df_cm_rand.head()

In [None]:
# Split datasets into 3 difficulties
DIF_LEVELS = ['Hard', 'Normal', 'Easy']
N_BINS = len(DIF_LEVELS)
difficulty_mapping = {0: DIF_LEVELS[0], 1: DIF_LEVELS[1], 2: DIF_LEVELS[2]}

encoder = preprocessing.KBinsDiscretizer(n_bins=N_BINS, encode='ordinal', strategy='uniform')
acc_test_clean = df_cm[df_cm['Rate'] == 0]['Test'].to_numpy()
ordinal_clean = encoder.fit_transform(acc_test_clean.reshape([-1, 1])).flatten().astype(int)

print('Convert test set accuracy into ordinal data:')
for i in range(N_BINS):
    start = encoder.bin_edges_[0][i]
    end = encoder.bin_edges_[0][i+1]
    print('{} in [{:.3f}, {:.3f})'.format(DIF_LEVELS[i], start, end))

In [None]:
POISON_LVLS = np.round(np.arange(0, 0.41, 0.05, dtype=float), 2)
POISON_LVLS

In [None]:
datanames = df_cm[df_cm['Rate'] == 0]['Data'].to_numpy()
df_cm['Difficulty'] = 0
df_cm_rand
for data, dif in zip(datanames, ordinal_clean):
    idx = df_cm[df_cm['Data'] == data].index
    df_cm.loc[idx, ['Difficulty']] = dif

df_cm['Difficulty'] = df_cm['Difficulty'].replace(difficulty_mapping)
df_cm_rand['Difficulty'] = df_cm['Difficulty']

In [None]:
col_X = [
    'F1', 'F1 SD', 'F1v', 'F2', 'F3', 'F4', 'N1', 'N2', 'N2 SD',
    'N3 ', 'N3 SD', 'N4', 'N4 SD', 'T1', 'T1 SD', 'LSC', 'L1', 'L2', 'L3',
    'T2', 'T3', 'T4', 'C1', 'C2', 'Density', 'ClsCoef', 'Hubs', 'HubsSD'
]
col_y = 'Test'

In [None]:
alpha = 0.03
N_FOLD = 5

df_cm['prediction'] = 0
df_cm_rand['prediction'] = 0

# Split by clean data
indices_clean = df_cm[df_cm['Rate'] == 0].index
kf = KFold(n_splits=N_FOLD, shuffle=True)
for i_train, i_test in kf.split(indices_clean):
    idx_train_clean = indices_clean[i_train]
    idx_test_clean = indices_clean[i_test]

    data_train = df_cm.loc[idx_train_clean, 'Data']
    data_test = df_cm.loc[idx_test_clean, 'Data']

    idx_train = df_cm[df_cm['Data'].isin(data_train)].index
    idx_test = df_cm[df_cm['Data'].isin(data_test)].index
    assert len(idx_train) + len(idx_test) == df_cm.shape[0]

    X_train = df_cm.loc[idx_train, col_X]
    y_train = df_cm.loc[idx_train, col_y]
    regressor = linear_model.Ridge(alpha=alpha)
    regressor.fit(X_train, y_train)
    
    X_test = df_cm.loc[idx_test, col_X]
    df_cm.loc[idx_test, 'prediction'] = regressor.predict(X_test)

    # On Random noise
    X_test_rand = df_cm_rand.loc[idx_test, col_X]
    df_cm_rand.loc[idx_test, 'prediction'] = regressor.predict(X_test_rand)

In [None]:
THRESHOLD = 0.00

FONTSIZE = 14
FIGSIZE = (5, 5)

plt.rcParams["font.size"] = FONTSIZE
fig, ax = plt.subplots(figsize=FIGSIZE)
y_true_ = (df_cm['Rate'] > THRESHOLD).astype(int)
y_score_ = np.abs(df_cm['prediction'] - df_cm['Train'])
fpr, tpr, thresholds = roc_curve(y_true_, y_score_)
roc_auc_ = auc(fpr, tpr)
idx_t = np.argmax(np.sqrt((1 - fpr)**2 + tpr**2))
threshold = thresholds[idx_t]
roc_display = RocCurveDisplay(fpr=fpr, tpr=tpr, roc_auc=roc_auc_).plot(ax=ax)
ax.vlines(threshold, 0, 1, linestyles='dotted', label='threshold', colors='green')
plt.tight_layout()
plt.show()

In [None]:
print('C-Measure threshold:', threshold)

In [None]:
def get_table(df, y_true, results):
    df_ = df[['Difficulty', 'Rate']].copy()
    df_['Label'] = y_true
    df_['Prediction'] = results.astype(int)

    res_ = []
    names_row2 = []
    for d in DIF_LEVELS:
        for r in POISON_LVLS:
            names_row2.append('{} {}'.format(d, r))
            row = []
            for p in [0, 1]:
                condition = (df_['Difficulty'] == d) & (df_['Rate'] == r) & (df_['Prediction'] == p)
                row.append(df_[condition].shape[0])
            res_.append(row)
    res_ = np.array(res_)
    res2 = np.zeros((len(res_), 3))
    res2[:,0] = res_[:,0]
    res2[:,1] = res_[:,1]
    res2[:,2] = res_.sum(axis=1)

    res2[:,0] = res2[:,0] / res2[:,2]
    res2[:,1] = res2[:,1] / res2[:,2]
    res2[:,2] = res2[:,2] / res2[:,2]
    res2 = np.round(res2, 2)

    rates = np.tile(POISON_LVLS, reps=len(DIF_LEVELS))
    difficulties = np.repeat(DIF_LEVELS, repeats=len(POISON_LVLS))
    precision = res2[:,1]
    df_confusion_mat = pd.DataFrame({
        'Poisoning Rate': rates,
        'Dataset Difficulty': difficulties,
        'Precision (PPV)': precision,
    })
    return df_confusion_mat

In [None]:
results = (df_cm['Train'] - df_cm['prediction']) > threshold
y_true = (df_cm['Rate'] > THRESHOLD).astype(int)
df_confusion_mat = get_table(df_cm, y_true, results)
path_confusionmat = os.path.join(PATH_ROOT, 'results', 'synth_heatmap.csv')
df_confusion_mat.to_csv(path_confusionmat, index=False)

In [None]:
FONTSIZE = 13
FIGSIZE = (5, 4)

plt.rcParams["font.size"] = FONTSIZE
fig, ax = plt.subplots(figsize=FIGSIZE)
df_pivot = df_confusion_mat.pivot('Poisoning Rate', 'Dataset Difficulty', 'Precision (PPV)')
df_pivot = df_pivot[['Easy', 'Normal', 'Hard']]
sns.heatmap(df_pivot, ax=ax, annot=True, fmt='.2f', cmap="RdYlGn")
plt.yticks(rotation=0)
plt.tight_layout()
plot_output = os.path.join(PATH_ROOT, 'results', 'synth_heatmap.svg')
plt.savefig(plot_output, dpi=600)

# This image require manually edit in Inkescape!

In [None]:
results = (df_cm_rand['Train'] - df_cm_rand['prediction']) > threshold
y_true = np.zeros(df_cm_rand.shape[0])
df_confusion_mat = get_table(df_cm_rand, y_true, results)

In [None]:
df_cm_rand

In [None]:
FONTSIZE = 13
FIGSIZE = (5, 4)

plt.rcParams["font.size"] = FONTSIZE
fig, ax = plt.subplots(figsize=FIGSIZE)
df_pivot = df_confusion_mat.pivot('Poisoning Rate', 'Dataset Difficulty', 'Precision (PPV)')
df_pivot = df_pivot[['Easy', 'Normal', 'Hard']]
sns.heatmap(df_pivot, ax=ax, annot=True, fmt='.2f', cmap="RdYlGn")
plt.yticks(rotation=0)
plt.tight_layout()
# plot_output = os.path.join(PATH_ROOT, 'results', 'synth_heatmap.svg')
# plt.savefig(plot_output, dpi=600)
plt.show()