In [6]:
%load_ext autoreload
%autoreload 2

# Imports
import json
import tensorflow as tf
import recommender
from matrix import ReportTechniqueMatrix
from matrix_builder import ReportTechniqueMatrixBuilder
import random
import math
import importlib
import pandas as pd
import numpy as np
from utils import get_mitre_technique_ids_to_names

tf.config.run_functions_eagerly(True)

assert tf.executing_eagerly()

importlib.reload(recommender)

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


<module 'recommender' from '/Users/mjturner/code/technique-inference-engine/models/recommender/__init__.py'>

In [7]:
def train_test_split(indices: list, values: list, test_ratio: float=0.1) -> tuple:
    n = len(indices)
    assert len(values) == n

    indices_for_test_set = frozenset(random.sample(range(n), k=math.floor(test_ratio * n)))

    train_indices = []
    test_indices = []
    train_values = []
    test_values = []

    for i in range(n):
        if i in indices_for_test_set:
            test_indices.append(indices[i])
            test_values.append(values[i])
        else:
            train_indices.append(indices[i])
            train_values.append(values[i])

    return train_indices, train_values, test_indices, test_values


In [8]:
def view_prediction_performance_table_for_report(
        train_data: ReportTechniqueMatrix,
        test_data: ReportTechniqueMatrix,
        predictions: pd.DataFrame,
        report_id: int,
    ) -> pd.DataFrame:
    """Gets a dataframe to visualize the training data, test data, and predictions for a report."""
    # 1. training_data
    training_dataframe = train_data.to_pandas()
    report_train_techniques = training_dataframe.loc[report_id]
    report_train_techniques.name = "training_data"

    # 2. predictions
    predicted_techniques = predictions.loc[report_id]
    predicted_techniques.name = "predictions"

    # now test data
    test_dataframe = test_data.to_pandas()
    report_test_techniques = test_dataframe.loc[report_id]
    report_test_techniques.name = "test_data"

    report_data = pd.concat((predicted_techniques, report_train_techniques, report_test_techniques), axis=1)

    # add name for convenience
    all_mitre_technique_ids_to_names = get_mitre_technique_ids_to_names("../enterprise-attack.json")
    report_data.loc[:, "technique_name"] = report_data.apply(lambda row: all_mitre_technique_ids_to_names.get(row.name), axis=1)

    return report_data


In [9]:
test_ratio = 0.1
embedding_dimension = 10

data_builder = ReportTechniqueMatrixBuilder(
    combined_dataset_filepath="../data/combined_dataset_full_frequency.json",
    enterprise_attack_filepath="../enterprise-attack.json",
)
data = data_builder.build()

train_indices = frozenset(random.sample(data.indices, k=math.floor((1-test_ratio) * len(data.indices))))
test_indices = frozenset(data.indices).difference(train_indices)

training_data = data.mask(train_indices)
test_data = data.mask(test_indices)

# train
model = recommender.FactorizationRecommender(m=data.m, n=data.n, k=embedding_dimension)
model.fit(training_data.to_sparse_tensor(), num_iterations=1000, learning_rate=10., regularization_coefficient=0.1, gravity_coefficient=0.0)

evaluation = model.evaluate(test_data.to_sparse_tensor())
print("MSE Error", evaluation)

predictions = model.predict()

predictions_dataframe = pd.DataFrame(predictions, columns=data.technique_ids)

MSE Error 0.07193438


In [10]:
# get best and worst test performance
test_ndarray = test_data.to_numpy()
predictions_ndarray = predictions_dataframe.to_numpy()
# where test data, use predictions, else, fill with Nan
test_performance = np.mean(np.square(predictions_ndarray - test_ndarray), axis=1, where=test_ndarray > 0.5)

best_test_perf = np.nanargmin(test_performance, )
worst_test_perf = np.nanargmax(test_performance)

best_performance_results = view_prediction_performance_table_for_report(
    train_data=training_data,
    test_data=test_data,
    predictions=predictions_dataframe,
    report_id=best_test_perf
)

worst_performance_results = view_prediction_performance_table_for_report(
    train_data=training_data,
    test_data=test_data,
    predictions=predictions_dataframe,
    report_id=worst_test_perf
)

  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = um.true_divide(


In [11]:
print(best_performance_results.sort_values("test_data", ascending=False).head(15))

           predictions  training_data  test_data  \
T1027         1.001889            0.0        1.0   
T1552.001     0.848310            0.0        0.0   
T1561         0.326331            0.0        0.0   
T1573.001     0.939929            0.0        0.0   
T1485         0.451991            0.0        0.0   
T1190         0.950845            1.0        0.0   
T1132.001     0.248229            0.0        0.0   
T1078.004     0.772365            0.0        0.0   
T1056.001     0.943955            0.0        0.0   
T1001.002     0.234360            0.0        0.0   
T1095         0.881229            0.0        0.0   
T1110.003     0.727067            0.0        0.0   
T1078.002     0.852815            0.0        0.0   
T1218.011     0.918010            0.0        0.0   
T1553.004     0.275539            0.0        0.0   

                              technique_name  
T1027        Obfuscated Files or Information  
T1552.001               Credentials In Files  
T1561                     

In [12]:
print(worst_performance_results.sort_values("test_data", ascending=False).head(15))

           predictions  training_data  test_data  \
T1106        -0.030872            0.0        1.0   
T1552.001    -0.026832            0.0        0.0   
T1561        -0.000862            0.0        0.0   
T1573.001    -0.027231            0.0        0.0   
T1485        -0.000736            0.0        0.0   
T1190        -0.031759            0.0        0.0   
T1132.001     0.010440            0.0        0.0   
T1078.004    -0.028884            0.0        0.0   
T1056.001    -0.029308            0.0        0.0   
T1001.002     0.004404            0.0        0.0   
T1095        -0.023185            0.0        0.0   
T1110.003    -0.021503            0.0        0.0   
T1078.002    -0.027470            0.0        0.0   
T1218.011    -0.031216            0.0        0.0   
T1553.004     0.005378            0.0        0.0   

                              technique_name  
T1106                             Native API  
T1552.001               Credentials In Files  
T1561                     

In [None]:
def predict_for_new_report(self, techniques: tuple[str]) -> np.array:
    """Predicts for a new, yet-unseen report.

    Args:
        techniques: an iterable of MITRE technique identifiers involved
            in the new report.

    Returns:
        A length n array of predicted values for each technique based on
        the projected embedding for this new report.
    """
