In [1]:
import numpy as np
import pandas as pd
import seaborn as sns # libreria utile per matrice di confusione
# import plotly.express as px
from collections import Counter
from matplotlib import pyplot as plt
from sklearn import tree
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, fbeta_score, make_scorer, confusion_matrix, precision_recall_curve
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
import lightgbm as lgb
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.pipeline import Pipeline
import shap
import pickle
from sklearn.feature_selection import RFE
from sklearn.metrics import roc_curve, roc_auc_score
from sklearn.inspection import PartialDependenceDisplay
import sys

In [2]:
plt.rcParams["mathtext.fontset"]
plt.rcParams.update({'font.size': 28, 'font.family': 'STIXGeneral', 'mathtext.fontset': 'stix'})

In [3]:
transaction_frequency_opcode = ['address', 'balance', 'lifetime', 'tx_in', 'tx_out', 'investment_in',
 'payment_out', 'investment_to_contract/tx_in', 'payment_from_contract/tx_out',
 '#addresses_paying_contract', '#addresses_paid_by_contract', 'mean_v1', 'sdev_v1', 'mean_v2', 'sdev_v2', 'paid_rate',
 'paid_one', 'percentage_some_tx_in', 'sdev_tx_in', 'percentage_some_tx_out', 'sdev_tx_out', 'owner_gets_eth_Wo_investing',
 'owner_gets_eth_investing', 'owner_no_eth', 'PUSH', 'DUP', 'JUMPDEST', 'STOP', 'MSTORE', 'JUMPI', 'REVERT', 'CALLVALUE',
 'ISZERO', 'CODECOPY', 'RETURN', 'LOG', 'SHA3', 'MSTORE8', 'SWAP', 'POP', 'ADD', 'MLOAD', 'AND', 'SUB', 'CALLDATALOAD', 'EXP',
 'MUL', 'SLOAD', 'EQ', 'JUMP', 'DIV', 'CALLER', 'CALLDATACOPY', 'SSTORE', 'NOT', 'CALL', 'LT', 'GT', 'OR', 'ADDRESS', 'TIMESTAMP',
 'GASLIMIT', 'GAS', 'ORIGIN', 'BALANCE', 'CALLDATASIZE', 'SAR', 'MSIZE', 'CODESIZE', 'COINBASE', 'CREATE2', 'EXTCODESIZE', 'CALLCODE', 'SHL',
 'BLOCKHASH', 'RETURNDATASIZE', 'SHR', 'GETPC', 'DELEGATECALL', 'MOD', 'ADDMOD', 'NUMBER', 'XOR', 'SLT', 'EXTCODECOPY', 'MULMOD', 'CREATE', 'SELFDESTRUCT',
 'STATICCALL', 'RETURNDATACOPY', 'SGT', 'DIFFICULTY', 'SMOD', 'BYTE', 'SIGNEXTEND', 'CHAINID', 'SELFBALANCE', 'GASPRICE', 'EXTCODEHASH', 'SDIV', 'target']

In [4]:
best_features_set = ['lifetime',
 'tx_in',
 'investment_to_contract/tx_in',
 'mean_v1',
 'sdev_v1',
 'sdev_v2',
 'paid_one',
 'sdev_tx_in',
 'PUSH',
 'DUP',
 'JUMPDEST',
 'STOP',
 'MSTORE',
 'JUMPI',
 'REVERT',
 'CALLVALUE',
 'CODECOPY',
 'RETURN',
 'LOG',
 'SHA3',
 'MSTORE8',
 'SWAP',
 'POP',
 'ADD',
 'MLOAD',
 'AND',
 'SUB',
 'CALLDATALOAD',
 'EXP',
 'MUL',
 'SLOAD',
 'EQ',
 'DIV',
 'CALLER',
 'CALLDATACOPY',
 'SSTORE',
 'NOT',
 'CALL',
 'LT',
 'ADDRESS',
 'GASLIMIT',
 'GAS',
 'ORIGIN',
 'BALANCE',
 'COINBASE',
 'MULMOD',
 'SGT']

In [None]:
ds_full_path= (r"C:\Users\lucap\OneDrive\Desktop\Scam on blockchain\Smart Ponzi\DS_deployed_bytecode.csv")
db = pd.read_csv(ds_full_path, header = 0, delimiter = ',')
db = db.fillna(0)
db

In [None]:
df_transaction_weighted_opcode = db[transaction_frequency_opcode].copy()

y = df_transaction_weighted_opcode.loc[:, ['target']].values
y = pd.DataFrame(y,columns=['target'])

dataset_account = db.iloc[:, 0:24]
dataset_opcode = db.iloc[:, 24:100]

dataset_opcode['all_opcode'] = dataset_opcode[list(dataset_opcode.columns)].sum(axis=1) #generate all_opcode
dataset_opcode = dataset_opcode[:].div(dataset_opcode['all_opcode'], axis=0)
dataset_opcode = dataset_opcode.drop(['all_opcode'], axis = 1)

df_transaction_weighted_opcode = pd.concat([dataset_account, dataset_opcode, y], axis=1)
df_transaction_weighted_opcode = df_transaction_weighted_opcode.fillna(0)

df_transaction_weighted_opcode

In [7]:
col_name = transaction_frequency_opcode[1:-1]

In [8]:
addr= df_transaction_weighted_opcode.iloc[:, 0]
X = df_transaction_weighted_opcode.iloc[:, 1:-1].values # rimuovo la prima colonna che contiene gli indirizzi
y = df_transaction_weighted_opcode.loc[:, ['target']].values

addr = pd.DataFrame(addr)
X = pd.DataFrame(X)
X.columns = col_name

In [9]:
X['address'] = addr['address']

In [10]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.15, random_state = 42, stratify=y)

In [None]:
X_train

In [12]:
df_X_train_adr = pd.DataFrame(X_train,columns=df_transaction_weighted_opcode.columns[0:1])
df_X_test_adr = pd.DataFrame(X_test,columns=df_transaction_weighted_opcode.columns[0:1])

In [13]:
X_train = X_train.iloc[:, :-1].copy()
X_test = X_test.iloc[:, :-1].copy()

In [14]:
best_params =[

        {
            'classifier': [lgb.LGBMClassifier(boosting_type='gbdt', n_jobs=4, importance_type='split',random_state=42)],
            'classifier__objective': ['binary'],
            'classifier__is_unbalance': [True],
            'classifier__learning_rate' : [0.1],
            'classifier__n_estimators' : [140],
            'classifier__max_depth' : [15],
            'classifier__colsample_bytree' : [0.5],
            'classifier__reg_alpha' : [0.2],
            'classifier__reg_lambda' : [1],
        }
]

In [None]:
data = {}
pipeline = Pipeline(steps=[('classifier',RandomForestClassifier(random_state=42))])

grid_search = GridSearchCV(pipeline, best_params, cv=10,
                               scoring='roc_auc',
                               return_train_score=True,n_jobs=4,verbose=2)
grid_search.fit(X_train, y_train.ravel())

In [None]:
print(grid_search.best_params_)
print('Test accuracy: %.3f' % grid_search.score(X_test, y_test))

In [None]:
data = {}
y_pred_train = grid_search.best_estimator_.predict(X_train)
data['best_precision_training'] = [precision_score(y_train, y_pred_train)] # TP / (FP + TP)
data['best_recall_training'] = [recall_score(y_train, y_pred_train)]	     # TP / (FN + TP)
data['best_accuracy_training'] = [accuracy_score(y_train, y_pred_train)]	 # (TP + TN)/ (TP + FN + TN + FP)
data['best_f1_training'] = [f1_score(y_train, y_pred_train)]
# Confusion matrix
y_pred = grid_search.best_estimator_.predict(X_test)
data['best_precision_test'] = [precision_score(y_test, y_pred)] # TP / (FP + TP)
data['best_recall_test'] = [recall_score(y_test, y_pred)]	     # TP / (FN + TP)
data['best_accuracy_test'] = [accuracy_score(y_test, y_pred)]	 # (TP + TN)/ (TP + FN + TN + FP)
data['best_f1_test'] = [f1_score(y_test, y_pred)]

In [None]:
data

In [19]:
def plot_confusion_matrix(y_test,y_pred,name):
    cm = confusion_matrix(y_test, y_pred)
    plt.figure(figsize=(10,10))
    ax = sns.heatmap(cm, annot=True, cmap='Greens', cbar=False, fmt='d',annot_kws={"fontsize":38})
#     cbar = ax.collections[0].colorbar
#     cbar.ax.tick_params(labelsize=18)
    plt.xticks(fontsize=38)
    plt.yticks(fontsize=38)
    plt.savefig('cm_{}.png'.format(name))
    plt.show()

In [None]:
plot_confusion_matrix(y_test,y_pred,"Full features set")

In [21]:
model = grid_search.best_estimator_

In [None]:
df_X_train_full = pd.DataFrame(X_train,columns=df_transaction_weighted_opcode.columns[1:-1])
df_X_test_full = pd.DataFrame(X_test,columns=df_transaction_weighted_opcode.columns[1:-1])

df_X_train_full = df_X_train_full[best_features_set].copy()
df_X_test_full = df_X_test_full[best_features_set].copy()

model.fit(df_X_train_full,y_train.ravel())
y_pred_proba_best = model.predict_proba(df_X_test_full)[:, 1]

fpr, tpr, thresholds = roc_curve(y_test,y_pred_proba_best)
auc_score = roc_auc_score(y_test,y_pred_proba_best)


y_pred_best = model.predict(df_X_test_full)
best_precision_test= precision_score(y_test, y_pred_best) # TP / (FP + TP)
best_recall_test = recall_score(y_test, y_pred_best)	     # TP / (FN + TP)
best_accuracy_test = accuracy_score(y_test, y_pred_best)	 # (TP + TN)/ (TP + FN + TN + FP)
best_f1_test = f1_score(y_test, y_pred_best)

print('AUC: {:.3f}'.format(auc_score))
print('Precision: {:.3f}'.format(best_precision_test))
print('Recall: {:.3f}'.format(best_recall_test))
print('Accuracy: {:.3f}'.format(best_accuracy_test))
print('F1: {:.3f}'.format(best_f1_test))

In [25]:
np.set_printoptions(threshold=sys.maxsize)

In [None]:
plot_confusion_matrix(y_test,y_pred_best,"Best features set")

In [28]:
cls = grid_search.best_estimator_

In [None]:
explainer_tree = shap.TreeExplainer(cls.named_steps['classifier'])
shap_values_tree = explainer_tree.shap_values(df_X_test_full)

In [30]:
def get_shaps(classifier, dataset):
    print(dataset.shape)
    explainer = shap.Explainer(classifier, dataset)
    shap_values = explainer(dataset,check_additivity=False)

    explainer_tree = shap.TreeExplainer(classifier)
    shap_values_tree = explainer_tree.shap_values(dataset)
    return shap_values,shap_values_tree

In [31]:
def plot_beeswarn(df,shap_values,filename,class_index=1):
    shap.summary_plot(shap_values[class_index], df,show=False,max_display=12,color='red',color_bar=True,class_names=True)
    fig = plt.gcf() # gcf means "get current figure"
    ax = plt.gca() #gca means "get current axes"

    fig.axes[1].yaxis.get_label().set_fontsize(24)
    # fig.axes[1].set_fontsize(24)
    plt.xlabel('SHAP value',fontsize=20)
    plt.yticks(fontsize=20)
    plt.xticks(fontsize=20)
    plt.tight_layout()
    plt.savefig('{}.png'.format(filename))

In [32]:
explainer = shap.Explainer(cls.named_steps['classifier'], df_X_test_full)
shap_values = explainer(df_X_test_full,check_additivity=False)

In [None]:
shap_values, shap_values_tree = get_shaps(cls.named_steps['classifier'], df_X_test_full)

In [None]:
plot_beeswarn(df_X_test_full,shap_values_tree,'total',1)

In [35]:
from sklearn.inspection import PartialDependenceDisplay

In [36]:
top_twelve = [
 'CALLDATALOAD',
 'POP',
 'DIV',
 'SUB',
 'GASLIMIT',
 'paid_one',
 'SGT',
 'CALLVALUE',
 'MULMOD',
 'SWAP',
 'tx_in',
 'ORIGIN'
]

In [None]:
for x in top_twelve:
    pdp_display = PartialDependenceDisplay.from_estimator(cls, df_X_test_full, [x])
    plt.gca().set_facecolor('white')  # Set background color to white
    pdp_display.plot(line_kw={'linewidth': 2})
    plt.grid(color='gray', linestyle='-', linewidth=0.5)  # Add grid lines
    plt.gca().xaxis.grid(True)  # Show grid for x-axis
    plt.gca().yaxis.grid(True)  # Show grid for y-axis
    plt.show()

In [None]:
y_pred_proba_best = model.predict_proba(df_X_test_full)[:, 1]

In [None]:
model.predict_proba

In [None]:
y_pred_proba_best

False Negative and False Positive

In [41]:
df_check = df_X_test_full.copy()
df_check['prediction'] = y_pred
df_check['label'] = y_test
df_check['probability'] = y_pred_proba_best
df_check['correct'] = df_check.apply(lambda x: 1 if x.prediction == x.label else 0,axis=1)
df_check['address'] = df_X_test_adr

In [None]:
df_check

prediction	label	probability

In [None]:
df_check.sort_values(by=['probability'])

In [None]:
fig=plt.figure(figsize=(10,7))
sns.histplot(data=df_check,x='probability',hue='label',bins=np.arange(0,1.1,0.1),stat='probability',common_norm=False)
plt.xlabel('Estimated Probability being Ponzi',fontsize=18)
plt.ylabel('Density',fontsize=18)
plt.xticks(fontsize=14)
plt.yticks(fontsize=14)

plt.legend(loc='best',labels=['Ponzi','Not Ponzi'],fontsize=16)
plt.savefig('distribution_probability.png')

In [45]:
df_check['FP'] = df_check.apply(lambda x: 1 if x['prediction'] == 1 and x['correct'] ==0 else 0, axis=1 )
df_check['FN'] = df_check.apply(lambda x: 1 if x['prediction'] == 0 and x['correct'] ==0 else 0, axis=1 )

In [None]:
df_check

In [47]:
df_fp = df_check[df_check['FP']==1].copy()
df_fn = df_check[df_check['FN']==1].copy()

In [None]:
df_fp

In [None]:
df_fp["address"]

In [None]:
df_fn["address"]

In [79]:
pd.set_option("display.max_columns", 999)

In [None]:
df_fn.sort_index()

In [None]:
df_fp

In [53]:
df_shap_fn = df_fn.sort_values('probability',ascending=False)[df_X_test_full.columns].copy()

In [None]:
shap_fn, shap_fn_tree = get_shaps(cls.named_steps['classifier'],df_shap_fn)

In [None]:
plot_beeswarn(df_shap_fn,shap_fn_tree,'total',1)

In [None]:
shap.plots.bar(shap_fn[1],show_data=True)


In [None]:
df_shap_fp = df_fp.sort_values('probability',ascending=False)[df_X_test_full.columns].copy()
shap_fp, shap_fp_tree = get_shaps(cls.named_steps['classifier'],df_shap_fp)

In [None]:
plot_beeswarn(df_shap_fp,shap_fp_tree,'total',1)

In [None]:
shap.plots.bar(shap_fp[4],show_data=True)

In [None]:
shap.initjs()
shap.force_plot(shap_fp[1])


In [None]:

shap.force_plot(shap_fp[1])

PDP with SHAP

In [None]:
top_twelve

In [None]:
shap.dependence_plot("CALLDATALOAD", shap_values_tree[1], df_X_test_full)

In [None]:
shap.dependence_plot("tx_in", shap_values_tree[1], df_X_test_full)

In [None]:
shap.dependence_plot("tx_in", shap_values_tree[1], df_X_test_full,  interaction_index=None)