In [None]:
%matplotlib widget

In [None]:
import sys
sys.path.append("..")
import pandas as pd
from pathlib import Path
from tqdm import tqdm
import json
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.manifold import TSNE
from sklearn.preprocessing import OrdinalEncoder, LabelEncoder
from data.loaders import load_capas_from_jsons, get_train_test_data, extract_capa_for_model
from catboost import CatBoost, CatBoostRegressor, Pool
from catboost import CatBoostClassifier
from catboost import CatBoostRegressor, EShapCalcType, EFeaturesSelectionAlgorithm

In [None]:
def _load_split(features_df, split_ver):
    if split_ver=='v3':
        train_split, test_split, hard_test_split = get_train_test_data(train_test_split_dir, ver=split_ver)
        train_index  = np.intersect1d(train_split.index, features_df.index)
        test_index  = np.intersect1d(test_split.index, features_df.index)
        hard_test_index  = np.intersect1d(hard_test_split.index, features_df.index)
        features_train = features_df.loc[train_index]
        features_test = features_df.loc[test_index]
        hard_features_test = features_df.loc[hard_test_index]

    else:
        train_split, test_split = get_train_test_data(train_test_split_dir, ver=split_ver)

        train_index  = np.intersect1d(train_split.index, features_df.index)
        test_index  = np.intersect1d(test_split.index, features_df.index)
        features_train = features_df.loc[train_index]
        features_test = features_df.loc[test_index]
        hard_features_test = pd.DataFrame(columns=features_df.columns)
    return features_train, features_test, hard_features_test

In [None]:
base_dir = r"C:\Users\stav\data\whodis\parsed\CAPAs"
train_test_split_dir = r'C:\Users\stav\data\whodis\train_test_split'

In [None]:
cat_dfs= load_capas_from_jsons(base_dir)
capas_df = cat_dfs['capas']
mbcs_df = cat_dfs['mbc']
features_encoder = LabelEncoder(); features_encoder.fit(capas_df['rule'].append(mbcs_df['objective']))
capas_df['rule'] = features_encoder.transform(capas_df.rule).astype(str)
mbcs_df['objective']=features_encoder.transform(mbcs_df.objective).astype(str)

In [None]:
# Prepare features
capas_features_df = extract_capa_for_model(capas_df, column='rule')
mbcs_features_df = extract_capa_for_model(mbcs_df,column='objective')

In [None]:
features_opts = 'both' #['capas','mbc','both']
if features_opts=='both':
    features_df = pd.concat([capas_features_df,mbcs_features_df.drop(columns=['label'])],axis=1).fillna(int(0))
    feature_names = features_df.drop(columns=['label']).columns
    features_df[feature_names]=features_df[feature_names].astype(int)
elif features_opts=='capas':
    features_df = capas_features_df
elif features_opts=='mbc':
    features_df = mbcs_features_df

In [None]:
split_type= 'family_as_bg' #['flat','family_as_bg']
if split_type=='flat':
    pass
elif split_type=='family_as_bg':
    families = ['orcus', '7ev3n', 'Emotet', 'Conti', 'SugarRansomware','not_apt','missing']
    apts = ['cozy','veno']
    
    transform_dict = {f:'family' for f in families}
    for apt in apts:
        transform_dict[apt]=apt
    transform_dict['wipbot']='veno'
    features_df['label'] = features_df.label.apply(transform_dict.get)
    
label_names = features_df['label'].unique()
le = LabelEncoder()
le.fit(features_df.label)
features_df['label'] = le.transform(features_df.label)

In [None]:
split_ver = 'v3' #['v1,'v2',v3']
features_train, features_test, hard_features_test=_load_split(features_df, split_ver)
train_pool = Pool(features_train.drop(['label'], 1), 
                  label=features_train.label)
                 #cat_features=features_train.drop(columns=['label']).columns.to_list())
test_pool = Pool(features_test.drop(['label'], 1),
                 label=features_test.label)
                #cat_features=features_train.drop(columns=['label']).columns.to_list())
hard_test_pool = Pool(hard_features_test.drop(['label'], 1),
                 label=hard_features_test.label)
                #cat_features=features_train.drop(columns=['label']).columns.to_list())

In [None]:
model_params_featur_select1 = dict(iterations=1500, learning_rate=0.01, auto_class_weights='Balanced', reg_lambda=20.0, grow_policy='Lossguide', 
                    depth=6, max_leaves=16, colsample_bylevel=0.5, loss_function='MultiClassOneVsAll')

selection_steps = 5
model1 = CatBoostClassifier(**model_params_featur_select1)
summary1 = model.select_features(
         Pool(features_train.drop(['label'], 1), label=features_train.label),
        eval_set=Pool(features_test.drop(['label'], 1), label=features_test.label),
        features_for_select=features_train.drop(columns=['label']).columns.to_list(),     # we will select from all features
        num_features_to_select=100,  # we want to select exactly important features
        steps=selection_steps,                                     # more steps - more accurate selection
        algorithm=EFeaturesSelectionAlgorithm.RecursiveByShapValues,
        shap_calc_type=EShapCalcType.Regular,            # can be Approximate, Regular and Exact
        train_final_model=True,                          # to train model with selected features
        logging_level='Verbose',
        plot=True
    )
selected_features = summary['selected_features_names']


In [None]:
X_train = features_train[selected_features]
y_train = features_train.label
X_test = features_test[selected_features]
y_test = features_test.label
X_test_hard = hard_features_test[selected_features]
y_test_hard = hard_features_test.label

In [None]:
print('Test Accuracy (Easy) {}'.format((model1.predict(X_test))==y_test).sum()/y_test.shape[0]))
print('Test Accuracy (hard) {}'.format((model1.predict(X_test_hard)==y_test_hard).sum()/y_test_hard.shape[0]))

In [2]:
# Ignore Features

ignore_features=['move file']
ignore_features_encoded = features_encoder.transform(ignore_features).astype(str)
split_ver = 'v3' #['v1,'v2',v3']
features_train, features_test, hard_features_test=_load_split(features_df.drop(columns=ignore_features_encoded), split_ver)

NameError: name 'features_encoder' is not defined

In [None]:
model_params_featur_select2 = dict(iterations=1500, learning_rate=0.01, auto_class_weights='Balanced', reg_lambda=20.0, grow_policy='Lossguide', 
                    depth=6, max_leaves=16, colsample_bylevel=0.5, loss_function='MultiClassOneVsAll')

selection_steps = 5
model2 = CatBoostClassifier(**model_params_featur_select1)
summary = {}
summary['selected_features_names'] = features_train.drop(columns=['label']).columns.to_list()
summary = model2.select_features(
        train_pool,
        eval_set=test_pool,
        features_for_select=features_train.drop(columns=['label']).columns.to_list(),     # we will select from all features
        num_features_to_select=100,  # we want to select exactly important features
        steps=selection_steps,                                     # more steps - more accurate selection
        algorithm=EFeaturesSelectionAlgorithm.RecursiveByShapValues,
        shap_calc_type=EShapCalcType.Regular,            # can be Approximate, Regular and Exact
        train_final_model=True,                          # to train model with selected features
        logging_level='Verbose',
        plot=True
    )