In [None]:
#!/usr/bin/env python3

CONSIDER_FOURIER_VALUES = False
ONLY_LOAD_N_ROWS_FROM_DATASETS = None # one to load all rows
SHAPLEY_KERNEL_EXPLAINER_SAMPLES = 50 # Default is 200
SHAPLEY_BEESWARM_EXPLAINER_SAMPLES = 800 # Default is 1600
MAX_EVALS = 1000 # Default is 1200
BEESWARM_MAX_DISPLAY = 40

NN_PATIENCE = 5
NN_NUM_EPOCHS = 100

PARALLEL_N_JOBS = 8

import gc
import glob
import os
import random
import uuid
from typing import Dict, List, Tuple


import matplotlib.pyplot as plt
import copy

import numpy as np
import pandas as pd
import tqdm
from loguru import logger
from sklearn import pipeline
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import StratifiedKFold, train_test_split
from sklearn.preprocessing import MinMaxScaler

import shap

import rename_columns.rename as rename

# import dotenv

# Way to dynamically change the number of jobs at run time
def get_num_jobs(default_jobs: int) -> int:
    """This function provides a way to override the number of jobs specified
    in the command line arguments dynamically.
    A file called num_jobs.txt can be created and the first line
    should contain the number of jobs.

    Args:
        default_jobs (int): default value if it is not overridden

    Returns:
        int: number of jobs to run
    """
    if not os.path.exists("num_jobs.txt"):
        return default_jobs
    with open("num_jobs.txt") as f:
        try:
            line = f.readlines()[0].strip()
            temp_jobs = int(line)
            if temp_jobs > 0 and temp_jobs < 20:
                logger.info(f"NUM_JOBS override: {temp_jobs}")
                return temp_jobs
        except:
            return default_jobs
    return default_jobs


def random_seed() -> None:
    np.random.seed(0)
    random.seed(0)


def get_save_filename() -> str:
    return f"{str(uuid.uuid4())}.csv.gz"


def gc_collect() -> None:
    for i in range(3):
        for j in range(3):
            gc.collect(j)


def get_columns_and_types(thisdf: pd.DataFrame) -> Dict[str, List[str]]:
    """For each feature set type, get the relevant columns.

    Args:
        thisdf (pd.DataFrame): Input dataframe.

    Returns:
        Dict[str, List[str]]: Dictionary that maps the feature type to the
            list of columns to the feature type.
    """
    columns = [c for c in thisdf.columns if not c.startswith("an_")]

    def get_columns(columns: List[str], start_string: str) -> List[str]:
        columns = [c for c in columns if c.startswith(start_string)]
        columns = [c for c in columns if "head" not in c and "tail" not in c]
        columns = [c for c in columns if "begin" not in c and "end" not in c]
        columns = [c for c in columns if "filesize" not in c]
        columns = [c for c in columns if "dit.shanon" not in c]
        
        # DO WE WANT RAW FOURIER VALUES
        if not CONSIDER_FOURIER_VALUES:
            columns = [c for c in columns if "fourier.value" not in c.lower()]
        
        return columns

    baseline_columns = get_columns(columns, "baseline")
    advanced_columns = get_columns(columns, "advanced")
    fourier_columns = get_columns(columns, "fourier")
    fourier_min_columns = [
        "fourier.stat.1byte.autocorr",
        "fourier.stat.1byte.mean",
        "fourier.stat.1byte.std",
        "fourier.stat.1byte.chisq",
        "fourier.stat.1byte.moment.2",
        "fourier.stat.1byte.moment.3",
        "fourier.stat.1byte.moment.4",
        "fourier.stat.1byte.moment.5",
    ]

    baseline_and_advanced = list(set(baseline_columns + advanced_columns))
    baseline_and_fourier = list(set(baseline_columns + fourier_columns))
    advanced_and_fourier = list(set(advanced_columns + fourier_columns))
    baseline_and_fourier_min = list(set(baseline_columns + fourier_min_columns))
    advanced_and_fourier_min = list(set(advanced_columns + fourier_min_columns))

    baseline_advanced_fourier = list(
        set(baseline_columns + advanced_columns + fourier_columns)
    )
    baseline_advanced_and_fourier_min = list(
        set(baseline_columns + advanced_columns + fourier_min_columns)
    )

    rv = {
        "baseline-advanced-and-fourier": baseline_advanced_fourier,
    }

    #logger.info(f"Features = {rv}")

    return rv


def get_annotation_columns(thisdf: pd.DataFrame) -> List[str]:
    """List of columns used for annotation.

    Args:
        thisdf (pd.DataFrame): Input dataframe.

    Returns:
        _type_: List of columns
    """
    return [c for c in thisdf.columns if c.startswith("an_")]


def annotate_df_with_additional_fields(
    name: str, dataframe: pd.DataFrame
) -> pd.DataFrame:
    """Add some metadata to each dataframe

    Args:
        name (str): Name of the csv/parquet file
        dataframe (pd.DataFrame): Dataframe

    Returns:
        pd.DataFrame: Dataframe with additional information
    """
    if "base32" in name or "b32" in name:
        dataframe["an_is_base32"] = 1
    else:
        dataframe["an_is_base32"] = 0
    dataframe["an_is_base32"] = dataframe["an_is_base32"].astype(np.bool_)

    if "encrypt" in name:
        dataframe["is_encrypted"] = 1
    else:
        dataframe["is_encrypted"] = 0
    dataframe["is_encrypted"] = dataframe["is_encrypted"].astype(np.bool_)

    if "v1" in name:
        dataframe["an_v1_encrypted"] = 1
    else:
        dataframe["an_v1_encrypted"] = 0
    dataframe["an_v1_encrypted"] = dataframe["an_v1_encrypted"].astype(np.bool_)

    if "v2" in name:
        dataframe["an_v2_encrypted"] = 1
    else:
        dataframe["an_v2_encrypted"] = 0
    dataframe["an_v2_encrypted"] = dataframe["an_v2_encrypted"].astype(np.bool_)

    if "v3" in name:
        dataframe["an_v3_encrypted"] = 1
    else:
        dataframe["an_v3_encrypted"] = 0
    dataframe["an_v3_encrypted"] = dataframe["an_v3_encrypted"].astype(np.bool_)

    def is_webp(filename: str) -> int:
        return 1 if ".webp" in filename else 0

    dataframe["an_is_webp"] = (
        dataframe["extended.base_filename"].map(is_webp).astype(np.bool_)
    )

    return dataframe


def load_data(input_directory: str) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """Load all pandas data files from a directory and annotate them with
    additional fields

    Args:
        input_directory (str): input directory

    Returns:
        pd.DataFrame: A combined dataframe of all files (Train)
        pd.DataFrame: A combined dataframe of all files (Test)
    """

    interesting_files = [
        "plaintext.base32.combined.csv.gz",
        "expanded.base32.des3.csv.gz",
        "n1.plaintext.base32.csv.gz",
        "expanded_encrypted_v3.csv.gz",
        "n1.expanded.plaintext.csv.gz",
        "n1.expanded.pyencrypted_v2.csv.gz",
        "n1.expanded.pyencrypted_v1.base32.csv.gz",
        "expanded.des3.csv.gz",
        "n1.expanded.pyencrypted_v2.base32.csv.gz",
        "expanded.pyencrypted_v1.csv.gz",
        "expanded.pyencrypted_v2.base32.csv.gz",
        "expanded_encrypted_v3_base32.csv.gz",
        "n1.expanded.pyencrypted_v3.base32.csv.gz",
        "plaintext.combined.csv.gz",
        "plaintext.expanded.csv.gz",
        "expanded.pyencrypted_v2.csv.gz",
        "expanded.plaintext.base32.csv.gz",
        "n1.expanded.pyencrypted_v3.csv.gz",
        "n1.expanded.plaintext.base32.csv.gz",
        "n1.plaintext.csv.gz",
        "expanded.pyencrypted_v1.b32.csv.gz",
        "n1.expanded.pyencrypted_v1.csv.gz",
    ]
    p = 0.1
    logger.info("Loading dataframes")
    if ONLY_LOAD_N_ROWS_FROM_DATASETS and ONLY_LOAD_N_ROWS_FROM_DATASETS is not None:
        dataframes = {
            # f: pd.read_csv(f, skiprows=lambda i: i > 0 and random.random() > p)
            f: pd.read_csv(f, nrows=ONLY_LOAD_N_ROWS_FROM_DATASETS)
            for f in tqdm.tqdm(
                glob.glob(f"{input_directory}{os.path.sep}*.csv.gz"), desc="Loading data"
            )
            if os.path.basename(f).lower() in interesting_files
        }
    else:
        dataframes = {
            # f: pd.read_csv(f, skiprows=lambda i: i > 0 and random.random() > p)
            f: pd.read_csv(f)
            for f in tqdm.tqdm(
                glob.glob(f"{input_directory}{os.path.sep}*.csv.gz"), desc="Loading data"
            )
            if os.path.basename(f).lower() in interesting_files
        }

    logger.info("Annotating dataframes with additional fields")
    dataframes = {
        f: annotate_df_with_additional_fields(f, df) for f, df in dataframes.items()
    }

    logger.info("Combining test dataframes into a single dataframe")
    test_df = (
        pd.concat([df for fname, df in dataframes.items() if "n1." in fname.lower()])
        .sample(frac=1)
        .reset_index(drop=True)
    )

    logger.info("Combining train dataframes into a single dataframe")
    train_df = (
        pd.concat(
            [df for fname, df in dataframes.items() if "n1." not in fname.lower()]
        )
        .sample(frac=1)
        .reset_index(drop=True)
    )

    
    logger.info("done...")
    return train_df, test_df
  

In [None]:
master_train_df, master_test_df = load_data("/Users/phantom/dev/NapierOne")

In [None]:
annot_columns = get_annotation_columns(master_train_df)
fscolumns = get_columns_and_types(master_train_df)["baseline-advanced-and-fourier"]
colnames = [c for c in fscolumns if "is_encrypted" not in c]
colnames = [c for c in colnames if c not in annot_columns]
colnames = [c for c in colnames if not c.startswith("an_")]
colnames = [c for c in colnames if not c.lower() == "extended.base_filename"]
            
            
train_df=master_train_df[colnames + ["is_encrypted"]].copy()
test_df=master_test_df[colnames + ["is_encrypted"]].copy()
            
print(f"{train_df.shape=}")
            
train_df = train_df.sample(frac=1, random_state=42).reset_index(drop=True)
X_train = train_df[colnames]
y_train = train_df["is_encrypted"].to_numpy().flatten()
X_test = test_df[colnames]
y_test = test_df["is_encrypted"].to_numpy().flatten()
            
gc_collect()        
pline = pipeline.Pipeline(
    [
        ('std', MinMaxScaler()),
        ('classif', RandomForestClassifier(n_jobs=PARALLEL_N_JOBS))
    ]
)
pline.fit(X_train, y_train)

In [None]:
def get_color(colname: str) -> str:
    colname = colname.lower()
    if colname.startswith("baseline"):
        return "green"
    elif colname.startswith("fourier"):
        return "blue"
    else:
        return "red"

feature_importances = pline.named_steps['classif'].feature_importances_
importance_df = pd.DataFrame(
    {
        "original_feature_name": colnames,
        "features": [rename.rename_column(c) for c in colnames],
        "importances": feature_importances,
        "colors": [get_color(c) for c in colnames]
    }
)

importance_df = importance_df.sort_values(by="importances", ascending=False).reset_index(drop=True)

plt_df = importance_df.head(BEESWARM_MAX_DISPLAY)

plt.figure(figsize=(4, 10))
plt.barh(
    plt_df["features"],
    plt_df["importances"],
    color=plt_df["colors"],
)

plt.gca().invert_yaxis()
legend_handles = [
                  plt.Rectangle((0, 0), 1, 1, color='blue'),
                  plt.Rectangle((0, 0), 1, 1, color='red'),
                  plt.Rectangle((0, 0), 1, 1, color='green')
                  ][::-1]
plt.legend(legend_handles, ["Fourier", "Advanced", "Baseline"][::-1])

In [None]:
importance_df.to_csv("rf-importances.csv")

In [None]:
print("Transforming X_train")
tr_X_train = pline.named_steps['std'].transform(X_train)
print("Transoforming X_test")
#X_test = test_df[colnames]
tr_X_test = pline.named_steps['std'].transform(X_test)
classif_fn = pline.named_steps['classif'].predict

print("Creating shap kernel explainer")
explainer = shap.KernelExplainer(
    classif_fn,
    shap.sample(tr_X_train, SHAPLEY_KERNEL_EXPLAINER_SAMPLES))
    #tr_X_train)

print("Getting shap values")
shap_values = explainer.shap_values(
    shap.sample(tr_X_train, SHAPLEY_KERNEL_EXPLAINER_SAMPLES), # tr_X_train
    nsamples="auto")

print("Final prediction")
shap.summary_plot(
    shap_values,
    tr_X_test,
    feature_names=[rename.rename_column(c) for c in colnames])

In [None]:
print("Creating shap kernel explainer")
explainer_ = shap.Explainer(
    pline.named_steps['classif'],
    shap.sample(tr_X_train, SHAPLEY_BEESWARM_EXPLAINER_SAMPLES),
    max_evals=MAX_EVALS)
    #tr_X_train)

print("Getting shap values")
explanation_ = explainer_(
    shap.sample(tr_X_test, SHAPLEY_BEESWARM_EXPLAINER_SAMPLES), # tr_X_train
    )

explanation2_ = shap.Explanation(
    values=explanation_.values, 
    base_values=explanation_.base_values, 
    data=explanation_.data,
    feature_names=[rename.rename_column(c) for c in colnames]
)
shap.plots.beeswarm(explanation2_, max_display=BEESWARM_MAX_DISPLAY)


In [None]:
tree_explainer = shap.TreeExplainer(pline.named_steps['classif'], seed=42)
tree_explanation = tree_explainer(
    pline.named_steps['std'].transform(tr_X_train),
    check_additivity=True)

tree_explanation2 = shap.Explanation(
    values=tree_explanation.values, 
    base_values=tree_explanation.base_values, 
    data=tree_explanation.data,
    feature_names=[rename.rename_column(c) for c in colnames]
)
shap.plots.beeswarm(tree_explanation2[:, :, 1], max_display=BEESWARM_MAX_DISPLAY)

In [None]:
!ls