# Noise Detection Algorithm

The data presented are measurements of a gaussian beam for varying beam-frequencies and distances. Due to technical difficulties, our measuring device would sometimes crash and provide us with completely noisy data, or data that was only half complete. Our intent was to automize the measuring process, by making the lab-computer automatically evaluate the data. In the case of faulty data, it was supposed to restart the measurement.

# Importing packages

In [None]:
# general
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path # to extract data from registry
import pandas as pd # data frames
from mpl_toolkits.axes_grid1 import make_axes_locatable #adjust colorbars to axis

In [None]:
# machine learning
# spli data
from sklearn.model_selection import train_test_split
# process data
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
# classification algorithm
from sklearn.neighbors import KNeighborsClassifier
# pipeline
from sklearn.pipeline import Pipeline
# model evaluation
from sklearn.model_selection import StratifiedKFold
from sklearn.model_selection import cross_val_score
from sklearn.metrics import confusion_matrix

# Importing and processing Data

## Importing samples

In [None]:
p=Path('.')
# list(path.glob'./*.dat') finds all data ".dat" data in entered directory
paths=list([x for x in p.iterdir() if x.is_dir() and x.name=='Measurements'][0].glob('./*.dat')) #use ** to also include subregistries
#remember to change name if directory name is changed
# generate lists
path_names=list(map(lambda x:x.name, paths))
data_dict={path.name: np.genfromtxt(path,skip_header=1)[:,2] for path in paths}
# add images
size=np.int(np.sqrt(len(data_dict[path_names[0]])))
data_dict["images"]=[data_dict[name].reshape((size,size)) for name in path_names]

## Assigning Features

In [None]:
# targets

# key targets 0=noise, 1=okay data, 2= good data
three_targets={'map00Ghz-Rauschmessung-Gleb.dat':0,'map100GHz-2-Gleb.dat':2
               ,'map100GHz-Gleb.dat':0,'map105GHz-2-Gleb.dat':2,'map105GHz-Gleb.dat':0
               ,'map110GHz-19_06.-Gleb.dat':0,'map110GHz-test1-Gleb.dat':0
               ,'map70GHz-test1-Gleb.dat':2,'map70GHz-Tu16-Nadine.dat':0
               ,'map75GHz-Mo15-Nadine.dat':2,'map80GHz-Mo15-Nadine.dat':2
               ,'map85GHz-2-Gleb.dat':0,'map85GHz-3-Gleb.dat':0
               ,'map85GHz-4-Gleb.dat':0,'map85GHz-5-Gleb.dat':0
               ,'map85GHz-80cm-19_06.-Gleb.dat':2,'map85GHz-Gleb.dat':1
               ,'map85GHz-test6-Gleb.dat':1,'map85GHz-test7-Gleb.dat':2
               ,'map90GHz-2-Gleb.dat':2,'map90GHz-80cm-19_06.-Gleb.dat':2
               ,'map90GHz-Gleb.dat':1,'map95GHz-19_06.-Gleb.dat':0
               ,'map95GHz-2-19_06.-Gleb.dat':2,'map95GHz-2-Gleb.dat':2
               ,'map95GHz-80cm-19_06.-Gleb.dat':2,'map95GHz-Gleb.dat':2
               ,'map95GHz-Mo15-Nadine.dat':2,'map95GHz-nadine.dat':1
               ,'map95GHz-nadine2.dat':2,'map95GHz-test2-Gleb.dat':0
               ,'map95GHz-test3-Gleb.dat':0,'map95GHz-Tu16-Nadine.dat':0
               ,'map80GHz-1-120cm-23_06.-Gleb.dat':2,'map85GHz-1-120cm-23_06.-Gleb.dat':2
               ,'map85GHz-2-120cm-23_06.-Gleb.dat':2,'map85GHz-3-120cm-23_06.-Gleb.dat':2
               ,'ma70GHz-1-120cm-24_06.-Nadine.dat':2,'ma75GHz-1-120cm-24_06.-Nadine.dat':2
               ,'map100GHz-1-120cm-24_06.-Nadine.dat':2,'map105GHz-2-120cm-24_06.-Nadine.dat':2
               ,'map110GHz-1-120cm-24_06.-Nadine.dat':0,'map90GHz-1-120cm-23_06.-Gleb.dat':2
               ,'map95GHz-1-120cm-23_06.-Gleb.dat':2,'Rauschdaten_120mm.dat':0
}

## Forming Dictionary out of Data

In [None]:
y=np.zeros(len(three_targets)).astype(int)
X=np.zeros((len(three_targets),len(data_dict[path_names[0]])))
size=np.int(np.sqrt(len(data_dict[path_names[0]])))
names=len(three_targets)*[""]
# counter
count=0
# format data correctly
for name in path_names:
    if name in three_targets:
        names[count]=name
        y[count]=three_targets[name]
        X[count,:]=data_dict[name]
        count+=1
    else:
        print("{} not yet labeled".format(name))
beam_data={"data_names":names,"data":X,"target":y,
           "target_names":["full noise","half_noise","no noise"],"images":[x.reshape((size,size)) for x in X]}

## Plotting all data

In [None]:
#all data
rows=int(len(path_names) / 3) + (len(path_names) % 3 > 0) # how many rows
##############################
rows=1
##################################
# plot
fig = plt.figure(figsize=(18, 5*rows))
for i,name in enumerate([path_names[i] for i in [17,18,37]]):
    ax=plt.subplot(rows,3,i+1)
    ax.set_axis_off() # hide axis
    im=ax.imshow(data_dict["images"][list(data_dict.keys()).index(name)],cmap='jet', interpolation='nearest')
    if name in beam_data["data_names"]:
        ax.set_title("name: {} \n labeled: {}".format(name,beam_data["target_names"][beam_data["target"][list(beam_data["data_names"]).index(name)]]),fontsize=14)
    else:
        ax.set_title("name: {}\n not labeled yet".format(name),color="red")
        print("'{}'".format(name))
    #adjust colorbar to plot
    divider = make_axes_locatable(ax)
    cax = divider.append_axes("right", size="5%", pad=0.1)
    plt.colorbar(im, cax=cax)
plt.tight_layout()
#plt.savefig("questionable-labels.png")

# Feature Engineering

## rescaling

In [None]:
def rescale_local_max(X):
    return [X[i]/maxi for i,maxi in enumerate(np.amax(X,axis=1))]

In [None]:
def rescale_local_max(X):
    return [X[i]/(maxi-mini) for i,(maxi,mini) in enumerate(zip(np.amax(X,axis=1),np.amin(X,axis=1)))]

## rotating for symmetrical prediction (generalization)

In this dataset, the transition to half_noise always occurs from no_noise in the top region to full_noise in the bottom region. To generalize the algorithm for other problems, the half data will now be rotated

For even further generalization, all data in 2D representation can be repositioned to have their peak in the center. This way, dispositions of the curve location would not need to be considered within the model, which could improve not only generalization performace, but also general performance in general.

In [None]:
# rescale
X_total=rescale_local_max(beam_data["data"])
# take half data
X_half_noise=np.array(X_total)[np.where(beam_data["target"]==1)]
# X_data in total
X_added=np.zeros((X_half_noise.shape[0]*3,X_half_noise.shape[1]))
for i in range(X_half_noise.shape[0]):
        X_added[3*i,:]=np.rot90(X_half_noise[i].reshape((size,size))).flatten()
        X_added[3*i+1,:]=np.rot90(X_added[3*i,:].reshape((size,size))).flatten()
        X_added[3*i+2,:]=np.rot90(X_added[3*i+1,:].reshape((size,size))).flatten()
X_total=np.vstack((X_total,X_added))
y_total=np.concatenate((beam_data["target"],[1]*X_half_noise.shape[0]*3))

In [None]:
# provide example of rotated data
fig=plt.figure(figsize=(10,2.5))
ax=plt.subplot(1,4,1)
ax.imshow(X_half_noise[3,:].reshape((size,size)),cmap="jet")
ax.set_title("original")
ax.set_axis_off() # hide axis
for i in range(3):
    ax=plt.subplot(1,4,i+2)
    ax.imshow(X_added[9+i,:].reshape((size,size)),cmap="jet")
    ax.set_title("rotated by {}°".format(90*(i+1)))
    ax.set_axis_off() # hide axis
plt.tight_layout()
plt.savefig("rotated-maps.png")

In [None]:
# split sets
X_train, X_test, y_train, y_test = train_test_split(X_total,y_total, stratify=y_total, random_state=0)

# Machine Learning (applying PCA + knn)

In [None]:
X_train.shape

In this application we have few samples (40) and many features (441). To reduce the number of features, we apply Prinicipal Component Analysis (PCA), to reduce the dimension in feature space (Here from 221 to 2) and therefore improve the performance of our alogorithm. We use the k-neighbours classifier (knn), as it performs particulary well on small sample sizes.

## PCA and it's data rescaling results

In [None]:
# scale to mean=0, std=1
scaler = StandardScaler()
# fit scaling
scaler.fit(X_train)
# apply scaling
scaled_X_train=scaler.transform(X_train)
# n_components=amount of principal components
pca = PCA(n_components=2) # n_components=0.95 alternatively
# fit PCA model to beast cancer data
pca.fit(scaled_X_train)
# transform
pca_X_train = pca.transform(scaled_X_train)

In [None]:
# Create data
# PCA on train data
g0 = pca_X_train[np.where(y_train==0)]
g1 = pca_X_train[np.where(y_train==1)]
g2 = pca_X_train[np.where(y_train==2)]

train_data = (g0, g1, g2)
colors = ("red","orange","blue")
groups = ("full_noise", "half_noise","no_noise")
train_marker=("o")
# PCA on test data
# transform
scaled_X_test=scaler.transform(X_test)
pca_X_test = pca.transform(scaled_X_test)
# # # # #
h0 = pca_X_test[np.where(y_test==0)]
h1 = pca_X_test[np.where(y_test==1)]
h2 = pca_X_test[np.where(y_test==2)]
test_data = (h0, h1, h2)
test_marker=("^")
# Create plot
fig = plt.figure(figsize=(7,7))
ax = fig.add_subplot(1, 1, 1, )
# plot train-transform
for data, color, group in zip(train_data, colors, groups):
    x =data[:,0]
    y =data[:,1]
    ax.scatter(x, y, alpha=0.7, c=color, edgecolors='black', s=50, label="train:"+group, marker=train_marker)
# plot test-transform
for data, color, group in zip(test_data, colors, groups):
    x =data[:,0]
    y =data[:,1]
    ax.scatter(x, y, alpha=0.7, c=color, edgecolors='black', s=100, label="test:"+group, marker=test_marker)
    # labels
#plt.title('PCA-transformed plot')
plt.xlabel("prinicipal component Nr.1",fontsize=15)
plt.ylabel("principal component Nr.2",fontsize=15)
plt.legend(loc="best")
plt.tight_layout()
plt.savefig("PCA_map.png")
plt.show()

In [None]:
# plot components
fig, axes = plt.subplots(1, 2,figsize=(15,6))
for i, (component, ax) in enumerate(zip(pca.components_, axes.ravel())):
    im=ax.imshow(component.reshape((size,size)),cmap='jet',interpolation="nearest")
    ax.set_title("component Nr.{}".format(i+1),fontsize=30)
    ax.set_xlabel("Pixel in x-direction",fontsize=25)
    ax.set_ylabel("Pixel in y-direction",fontsize=25)
    fig.colorbar(im, ax=ax, fraction=0.046, pad=0.04)
#fig.suptitle('PCA component weights', fontsize=20)
plt.tight_layout()
plt.savefig("PCA-componenets.png")

## Creating and fitting pipeline

because of small sample-size, no parameter optimization will be conducted 
(as splitting the data in another validation set would reduce the already small test set)

In [None]:
# create pipeline
pipe = Pipeline([("scaler", StandardScaler()), ("component_analyzer", PCA(n_components=2)),
                 ("classifier", KNeighborsClassifier(n_neighbors=1))])# fitting
pipe.fit(X_train,y_train)


## Algorithm performance

### General evaluation via stratified KFold cross validation

stratified makes sure that all classes are represented in each training set. Shuffle makes sure, that the data is shuffled before it is split and is only necessary when the data is sorted.

In [None]:
kfold = StratifiedKFold(n_splits=5, shuffle=True, random_state=0)
print("Cross-validation scores:\n{}".format(
      cross_val_score(pipe, X_total, y_total, cv=kfold)))

### confusion matrix shows, how test samples were classified

In [None]:
# cross calidation confusion matrix
for i,split in enumerate(splits):
    pipe.fit(X_total[split[0]],y_total[split[0]])
    print("Split Nr.{}:\n{}\n".format(i,confusion_matrix(pipe.predict(X_total[split[1]]),y_total[split[1]])))

In [None]:
pipe.predict(X_total[splits[4][1]])[pipe.predict(X_total[splits[4][1]])!=y_total[splits[4][1]]][0]

In [None]:
beam_data["target_names"][pipe.predict(X_total[splits[4][1]])[pipe.predict(X_total[splits[4][1]])!=y_total[splits[4][1]]][0]]

In [None]:
# Beam that was misclassified
index=np.where(pipe.predict(X_total[splits[4][1]])!=y_total[splits[4][1]])
plt.figure(figsize=(5,5.5))
ax=plt.subplot(1,1,1)
ax.set_axis_off() # hide axis
ax.set_title("target= {}\npredicted= {}".format(beam_data["target_names"][y_total[index][0]],
            beam_data["target_names"][pipe.predict(X_total[splits[4][1]])[pipe.predict(X_total[splits[4][1]])!=y_total[splits[4][1]]][0]]),
            fontsize=25)
ax.imshow(X_total[index].reshape((size,size)), 
          cmap="jet")
plt.tight_layout()
plt.savefig("false-prediction")

## Evaluation of one singular split 

### predict test set

In [None]:
predictions=pipe.predict(X_test)

In [None]:
print(predictions)

In [None]:
print("confusion matrix:\n{}".format(confusion_matrix(y_test,predictions)))

### falsely classified in test data

In [None]:
# collect indices
index=[]
predictions=pipe.predict(X_test)
for i in range(len(y_test)):
    if y_test[i]!=predictions[i]:
        print(y_test[i],predictions[i])
        index.append(i)
    
# show false predictions in test_data
rows=int(len(index) / 3) + (len(index) % 3 > 0) # how many rows
# plot
fig = plt.figure(figsize=(20, 5*rows+1))
for i,ind in enumerate(index):
    ax=plt.subplot(rows,3,i+1)
    im=ax.imshow(X_test[ind].reshape((size,size)),cmap='jet', interpolation='nearest')
    ax.set_title("pred: {},\n correct: {}".format(beam_data["target_names"][predictions[ind]],
                                             beam_data["target_names"][y_test[ind]]))
    #adjust colorbar to plot
    divider = make_axes_locatable(ax)
    cax = divider.append_axes("right", size="5%", pad=0.1)
    plt.colorbar(im, cax=cax)

### falsely classified in train data (unreasonable here, due to knn:n_neighbours=1)

In [None]:
# collect indices
index=[]
predictions=pipe.predict(X_train)
for i in range(len(y_train)):
    if y_train[i]!=predictions[i]:
        print((y_train[i],predictions[i]))
        index.append(i)
        
# show false predictions in train data
rows=int(len(index) / 3) + (len(index) % 3 > 0) # how many rows
# plot
fig = plt.figure(figsize=(20, 5*rows+1))
for i,ind in enumerate(index):
    ax=plt.subplot(rows,3,i+1)
    im=ax.imshow(X_train[ind].reshape((size,size)),cmap='jet', interpolation='nearest')
    ax.set_title("pred: {},\n correct: {}".format(beam_data["target_names"][predictions[ind]],
                                             beam_data["target_names"][y_train[ind]]))
    #adjust colorbar to plot
    divider = make_axes_locatable(ax)
    cax = divider.append_axes("right", size="5%", pad=0.1)
    plt.colorbar(im, cax=cax)