# Digital Finger Printing (DFP) with Morpheus - Azure Integrated Training
## Introduction

In this notebook, we will be building and running a DFP pipeline that performs both training and inference on Azure-AD logs. The goal is to train an autoencoder PyTorch model to recogize the patterns of users in the sample data. The model will then be used by another fork (inference) in the pipeline to generate anomaly scores for each individual log. These anomaly scores can be used by security teams to detect abnormal behavior when it happens so the proper action can be taken.

<div class="alert alert-block alert-info">
<b>Note:</b> For more information on DFP, the Morpheus pipeline, and setup steps to run this notebook, please refer to the coresponding DFP integrated training materials.
</div>

In [1]:
%load_ext autoreload
%autoreload 2

# Ensure that the morpheus directory is in the python path. This may not need to be run depending on the environment setup
import sys
import os
sys.path.insert(0, os.path.abspath("../../morpheus"))

In [2]:
import logging
import typing
import cudf
from datetime import datetime

# When segment modules are imported, they're added to the module registry. 
# To avoid flake8 warnings about unused code, the noqa flag is used during import.
import dfp.modules  # noqa: F401
from dfp.utils.config_generator import ConfigGenerator
from dfp.utils.config_generator import generate_ae_config
from dfp.utils.dfp_arg_parser import DFPArgParser
from dfp.utils.schema_utils import Schema
from dfp.utils.schema_utils import SchemaBuilder

import morpheus.loaders  # noqa: F401
import morpheus.modules  # noqa: F401
from morpheus.config import Config
from morpheus.pipeline.pipeline import Pipeline
from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.general.multi_port_modules_stage import MultiPortModulesStage
from morpheus.stages.input.control_message_file_source_stage import ControlMessageFileSourceStage

# Left align all tables
from IPython.core.display import HTML
table_css = 'table {align:left;display:block}'
HTML('<style>{}</style>'.format(table_css))

  warn(f"Tensorflow dtype mappings did not load successfully due to an error: {exc.msg}")


## High Level Configuration

The pipeline's functionality can be significantly altered by the following options, which are utilized across the entire pipeline. However, module-specific options also exist. The matching Python script for this notebook, `dfp_integrated_training_batch_pipeline.py`, configures these options through command line arguments.

### Options

| Name                   | Type                                       | Description                                                                                                                                                                                                                                                                            | Default Value |
|------------------------|--------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|
| `source`            | One of `["duo", "azure"]`           | Indicates what type of logs are going to be used in the workload.                                                                                                                                                                                                            | -             |
| `train_users`    | One of `["all", "generic", "individual"]` | Indicates whether or not to train per user or a generic model for all users. Selecting none runs the inference pipeline.                                                                                                                                                                    | -             |
| `skip_user`        | List of strings                                       | User IDs to skip. Mutually exclusive with `only_user`.                                                                                                                                                                                                                                               | -             |
| `only_user`        | List of strings                                       | Only users specified by this option will be included. Mutually exclusive with `skip_user`.                                                                                                                                                                                                 | -             |
| `start_time`       | `str`                                         | The start of the time window, if undefined start_date will be `now()-duration`.                                                                                                                                                                                                  | -             |
| `duration`         | `str`                                         | The training duration to run starting from `start_time`.                                                                                                                                                                                                                                              | -             |
| `cache_dir`        | `str`                                         | The location to cache data such as S3 downloads and pre-processed data.                                                                                                                                                                                                                    | -             |
| `log_level`        | `str`                                         | Specify the logging level to use.                                                                                                                                                                                                                                                                    | `info`        |
| `sample_rate_s`    | `int`                                         | Minimum time step, in milliseconds, between object logs.                                                                                                                                                                                                                                           | `0`        |
| `silence_monitors`    | `bool`                                         | Controls whether monitors will be verbose logs.                                                                                                                                                                                                                                           | `False`        |
| `tracking_uri`     | `str`                                         | The MLflow tracking URI to connect to the tracking backend.                                                                                                                                                                                                                                    | -             |

In [3]:
# Source
source = "azure"

# Global options
train_users = "all"

# Start time
start_time = datetime.strptime("2022-08-01", "%Y-%m-%d")

# Duration
duration = "60d"

# Smaple rate secs
sample_rate_s = 0

# MLFLow tracking uri
tracking_uri = "http://mlflow:5000"

# Enter any users to skip here
skip_user: typing.List[str] = []

# Only users
only_user: typing.List[str] = []

# Setting Log level
log_level = logging.INFO

# Location where cache objects will be saved
cache_dir = "./.cache/dfp"

# Silence monitors
silence_monitors = True

# Control messages as input files
load_train_only_input_files = [
    "./resource/azure_payload_lt.json"
]
load_train_inference_input_files = [
    "./resource/azure_payload_lti.json"
]

### Arguments Parser

The [DFPArgParser](../../../production/morpheus/dfp/utils/dfp_arg_parser.py) class is used for parsing and storing arguments used in a  pipeline for training, generating models and inference. It has several properties and methods to transform, store and access the arguments.

In [4]:
dfp_arg_parser = DFPArgParser(
    skip_user,
    only_user,
    start_time,
    log_level,
    cache_dir,
    sample_rate_s,
    duration,
    source,
    tracking_uri,
    silence_monitors,
    train_users
)

# Initalize parser
dfp_arg_parser.init()

In [5]:
# Create global config object for the pipeline
config: Config = generate_ae_config(
    source,
    userid_column_name="username",
    timestamp_column_name="timestamp",
    use_cpp=True,
)

# Construct the dataframe Schema which is used to normalize incoming azure logs
schema_builder = SchemaBuilder(config, source)
schema: Schema = schema_builder.build_schema()

### DFP Deployment Module Configuration
This module sets up modular Digital Fingerprinting intergated training pipeline instance.

### Configurable Parameters

| Parameter           | Type | Description                               | Example Value | Default Value |
|---------------------|------|-------------------------------------------|---------------|---------------|
| `inference_options` | dict | Options for the inference pipeline module | See Below     | `[Required]`  |
| `training_options`  | dict | Options for the training pipeline module  | See Below     | `[Required]`  |

### Training Options Parameters

| Parameter                    | Type | Description                                    | Example Value        | Default Value |
|------------------------------|------|------------------------------------------------|----------------------|---------------|
| `batching_options`           | dict | Options for batching the data                  | See Below            | `-`           |
| `cache_dir`                  | str  | Directory to cache the rolling window data     | "/path/to/cache/dir" | `./.cache`    |
| `dfencoder_options`          | dict | Options for configuring the data frame encoder | See Below            | `-`           |
| `mlflow_writer_options`      | dict | Options for the MLflow model writer            | See Below            | `-`           |
| `preprocessing_options`      | dict | Options for preprocessing the data             | See Below            | `-`           |
| `stream_aggregation_options` | dict | Options for aggregating the data by stream     | See Below            | `-`           |
| `timestamp_column_name`      | str  | Name of the timestamp column used in the data  | "my_timestamp"       | `timestamp`   |
| `user_splitting_options`     | dict | Options for splitting the data by user         | See Below            | `-`           |

### Inference Options Parameters

| Parameter                    | Type | Description                                    | Example Value        | Default Value  |
|------------------------------|------|------------------------------------------------|----------------------|----------------|
| `batching_options`           | dict | Options for batching the data                  | See Below            | `-`            |
| `cache_dir`                  | str  | Directory to cache the rolling window data     | "/path/to/cache/dir" | `./.cache`     |
| `detection_criteria`         | dict | Criteria for filtering detections              | See Below            | `-`            |
| `fallback_username`          | str  | User ID to use if user ID not found            | "generic_user"       | `generic_user` |
| `inference_options`          | dict | Options for the inference module               | See Below            | `-`            |
| `model_name_formatter`       | str  | Format string for the model name               | "model_{timestamp}"  | `[Required]`   |
| `num_output_ports`           | int  | Number of output ports for the module          | 3                    | `-`            |
| `timestamp_column_name`      | str  | Name of the timestamp column in the input data | "timestamp"          | `timestamp`    |
| `stream_aggregation_options` | dict | Options for aggregating the data by stream     | See Below            | `-`            |
| `user_splitting_options`     | dict | Options for splitting the data by user         | See Below            | `-`            |
| `write_to_file_options`      | dict | Options for writing the detections to a file   | See Below            | `-`            |

### `batching_options`

| Key                      | Type            | Description                         | Example Value                               | Default Value              |
|--------------------------|-----------------|-------------------------------------|---------------------------------------------|----------------------------|
| `end_time`               | datetime/string | Endtime of the time window          | "2023-03-14T23:59:59"                       | `None`                     |
| `iso_date_regex_pattern` | string          | Regex pattern for ISO date matching | "\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}" | `<iso_date_regex_pattern>` |
| `parser_kwargs`          | dictionary      | Additional arguments for the parser | {}                                          | `{}`                       |
| `period`                 | string          | Time period for grouping files      | "1d"                                        | `D`                        |
| `sampling_rate_s`        | integer         | Sampling rate in seconds            | 0                                          | `None`                       |
| `start_time`             | datetime/string | Start time of the time window       | "2023-03-01T00:00:00"                       | `None`                     |

### `dfencoder_options`

| Parameter         | Type  | Description                            | Example Value                                                                                                                                                                                                                                                 | Default Value |
|-------------------|-------|----------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|
| `feature_columns` | list  | List of feature columns to train on    | ["column1", "column2", "column3"]                                                                                                                                                                                                                             | `-`           |
| `epochs`          | int   | Number of epochs to train for          | 50                                                                                                                                                                                                                                                            | `-`           |
| `model_kwargs`    | dict  | Keyword arguments to pass to the model | {"encoder_layers": [64, 32], "decoder_layers": [32, 64], "activation": "relu", "swap_p": 0.1, "lr": 0.001, "lr_decay": 0.9, "batch_size": 32, "verbose": 1, "optimizer": "adam", "scalar": "min_max", "min_cats": 10, "progress_bar": false, "device": "cpu"} | `-`           |
| `validation_size` | float | Size of the validation set             | 0.1                                                                                                                                                                                                                                                           | `-`           |

### `monitor_options`

| Key                          | Type    | Description                                                | Example Value | Default Value |
| ----------------------------|---------|------------------------------------------------------------|---------------|---------------|
| `description`               | string  | Name to show for this Monitor Stage in the console window  | "Progress"    | `Progress`    |
| `silence_monitors`          | bool    | Silence the monitors on the console                        | See Below     | `None`        |
| `smoothing`                 | float   | Smoothing parameter to determine how much the throughput should be averaged | 0.01 | `0.05` |
| `unit`                      | string  | Units to show in the rate value                             | "messages"    | `messages`    |
| `delayed_start`             | bool    | When delayed_start is enabled, the progress bar will not be shown until the first message is received. Otherwise, the progress bar is shown on pipeline startup and will begin timing immediately. In large pipelines, this option may be desired to give a more accurate timing. | True  | `False`   |
| `determine_count_fn_schema` | string  | Custom function for determining the count in a message      | "Progress"    | `Progress`    |
| `log_level`                 | string  | Enable this stage when the configured log level is at `log_level` or lower. | "DEBUG" | `INFO` |


### `mlflow_writer_options`

| Key                         | Type       | Description                       | Example Value                 | Default Value |
|-----------------------------|------------|-----------------------------------|-------------------------------|---------------|
| `conda_env`                 | string     | Conda environment for the model   | "path/to/conda_env.yml"       | `[Required]`  |
| `databricks_permissions`    | dictionary | Permissions for the model         | See Below                     | `None`        |
| `experiment_name_formatter` | string     | Formatter for the experiment name | "experiment_name_{timestamp}" | `[Required]`  |
| `model_name_formatter`      | string     | Formatter for the model name      | "model_name_{timestamp}"      | `[Required]`  |
| `timestamp_column_name`     | string     | Name of the timestamp column      | "timestamp"                   | `timestamp`   |

### `stream_aggregation_options`

| Parameter               | Type   | Description                                                 | Example Value | Default Value |
|-------------------------|--------|-------------------------------------------------------------|---------------|---------------|
| `cache_mode`            | string | The user ID to use if the user ID is not found              | "batch"       | `batch`       |
| `min_history`           | int    | Minimum history to trigger a new training event             | 1             | `1`           |
| `max_history`           | int    | Maximum history to include in a new training event          | 0             | `0`           |
| `timestamp_column_name` | string | Name of the column containing timestamps                    | "timestamp"   | `timestamp`   |
| `aggregation_span`      | string | Lookback timespan for training data in a new training event | "60d"         | `60d`         |
| `cache_to_disk`         | bool   | Whether or not to cache streaming data to disk              | false         | `false`       |
| `cache_dir`             | string | Directory to use for caching streaming data                 | "./.cache"    | `./.cache`    |

### `user_splitting_options`

| Key                     | Type | Description                                          | Example Value               | Default Value  |
|-------------------------|------|------------------------------------------------------|-----------------------------|----------------|
| `fallback_username`     | str  | The user ID to use if the user ID is not found       | "generic_user"              | `generic_user` |
| `include_generic`       | bool | Whether to include a generic user ID in the output   | false                       | `false`        |
| `include_individual`    | bool | Whether to include individual user IDs in the output | true                        | `false`        |
| `only_users`            | list | List of user IDs to include; others will be excluded | ["user1", "user2", "user3"] | `[]`           |
| `skip_users`            | list | List of user IDs to exclude from the output          | ["user4", "user5"]          | `[]`           |
| `timestamp_column_name` | str  | Name of the column containing timestamps             | "timestamp"                 | `timestamp`    |
| `userid_column_name`    | str  | Name of the column containing user IDs               | "username"                  | `username`     |

### `detection_criteria`

| Key          | Type  | Description                              | Example Value | Default Value |
|--------------|-------|------------------------------------------|---------------|---------------|
| `threshold`  | float | Threshold for filtering detections       | 0.5           | `0.5`         |
| `field_name` | str   | Name of the field to filter by threshold | "score"       | `probs`       |

### `inference_options`

| Parameter               | Type   | Description                                          | Example Value           | Default Value |
|-------------------------|--------|------------------------------------------------------|-------------------------|---------------|
| `model_name_formatter`  | string | Formatter for model names                            | "user_{username}_model" | `[Required]`  |
| `fallback_username`     | string | Fallback user to use if no model is found for a user | "generic_user"          | `generic_user`|
| `timestamp_column_name` | string | Name of the timestamp column                         | "timestamp"             | `timestamp`   |

### `write_to_file_options`

| Key                 | Type      | Description                              | Example Value   | Default Value    |
|---------------------|-----------|------------------------------------------|-----------------|------------------|
| `filename`          | string    | Path to the output file                  | "output.csv"    | `None`           |
| `file_type`         | string    | Type of file to write                    | "CSV"           | `AUTO`           |
| `flush`             | bool      | If true, flush the file after each write | false           | `false`          |
| `include_index_col` | bool      | If true, include the index column        | false           | `true`           |
| `overwrite`         | bool      | If true, overwrite the file if it exists | true            | `false`          |


In [6]:
# Create config helper is used to generate config parameters for the DFP module
# This will populate to the minimum configuration parameters with intelligent default values
config_generator = ConfigGenerator(config, dfp_arg_parser, schema)

dfp_deployment_module_config = config_generator.get_module_conf()

## Pipeline Construction
From this point on we begin constructing the stages that will make up the pipeline. To make testing easier, constructing the pipeline object, adding the stages, and running the pipeline, is provided as a single cell. The below cell can be rerun multiple times as needed for debugging.

### Source Stage (`ControlMessageFileSourceStage`)

This pipeline read control message definations from one or more input files. This source stage will constructs control message and pass to downstream stages. It is capable of reading files from many different source types, both local and remote. This is possible by utilizing the `fsspec` library (similar to `pandas`). Refer to the [`fsspec`](https://filesystem-spec.readthedocs.io/) documentation for more information on the supported file types. Once all of the logs have been read, the source completes. 

| Name | Type | Default | Description |
| --- | --- | --- | :-- |
| `filenames` | List of strings | | Any control message defination files to read into the pipeline |

### DFP Deployment Module (`MultiPortModulesStage`)

MultiPortModulesStage is used to load modules that returns more than one ouptut. DFP deployment module sets up modular Digital Fingerprinting Pipeline instance. and performs integrated training as shown in the below diagram. For more information on the options passed to this module is shown [here](../../../../../docs/source/modules/examples/digital_fingerprinting/dfp_deployment.md).


## End-to-End Workflow Architecture

![Integrated Training Pipeline](../../../../../docs/source/img/dfp_integrated_training_file_pipeline.png))



In [7]:
def construct_pipeline(input_files: list[str]):
    # Create a pipeline object
    pipeline = Pipeline(config)

    # ControlMessage file source stage.
    source_stage = pipeline.add_stage(ControlMessageFileSourceStage(config, filenames=input_files))

    # DFP deployment (integrated training) module stage.
    dfp_deployment_stage = pipeline.add_stage(
        MultiPortModulesStage(config,
                                dfp_deployment_module_config,
                                input_ports=["input"],
                                output_ports=["output_0", "output_1"]))

    # Connect stages with edges.
    pipeline.add_edge(source_stage, dfp_deployment_stage)

    return pipeline

### Training
To ensure a smooth deployment of the inference tasks to the pipeline, it is imperative to have at least one version of the trained model available on the MLflow server. Once the initial model training is complete, we can proceed with the publishing of the inference tasks to the pipeline.

In [None]:
input_files = load_train_only_input_files

pipeline = construct_pipeline(input_files)
await pipeline.run_async()

### Training and Inference
Now that we have a trained model available in MLflow, we can begin executing both training and inference tasks in parallel. 

In [None]:
input_files = load_train_inference_input_files

pipeline = construct_pipeline(input_files)
await pipeline.run_async()

### Inference Results
Pipeline writes the inference results to `dfp_detections_azure.csv` 

In [11]:
df = cudf.read_csv("dfp_detections_azure.csv")
df

Unnamed: 0.1,Unnamed: 0,timestamp,username,appDisplayName,clientAppUsed,deviceDetailbrowser,deviceDetaildisplayName,deviceDetailoperatingSystem,statusfailureReason,logcount,...,locincrement_loss,logcount_z_loss,clientAppUsed_pred,clientAppUsed_loss,appDisplayName_loss,statusfailureReason_loss,appincrement_loss,appDisplayName_z_loss,model_version,event_time
0,10,2022-08-30T08:31:49.739107000Z,tprice@domain.com,SD ECDN,Browser,Rich Client 3.19.8.16603,THOMASPRICE-LT,Windows 10,,3,...,0.610670,0.027359,Mobile Apps and Desktop clients,1.174857,2.780367,1.605560,0.735984,1.979742,DFP-azure-tprice@domain.com:1,2023-07-20T20:24:22Z
1,18,2022-08-31T00:21:46.153050000Z,attacktarget@domain.com,Citrix ShareFile,Mobile Apps and Desktop clients,Chrome 100.0.4896,ATTACKTARGET-LT,Windows 10,,0,...,5.914863,0.633756,Mobile Apps and Desktop clients,0.591306,1.697818,0.607544,2.277889,2.968441,DFP-azure-attacktarget@domain.com:2,2023-07-20T20:25:09Z
2,20,2022-08-31T00:27:44.169328000Z,attacktarget@domain.com,Altoura,Mobile Apps and Desktop clients,Chrome 100.0.4896,ATTACKTARGET-LT,Windows 10,,2,...,5.936103,0.109216,Mobile Apps and Desktop clients,0.603926,1.713131,0.621300,3.988265,2.393491,DFP-azure-attacktarget@domain.com:2,2023-07-20T20:25:09Z
3,21,2022-08-31T00:44:58.235390000Z,attacktarget@domain.com,LumApps,Mobile Apps and Desktop clients,Chrome 100.0.4896,ATTACKTARGET-LT,Windows 10,,3,...,5.942259,0.376158,Mobile Apps and Desktop clients,0.599671,1.697026,0.614225,14.026583,2.998179,DFP-azure-attacktarget@domain.com:2,2023-07-20T20:25:09Z
4,22,2022-08-31T00:47:59.808993000Z,attacktarget@domain.com,Linux Foundation Training,Mobile Apps and Desktop clients,Chrome 100.0.4896,ATTACKTARGET-LT,Windows 10,,4,...,5.936529,0.562854,Mobile Apps and Desktop clients,0.589449,1.683457,0.599916,30.273712,3.507625,DFP-azure-attacktarget@domain.com:2,2023-07-20T20:25:09Z
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
104,123,2022-08-31T23:54:50.435683000Z,attacktarget@domain.com,Box,Mobile Apps and Desktop clients,Chrome 100.0.4896,ATTACKTARGET-LT,Windows 10,,16,...,6.032503,2.419621,Mobile Apps and Desktop clients,0.418121,1.608195,0.420876,619.997925,6.333486,DFP-azure-attacktarget@domain.com:2,2023-07-20T20:26:07Z
105,312,2022-08-31T22:23:06.806213000Z,attacktarget@domain.com,Stormboard,Mobile Apps and Desktop clients,Chrome 100.0.4896,ATTACKTARGET-LT,Windows 10,,9,...,0.503085,0.252396,Mobile Apps and Desktop clients,0.807381,4.478906,1.358579,31.562899,1.308199,DFP-azure-generic_user:53,2023-07-20T20:26:10Z
106,313,2022-08-31T22:43:42.029787000Z,djohnson@domain.com,Box,Mobile Apps and Desktop clients,Edge 87.11424,DAVIDJOHNSON-LT,Windows 10,,0,...,0.514629,0.155598,Mobile Apps and Desktop clients,0.799728,4.384770,1.430044,36.937981,0.934225,DFP-azure-generic_user:53,2023-07-20T20:26:10Z
107,314,2022-08-31T22:43:47.300043000Z,attacktarget@domain.com,CallPlease,Mobile Apps and Desktop clients,Chrome 100.0.4896,ATTACKTARGET-LT,Windows 10,,10,...,0.535288,0.019980,Mobile Apps and Desktop clients,0.783910,4.378347,1.345962,43.152668,0.908708,DFP-azure-generic_user:53,2023-07-20T20:26:10Z
