In [None]:
import pandas as pd
import numpy as np
import math
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import NearestNeighbors
from sklearn.neighbors import KNeighborsClassifier
from sklearn.neural_network import MLPClassifier
from joblib import Parallel, delayed
import matplotlib.pyplot as plt
from sklearn.utils import resample
from scipy.stats import norm
from sklearn.model_selection import train_test_split
import multiprocessing
import matplotlib.pyplot as plt
import time

In [None]:
# calculate r-separation distance of dataset
def get_nearest_oppo_dist(X, y, norm, n_jobs):
    if len(X.shape) > 2:
        X = X.reshape(len(X), -1)
    p = norm

    def helper(yi):
        return NearestNeighbors(n_neighbors=1, 
                                metric='minkowski', p=p, n_jobs=-1).fit(X[y != yi])

    nns = Parallel(n_jobs=n_jobs)(delayed(helper)(yi) for yi in np.unique(y))
    ret = np.zeros(len(X))
    for yi in np.unique(y):
        dist, _ = nns[yi].kneighbors(X[y == yi], n_neighbors=1)
        ret[np.where(y == yi)[0]] = dist[:, 0]

    return nns, ret

In [None]:
def sampling(position, label, distance, k_samples, seed):
    np.random.seed(seed)
    x_low = position - distance
    x_high = position + distance
    x_samples = np.random.uniform(x_low, x_high, (k_samples,2))
    sample_list = []
    
    for x_sample in x_samples:
        sample_list.append([x_sample[0], x_sample[1], label])

    return sample_list

In [None]:
# small k sampling can go wrong when in some seed all samples are excluded. e.g. k=1 will go wrong
def sampling_round(position, label, distance, k_samples, seed):
    #position = np.array([1, 2])
    #distance = 0.1
    np.random.seed(seed)
    x_low = position - distance
    x_high = position + distance
    x_samples = np.random.uniform(x_low, x_high, [k_samples,2])
    df_samples = pd.DataFrame(x_samples) 
    round_samples = []
    for x1,x2 in zip(df_samples[0],df_samples[1]):
        d = math.sqrt((x1-position[0])*(x1-position[0])+(x2-position[1])*(x2-position[1]))
        if d < distance:
            round_samples.append([x1,x2,label])
        
    return round_samples
    
#s = sampling_round(np.array([1, 1]), 0, 0.1, 1000)
#sdf = pd.DataFrame(s)
#print(sdf)
#plt.scatter(sdf.iloc[:,0], sdf.iloc[:,1], color='black')

In [None]:
# load training data from file
#dataset = '../input/siemens-aida/trainingdata_a.csv'
#df = pd.read_csv(dataset, sep=';')
#x = df[['x_i1','x_i2']].to_numpy()
#y = df['l_i'].to_numpy()

with np.load('./data_2d/B_sep.npz') as dataset:
    x = dataset['x']
    y = dataset['y']
print("Number of Datapoints in the set:%f" % len(x))

time0 = time.perf_counter()
dist = np.inf #1, 2, np.inf
nns, ret = get_nearest_oppo_dist(x, y, dist, -1)
print("2R-Separation Minimal: %f" % ret.min())
print("2R-Separation Mean: %f" % ret.mean())
epsilon = ret.min()/2
print("Epsilon: %f" % epsilon)
time1 = time.perf_counter()
print(f"Separation Calculation took {time1 - time0:0.3f} seconds")

In [None]:
timestart = time.perf_counter()

model_epsilon = epsilon
eval_epsilons = [epsilon
                ]
ks = [1, 2, 3, 5, 8, 10, 15, 20, 30, 40, 50]
ks_str = ', '.join(map(str, ks))
eval_epsilons_str = ', '.join(map(str, eval_epsilons))
avg_test_acc = np.empty([len(eval_epsilons), len(ks)])
std_test_acc = np.empty([len(eval_epsilons), len(ks)])

#msu = "minimal separation unrobustness" = increase in error rate when adding random noise to the test data 
#in a way that classes stay separated
#avg_msu = np.empty([len(model_epsilons)])
#std_msu = np.empty([len(model_epsilons)])

for idm, k in enumerate(ks):
    print("k =", k)
    
    runs = 300
    clearresult = np.empty([runs])
    epsilonresult = np.empty([runs])
    results_acc = np.empty([len(eval_epsilons),runs])

    for seed in range(runs):

        with np.load('./data_2d/B_sep.npz') as dataset:
            x = dataset['x']
            y = dataset['y']
        x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=seed)

        clf = RandomForestClassifier(n_estimators=100, n_jobs=-1, random_state=seed) 
        #clf_aug = RandomForestClassifier(n_estimators=100, n_jobs=-1, random_state=seed) 
        #clf = KNeighborsClassifier(n_neighbors = 2)
        #clf_aug = KNeighborsClassifier(n_neighbors = 2)

        #time2 = time.perf_counter()
        if model_epsilon == 0:
            clf.fit(x_train, y_train)  
        else:
            k_samples_train = k
            aug_list_train = []
            for p,y in zip(x_train,y_train):   
                aug_list_train.extend(sampling(p, y, model_epsilon, k_samples_train, seed))    
            df_aug_train = pd.DataFrame(aug_list_train)
            x_aug_train = df_aug_train.iloc[:,0:2]
            y_aug_train = df_aug_train.iloc[:,2]
            clf.fit(x_aug_train, y_aug_train)

        for ide, eval_epsilon in enumerate(eval_epsilons):

            if eval_epsilon == 0:
                acc = clf.score(x_test, y_test)
                results_acc[ide, seed] = acc
            else:
                aug_list_test = []
                k_samples_test = k
                for p,y in zip(x_test,y_test):   
                    aug_list_test.extend(sampling(p, y, eval_epsilon, k_samples_test, seed))    
                df_aug_test = pd.DataFrame(aug_list_test)
                x_aug_test = df_aug_test.iloc[:,0:2]
                y_aug_test = df_aug_test.iloc[:,2]
                acc = clf.score(x_aug_test, y_aug_test)
                results_acc[ide, seed] = acc
                
    #extra for-loop to get and write down the mean over all runs for every evaluation epsilon of one model_epsilon before training the next model_epsilon:
    for ide2, eval_epsilon2 in enumerate(eval_epsilons):
        avg_test_acc[ide2, idm] = results_acc[ide2,:].mean()*100    
        std_test_acc[ide2, idm] = results_acc[ide2,:].std()*100 
        if eval_epsilon2 == 0:
            clearresult = results_acc[ide2,:]
        if eval_epsilon2 == epsilon:
            epsilonresult = results_acc[ide2,:]
            
    #msu = "minimal separation unrobustness" = increase in error rate when adding random noise to the test data 
    # in a way that classes stay separated
    #avg_msu[idm] = (epsilonresult.mean() - clearresult.mean())/clearresult.mean()*100
    #std_msu[idm] = np.subtract(epsilonresult, clearresult).std()/clearresult.mean()*100
    
          
print(avg_test_acc)
print(std_test_acc)
#print(avg_msu)
#print(std_msu)
np.savetxt(
    './avg_testacc_over_ks.csv',
    avg_test_acc, fmt='%1.4f', delimiter=';', header='Networks trained with'
    ' different numbers of augmented points per data point (k = {}) along columns THEN evaluated on training set using (epsilon = {}) '
    ' along rows'.format(ks_str, eval_epsilons_str))

np.savetxt(
    './std_testacc_over_ks.csv',
    std_test_acc, fmt='%1.4f', delimiter=';', header='Networks trained with'
    ' different numbers of augmented points per data point (k = {}) along columns THEN evaluated on training set using (epsilon = {}) '
    ' along rows'.format(ks_str, eval_epsilons_str))


In [None]:
#this plots the accuracy and standard deviation over all ks

x = ks
y1 = std_test_acc
y2 = avg_test_acc

ax1 = plt.subplot(211)
plt.scatter(x, y2)
plt.ylabel("Test accuracy [%]", labelpad=None)
plt.title("Effect of parameter k on test accuracy", pad=10)
plt.yticks(np.arange(99, 101, 1))
plt.tick_params('x', labelbottom=False)

ax2 = plt.subplot(212, sharex=ax1)
plt.scatter(x, y1)
plt.xlabel("Number of augmented points k", labelpad=6)
plt.ylabel("Standard deviation [%]", labelpad=8)
plt.yticks(np.arange(0, 0.35, 0.1))
plt.savefig("effect_k.svg",bbox_inches='tight')
plt.show()