In [None]:
import itertools
import time
from typing import Tuple, List

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn import svm
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import f1_score, precision_score, recall_score, accuracy_score, balanced_accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.naive_bayes import MultinomialNB
from sklearn.neighbors import KNeighborsClassifier
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.tree import DecisionTreeClassifier


## Declaring constants

In [None]:
BODMAS_METADATA_CSV = "/opt/work/bd/BODMAS/bodmas_metadata.csv"
BODMAS_CATEGORY_CSV = "/opt/work/bd/BODMAS/bodmas_malware_category.csv"
BODMAS_FEATURE_VECTORS_NPZ = "/opt/work/bd/BODMAS/bodmas_feature_vectors.npz/bodmas.npz"

## Defining utility functions

In [None]:
def read_bodmas_metadata() -> pd.DataFrame:
    df = pd.read_csv(BODMAS_METADATA_CSV)
    df.rename(columns={"sha": "sha256"}, inplace=True)
    df.set_index("sha256", inplace=True)
    return df


def read_bodmas_category() -> pd.DataFrame:
    return pd.read_csv(BODMAS_CATEGORY_CSV, index_col="sha256")


def read_bodmas_features() -> Tuple[np.ndarray, np.ndarray]:
    npz = np.load(BODMAS_FEATURE_VECTORS_NPZ)
    return npz["X"].astype(float), npz["y"]


def display_df(df: pd.DataFrame, title: str = None):
    print(f"=== DataFrame {f'<{title}>' if title else ''} shape: {df.shape}")
    return df


def display_np(np_array: np.ndarray, title: str = None):
    print(f"=== np.ndarray {f'<{title}>' if title else ''} shape: {np_array.shape}")
    return np_array


def display_df_columns(df: pd.DataFrame, column_name: str, top_k: int = 10, log_scale: bool = False):
    values = df[column_name].value_counts().to_frame().head(top_k)
    plt.figure(figsize=(20, 4))
    plt.plot(values, marker='o', linestyle='dashed', linewidth=1, markersize=12)
    if log_scale:
        plt.yscale('log')
    plt.xticks(rotation=45)
    plt.show()
    return values


def evaluate_predictions(labels_true, labels_pred):
    f1_micro = f1_score(labels_true, labels_pred, average="micro")
    f1_macro = f1_score(labels_true, labels_pred, average="macro")

    acc = accuracy_score(labels_true, labels_pred)
    bacc = balanced_accuracy_score(labels_true, labels_pred)
    prec_micro = precision_score(labels_true, labels_pred, average="micro")
    prec_macro = precision_score(labels_true, labels_pred, average="macro")
    rec_micro = recall_score(labels_true, labels_pred, average="micro")
    rec_macro = recall_score(labels_true, labels_pred, average="macro")

    metrics = f1_micro, f1_macro, acc, bacc, prec_micro, prec_macro, rec_micro, rec_macro, f1_micro, f1_macro
    return tuple([round(m, 4) for m in metrics])


pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
pd.set_option('display.expand_frame_repr', False)

## Declaring globals

In [None]:
BODMAS_METADATA = read_bodmas_metadata()
BODMAS_CATEGORY = read_bodmas_category()
BODMAS_FEATURES_X, BODMAS_FEATURES_y = read_bodmas_features()

## BODMAS Category info

In [None]:
display_df(BODMAS_CATEGORY, "Bodmas category info")

In [None]:
display_df_columns(BODMAS_CATEGORY, "category", top_k=100, log_scale=False)

## BODMAS Metadata info

In [None]:
display_df(BODMAS_METADATA, "Bodmas metadata")

In [None]:
display_df_columns(BODMAS_METADATA, "family", top_k=100, log_scale=True)
display_df_columns(BODMAS_METADATA, "family", top_k=10, log_scale=False)

## Joining metadata with category + filtering

* setting NaN family/category values to "benign"
* setting single family values to "other" 

In [None]:
BODMAS_METADATA = BODMAS_METADATA.merge(BODMAS_CATEGORY, on="sha256", how="left")
BODMAS_METADATA["family"] = BODMAS_METADATA["family"].fillna("benign")
BODMAS_METADATA["category"] = BODMAS_METADATA["category"].fillna("benign")

family_value_counts = BODMAS_METADATA['family'].value_counts()
single_occurrence_families = family_value_counts[family_value_counts == 1].index
BODMAS_METADATA['family'] = BODMAS_METADATA['family'].apply(lambda x: 'other' if x in single_occurrence_families else x)

display_df(BODMAS_METADATA, "Bodmas metadata cobmined with categories")

## Label encoding on "family" and "category"

In [None]:
label_encoder_family = LabelEncoder()
label_encoder_family.fit(BODMAS_METADATA['family'])
label_encoder_category = LabelEncoder()
label_encoder_category.fit(BODMAS_METADATA['category'])

BODMAS_METADATA['family'] = label_encoder_family.transform(BODMAS_METADATA['family'])
BODMAS_METADATA['category'] = label_encoder_category.transform(BODMAS_METADATA['category'])

In [None]:
print(f"Encoded category values: {BODMAS_METADATA['category'].value_counts()}")
print(f"Encoded family values: {BODMAS_METADATA['family'].value_counts()}")

## BODMAS npz data: **X** `(130k x 2k features / sample)` and **y** `(130k x 1 benign/malign)`

In [None]:
display_np(BODMAS_FEATURES_X, "Bodmas features X")

In [None]:
display_np(BODMAS_FEATURES_y, "Bodmas features y")

In [None]:
def reduce_dataset(X: np.ndarray, y: np.ndarray, n_samples):
    if n_samples is None or n_samples == 0:
        return X, y
    # combined = pd.concat([X, y], axis=1)
    combined = np.hstack((X, y.reshape(X.shape[0], 1)))
    np.random.shuffle(combined)
    combined = combined[:n_samples]
    X_sampled = combined[:, :-1]
    y_sampled = combined[:, -1]
    return X_sampled, y_sampled


def filter_dataset_having_min_class_size(X: np.ndarray, y: np.ndarray, df_series: pd.Series, n: int):
    # unique_values, value_counts = np.unique(y, return_counts=True)
    value_counts = df_series.value_counts()
    filtered_indices = np.array([], dtype=np.int64)
    for value, count in value_counts.items():
        if count >= n:
            indices = np.where(df_series == value)[0]
            filtered_indices = np.append(filtered_indices, indices)

    X_filtered = X[filtered_indices]
    y_filtered = y[filtered_indices]
    return X_filtered, y_filtered, df_series.iloc[filtered_indices]


def train_model(m, scaler, X_train, y_train, X_test, y_test) -> Tuple[float, List[float]]:
    print(
        f"Training on {len(y_train)} train samples, model: {m.__class__.__name__}, scaler: {scaler.__class__.__name__ if scaler else '-'}")
    if scaler is None:
        model = m
    else:
        model = make_pipeline(scaler, m)

    try:
        ts = time.perf_counter()
        model.fit(X_train, y_train)
        dt = time.perf_counter() - ts
        pred = model.predict(X_test)
        metrics = evaluate_predictions(y_test, pred)
    except:
        dt = 0
        metrics = [0] * 10
    return round(dt, 1), metrics

## Binary classification

```
# y = BODMAS_FEATURES_y # binary classification
# y = BODMAS_METADATA['family'].values # multi-class
# y = BODMAS_METADATA['category'].values # multi-class - on easy mode since categories are much fewer and more balanced
```

In [None]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
scalers = [None, StandardScaler(), MinMaxScaler()]
models = [LogisticRegression(), KNeighborsClassifier(), DecisionTreeClassifier(), svm.SVC(), RandomForestClassifier(),
          MultinomialNB()]
results = []

for n_samples in [100, 1000]:
    X_sampled, y_sampled = reduce_dataset(BODMAS_FEATURES_X, BODMAS_FEATURES_y, n_samples)
    X_train, X_test, y_train, y_test = train_test_split(X_sampled, y_sampled, test_size=0.25, stratify=y_sampled,
                                                        random_state=42)
    for m, scaler in itertools.product(models, scalers):
        dt, metrics = train_model(m, scaler, X_train, y_train, X_test, y_test)
        results.append([m.__class__.__name__, scaler.__class__.__name__ if scaler else '-', n_samples, dt, *metrics])

df_results = pd.DataFrame(results,
                          columns=["model", "scaler", "n_samples", "dt (s)", "f1_micro", "f1_macro", "acc", "bacc",
                                   "prec_micro",
                                   "prec_macro", "rec_micro", "rec_macro", "f1_micro", "f1_macro"])
display_df(df_results)

## Multiclass classification -- family

In [None]:
# scalers = [None, StandardScaler(), MinMaxScaler()]
scalers = [StandardScaler()]
# models = [LogisticRegression(), KNeighborsClassifier(), DecisionTreeClassifier(), svm.SVC(), RandomForestClassifier(),
#           MultinomialNB()]
models = [KNeighborsClassifier()]
results = []

for min_class_size in [2000]:
    X_sampled, _, df_series_families = filter_dataset_having_min_class_size(BODMAS_FEATURES_X, BODMAS_FEATURES_y,
                                                                            BODMAS_METADATA['family'], min_class_size)
    X_train, X_test, y_train, y_test = train_test_split(X_sampled, df_series_families, test_size=0.25,
                                                        stratify=df_series_families,
                                                        random_state=42)
    print(
        f"Training on {len(df_series_families)} samples with min class size {min_class_size} | unique classes: {len(np.unique(df_series_families))}")
    for m, scaler in itertools.product(models, scalers):
        dt, metrics = train_model(m, scaler, X_train, y_train, X_test, y_test)
        results.append([m.__class__.__name__, scaler.__class__.__name__ if scaler else '-', n_samples, dt, *metrics])

df_results = pd.DataFrame(results,
                          columns=["model", "scaler", "n_samples", "dt (s)", "f1_micro", "f1_macro", "acc", "bacc",
                                   "prec_micro",
                                   "prec_macro", "rec_micro", "rec_macro", "f1_micro", "f1_macro"])
display_df(df_results)

In [None]:
# TODO
# hb = HungaBungaClassifier(brain=True)
# hb.fit(x=X_train, y=y_train)
# automl = autosklearn.classification.AutoSklearnClassifier()
# automl.fit(X_train, y_train)