In [1]:
import pandas as pd
from sklearn.cluster import DBSCAN
from sklearn.neighbors import NearestNeighbors
import numpy as np
from scipy.spatial.distance import cdist
from kneed import KneeLocator
from sklearn.decomposition import PCA
from Style import Configure as Conf
import Measurements as M
from operator import itemgetter
import re

Dataset is a class created for orgenaize the data and the results.
The variables in the class are 3 tuple contains three list:
1. dataset paths.
2. the data frames.
3. the prediction.
one tuple for training datasets.
one for positive test set (contains normal datasets).
and one for negative test set ( contains abnormal datasets).

In [2]:
import Dataset as DS

In [3]:
def get_mic_topics(exp):
    if "panda" in exp:
        topics = []
        for epv in ['effort', 'position', 'velocity']:
            for i in range(1, 8):
                topics.append('panda_joint' + str(i) + ' ' + epv)
            for i in range(1, 3):
                topics.append('panda_finger_joint' + str(i) + ' ' + epv)
    else:
        topics = ['linear velocity x', 'linear velocity y', 'angular velocity z']
    return topics

In [4]:
def most_influence_feature_by_pca(datasets, n=25):
    trainings, positives, negatives = datasets
    # t_df = pd.concat(trainings, ignore_index=True)
    initial_feature_names = trainings[0].columns
    dic = dict.fromkeys(range(0, trainings[0].shape[1]), 0)
    for training in trainings:
        pca = PCA(n_components=n)
        pca = pca.fit(training)
        for arr in pca.components_:
            abs_arr = np.abs(arr)
            for i in range(len(abs_arr)):
                dic[i] += abs_arr[i]
    dic_most_important_feature = dict(sorted(dic.items(), key=itemgetter(1), reverse=True)[:n])
    most_important_index = list(dic_most_important_feature.keys())
    # get the names
    most_important_names = [initial_feature_names[most_important_index[i]] for i in range(n)]
    return most_important_names

In [5]:
def filter_topics(datasets, topics=None):
    if topics is None:
        topics = []
    trainings, positives, negatives = datasets
    flt_training = list(map(lambda df: df[topics], trainings))
    flt_positive = list(map(lambda df: df[topics], positives))
    flt_negative = list(map(lambda df: df[topics], negatives))
    return flt_training, flt_positive, flt_negative

# DBSCAN - Density-Based Spatial Clustering of Applications with Noise
DBSCAN algorithm need two parameter:

*EPS* - The maximum distance between two points for them to be considered neighbors. Points that are within eps distance of each other are considered part of the same cluster.

*min_group* - The minimum number of points required for a point to be considered a core point. Points that have fewer than min_samples neighbors are labeled as noise.

Calculate EPS:

In [6]:
def find_eps(x_train, min_group):
    nbrs = NearestNeighbors(n_neighbors=min_group - 1).fit(x_train)
    distances, indices = nbrs.kneighbors(x_train)
    distances = np.sort(distances, axis=0)
    distances = distances[:, 1]
    # plt.plot(distances)
    # plt.show()
    from scipy.signal import savgol_filter
    sg_distances = savgol_filter(distances, 51, 2)
    # plt.plot(range(1, len(sg_distances) + 1), sg_distances, range(1, len(sg_distances) + 1), distances)
    # plt.show()
    kneedle = KneeLocator(range(1, len(sg_distances) + 1),  # x values
                          sg_distances,  # y values
                          S=0,  # measure of how many “flat” points we expect to see in the unmodified data curve
                          curve="convex",  # parameter from figure concave/convex
                          online=True,
                          direction="increasing")  # parameter from figure
    eps = kneedle.knee_y
    Xtrain = x_train.to_numpy()
    eps_calibration = True
    predictions = []
    i_pred = []
    predicts = []
    while(eps_calibration):
        labels_ = DBSCAN(eps=eps, min_samples=min_group).fit_predict(Xtrain)
        predicts = labels_
        fp_labels = [i for i in labels_ if i < 0]
        if len(fp_labels)/len(labels_) > 0.1:
            eps += 0.04
        else:
            eps_calibration = False
    return eps

Input: data-sets
output: all training-sets in one dataframe, and all test-sets (positive and negative) in one dataframe, and all data-sets (training and test) in one dataframe

In [7]:
def concat_datasets(datasets):
    trainings, positives, negatives = datasets
    training = pd.concat(trainings, ignore_index=True)
    positive = pd.concat(positives, ignore_index=True)
    negative = pd.concat(negatives, ignore_index=True)
    x_train = training
    x_test = pd.concat([positive, negative], ignore_index=True)
    x_dfs = pd.concat([training, positive, negative], ignore_index=True)
    return x_train, x_test, x_dfs

input: get concate dataframe and concate predication, and the data-sets at the begining
output: datasets and prediction

In [8]:
def convert_concat_to_datasets_n_prediction(Xdf, Ydf, datasets):
    count = 0
    trainings, positives, negatives = datasets
    new_trainings_dataset = []
    new_trainings_predicts = []
    for train in trainings:
        dataset = Xdf.iloc[count:train.shape[0]+count, :]
        predicts = Ydf[count:train.shape[0]+count]
        new_trainings_dataset.append(dataset)
        new_trainings_predicts.append(predicts)
        count += train.shape[0]
    new_positives_dataset = []
    new_positives_predicts =[]
    for pos in positives:
        dataset = Xdf.iloc[count:pos.shape[0]+count, :]
        predicts = Ydf[count:pos.shape[0]+count]
        new_positives_dataset.append(dataset)
        new_positives_predicts.append(predicts)
        count += pos.shape[0]
    new_negatives_dataset = []
    new_negatives_predicts = []
    for neg in negatives:
        dataset = Xdf.iloc[count:neg.shape[0]+count, :]
        predicts = Ydf[count:neg.shape[0]+count]
        new_negatives_dataset.append(dataset)
        new_negatives_predicts.append(predicts)
        count += neg.shape[0]
    p_datasets = new_trainings_predicts, new_positives_predicts, new_negatives_predicts
    n_datasets = new_trainings_dataset, new_positives_dataset, new_negatives_dataset
    return n_datasets, p_datasets

In [9]:
def predict_sample(class_trains, sample, max_dist):
    min_dist = np.min(cdist(class_trains, [sample]))
    if max_dist >= min_dist:
        return 1
    return 0

run DBSCAN only on the training set will the test set examinee "online" based on the trainig set results

In [10]:
def run_dbscan(datasets, my_eps=0, min_group=0):
    Xtrain, Xtest, Xdfs = concat_datasets(datasets)
    if min_group == 0:
        min_group = Xdfs.shape[1] + 1 #based on יeuristic
    if my_eps == 0:
        my_eps = find_eps(x_train=Xtrain, min_group=min_group) 
    Xtrain = Xtrain.to_numpy()
    Xtest = Xtest.to_numpy()
    predicts = DBSCAN(eps=my_eps, min_samples=min_group).fit_predict(Xtrain)
    predictions = []
    i_pred = []
    for i in range(0, len(predicts), 1):
        if predicts[i] >= 0:
            predictions.append(1)
            i_pred.append(i)
        else:
            predictions.append(0)
    print("----------------------- EPS =     " + str(my_eps) + "    min group =    " + str(min_group) +
          "  -----------------------")
    # update the datasets_preds
    pos_train = [Xtrain[i] for i in i_pred]
    if len(i_pred) < 1:
        print("Error")
    
    #check for each sample in test-set if it anomaly or normal
    for sample in Xtest:
        predictions.append(predict_sample(pos_train, sample, my_eps))

    dfs, n_dfs = convert_concat_to_datasets_n_prediction(Xdfs, predictions, datasets)
    return dfs, n_dfs

# Predict

In [11]:
def find_max_longest_sequence(trainings_names, trainings_preds, alpha=0.95):
    longests = []
    for f, y_pred in zip(trainings_names, trainings_preds):
        longest = M.Measurements.longest_sequence(y_pred, Conf.NEGATIVE_LABEL)
        longests.append(longest)
    longests.sort()
    max_longest = longests[int(round(len(longests) * alpha)) - 1]
    # print(longests)
    # print("max:    " + str(max_longest))
    return max_longest

In [12]:
def dict_sum_preds(multi_paths_preds, max_longest, dict_preds={}):
    for paths_preds in multi_paths_preds:
        for f, y_pred in paths_preds:
            if f not in dict_preds:
                dict_preds[f] = 0
            warning = M.Measurements.count_warnings(y_pred, Conf.NEGATIVE_LABEL, max_longest + 1)
            if warning > 0:
                dict_preds[f] += 1
    return dict_preds

In [13]:
def sum_result(dict_results):
    tmp = "not exist file"
    count = 0
    anomaly_count = 0
    lst = []
    for key in dict_results.keys():
        if tmp not in key:
            if not count == 0:
                if 'type' in tmp:
                    tmp = 'normal ' 
                print(tmp+':')
                print("number of runs detected as anomaly: " + str(anomaly_count) + " from " + str(count))
                lst.append(anomaly_count/count)
            count = 0
            anomaly_count = 0
            tmp = key.split('\\')[1] # tmp == <normal/anomaly>\\<scenario>\\<run>
        count += 1
        if dict_results[key] > 0:
            anomaly_count += 1
    if 'type' in tmp:
        tmp = 'normal ' 
    print(tmp+':')
    print("number of runs detected as anomaly: " + str(anomaly_count) + " from " + str(count))
    lst.append(anomaly_count / count)
    return lst

In [14]:
def predict(data, mic_or_mac):
    trainings_preds, positives_preds, negatives_preds = data.get_predictions()
    trainings_names, positives_names, negatives_names = data.get_names()
    multi_paths_preds = [zip(positives_names, positives_preds), zip(negatives_names, negatives_preds)]
    max_longest = find_max_longest_sequence(trainings_names, trainings_preds)
    dict_preds = dict_sum_preds(multi_paths_preds, max_longest, {})
    print("summarize Result for " + " features:")
    return dict_preds

In [15]:
def union_results(mic_dict_preds, mac_dict_preds):
    union_dict_preds = {}
    for key in mic_dict_preds.keys():
        union_dict_preds[key] = mic_dict_preds[key] + mac_dict_preds[key]
    return sum_result(union_dict_preds)

# Run
The data is stored in CSV files. Each experiment has normal and abnormal data. Multiple runs residing in the same folder correspond to the same behavior (either normal or the same anomaly).

In [16]:
EXPERIMENTS = ["real_panda", "sim_panda", "real_turtlebot3", "sim_turtlebot3"]

for exp in EXPERIMENTS:
    df = pd.DataFrame()
    print("\n-------------------------------------------------\n ------------------" + exp + "------------------ \n-------------------------------------------------\n")

    exp_ds = DS.Datasets("data/"+exp+"/normal/", "data/"+exp+"/abnormal/", test_size=0.3)
    similar_columns = exp_ds.find_similar_columns_in_training()
    exp_ds.filter_by_columns(similar_columns)
    exp_ds.normalization()
    
    # creating 'micro' datasets
    mic_or_mac = "micro"
    mic_datasets = exp_ds.get_copied_datasets()
    mic_datasets = filter_topics(mic_datasets, get_mic_topics(exp))
    
    # predict
    print("\n ------------ Prediction based on spesific topic's values ------------ \n ")
    mic_datasets, mic_predictions = run_dbscan(mic_datasets)
    exp_ds.set_predictions(mic_predictions[0], mic_predictions[1], mic_predictions[2])
    mic_dict_preds = predict(exp_ds, mic_or_mac)
    df[exp+"_"+mic_or_mac] = sum_result(mic_dict_preds)

    # creating 'macro' datasests
    mic_or_mac = "macro"
    mac_datasets = exp_ds.get_copied_datasets()
    mac_datasets = DS.drop_topics(mac_datasets, get_mic_topics(exp)) # drop topics values from database, only statistic topics left
    mac_datasets = filter_topics(mac_datasets, most_influence_feature_by_pca(mac_datasets, n=27))
    
    # predict
    print("\n ------------ Prediction based on statistics on topics ------------ \n ")
    mac_datasets, mac_predictions = run_dbscan(mac_datasets)
    exp_ds.set_predictions(mac_predictions[0], mac_predictions[1], mac_predictions[2])
    mac_dict_preds = predict(exp_ds, mic_or_mac)
    df[exp+"_"+mic_or_mac] = sum_result(mac_dict_preds)
    
    #union predictions
    print("\n ---------------------- Union Results --------------------- \n")
    df[exp+"_union"] = union_results(mic_dict_preds, mac_dict_preds)
    df.to_excel('C:\\Users\Avior\PycharmProjects\ROS-DBSCAN\\'+exp+'-result.xlsx')


-------------------------------------------------
 ------------------real_panda------------------ 
-------------------------------------------------


 ------------ Prediction based on spesific topic's values ------------ 
 
----------------------- EPS =     0.4904604636324936    min group =    28  -----------------------
summarize Result for  features:
normal :
number of runs detected as anomaly: 0 from 10
change_obj_weight:
number of runs detected as anomaly: 18 from 18
gripper_attack:
number of runs detected as anomaly: 15 from 15
low_net_connection:
number of runs detected as anomaly: 4 from 17
miss_bubble:
number of runs detected as anomaly: 12 from 12

 ------------ Prediction based on statistics on topics ------------ 
 
----------------------- EPS =     1.0438122635752896    min group =    28  -----------------------
summarize Result for  features:
normal :
number of runs detected as anomaly: 0 from 10
change_obj_weight:
number of runs detected as anomaly: 0 from 18
gripper_at