In [None]:
import pandas as pd
from sklearn.metrics import f1_score
from sklearn.metrics import confusion_matrix
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import accuracy_score
import os

# Evaluated data from literature

**Note:** Run the *Apps Evaluation.ipynb* notebook first to generate the evaluation data 

**Note 2:** You can also use the already generated data from the `evaluation-data` repository present in the `raw/papers` folder

In [None]:
directory = "**path to the generated app evaluation data**" 

In [None]:
def evaluate(y_true, y_pred_matrix):
    eval_results = pd.DataFrame()
    for metric in y_pred_matrix.columns:
        y_pred = y_pred_matrix[metric]
        f1 = f1_score(y_true, y_pred)
        accuracy = accuracy_score(y_true, y_pred)
        precision = precision_score(y_true, y_pred)
        recall = recall_score(y_true, y_pred)
        tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
        row = pd.Series({'TP':tp,'TN':tn,'FP':fp,'FN':fn,'Accuracy': accuracy, 'Recall':recall, 'Precision': precision, 'F1':f1},name=metric)
        eval_results = eval_results.append(row)
    return eval_results

In [None]:
filenames = os.listdir(directory)
filecount = len(filenames)
eval_results_sum = 0
for filename in filenames:
    filepath = f"{directory}/{filename}"
    if os.path.isdir(filepath):
        continue
    results = pd.read_csv(filepath).drop(['state1','state2'], axis=1).astype(int)
    y_true = results.iloc[:,-1] # Extract the human classified labels
    y_pred_matrix = results.iloc[:,0:len(results.columns)-1] # Extract the classified labels
    eval_results = evaluate(y_true, y_pred_matrix)
    eval_results.to_csv(f'processed_{filename}')
    eval_results_sum += eval_results

In [None]:
eval_results_average = eval_results_sum/filecount
eval_results_average.to_csv(f'all_apps_avereged.csv')

In [None]:
eval_results_average

In [None]:
extracted_theirs = eval_results_average.T[['DOM_contentHash', 'DOM_Levenshtein', 'TLSH_hash2vec']]
extracted_theirs = extracted_theirs.rename(columns={'DOM_contentHash': 'TLSH Score (Hash)', 'DOM_Levenshtein': 'Levenshtein (DOM)', 'TLSH_hash2vec':'Euclidean (Hash)'})
extracted_theirs = extracted_theirs.T

# Extract data from our evaluation framework

**Note:** You can also use the already generated data from the `evaluation-data` repository present in the `raw/evaluation framework` folder

In [None]:
directory = "**Path to the directory where the data from our evaluation is contained*"

In [None]:
import json

with open(directory + '/endpoints.json') as f:
    endpoints = json.load(f)

with open(directory + '/interactions.json') as f:
    interactions = json.load(f)

### Parsing interactions as states

In [None]:
from tqdm import tqdm

state_names = list()
clustering_input = list()
for interaction in tqdm(interactions):
    state_hash = interaction['hash']
    state_name = interaction['_id']['$oid']
    dom = interaction['response']['data']
    clustering_input.append({'name': state_name, 'hash': state_hash, 'dom':dom})
    state_names.append(state_name)

In [None]:
import sys
sys.path.insert(0, '..') # Allow relative imports
from scanner.Detection.ClusteringBased.Clustering.DBSCANClustering import DBSCANClustering
clustering = DBSCANClustering()

In [None]:
n_clusters_nativ, labels_nativ = clustering.cluster(clustering_input, distance_type='tlsh', field_for_index='name')
print(f'DBSCAN TLSH Nativ found {n_clusters_nativ} clusters')

In [None]:
n_clusters_hash2vec, labels_hash2vec = clustering.cluster(clustering_input, distance_type='hash2vec', field_for_index='name')
print(f'DBSCAN TLSH Hash2Vec found {n_clusters_hash2vec} clusters')

In [None]:
n_clusters_levenshtein_dom, labels_levenshtein_dom = clustering.cluster(clustering_input, distance_type='levenshtein', field_for_index='dom')
print(f'DBSCAN TLSH Levenshtain DOM found {n_clusters_levenshtein_dom} clusters')

In [None]:
import itertools
state_pairs = list(itertools.combinations(state_names,2))

In [None]:
def detect_duplicates(labels, state_pairs, state_names):
    duplicate_states = []
    for state_pair in tqdm(state_pairs):
        first_state_idx = state_names.index(state_pair[0])
        first_state_label = labels[first_state_idx]

        second_state_idx = state_names.index(state_pair[1])
        second_state_label = labels[second_state_idx]

        if first_state_label == second_state_label:
            duplicate_states.append(state_pair)
    return duplicate_states

In [None]:
our_duplicates_dbscan_tlsh_nativ = detect_duplicates(labels_nativ, state_pairs, state_names)

In [None]:
print(f'Duplicates found with our TLSH nativ method: {len(our_duplicates_dbscan_tlsh_nativ)}')

In [None]:
our_duplicates_dbscan_tlsh_hash2vec = detect_duplicates(labels_hash2vec, state_pairs, state_names)

In [None]:
print(f'Duplicates found with our TLSH Hash2Vec method: {len(our_duplicates_dbscan_tlsh_hash2vec)}')

In [None]:
our_duplicates_dbscan_levenshtain_dom = detect_duplicates(labels_levenshtein_dom, state_pairs, state_names)

In [None]:
print(f'Duplicates found with our TLSH Levenshtain DOM method: {len(our_duplicates_dbscan_levenshtain_dom)}')

In [None]:
def create_classified_bool_array(duplicate_state_pairs):
    bool_array = []
    for state_pair in state_pairs:
        if state_pair in duplicate_state_pairs:
            bool_array.append(True)
        else:
            bool_array.append(False)
    return bool_array

In [None]:
def find_state_interaction_path(state_name):
    for interaction in interactions:
        if interaction['_id']['$oid'] == state_name:
            return interaction['request']['endpoint']['path']

In [None]:
ground_truth_bool_array = list()
for state_pair in state_pairs:
    first_state_name = state_pair[0]
    second_state_name = state_pair[1]

    first_state_interaction_path = find_state_interaction_path(first_state_name)
    second_state_interaction_path = find_state_interaction_path(second_state_name)

    duplicates = False
    if first_state_interaction_path == second_state_interaction_path:
        duplicates = True

    ground_truth_bool_array.append(duplicates)

In [None]:
results = pd.DataFrame()
results['state1'] = [i[0] for i in state_pairs]
results['state2'] = [i[1] for i in state_pairs]

In [None]:
results['TLSH_nativ'] = create_classified_bool_array(our_duplicates_dbscan_tlsh_nativ)
results['TLSH_hash2vec'] = create_classified_bool_array(our_duplicates_dbscan_tlsh_hash2vec)
results['Levenshtain_Dom'] = create_classified_bool_array(our_duplicates_dbscan_levenshtain_dom)
results['Ground Truth'] = ground_truth_bool_array

In [None]:
results

In [None]:
evaluation = results.drop(['state1','state2'], axis=1).astype(int)

In [None]:
y_true = evaluation.iloc[:,-1] # Extract the human classified labels

In [None]:
y_pred_matrix = evaluation.iloc[:,0:len(evaluation.columns)-1] # Extract the classified labels

In [None]:
from sklearn.metrics import f1_score
from sklearn.metrics import confusion_matrix
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import accuracy_score

eval_results = pd.DataFrame()

for metric in y_pred_matrix.columns:
    y_pred = y_pred_matrix[metric]
    f1 = f1_score(y_true, y_pred)
    accuracy = accuracy_score(y_true, y_pred)
    precision = precision_score(y_true, y_pred)
    recall = recall_score(y_true, y_pred)
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()

    row = pd.Series({'TP':tp,'TN':tn,'FP':fp,'FN':fn,'Accuracy': accuracy, 'Recall':recall, 'Precision': precision, 'F1':f1},name=metric)

    eval_results = eval_results.append(row)

In [None]:
eval_results

In [None]:
extracte_ours = eval_results.T[['TLSH_nativ', 'Levenshtain_Dom', 'TLSH_hash2vec']]
extracte_ours = extracte_ours.rename(columns={'TLSH_nativ': 'TLSH Score (Hash)', 'Levenshtain_Dom': 'Levenshtein (DOM)', 'TLSH_hash2vec':'Euclidean (Hash)'})
extracte_ours = extracte_ours.T

# Plot data

In [None]:
extracted_theirs

In [None]:
extracte_ours

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Numbers of pairs of bars you want
N = 3

# Position of bars on x-axis
ind = np.arange(N)

# Width of a bar
width = 0.3
offset = 0

fig, ax = plt.subplots(figsize = (6,3))
distance_metrics = list(extracte_ours.index)

acc_ours = extracte_ours['Accuracy']
acc_ours = round(acc_ours,2)
acc_ours = acc_ours[['Euclidean (Hash)', 'TLSH Score (Hash)', 'Levenshtein (DOM)']]

acc_theirs = extracted_theirs['Accuracy']
acc_theirs = round(acc_theirs,2)
acc_theirs = acc_theirs[['Euclidean (Hash)', 'TLSH Score (Hash)', 'Levenshtein (DOM)']]

ax.grid(linestyle = '--', linewidth = 0.5, axis = 'y', zorder=0)

ax.bar(ind, acc_theirs, width, zorder=3)

#ax.bar(ind, acc_theirs, width, yerr=acc_theirs.std(),zorder=3, capsize=5)
ax.bar(ind + width + offset, acc_ours, width, zorder=3)
#ax.bar(ind + width + offset, acc_ours, width, yerr=acc_ours.std(), zorder=3, capsize=5)

for container in ax.containers:
    ax.bar_label(container)

ax.set_title("Duplicate Detection Accuracy of Different Similarity Metrics")
ax.set_xlabel("Similarity Metric")
ax.set_ylabel("Accuracy")

ax.set_xticks(ind + width / 2, ['Euclidean', 'TLSH Score', 'Levenshtein'])
ax.set_ylim(0,1.1)

ax.legend(['Data by Yandrapally et al.', 'Our Evaluation Target'],loc='upper center', bbox_to_anchor=(0.5, -0.2),
      fancybox=True, shadow=False, ncol=2)

plt.savefig('distance_metrics_accuracy_ours_vs_literature.svg', bbox_inches='tight', format='svg')
