# Digital Finger Printing Azure Inference + Triage and Summary with an LLM
## Introduction

In this notebook, we will be building and running a DFP pipeline that performs inference on Azure logs. The goal is to use the pretrained models generated in the Duo Training notebook to generate anomaly scores for each log. These anomaly scores can be used by security teams to detect abnormal behavior when it happens so the proper action can be taken.

We will also build upon the Azure Inference only notebook by passing thresholded and filtered event to a LLaMa-3 8B model for natural language explanation and initial triage. You can choose to self-host the model using NVIDIA Inference Microservices (NIM), or use the API endpoint at build.nvidia.com. 

<div class="alert alert-block alert-info">
<b>Note:</b> For more information on DFP, the Morpheus pipeline, and setup steps to run this notebook, please refer to the coresponding DFP training materials.
</div>

We'll begin with some langchain installs and basic imports we will use in the rest of this notebook.

In [4]:
!pip install langchain
!pip install langchain_nvidia_ai_endpoints
!pip install langchain_community

[0m

In [5]:
%load_ext autoreload
%autoreload 2

import sys
import os

# Ensure that the morpheus directory is in the python path. This may not need to be run depending on the environment setup

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [6]:
import functools
import logging
import os
import typing
import mlflow
import json

import time
import cudf

from datetime import datetime
from functools import partial
import pandas as pd

from dfp.stages.dfp_file_batcher_stage import DFPFileBatcherStage
from dfp.stages.dfp_file_to_df import DFPFileToDataFrameStage
from dfp.stages.dfp_inference_stage import DFPInferenceStage
from dfp.stages.dfp_postprocessing_stage import DFPPostprocessingStage
from dfp.stages.dfp_preprocessing_stage import DFPPreprocessingStage
from dfp.stages.dfp_rolling_window_stage import DFPRollingWindowStage
from dfp.stages.dfp_split_users_stage import DFPSplitUsersStage
from dfp.stages.multi_file_source import MultiFileSource
from dfp.utils.regex_utils import iso_date_regex
from dfp.stages.dfp_string_create_stage import DFPStringCreateStage
from dfp.stages.dfp_rag_concat_stage import DFPRAGConcatStage
from dfp.stages.dfp_rag_upload_stage import DFPRAGUploadStage
from dfp.llm.nim_task_handler import NIMTaskHandler
from dfp.llm.retriever_context_node import RetrieverContextNode
from dfp.llm.llm_engine_utils import build_engine_llm_service, build_engine_rag_context
from dfp.llm import nemo_retriever_client as nrc

from morpheus.common import FileTypes
from morpheus.common import FilterSource
from morpheus.cli.utils import get_log_levels
from morpheus.cli.utils import get_package_relative_file
from morpheus.cli.utils import load_labels_file
from morpheus.cli.utils import parse_log_level
from morpheus.config import Config
from morpheus.config import ConfigAutoEncoder
from morpheus.config import CppConfig
from morpheus.pipeline import LinearPipeline
from morpheus.stages.output.write_to_file_stage import WriteToFileStage
from morpheus.utils.column_info import ColumnInfo
from morpheus.utils.column_info import DataFrameInputSchema
from morpheus.utils.column_info import DateTimeColumn
from morpheus.utils.column_info import DistinctIncrementColumn
from morpheus.utils.column_info import IncrementColumn
from morpheus.utils.column_info import RenameColumn
from morpheus.utils.column_info import StringCatColumn
from morpheus.utils.file_utils import date_extractor
from morpheus.stages.postprocess.filter_detections_stage import FilterDetectionsStage
from morpheus.stages.postprocess.serialize_stage import SerializeStage
from morpheus.utils.logger import configure_logging
from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.config import PipelineModes
from morpheus.io.deserializers import read_file_to_df
from morpheus.llm import LLMEngine
from morpheus.llm.nodes.extracter_node import ExtracterNode
from morpheus.llm.nodes.llm_generate_node import LLMGenerateNode
from morpheus.llm.nodes.prompt_template_node import PromptTemplateNode
from morpheus.llm.services.llm_service import LLMService
from morpheus.llm.services.nemo_llm_service import NeMoLLMService
from morpheus.llm.task_handlers.simple_task_handler import SimpleTaskHandler
from morpheus.messages import ControlMessage
from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage
from morpheus.stages.llm.llm_engine_stage import LLMEngineStage
from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage
from morpheus.utils.concat_df import concat_dataframes


# Left align all tables
from IPython.core.display import HTML
table_css = 'table {align:left;display:block}'
HTML('<style>{}</style>'.format(table_css))

## High Level Configuration of DFP

***This section is configured exactly the same as the Azure Inference demo notebook in this directory. If you are familiar with that, you can skip over this.***

The following options significantly alter the functionality of the pipeline. These options are separated from the individual stage options since they may effect more than one stage. Additionally, the matching python script to this notebook, `dfp_azure_pipeline.py`, configures these options via command line arguments.

### Options

| Name | Type | Description |
| --- | --- | :-- |
| `train_users` | One of `["none"]` | For inference, this option should always be `"none"` |
| `skip_users` | List of strings | Any user in this list will be dropped from the pipeline. Useful for debugging to remove automated accounts with many logs. |
| `cache_dir` | string | The location to store cached files. To aid with development and reduce bandwidth, the Morpheus pipeline will cache data from several stages of the pipeline. This option configures the location for those caches. |
| `input_files` | List of strings | List of files to process. Can specify multiple arguments for multiple files. Also accepts glob (\*) wildcards and schema prefixes such as `s3://`. For example, to make a local cache of an s3 bucket, use `filecache::s3://mybucket/*`. Refer to `fsspec` documentation for list of possible options. |
| `model_name_formatter` | string | A format string to use when building the model name. The model name is constructed by calling `model_name_formatter.format(user_id=user_id)`. For example, with `model_name_formatter="my_model-{user_id}"` and a user ID of `"first:last"` would result in the model name of `"my_model-first:last"`. This should match the value used in `DFPMLFlowModelWriterStage`. Available keyword arguments: `user_id`, `user_md5`. |
| `experiment_name_formatter` | string | A format string (without the `f`) that will be used when creating an experiment in ML Flow. Available keyword arguments: `user_id`, `user_md5`, `reg_model_name`. |


In [7]:
# Global options
train_users = "none"

# Enter any users to skip here
skip_users: typing.List[str] = []

# Location where cache objects will be saved
cache_dir = "./.cache/dfp"

# Input files to read from
input_files = [
    "../../../../data/dfp/azure-inference-data/AZUREAD_*.json",
]

# The format to use for models
model_name_formatter = "DFP-azure-{user_id}"

# === Derived Options ===
# To include the generic, we must be training all or generic
include_generic = train_users == "all" or train_users == "generic"

# To include individual, we must be either training or inferring
include_individual = train_users != "generic"

# None indicates we arent training anything
is_training = train_users != "none"

# Tracking URI
tracking_uri = "http://mlflow:5000"

### Set MLFlow Tracking URI
Set MLFlow tracking URI to make inference calls.

In [8]:
mlflow.set_tracking_uri(tracking_uri)

### Global Config Object
Before creating the pipeline, we need to setup logging and set the parameters for the Morpheus config object. This config object is responsible for the following:
 - Indicating whether to use C++ or Python stages
    - C++ stages are not supported for the DFP pipeline. This should always be `False`
 - Setting the number of threads to use in the pipeline. Defaults to the thread count of the OS.
 - Sets the feature column names that will be used in model training
    - This option allows extra columns to be used in the pipeline that will not be part of the training algorithm.
    - The final features that the model will be trained on will be an intersection of this list with the log columns.
 - The column name that indicates the user's unique identifier
    - It is required for DFP to have a user ID column
 - The column name that indicates the timestamp for the log
    - It is required for DFP to know when each log occurred


In [9]:
# Enable the Morpheus logger
from morpheus.config import PipelineModes
import os

config = Config()

CppConfig.set_should_use_cpp(False)

config.num_threads = os.cpu_count()
config.mode = PipelineModes.NLP #This is for the LLM piece, to allow Morpheus to make the necessary NLP GPU optimizations in the pipeline during data processing

config.ae = ConfigAutoEncoder()

config.ae.feature_columns = [
    "appDisplayName", "clientAppUsed", "deviceDetailbrowser", "deviceDetaildisplayName", "deviceDetailoperatingSystem", "statusfailureReason", "appincrement", "locincrement", "logcount", 
]
config.ae.userid_column_name = "username"
config.ae.timestamp_column_name = "timestamp"

In [10]:
# Specify the column names to ensure all data is uniform
source_column_info = [
    DateTimeColumn(name=config.ae.timestamp_column_name, dtype=datetime, input_name="time"),
    RenameColumn(name=config.ae.userid_column_name, dtype=str, input_name="properties.userPrincipalName"),
    RenameColumn(name="appDisplayName", dtype=str, input_name="properties.appDisplayName"),
    ColumnInfo(name="category", dtype=str),
    RenameColumn(name="clientAppUsed", dtype=str, input_name="properties.clientAppUsed"),
    RenameColumn(name="deviceDetailbrowser", dtype=str, input_name="properties.deviceDetail.browser"),
    RenameColumn(name="deviceDetaildisplayName", dtype=str, input_name="properties.deviceDetail.displayName"),
    RenameColumn(name="deviceDetailoperatingSystem",
                    dtype=str,
                    input_name="properties.deviceDetail.operatingSystem"),
    StringCatColumn(name="location",
                    dtype=str,
                    input_columns=[
                        "properties.location.city",
                        "properties.location.countryOrRegion",
                    ],
                    sep=", "),
    RenameColumn(name="statusfailureReason", dtype=str, input_name="properties.status.failureReason"),
]

source_schema = DataFrameInputSchema(json_columns=["properties"], column_info=source_column_info)


In [11]:
# Preprocessing schema
preprocess_column_info = [
    ColumnInfo(name=config.ae.timestamp_column_name, dtype=datetime),
    ColumnInfo(name=config.ae.userid_column_name, dtype=str),
    ColumnInfo(name="appDisplayName", dtype=str),
    ColumnInfo(name="clientAppUsed", dtype=str),
    ColumnInfo(name="deviceDetailbrowser", dtype=str),
    ColumnInfo(name="deviceDetaildisplayName", dtype=str),
    ColumnInfo(name="deviceDetailoperatingSystem", dtype=str),
    ColumnInfo(name="statusfailureReason", dtype=str),

    # Derived columns
    IncrementColumn(name="logcount",
                    dtype=int,
                    input_name=config.ae.timestamp_column_name,
                    groupby_column=config.ae.userid_column_name),
    DistinctIncrementColumn(name="locincrement",
                            dtype=int,
                            input_name="location",
                            groupby_column=config.ae.userid_column_name,
                            timestamp_column=config.ae.timestamp_column_name),
    DistinctIncrementColumn(name="appincrement",
                            dtype=int,
                            input_name="appDisplayName",
                            groupby_column=config.ae.userid_column_name,
                            timestamp_column=config.ae.timestamp_column_name)
]

preprocess_schema = DataFrameInputSchema(column_info=preprocess_column_info, preserve_columns=["_batch_id"])


## Pipeline Construction
From this point on we begin constructing the stages that will make up the pipeline. To make testing easier, constructing the pipeline object, adding the stages, and running the pipeline, is provided as a single cell. The below cell can be rerun multiple times as needed for debugging.

### Source Stage (`MultiFileSource`)

This pipeline read input logs from one or more input files. This source stage will construct a list of files to be processed and pass to downstream stages. It is capable of reading files from many different source types, both local and remote. This is possible by utilizing the `fsspec` library (similar to `pandas`). Refer to the [`fsspec`](https://filesystem-spec.readthedocs.io/) documentation for more information on the supported file types. Once all of the logs have been read, the source completes. 

| Name | Type | Default | Description |
| --- | --- | --- | :-- |
| `filenames` | List of strings | | Any files to read into the pipeline. All files will be combined into a single `DataFrame` |

### File Batcher Stage (`DFPFileBatcherStage`)

To improve performance, multiple small input files can be batched together into a single DataFrame for processing. This stage is responsible for determining the timestamp of input files, grouping input files into batches by time, and sending the batches to be processed into a single DataFrame. Repeated batches of files will be loaded from cache resulting in increased performance. For example, when performaing a 60 day training run, 59 days can be cached with a period of `"D"` and retraining once per day.

| Name | Type | Default | Description |
| --- | --- | --- | :-- |
| `period` | `str` | `"D"` | The period to create batches. Refer to `pandas` windowing frequency documentation for available options.  |
| `date_conversion_func` | Function of `typing.Callable[[fsspec.core.OpenFile], datetime]` | | A callback which is responsible for determining the date for a specified file. |

### File to DataFrame Stage (`DFPFileToDataFrameStage`)

After files have been batched into groups, this stage is responsible for reading the files and converting into a DataFrame. The specified input schema converts the raw DataFrame into one suitable for caching and processing. Any columns that are not needed should be excluded from the schema.

| Name | Type | Default | Description |
| --- | --- | --- | :-- |
| `schema` | `DataFrameInputSchema` | | After the raw `DataFrame` is read from each file, this schema will be applied to ensure a consisten output from the source. |
| `file_type` | `FileTypes` | `FileTypes.Auto` | Allows overriding the file type. When set to `Auto`, the file extension will be used. Options are `CSV`, `JSON`, `PARQUET`, `Auto`. |
| `parser_kwargs` | `dict` | `{}` | This dictionary will be passed to the `DataFrame` parser class. Allows for customization of log parsing. |
| `cache_dir` | `str` | `./.cache/dfp` | The location to write cached input files to. |

### Split Users Stage (`DFPSplitUsersStage`)

Once the input logs have been read into a `DataFrame`, this stage is responsible for breaking that single `DataFrame` with many users into multiple `DataFrame`s for each user. This is also where the pipeline chooses whether to train individual users or the generic user (or both).

| Name | Type | Default | Description |
| --- | --- | --- | :-- |
| `include_generic` | `bool` | | Whether or not to combine all user logs into a single `DataFrame` with the username 'generic_user' |
| `include_individual` | `bool` | | Whether or not to output individual `DataFrame` objects for each user |
| `skip_users` | List of `str` | `[]` | Any users to remove from the `DataFrame`. Useful for debugging to remove automated accounts with many logs. Mutually exclusive with `only_users`. |
| `only_users` | List of `str` | `[]` | Only allow these users in the final `DataFrame`. Useful for debugging to focus on specific users. Mutually exclusive with `skip_users`. |

### Rolling Window Stage (`DFPRollingWindowStage`)

The Rolling Window Stage performs several key pieces of functionality for DFP.
1. This stage keeps a moving window of logs on a per user basis
   1. These logs are saved to disk to reduce memory requirements between logs from the same user
1. It only emits logs when the window history requirements are met
   1. Until all of the window history requirements are met, no messages will be sent to the rest of the pipeline.
   1. Configuration options for defining the window history requirements are detailed below.
1. It repeats the necessary logs to properly calculate log dependent features.
   1. To support all column feature types, incoming log messages can be combined with existing history and sent to downstream stages.
   1. For example, to calculate a feature that increments a counter for the number of logs a particular user has generated in a single day, we must have the user's log history for the past 24 hours. To support this, this stage will combine new logs with existing history into a single `DataFrame`.
   1. It is the responsibility of downstream stages to distinguish between new logs and existing history.

| Name | Type | Default | Description |
| --- | --- | --- | :-- |
| `min_history` | `int` | `1` | The minimum number of logs a user must have before emitting any messages. Logs below this threshold will be saved to disk. |
| `min_increment` | `int` or `str` | `0` | Once the min history requirement is met, this stage must receive `min_increment` *new* logs before emmitting another message. Logs received before this threshold is met will be saved to disk. Can be specified as an integer count or a string duration. |
| `max_history` | `int` or `str` | `"1d"` | Once `min_history` and `min_increment` requirements have been met, this puts an upper bound on the maximum number of messages to forward into the pipeline and also the maximum amount of messages to retain in the history. Can be specified as an integer count or a string duration. |
| `cache_dir` | `str` | `./.cache/dfp` | The location to write log history to disk. |

### Preprocessing Stage (`DFPPreprocessingStage`)

This stage performs the final, row dependent, feature calculations as specified by the input schema object. Once calculated, this stage can forward on all received logs, or optionally can only forward on new logs, removing any history information.

| Name | Type | Default | Description |
| --- | --- | --- | :-- |
| `input_schema` | `DataFrameInputSchema` | | The final, row dependent, schema to apply to the incoming columns |

### Inference Stage (`DFPInference`)

This stage performs several tasks to aid in performing inference. This stage will:
1. Download models as needed from MLFlow
1. Cache previously downloaded models to improve performance
   1. Models in the cache will be periodically refreshed from MLFlow at a configured rate
1. Perform inference using the downloaded model

| Name | Type | Default | Description |
| --- | --- | --- | :-- |
| `model_name_formatter` | `str` | `""` | A format string to use when building the model name. The model name is constructed by calling `model_name_formatter.format(user_id=user_id)`. For example, with `model_name_formatter="my_model-{user_id}"` and a user ID of `"first:last"` would result in the model name of `"my_model-first:last"`. This should match the value used in `DFPMLFlowModelWriterStage` |

### Filter Detection Stage (`FilterDetectionsStage`)
This stage filters the output from the inference stage for any anomalous messages. Logs which exceed the specified Z-Score will be passed onto the next stage. All remaining logs which are below the threshold will be dropped. For the purposes of the DFP pipeline, this stage is configured to use the `mean_abs_z` column of the DataFrame as the filter criteria.

| Name | Type | Default | Description |
| --- | --- | --- | :-- |
| `threshold` | `float` | `0.5` | The threshold value above which logs are considered to be anomalous. The default is `0.5`, however the DFP pipeline uses a value of `2.0`. All normal logs will be filtered out and anomalous logs will be passed on. |
| `copy` | `bool` | `True` | When the `copy` argument is `True` (default), rows that meet the filter criteria are copied into a new dataframe. When `False` sliced views are used instead. This is a performance optimization, and has no functional impact. |
| `filter_source` | `FilterSource` | `FilterSource.Auto` | Indicates if the filter criteria exists in an output tensor (`FilterSource.TENSOR`) or a column in a DataFrame (`FilterSource.DATAFRAME`). |
| `field_name` | `str` | `probs` | Name of the tensor (`filter_source=FilterSource.TENSOR`) or DataFrame column (`filter_source=FilterSource.DATAFRAME`) to use as the filter criteria. |

### Post Processing Stage (`DFPPostprocessingStage`)
This stage adds a new `event_time` column to the DataFrame indicating the time which Morpheus detected the anomalous messages, and replaces any `NAN` values with the a string value of `'NaN'`.

### Serialize Stage (`SerializeStage`)
This stage controls which columns in the DataFrame will be included in the output. For the purposes of the DFP pipeline, we will exclude columns that are used internally by the pipeline which are not of interest to the end-user.

| Name | Type | Default | Description |
| --- | --- | --- | :-- |
| `include` | List of `str` | `[]` | List of regular expression patterns matching columns to include in the output. Specifying an empty list causes all columns to be included not explicitly excluded. |
| `exclude` | List of `str` | `[r'^ID$', r'^_ts_']` | List of regular expression patterns matching columns to exclude from the output. |
| `fixed_columns` | `bool` | `True` | When `True` it is assumed that the Dataframe in all messages contain the same columns as the first message received. |

***You'll notice here that we don't have a write to file stage for the DFP pipeline. That is because, in the next section, we will add stages for LLM processing.***

In [12]:
pipeline = LinearPipeline(config)

# Source stage
pipeline.set_source(MultiFileSource(config, filenames=input_files))

# Batch files into batches by time. Use the default ISO date extractor from the filename
pipeline.add_stage(
    DFPFileBatcherStage(config,
                        period="D",
                        date_conversion_func=functools.partial(date_extractor, filename_regex=iso_date_regex)))

# Output is a list of fsspec files. Convert to DataFrames. This caches downloaded data
pipeline.add_stage(
    DFPFileToDataFrameStage(config,
                            schema=source_schema,
                            file_type=FileTypes.JSON,
                            parser_kwargs={
                                "lines": False, "orient": "records"
                            },
                            cache_dir=cache_dir))


# This will split users or just use one single user
pipeline.add_stage(
    DFPSplitUsersStage(config,
                        include_generic=include_generic,
                        include_individual=include_individual,
                        skip_users=skip_users))

# Next, have a stage that will create rolling windows
pipeline.add_stage(
    DFPRollingWindowStage(
        config,
        min_history=300 if is_training else 1,
        min_increment=300 if is_training else 0,
        # For inference, we only ever want 1 day max
        max_history="60d" if is_training else "1d",
        cache_dir=cache_dir))

# Output is UserMessageMeta -- Cached frame set
pipeline.add_stage(DFPPreprocessingStage(config, input_schema=preprocess_schema))

# Perform inference on the preprocessed data
pipeline.add_stage(DFPInferenceStage(config, model_name_formatter=model_name_formatter))

pipeline.add_stage(MonitorStage(config, description="DFP Inference rate", smoothing=0.001))

# Filter for only the anomalous logs
pipeline.add_stage(
            FilterDetectionsStage(config, threshold=6, filter_source=FilterSource.DATAFRAME, field_name='mean_abs_z'))
pipeline.add_stage(DFPPostprocessingStage(config))

# Exclude the columns we don't want in our output
pipeline.add_stage(SerializeStage(config, exclude=['batch_count', 'origin_hash', '_row_hash', '_batch_id']))

# Monitor throughput at the tail-end of the DFP specific portion of the pipeline
pipeline.add_stage(MonitorStage(config, description="DFP Serialization rate", smoothing=0.001))

<monitor-11; MonitorStage(description=DFP Serialization rate, smoothing=0.001, unit=messages, delayed_start=False, determine_count_fn=None, log_level=LogLevels.INFO)>

## LLM-based Triage

Now that we have the pipeline set up to pass through the Digital Fingerprinting piece of the code, the output of our last `MonitorStage` can be sent into a series of stages tha:

1. **Transform the data into a column that can be used in an LLM prompt template.**
2. **Apply a prompt template and make calls to the LLM using the Morpheus `LLMEngine`.**
3. **Collect LLM output and add that to a column of our final output DataFrame.**
4. **Write the output to a file.**

### Exploring a Prompt Template

We will use the following prompt template for the AZURE Active Directory log use case for DFP. Naturally, this will need to change when log sources change to give the LLM more context. 

```
        You are an L1 SOC analyst. Your task is to triage and explain an alert received from an ML workflow that uses an autoencoder to perform anomaly detection, per user, on AZURE AD log telemetry.

        The log contains the following kinds of fields: 

        timestamp: timestamp of the event
        username: username of the user
        appDisplayName: the name of the running app
        category: the type of authentication event
        clientAppUsed: the type of browser/app doing the authentication
        deviceDetailBrowser: specifics about the browser such as versions, etc. 
        deviceDetailDisplayName: displayName of the user if available
        location: location where the activity originated
        statusfailureReason: if the auth event failed, why. 
        event_time: time the event was logged
        logcount: number of authentication logs for this user in this time period
        locincrement: number of distinct locations for that user
        appincrement: number of distinct apps used to authenticate
        <field>_pred: autoencoder output of the feature
        <field>_z_loss: z-score of the standard scaled loss between feature and prediction
        mean_abs_z: mean absolute z-score of reconstruction error across all logs for that user
        max_abs_z: max absolute z-score of reconstruction error across all logs for that user

        Given the feature explanations above, I would like you to use that, and any internal knowledge of cyber you have to triage the following event.

        The event is: 
        ### EVENT ######
        {{event}}
        #### END EVENT ####

        In your output, please keep it concise. Explain which fields were most anomalous, and any cyber-specific context that may be helpful around that that'll speed up the triage that a human will do after you. Split your response into the following sections using it as a templace:
        
        ##Start Report##
        **Triage Overview**
        <provide estimated severity 1/10 scale>
        <provide an overview of the event and likelihood of malicious activity based on your cyber knowledge> 
        **Most Anomalous Fields**
        <list the most anomalous fields and interpret their z-scores>
        **Cyber Triage**
        <cyber-specific content and triage in logical bullet points>
        **Recommendations**
        <recommendations on next steps for human triage and investigation> <DO NOT provide recommendations on changes to posture, policy etc. Just next steps in investigation>
        ##End Report##
        
        Please only return ascii characters, no special characters like sigma or bullet points. Only A-Z,a-z, 0-9, *, and \n
```

Here, the `{{event}}` portion of the template is where a dictionary-like version of the model output per event will be inserted to make the triage request. 

### Pipeline Stages

Now, let us explore the various new Morpheus stages we will chain together to form the latter half of the larger pipeline.

#### String Create Stage (`DFPStringCreateStage`)
This is a custom stage that aggregates the value of every column on the `MessageMeta` dataframe into a single column called `event` that contains a string value of the dictionary representation of all columns as `key:value` pairs. 

The `DFPStringCreateStage` will also remove any non-ascii characters in the content of the strings to prevent errors in Morpheus pipeline logging.

It is a transparent, pass-thru stage that does not require any configurable arugments.

#### Deserialize Stage (`DeserializeStage`)
This stage converts a simple `MessageMeta` object coming from the output of the DFP pipeline before it into a `ControlMessage` with a structure designed for the `LLMEngine`.

| Name | Type | Default | Description |
| --- | --- | --- | :-- |
| `ensure_sliceable_index` | `bool` | True | Whether or not to call `ensure_sliceable_index()` on all incoming `MessageMeta`, which will replace the index of the underlying dataframe if the existing one is not unique and monotonic. |
| `message_type` | `MultiMessage or ControlMessage` | `MultiMessage` | Sets the type of message to be emitted from this stage. |
| `task_type` | `str` | `None` | If specified, adds the specified task to the `ControlMessage`. This parameter is only valid when `message_type` is set to `ControlMessage`. If not `None`, `task_payload` must also be specified. |
| `task_payload` | `dict` | `None` | If specified, adds the specified task to the `ControlMessage`. This parameter is only valid when `message_type`is set to `ControlMessage`. If not `None`, `task_type` must also be specified. |

In our case, the task arguments are set to be `{"task_type": "completion", "task_dict": {"input_keys": ["event"]}}` which performs a completion task applying a prompt template on the `event` column of our dataframe. 

#### Write to File Stage (`WriteToFileStage`)

This final stage will write all received messages to a single output file in either CSV or JSON format.

| Name | Type | Default | Description |
| --- | --- | --- | :-- |
| `filename` | `str` | | The file to write anomalous log messages to. |
| `overwrite` | `bool` | `False` | If the file specified in `filename` already exists, it will be overwritten if this option is set to `True` |

***All the other stages we will see in the pipeline have already been introduced. They just might be initialized with different parameters.***

### Building an `LLMEngine` Stage

An `LLMEngine` stage is a stage that can consist of multiple `nodes` that perform various LLM based tasks. In our case, we will build an `LLMEngine` with the following components

1. `ExtractorNode` which extract the relevent `event` column values from the input dataframe.
2. `PromptTemplateNode` which applies the provided prompt template to the extracted events.
3. `LLMGenerateNode` which makes API calls to the LLM service for completion. 
4. `SimpleTaskHandler` which copies fields from an `LLMContext` to columns in the DataFrame contained in the `ControlMessage` payload.

Behind the scenes, the `LLMEngine` handles concurrent, asynchronous API calls and data-processing to boost performance. 


### Adding Event Summarization

Now that we have the pieces in place to build our first `LLMEngineStage`, let's put it into the pipeline to create a stage that takes multiple anomalous detections for a user, and summarizes it into an incident based on the most anomalous events.

In [13]:
# Load prompt templates from file
templates = json.load(open("dfp/llm/prompt_templates.json", "r"))

first_completion_task = {"task_type": "completion", "task_dict": {"input_keys": ["event"]}}

pipeline.add_stage(DFPStringCreateStage(config))

pipeline.add_stage(
    DeserializeStage(config, message_type=ControlMessage, task_type="llm_engine", task_payload=first_completion_task))

pipeline.add_stage(MonitorStage(config, description="LLM Summary Deserialize rate", unit="req", delayed_start=True))

pipeline.add_stage(LLMEngineStage(config, engine=build_engine_llm_service(prompt_template = templates["incident_summary"], 
                                                                           llm_service="NIM", output_column="incident_summary")))

pipeline.add_stage(MonitorStage(config, description="LLM Summary Inference rate", unit="req", delayed_start=True))

pipeline.add_stage(SerializeStage(config, exclude=['event']))

<serialize-17; SerializeStage(include=None, exclude=['event'], fixed_columns=True)>

## Retrieval Augmented Generation (RAG) for Threat Intelligence Correlation and Recommendations

In this section, we'll aim to use RAG to look up threat intelligence reports stored in a NeMo Retriever collection that are relevant to the anomalous event, and enrich the event with threat intelligence findings and recommendations.


### Using a NIM to Generate a Search Optimized Query

The first part of this process will involve creating an `LLMEngineStage` that generates a search optimized prompt for the Retriever that incorporates the relevant aspects of the anomalous event. We'll use the following prompt template:

```
        You are an L1 SOC analyst. You will be given an incident summary generated from an ML workflow that uses an autoencoder to perform anomaly detection, per user, on AZURE AD log telemetry.

        The summary was generated based on the following kinds of fields: 

        timestamp: timestamp of the event
        username: username of the user
        appDisplayName: the name of the running app
        category: the type of authentication event
        clientAppUsed: the type of browser/app doing the authentication
        deviceDetailBrowser: specifics about the browser such as versions, etc. 
        deviceDetailDisplayName: displayName of the user if available
        location: location where the activity originated
        statusfailureReason: if the auth event failed, why. 
        event_time: time the event was logged
        logcount: number of authentication logs for this user in this time period
        locincrement: number of distinct locations for that user
        appincrement: number of distinct apps used to authenticate
        <field>_pred: autoencoder output of the feature
        <field>_z_loss: z-score of the standard scaled loss between feature and prediction
        mean_abs_z: mean absolute z-score of reconstruction error across all logs for that user
        max_abs_z: max absolute z-score of reconstruction error across all logs for that user


        The event is: 
        ### EVENT ######
        {{incident_summary}}
        #### END EVENT ####

        Your task is to use the incident summary to create an optimized search query which can be used to search collections of Threat Intelligence documents for similar events either by threat actor, threat vector, or other similar characteristics. 
        
        Please only return ascii characters, no special characters like sigma or bullet points. Only A-Z,a-z, 0-9 and '?'. Please only generate the search query, no text before or after it. 
        
        An example of your response could be: "Similar events and recommendatios for <description of anomaly> which could be indicative of <cyber triage summary>
        
        Format your response as:
        
        ##Begin Response##
        <prompt>
        ##End Response##
```


In [14]:
#configure_logging(log_level=logging.INFO)

second_completion_task = {"task_type": "completion", "task_dict": {"input_keys": ["incident_summary"]}}

pipeline.add_stage(
    DeserializeStage(config, message_type=ControlMessage, task_type="llm_engine", task_payload=second_completion_task))

pipeline.add_stage(MonitorStage(config, description="LLM RAG Prompt Deserialize rate", unit="req", delayed_start=True))

#build optimized query
pipeline.add_stage(LLMEngineStage(config, engine=build_engine_llm_service(prompt_template=templates["rag_query"],
    llm_service="NIM", output_column="rag_query")))

pipeline.add_stage(MonitorStage(config, description="LLM RAG Prompt Inference rate", unit="req", delayed_start=True))

pipeline.add_stage(SerializeStage(config, exclude=['event']))

<serialize-22; SerializeStage(include=None, exclude=['event'], fixed_columns=True)>

### Collect RAG Context from the Nemo Retriever 

We'll build a decoupled `LLMEngineStage` that takes the RAG prompt generated in the previous stage and performs inference requests to NeMo retriever to collect context texts for every query.

While building the `LLMEngine` for this stage, we see a new type of node: `RetrieverContextNode`. This node is a node used in an `LLMEngine` to asynchronously interact with an NVIDIA NeMo Retriever deployment to collect relevant text chunks for a given query. It accepts a `RetrieverClient` object initalized with the relevant collection information. 


In [15]:
third_completion_task = {"task_type": "completion", "task_dict": {"input_keys": ["rag_query"]}}

pipeline.add_stage(
    DeserializeStage(config, message_type=ControlMessage, task_type="llm_engine", task_payload=third_completion_task))

pipeline.add_stage(MonitorStage(config, description="NeMo RAG Context Deserialize rate", unit="req", delayed_start=True))

#add threat intelligence
retriever_client_cyber_enrichment = nrc.RetrieverClient()


base_directory = "upload_intel/intel/cyber_enrichment/"
input_files = [f"{i}.txt" for i in range(1, 14)]
output_file = base_directory+"merged.txt"

with open(output_file, 'w') as outfile:
    # Iterate through the list of input files
    for file_name in input_files:
        with open(base_directory+file_name, 'r') as infile:
            content = infile.read()
            outfile.write(content)
            outfile.write('\n')

print(f"Files have been merged into {output_file}")

#load in text file
from langchain_community.document_loaders import TextLoader
loader = TextLoader(output_file)

document = loader.load()
print("text file loaded")


document_chunks = retriever_client_cyber_enrichment.add_files(document)

Files have been merged into upload_intel/intel/cyber_enrichment/merged.txt
text file loaded
Number of chunks from the document: 370


In [16]:
#use optimized query to extract context
pipeline.add_stage(LLMEngineStage(config, engine=build_engine_rag_context(retriever_client_cyber_enrichment)))

<llm-engine-25; LLMEngineStage(engine=<morpheus._lib.llm.LLMEngine object at 0x7f1671f878f0>)>

In [17]:
pipeline.add_stage(MonitorStage(config, description="NeMo RAG Context Inference rate", unit="req", delayed_start=True))

pipeline.add_stage(SerializeStage(config, exclude=['event']))

<serialize-27; SerializeStage(include=None, exclude=['event'], fixed_columns=True)>

### Using RAG Context Alongside The Triaged Event to add a Threat Intelligence Enrichment Section

Now that we've generated an incident summary, and collected threat intelligence reports that most closesly match the incident, we can synthesize the information into a single event summary report. 

In [18]:
configure_logging(log_level=logging.INFO)

fourth_completion_task = {"task_type": "completion", "task_dict": {"input_keys": ["event"]}}

pipeline.add_stage(DFPRAGConcatStage(config))

pipeline.add_stage(
    DeserializeStage(config, message_type=ControlMessage, task_type="llm_engine", task_payload=fourth_completion_task))

pipeline.add_stage(MonitorStage(config, description="LLM RAG Enrich Deserialize rate", unit="req", delayed_start=True))

pipeline.add_stage(LLMEngineStage(config, engine=build_engine_llm_service(prompt_template = templates["enrichment"],
    llm_service="NIM")))

pipeline.add_stage(MonitorStage(config, description="LLM RAG Enrich Inference rate", unit="req", delayed_start=True))

pipeline.add_stage(SerializeStage(config, exclude=['event']))

pipeline.add_stage(DFPRAGUploadStage(config)) #upload a file into /intel/user_summaries

pipeline.add_stage(WriteToFileStage(config, filename="dfp_detections_triaged.csv", overwrite=True))

# Run the pipeline
await pipeline.run_async()

====Pipeline Pre-build====[0m
====Pre-Building Segment: linear_segment_0====[0m
====Pre-Building Segment Complete!====[0m
====Pipeline Pre-build Complete!====[0m
====Registering Pipeline====[0m
====Building Pipeline====[0m
====Building Pipeline Complete!====[0m


                                                    

====Registering Pipeline Complete!====[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

====Starting Pipeline====[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
DFP Serialization rate: 0 messages [00:00, ? messages/s][A
                                                    s/s][A
[A                                                     

====Building Segment: linear_segment_0====[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

Added source: <from-multi-file-0; MultiFileSource(filenames=['../../../../data/dfp/azure-inference-data/AZUREAD_*.json'], watch=False, watch_interval=1.0)>
  └─> fsspec.OpenFiles[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

Added stage: <dfp-file-batcher-1; DFPFileBatcherStage(date_conversion_func=functools.partial(<function date_extractor at 0x7f157b3a8430>, filename_regex=re.compile('(?P<year>\\d{4})-(?P<month>\\d{1,2})-(?P<day>\\d{1,2})(?:T(?P<hour>\\d{1,2})(?::|_|\\.)(?P<minute>\\d{1,2})(?::|_|\\.)(?P<second>\\d{1,2})(?:\\.(?P<microsecond>\\d{0,6}))?)?(?P<zulu>Z)?')), period=D, sampling_rate_s=None, start_time=None, end_time=None, sampling=None)>
  └─ fsspec.OpenFiles -> Tuple[fsspec.core.OpenFiles, int][0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

Added stage: <dfp-file-to-df-2; DFPFileToDataFrameStage(schema=DataFrameInputSchema(json_columns=['properties'], column_info=[DateTimeColumn(name='timestamp', dtype='datetime64[ns]', input_name='time'), RenameColumn(name='username', dtype='str', input_name='properties.userPrincipalName'), RenameColumn(name='appDisplayName', dtype='str', input_name='properties.appDisplayName'), ColumnInfo(name='category', dtype='str'), RenameColumn(name='clientAppUsed', dtype='str', input_name='properties.clientAppUsed'), RenameColumn(name='deviceDetailbrowser', dtype='str', input_name='properties.deviceDetail.browser'), RenameColumn(name='deviceDetaildisplayName', dtype='str', input_name='properties.deviceDetail.displayName'), RenameColumn(name='deviceDetailoperatingSystem', dtype='str', input_name='properties.deviceDetail.operatingSystem'), StringCatColumn(name='location', dtype='str', input_columns=['properties.location.city', 'properties.location.countryOrRegion'], sep=', '), RenameColumn(name='stat

DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

Added stage: <dfp-split-users-3; DFPSplitUsersStage(include_generic=False, include_individual=True, skip_users=[], only_users=None)>
  └─ pandas.DataFrame -> dfp.DFPMessageMeta[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

Added stage: <dfp-rolling-window-4; DFPRollingWindowStage(min_history=1, min_increment=0, max_history=1d, cache_dir=./.cache/dfp)>
  └─ dfp.DFPMessageMeta -> dfp.MultiDFPMessage[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

Added stage: <dfp-preproc-5; DFPPreprocessingStage(input_schema=DataFrameInputSchema(json_columns=[], column_info=[ColumnInfo(name='timestamp', dtype='datetime64[ns]'), ColumnInfo(name='username', dtype='str'), ColumnInfo(name='appDisplayName', dtype='str'), ColumnInfo(name='clientAppUsed', dtype='str'), ColumnInfo(name='deviceDetailbrowser', dtype='str'), ColumnInfo(name='deviceDetaildisplayName', dtype='str'), ColumnInfo(name='deviceDetailoperatingSystem', dtype='str'), ColumnInfo(name='statusfailureReason', dtype='str'), IncrementColumn(name='logcount', dtype='int', input_name='timestamp', groupby_column='username', period='D'), DistinctIncrementColumn(name='locincrement', dtype='int', input_name='location', groupby_column='username', period='D', timestamp_column='timestamp'), DistinctIncrementColumn(name='appincrement', dtype='int', input_name='appDisplayName', groupby_column='username', period='D', timestamp_column='timestamp')], preserve_columns=re.compile('(_batch_id)'), row_fil

DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

Added stage: <dfp-inference-6; DFPInferenceStage(model_name_formatter=DFP-azure-{user_id})>
  └─ dfp.MultiDFPMessage -> morpheus.MultiAEMessage[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

Added stage: <monitor-7; MonitorStage(description=DFP Inference rate, smoothing=0.001, unit=messages, delayed_start=False, determine_count_fn=None, log_level=LogLevels.INFO)>
  └─ morpheus.MultiAEMessage -> morpheus.MultiAEMessage[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

Added stage: <filter-8; FilterDetectionsStage(threshold=6, copy=True, filter_source=FilterSource.DATAFRAME, field_name=mean_abs_z)>
  └─ morpheus.MultiAEMessage -> morpheus.MultiAEMessage[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

Added stage: <dfp-postproc-9; DFPPostprocessingStage()>
  └─ morpheus.MultiAEMessage -> morpheus.MultiAEMessage[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

Added stage: <serialize-10; SerializeStage(include=None, exclude=['batch_count', 'origin_hash', '_row_hash', '_batch_id'], fixed_columns=True)>
  └─ morpheus.MultiAEMessage -> morpheus.MessageMeta[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

Added stage: <monitor-11; MonitorStage(description=DFP Serialization rate, smoothing=0.001, unit=messages, delayed_start=False, determine_count_fn=None, log_level=LogLevels.INFO)>
  └─ morpheus.MessageMeta -> morpheus.MessageMeta[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

Added stage: <dfp-string-create-12; DFPStringCreateStage(top_k=5, grouper=username, sort_key=max_abs_z)>
  └─ morpheus.MessageMeta -> morpheus.MessageMeta[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

Added stage: <deserialize-13; DeserializeStage(ensure_sliceable_index=True, message_type=<class 'morpheus._lib.messages.ControlMessage'>, task_type=llm_engine, task_payload={'task_type': 'completion', 'task_dict': {'input_keys': ['event']}})>
  └─ morpheus.MessageMeta -> morpheus.ControlMessage[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

Added stage: <monitor-14; MonitorStage(description=LLM Summary Deserialize rate, smoothing=0.05, unit=req, delayed_start=True, determine_count_fn=None, log_level=LogLevels.INFO)>
  └─ morpheus.ControlMessage -> morpheus.ControlMessage[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

Added stage: <llm-engine-15; LLMEngineStage(engine=<morpheus._lib.llm.LLMEngine object at 0x7f16bc19d6f0>)>
  └─ morpheus.ControlMessage -> morpheus.ControlMessage[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

Added stage: <monitor-16; MonitorStage(description=LLM Summary Inference rate, smoothing=0.05, unit=req, delayed_start=True, determine_count_fn=None, log_level=LogLevels.INFO)>
  └─ morpheus.ControlMessage -> morpheus.ControlMessage[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

Added stage: <serialize-17; SerializeStage(include=None, exclude=['event'], fixed_columns=True)>
  └─ morpheus.ControlMessage -> morpheus.MessageMeta[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

====Pipeline Started====[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

Added stage: <deserialize-18; DeserializeStage(ensure_sliceable_index=True, message_type=<class 'morpheus._lib.messages.ControlMessage'>, task_type=llm_engine, task_payload={'task_type': 'completion', 'task_dict': {'input_keys': ['incident_summary']}})>
  └─ morpheus.MessageMeta -> morpheus.ControlMessage[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

Added stage: <monitor-19; MonitorStage(description=LLM RAG Prompt Deserialize rate, smoothing=0.05, unit=req, delayed_start=True, determine_count_fn=None, log_level=LogLevels.INFO)>
  └─ morpheus.ControlMessage -> morpheus.ControlMessage[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

Added stage: <llm-engine-20; LLMEngineStage(engine=<morpheus._lib.llm.LLMEngine object at 0x7f157b3f34b0>)>
  └─ morpheus.ControlMessage -> morpheus.ControlMessage[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

Added stage: <monitor-21; MonitorStage(description=LLM RAG Prompt Inference rate, smoothing=0.05, unit=req, delayed_start=True, determine_count_fn=None, log_level=LogLevels.INFO)>
  └─ morpheus.ControlMessage -> morpheus.ControlMessage[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

Added stage: <serialize-22; SerializeStage(include=None, exclude=['event'], fixed_columns=True)>
  └─ morpheus.ControlMessage -> morpheus.MessageMeta[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

Added stage: <deserialize-23; DeserializeStage(ensure_sliceable_index=True, message_type=<class 'morpheus._lib.messages.ControlMessage'>, task_type=llm_engine, task_payload={'task_type': 'completion', 'task_dict': {'input_keys': ['rag_query']}})>
  └─ morpheus.MessageMeta -> morpheus.ControlMessage[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

Added stage: <monitor-24; MonitorStage(description=NeMo RAG Context Deserialize rate, smoothing=0.05, unit=req, delayed_start=True, determine_count_fn=None, log_level=LogLevels.INFO)>
  └─ morpheus.ControlMessage -> morpheus.ControlMessage[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

Added stage: <llm-engine-25; LLMEngineStage(engine=<morpheus._lib.llm.LLMEngine object at 0x7f1671f878f0>)>
  └─ morpheus.ControlMessage -> morpheus.ControlMessage[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

Added stage: <monitor-26; MonitorStage(description=NeMo RAG Context Inference rate, smoothing=0.05, unit=req, delayed_start=True, determine_count_fn=None, log_level=LogLevels.INFO)>
  └─ morpheus.ControlMessage -> morpheus.ControlMessage[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

Added stage: <serialize-27; SerializeStage(include=None, exclude=['event'], fixed_columns=True)>
  └─ morpheus.ControlMessage -> morpheus.MessageMeta[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

Added stage: <dfp-rag-concat-28; DFPRAGConcatStage()>
  └─ morpheus.MessageMeta -> morpheus.MessageMeta[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

Added stage: <deserialize-29; DeserializeStage(ensure_sliceable_index=True, message_type=<class 'morpheus._lib.messages.ControlMessage'>, task_type=llm_engine, task_payload={'task_type': 'completion', 'task_dict': {'input_keys': ['event']}})>
  └─ morpheus.MessageMeta -> morpheus.ControlMessage[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

Added stage: <monitor-30; MonitorStage(description=LLM RAG Enrich Deserialize rate, smoothing=0.05, unit=req, delayed_start=True, determine_count_fn=None, log_level=LogLevels.INFO)>
  └─ morpheus.ControlMessage -> morpheus.ControlMessage[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

Added stage: <llm-engine-31; LLMEngineStage(engine=<morpheus._lib.llm.LLMEngine object at 0x7f157c1365b0>)>
  └─ morpheus.ControlMessage -> morpheus.ControlMessage[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

Added stage: <monitor-32; MonitorStage(description=LLM RAG Enrich Inference rate, smoothing=0.05, unit=req, delayed_start=True, determine_count_fn=None, log_level=LogLevels.INFO)>
  └─ morpheus.ControlMessage -> morpheus.ControlMessage[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

Added stage: <serialize-33; SerializeStage(include=None, exclude=['event'], fixed_columns=True)>
  └─ morpheus.ControlMessage -> morpheus.MessageMeta[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

Added stage: <dfp-rag-upload-34; DFPRAGUploadStage()>
  └─ morpheus.MessageMeta -> morpheus.MessageMeta[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

Added stage: <to-file-35; WriteToFileStage(filename=dfp_detections_triaged.csv, overwrite=True, file_type=FileTypes.Auto, include_index_col=True, flush=False)>
  └─ morpheus.MessageMeta -> morpheus.MessageMeta[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
                                                    s/s][A
[A                                                     

====Building Segment Complete!====[0m


DFP Inference rate: 0 messages [00:00, ? messages/s]
DFP Inference rate: 0 messages [00:01, ? messages/s]s/s][A
DFP Inference rate: 0 messages [00:03, ? messages/s]s/s][A
DFP Inference rate: 0 messages [00:04, ? messages/s]s/s][A
DFP Inference rate: 0 messages [00:05, ? messages/s]s/s][A
  warn(f"Tensorflow dtype mappings did not load successfully due to an error: {exc.msg}")
DFP Inference rate: 0 messages [00:06, ? messages/s]
DFP Inference rate: 0 messages [00:07, ? messages/s]s/s][A
DFP Inference rate: 0 messages [00:08, ? messages/s]s/s][A
DFP Serialization rate: 0 messages [00:08, ? messages/s][A

Downloading artifacts:   0%|          | 0/6 [00:00<?, ?it/s]

DFP Inference rate: 0 messages [00:09, ? messages/s]
DFP Inference rate: 13 messages [00:09,  1.35 messages/s][A
DFP Inference rate: 36 messages [00:11,  3.53 messages/s][A
DFP Inference rate: 53 messages [00:11,  4.50 messages/s][A
DFP Inference rate: 60 messages [00:12,  4.68 messages/s][A
DFP Inference rate: 76 messages [00:13,  5.50 messages/s][A
DFP Inference rate: 87 messages [00:14,  5.86 messages/s][A
DFP Inference rate: 97 messages [00:15,  6.12 messages/s][A
DFP Inference rate: 117 messages [00:16,  6.95 messages/s]A
DFP Inference rate: 120 messages [00:18,  6.68 messages/s]A
DFP Inference rate: 242 messages [00:19, 12.79 messages/s]A
DFP Serialization rate: 83 messages [00:19,  4.35 messages/s][AW20240827 03:42:47.408099   881 meta.cpp:259] Dataframe is not a cudf dataframe, converting to cudf dataframe


LLM Summary Deserialize rate: 0 req [00:00, ? req/s][A[A

DFP Inference rate: 263 messages [00:20, 13.14 messages/s]
DFP Serialization rate: 83 messages [00:20,  4.35 me

Optimized RAG Query:
"Brute force attack or masquerading attempts from multiple locations and apps using anomalous client apps and mismatched display names"


DFP Inference rate[Complete]: 339 messages [00:26, 12.81 messages/s]


LLM Summary Inference rate[Complete]: 1 req [00:00, 155.63 req/s][A[A[A



LLM RAG Prompt Deserialize rate[Complete]: 1 req [00:00, 257.40 req/s][A[A[A[A
DFP Serialization rate[Complete]: 83 messages [00:19,  4.35 messages/s][A

DFP Inference rate[Complete]: 339 messages [00:26, 12.81 messages/s][A[A


LLM Summary Inference rate[Complete]: 1 req [00:00, 155.63 req/s][A[A[A



LLM RAG Prompt Deserialize rate[Complete]: 1 req [00:00, 257.40 req/s][A[A[A[A
DFP Serialization rate[Complete]: 83 messages [00:19,  4.35 messages/s][A

LLM Summary Deserialize rate[Complete]: 1 req [00:00, 117.43 req/s][A[A




LLM RAG Prompt Inference rate[Complete]: 1 req [00:00, 167.44 req/s][A[A[A[A[A





DFP Inference rate[Complete]: 339 messages [00:26, 12.81 messages/s]q/s][A[A[A[A[A[A


LLM Summary Inference rate[Complete]: 1 req [00:00, 155.63 req/s][A[A[A



LLM RAG Prompt Deserialize rate[Complete]

Documents have been written to /workspace/examples/digital_fingerprinting/production/morpheus/workspace/upload_intel/intel/user_summaries/3.txt
====Pipeline Complete====[0m





# Final Generated Report
Below, we can see what a final user summary report looks like for the user attacktarget@domain.com.

In [19]:
import pandas as pd

results = pd.read_csv("dfp_detections_triaged.csv").to_dict(orient='records')

print(results[0]['response'])

##Start Report##

**Event Overview**
Username: attacktarget@domain.com
Time Range: 2022-08-31 23:20:54 - 2022-08-31 23:54:50
Apps: Box, Google Cloud / G Suite Connector by Microsoft, Spike Email - Mail & Team Chat, WeVideo
Devices: ATTACKTARGET-LT, Windows 10, Chrome 100.0.4896

**Triage Overview**
This event is likely indicative of malicious activity. The high number of login attempts from different locations and apps, as well as the discrepancy between predicted and actual values for various fields, suggests anomalous behavior.

**Most Anomalous Fields**
1. logcount: High z-scores (247.99 - 258.22) indicate a significant increase in login attempts, potentially indicating a brute-force attack.
2. appincrement: High z-scores (600.43 - 645.85) suggest an unusual number of apps being used to authenticate, possibly indicating a malicious attempt to access the account.
3. appDisplayName: Mismatch between predicted and actual values (InviteDesk vs. various other apps) indicates potential ma

## RAG Upload to User Summaries Vector Database

Finally, we will upload all of the per-user event summary reports to a new vector database.

In [20]:
retriever_client_user_summaries = nrc.RetrieverClient() #create new retriever client

base_directory_user_summaries = "upload_intel/intel/user_summaries/"
input_files = [f"{i}.txt" for i in range(1, 4)]
output_file = base_directory_user_summaries+"merged.txt"



with open(output_file, 'w') as outfile:
    # Iterate through the list of input files
    for file_name in input_files:
        with open(base_directory_user_summaries+file_name, 'r') as infile:
            content = infile.read()
            outfile.write(content)
            outfile.write('\n')

print(f"Files have been merged into {output_file}")

#load in text file
loader = TextLoader(output_file)

document = loader.load()
print("text file loaded")


document_chunks = retriever_client_user_summaries.add_files(document)

Files have been merged into upload_intel/intel/user_summaries/merged.txt
text file loaded
Number of chunks from the document: 32


## Testing Our RAG Vector Database
Below, we can search using example user queries and see what the vector database might return.

In [21]:
pd.set_option('display.max_colwidth', None)

df1 = pd.DataFrame(retriever_client_user_summaries.search("tell me about attacktarget@domain.com"))
print("Here's what we know about the user attacktarget:")
print(df1[0][0])

df2 = pd.DataFrame(retriever_client_user_summaries.search("tell me about june@domain.com"))
print("\n\n\n\n\n\nHere's what we know about the user June:")
print(df2[0][0])

df3 = pd.DataFrame(retriever_client_user_summaries.search("tell me about daniel@domain.com"))
print("\n\n\n\n\n\nHere's what we know about the user Daniel:")
print(df3[0][0])

Here's what we know about the user attacktarget:
These IOCs should be correlated with the event to determine if they are related to the activity observed in this incident.
##Start Report##

**Event Overview**
Username: attacktarget@domain.com
Time Range: 2022-08-31 23:20:54 - 2022-08-31 23:54:50
Apps: Box, Google Cloud / G Suite Connector by Microsoft, Spike Email - Mail & Team Chat, WeVideo
Devices: ATTACKTARGET-LT, Windows 10, Chrome 100.0.4896






Here's what we know about the user June:
The event appears to be a collection of authentication logs for a single user, june@domain.com, over a long period of time. The logs are from various applications and devices, but a majority of them are from Box and Google Cloud / G Suite Connector by Microsoft. The time range of the event is from August 31, 2022, to May 30, 2024, which is a significant amount of time






Here's what we know about the user Daniel:
The event appears to be a collection of authentication logs for a single user, dan