In [None]:
import itertools
from pyspark.sql.functions import *
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import (
    MulticlassClassificationEvaluator,
    BinaryClassificationEvaluator,
)
from pyspark.ml.feature import Bucketizer, StringIndexer
from pyspark.ml.feature import VectorAssembler

from pyspark.mllib.evaluation import MulticlassMetrics

In [None]:
%run "Common"

## ML processing parameters
`filesystem_endpoint`: The Azure Synapse storage account and container hosting the data.

`data_folder`: The path to the folder on the Azure Synapse storage account containing the CSV file to process.

`csv_file_name`: The CSV file to process.

`algorithm`: The ML algorithm to use to train, test, and generate predictions. See the `MLAlgorithm` class defined in the `Common` notebook for a list of algorithms.

`train_pct`: The percentage of data to use for training.

In [None]:
csv_file_name = "StarReconNoneEdgesWithHops.csv"
filesystem_endpoint = "smtcrbsynfs@smtcrbsyndl.dfs.core.windows.net"
data_folder = "/data/uwf"

algorithm = MLAlgorithm.NB
train_pct = 0.7

In [None]:
file_path = f"{data_folder}/{csv_file_name}"
dataset = file_path.split("/")[-1]
conn_df = load_csv(filesystem_endpoint, file_path)

## Train, test, and generate predictions.

This cell runs the algoithm on the dataset for all permutations of feature columns.

This cell also saves the algorithm's metrics, including the confusion matrix. See the schema in the `Common` notebook to see exactly which results are collected.

In [None]:
result_df = spark.createDataFrame([], schema=schema)

target_cols = ["Tactic"]

all_cols = [
    "From",
    "To",
    "Avg_Duration",
    "Total_Duration",
    "Avg_Bytes",
    "Total_Bytes",
    "Count",
    "Hop_Count",
]

cols_combinations = []

for r in range(1, len(all_cols) + 1):
    cols_combinations.extend(list(itertools.combinations(all_cols, r)))

cols_combinations_len = len(cols_combinations)

for i, combination in enumerate(cols_combinations):
    print(f"Iteration {i+1} of {cols_combinations_len}")

    drop_cols = [col for col in all_cols if col not in combination]
    additional_drop_cols = ["Id"]
    drop_cols = drop_cols + additional_drop_cols

    print(f"Feature columns: {combination}")
    print(f"Dropped columns: {drop_cols}")

    iter_df = conn_df.drop(*drop_cols)
    iter_df = iter_df.na.drop(how="any")

    numeric_cols = [
        name
        for name, types in iter_df.dtypes
        if types == "int" or types == "double" or types == "bigint"
    ]

    string_cols = [name for name, types in iter_df.dtypes if types == "string"]

    indexers = [
        StringIndexer(inputCol=column, outputCol=column + "_processed").fit(iter_df)
        for column in string_cols
    ]

    numeric_bucketing = [
        Bucketizer(
            splits=[-float("inf"), 10, 100, float("inf")],
            inputCol=x,
            outputCol=x + "_processed",
        )
        for x in numeric_cols
    ]

    stages_ = indexers + numeric_bucketing

    iter_df = Pipeline(stages=stages_).fit(iter_df).transform(iter_df)

    feature_cols = []
    for col, types in iter_df.dtypes:
        if "_processed" in col:
            if not target_cols[0] in col:
                feature_cols.append(col)

    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
    iter_df = assembler.transform(iter_df)

    train, test = iter_df.randomSplit([train_pct, 1 - train_pct], seed=1234)
    label_col = target_cols[0] + "_processed"

    predictions = run_ml_algorithm(
        algorithm, feature_cols, label_col, iter_df, train_pct
    )

    predictions_and_labels = predictions.select(["prediction", label_col])

    metrics = MulticlassMetrics(predictions_and_labels.rdd.map(tuple))
    mc_evaluator = MulticlassClassificationEvaluator(
        labelCol=label_col, predictionCol="prediction"
    )
    accuracy = mc_evaluator.evaluate(predictions)

    bin_evaluator = BinaryClassificationEvaluator(
        rawPredictionCol="prediction", labelCol=label_col, metricName="areaUnderROC"
    )
    auc_roc = bin_evaluator.evaluate(predictions)

    confusion_matrix = metrics.confusionMatrix().toArray()

    confusion_matrix_flattened = list(confusion_matrix.flatten().astype(str))

    feature_col_str = "&".join(combination)

    try:
        precision = metrics.precision(1.0)
    except:
        precision = metrics.weightedPrecision

    recall = metrics.recall(0)
    fmeasure = metrics.fMeasure(0.0, 2.0)

    try:
        fprate = metrics.falsePositiveRate(1.0)
    except:
        fprate = metrics.weightedFalsePositiveRate

    confusion_matrix_values = [float(x) for x in confusion_matrix_flattened]

    if len(confusion_matrix_values) == 1:
        confusion_matrix_values = (confusion_matrix_values[0], 0.0, 0.0, 0.0)
    else:
        confusion_matrix_values = tuple(confusion_matrix_values)

    result_metrics = (
        feature_col_str,
        accuracy,
        precision,
        recall,
        fmeasure,
        fprate,
        auc_roc,
    ) + confusion_matrix_values

    result_df = result_df.union(spark.createDataFrame([result_metrics], schema=schema))

## Save results

The results are saved to the Azure Synapse Storage in the same folder as the input data. 

In [None]:
save_df_to_csv(
    result_df,
    filesystem_endpoint,
    f"{data_folder}/{algorithm.value}_results_{dataset}.csv",
)