# Rule Engine DSL with Anomaly Detection 

Refer to [this](https://github.com/MokoSan/FSharpAdvent_2021/blob/main/src/Prototypes/RuleEngineDSL.ipynb) notebook for the prelims.

In [None]:
#r "nuget:Microsoft.Diagnostics.Tracing.TraceEvent"
#r "nuget:XPlot.Plotly"
#r "nuget:XPlot.GoogleCharts"
#r "nuget:Microsoft.ML"
#r "nuget:Microsoft.ML.TimeSeries"

open System;
open System.Linq;
open Microsoft.Diagnostics.Tracing;
open Microsoft.Diagnostics.Tracing.Analysis;
open Microsoft.Diagnostics.Tracing.Etlx;
open Microsoft.Diagnostics.Tracing.Session;
open Microsoft.Diagnostics.Tracing.Parsers.Clr;
open Microsoft.Diagnostics.Tracing.Analysis.GC;
open Microsoft.Diagnostics.Symbols;
open Microsoft.ML;
open Microsoft.ML.Data;
open Microsoft.ML.Transforms.TimeSeries;
open System.Collections.Generic;

## Stuff that Remains The Same

In [None]:
// Conditioner stuff
// Conditioner Event is the name of the type of event.
type ConditionerEvent    = string
// Conditioner Property is the name of the property of the event.
type ConditionerProperty = string

type Conditioner = { ConditionerEvent : ConditionerEvent; ConditionerProperty : ConditionerProperty }

// Action stuff
type ActionOperator = 
    |  Print

type ActionOperand =
    | Alert
    | CallStack

type Action = { ActionOperator: ActionOperator; ActionOperand: ActionOperand }

let parseAction (actionAsAString : string) : Action = 
    let splitAction : string[] = actionAsAString.Split(" ", StringSplitOptions.RemoveEmptyEntries)

    // ActionOperator
    let parseActionOperator : ActionOperator = 
        match splitAction.[0].ToLower() with
        | "print" -> ActionOperator.Print
        | _       -> failwith($"{splitAction.[0]} is an unrecognized Action Operator.")

    // ActionOperand 
    let parseActionOperand : ActionOperand = 
        match splitAction.[1].ToLower() with
        | "alert"     -> ActionOperand.Alert
        | "callstack" -> ActionOperand.CallStack
        | _           -> failwith($"{splitAction.[1]} is an unrecognized Action Operand.")

    { ActionOperator = parseActionOperator; ActionOperand = parseActionOperand }

## Updated Condition + Parser

In [None]:
type ConditionType = 
    | LessThan
    | LessThanEqualTo
    | GreaterThan
    | GreaterThanEqualTo
    | Equal
    | NotEqual
    | IsAnomaly

type AnomalyDetectionType =
    | DetectIIDSpike

type ConditionalValue =
    | Value of double
    | AnomalyDetectionType of AnomalyDetectionType 

type Condition = 
    {  Conditioner      : Conditioner;
       ConditionType    : ConditionType;
       ConditionalValue : ConditionalValue }

let parseCondition (conditionAsString : string) : Condition = 

    let splitCondition : string[] = conditionAsString.Split(" ", StringSplitOptions.RemoveEmptyEntries)
    
    // Precondition check
    if splitCondition.Length <> 3
    then failwith("Incorrect format of the condition. Format is: Event.Property Condition ConditionalValue. For example: GCEnd.SuspensionTimeMSec >= 298")
    
    // Condition Event and Property
    let parseConditioner : Conditioner = 
        let splitConditioner : string[] = splitCondition.[0].Split(".", StringSplitOptions.RemoveEmptyEntries)
        let parseConditionEvent : ConditionerEvent = splitConditioner.[0]
        let parseConditionProperty : ConditionerProperty = splitConditioner.[1]

        { ConditionerEvent = parseConditionEvent; ConditionerProperty = parseConditionProperty }

    // Condition Type
    let parseConditionType : ConditionType =
        match splitCondition.[1].ToLower() with
        | ">"  | "greaterthan"                                 -> ConditionType.GreaterThan 
        | "<"  | "lessthan"                                    -> ConditionType.LessThan
        | ">=" | "greaterthanequalto" | "greaterthanorequalto" -> ConditionType.GreaterThanEqualTo
        | "<=" | "lessthanequalto"    | "lessthanorequalto"    -> ConditionType.LessThanEqualTo
        | "="  | "equal"              | "equals"               -> ConditionType.Equal
        | "!=" | "notequal"                                    -> ConditionType.NotEqual
        | "isanomaly"                                          -> ConditionType.IsAnomaly
        | _                                                    -> failwith("${splitCondition.[1]} is an unrecognized condition type.")

    // Condition Value
    let parseConditionValue : ConditionalValue =
        let conditionalValueAsString = splitCondition.[2].ToLower()
        let checkDouble, doubleValue = Double.TryParse conditionalValueAsString 
        match checkDouble, doubleValue with
        | true, v -> ConditionalValue.Value(v)
        | false, _ -> 
            match conditionalValueAsString with
            | "detectiidspike" -> ConditionalValue.AnomalyDetectionType(AnomalyDetectionType.DetectIIDSpike)
            | _                -> failwith($"{conditionalValueAsString} is an unrecognized anomaly detection type.")
        
    { Conditioner = parseConditioner; ConditionType = parseConditionType; ConditionalValue = parseConditionValue }

In [None]:
type Rule             = { Condition : Condition; Action : Action; OriginalRule : string; Id : Guid }
type RuleApplier      = Rule * TraceEvent -> unit
type ConditionChecker = Rule * TraceEvent -> bool

### Testing

In [None]:
let parseRule (ruleAsString : string) : Rule = 
    let splitRuleAsAString : string[] = ruleAsString.Split(":")
    let condition : Condition = parseCondition splitRuleAsAString.[0]
    let action : Action = parseAction splitRuleAsAString.[1]
    { Condition = condition; Action = action; OriginalRule = ruleAsString; Id = Guid.NewGuid() }

In [None]:
let testRule1 = "GCEnd.SuspensionTimeMSec > 100 : Print CallStack"
let parsedTestRule1 = parseRule testRule1
display(parsedTestRule1)

Condition,Action,OriginalRule,Id
"{ { Conditioner = { ConditionerEvent = ""GCEnd""  ConditionerProperty = ""SuspensionTimeMSec"" }  ConditionType = GreaterThan  ConditionalValue = Value 100.0 }: Conditioner: { { ConditionerEvent = ""GCEnd""  ConditionerProperty = ""SuspensionTimeMSec"" }: ConditionerEvent: GCEnd, ConditionerProperty: SuspensionTimeMSec }, ConditionType: GreaterThan, ConditionalValue: { Value 100.0: Item: 100 } }","{ { ActionOperator = Print  ActionOperand = CallStack }: ActionOperator: Print, ActionOperand: CallStack }",GCEnd.SuspensionTimeMSec > 100 : Print CallStack,dacbb22b-feca-4b8e-83b0-00e55703bebc


In [None]:
let testRule2 = "GCEnd.PauseTimeMSec >= 300 : Print Alert"
let parsedTestRule2 = parseRule testRule2
display(parsedTestRule2)

Condition,Action,OriginalRule,Id
"{ { Conditioner = { ConditionerEvent = ""GCEnd""  ConditionerProperty = ""PauseTimeMSec"" }  ConditionType = GreaterThanEqualTo  ConditionalValue = Value 300.0 }: Conditioner: { { ConditionerEvent = ""GCEnd""  ConditionerProperty = ""PauseTimeMSec"" }: ConditionerEvent: GCEnd, ConditionerProperty: PauseTimeMSec }, ConditionType: GreaterThanEqualTo, ConditionalValue: { Value 300.0: Item: 300 } }","{ { ActionOperator = Print  ActionOperand = Alert }: ActionOperator: Print, ActionOperand: Alert }",GCEnd.PauseTimeMSec >= 300 : Print Alert,d8d25a9d-f779-4444-bedd-b2887435edfb


In [None]:
let testRule3 = "GCEnd.PauseTimeMSec isAnomaly detectIIDSpike : Print Alert"
let parsedTestRule3 = parseRule testRule3
display(parsedTestRule3)

Condition,Action,OriginalRule,Id
"{ { Conditioner = { ConditionerEvent = ""GCEnd""  ConditionerProperty = ""PauseTimeMSec"" }  ConditionType = IsAnomaly  ConditionalValue = AnomalyDetectionType DetectIIDSpike }: Conditioner: { { ConditionerEvent = ""GCEnd""  ConditionerProperty = ""PauseTimeMSec"" }: ConditionerEvent: GCEnd, ConditionerProperty: PauseTimeMSec }, ConditionType: IsAnomaly, ConditionalValue: { AnomalyDetectionType DetectIIDSpike: Item: DetectIIDSpike } }","{ { ActionOperator = Print  ActionOperand = Alert }: ActionOperator: Print, ActionOperand: Alert }",GCEnd.PauseTimeMSec isAnomaly detectIIDSpike : Print Alert,09ee4581-7e95-4410-99c5-6bd15ed78205


### Anomaly Detection: IIDSpike Detection

#### Anomaly Data Service

Based on the rule, for anomaly detection, we need some data points in the past to make a decision about the data.

In [None]:
type AnomalyDetectionOutput =
    {
        Timestamp : double;
        Value     : double;
        IsAnomaly : bool;
        pValue    : double;
    }

// Thread safety??
type AnomalyDetectionService() =
    let cache = Dictionary<Guid, List<double * double>>()

    member this.UpdateData (rule : Rule) (timestamp : double) (value : double) : unit = 
        let ruleId = rule.Id
        let contains, v = cache.TryGetValue ruleId
        if contains then v.Add(timestamp, value)
        else cache.Add(ruleId, List<double * double>())

#### Anomaly Detection Implementation

In [None]:
let ctx : MLContext = MLContext()

type Input() =
    [<DefaultValue>]
    [<LoadColumn(0)>]
    val mutable public timestamp : double 

    [<DefaultValue>]
    [<LoadColumn(1)>]
    val mutable public value : float32

type Prediction() = 
    [<DefaultValue>]
    [<VectorType(3)>] // Prediction + value + p-value
    val mutable public Prediction : double[]

let getAnomalies (rule : Rule) (traceEvent: TraceEvent) (payload : double) : AnomalyDetectionOutput = 
    { Timestamp = traceEvent.TimeStampRelativeMSec; Value = payload; IsAnomaly = true; pValue = 0.}

// double * double -> Timestamp * Value
let getAnomaliesHelper (input : (double * double) seq ) =
    let dataView = 
        ctx
            .Data
            .LoadFromEnumerable<Input>(input |> Seq.map(fun (timestamp, value) -> Input(timestamp = timestamp, value = float32 value) ))
        
    let anomalyPValueHistoryLength = 30
    let anomalyConfidence = 95.

    // https://github.com/dotnet/machinelearning/blob/510f0112d4fbb4d3ee233b9ca95c83fae1f9da91/src/Microsoft.ML.TimeSeries/SequentialAnomalyDetectionTransformBase.cs
    // Steps:
    // 1. Compute raw anomaly score - for this method, it's simply the input value: https://github.com/dotnet/machinelearning/blob/510f0112d4fbb4d3ee233b9ca95c83fae1f9da91/src/Microsoft.ML.TimeSeries/IidAnomalyDetectionBase.cs#L191
    // 2. Compute p-value based on kernel density estimate: 
    //  -> https://github.com/dotnet/machinelearning/blob/510f0112d4fbb4d3ee233b9ca95c83fae1f9da91/src/Microsoft.ML.TimeSeries/SequentialAnomalyDetectionTransformBase.cs#L562
    //  -> https://github.com/dotnet/machinelearning/blob/510f0112d4fbb4d3ee233b9ca95c83fae1f9da91/src/Microsoft.ML.TimeSeries/SequentialAnomalyDetectionTransformBase.cs#L475 
    // If p-value < (1 - confidence / 100.0) -> Alert i.e. anomaly.
    let anomalyPipeline =
        ctx.Transforms.DetectIidSpike(
        outputColumnName = "Prediction",
        inputColumnName = "value",
        side = AnomalySide.TwoSided,
        confidence = anomalyConfidence,  //  Alert Threshold = 1 - options.Confidence / 100;
        pvalueHistoryLength = anomalyPValueHistoryLength)

    // For this model, fitting doesn't matter.
    let trainedAnomalyModel = anomalyPipeline.Fit(ctx.Data.LoadFromEnumerable(List<Input>()))
    let transformedAnomalyData = trainedAnomalyModel.Transform(dataView)
    let anomalies = 
        ctx.Data.CreateEnumerable<Prediction>(transformedAnomalyData, reuseRowObject = false)
    let anomaliesWithTimeStamp : (double * double * double) seq = 
        anomalies
        |> Seq.mapi(fun i p -> p.Prediction.[0], p.Prediction.[1], fst (input.ElementAt(i)))

    anomalies

In [None]:
let applyRule (rule : Rule) (traceEvent : TraceEvent) : unit =

    // Helper fn checks if the condition is met for the traceEvent.
    let checkCondition : bool =
        let condition : Condition = rule.Condition

        // Match the event name.
        let matchEventName (rule : Rule) (traceEvent : TraceEvent): bool = 
            traceEvent.EventName = condition.Conditioner.ConditionerEvent
        
        // Check if the specified payload exists.
        let checkPayload (rule : Rule) (traceEvent : TraceEvent): bool = 
            if traceEvent.PayloadNames.Contains condition.Conditioner.ConditionerProperty then true
            else false

        // Check if the condition matches.
        let checkConditionValue (rule : Rule) (traceEvent : TraceEvent): bool =
            let payload : double   = Double.Parse(traceEvent.PayloadByName(condition.Conditioner.ConditionerProperty).ToString())
            let conditionalValue   : ConditionalValue = rule.Condition.ConditionalValue

            match conditionalValue with
            | ConditionalValue.Value value ->
                match condition.ConditionType with
                | ConditionType.Equal              -> payload = value
                | ConditionType.GreaterThan        -> payload > value
                | ConditionType.GreaterThanEqualTo -> payload >= value
                | ConditionType.LessThan           -> payload < value
                | ConditionType.LessThanEqualTo    -> payload <= value
                | ConditionType.NotEqual           -> payload <> value
            | ConditionalValue.AnomalyDetectionType anomalyDetectionType ->
                match anomalyDetectionType with
                | AnomalyDetectionType.DetectIIDSpike ->
                    false // TODO: Fill This.

        // Match on Event Name, if the payload exists and the condition based on the trace event is met.
        matchEventName rule traceEvent && checkPayload rule traceEvent && checkConditionValue rule traceEvent

    let apply (action : Action): unit = 

        // Helper fn responsible for getting the call stack from a particular trace event.
        let processCallStack (callStack : TraceCallStack) : unit =
            use symbolReader = new SymbolReader(TextWriter.Null, SymbolPath.SymbolPathFromEnvironment)

            let printStackFrame (callStack : TraceCallStack) : unit =
                if not (isNull ( callStack.CodeAddress.ModuleFile))
                then
                    callStack.CodeAddress.CodeAddresses.LookupSymbolsForModule(symbolReader, callStack.CodeAddress.ModuleFile)
                    printfn "%s!%s" callStack.CodeAddress.ModuleName callStack.CodeAddress.FullMethodName

            let rec processFrame (callStack : TraceCallStack) : unit =
                if isNull callStack then ()
                else
                    printStackFrame callStack
                    processFrame callStack.Caller
            
            processFrame (traceEvent.CallStack())

        match action.ActionOperator with
        | ActionOperator.Print ->
            match action.ActionOperand with
            | ActionOperand.Alert -> printfn $"Alert!! {rule.OriginalRule} invoked as payload: {traceEvent.PayloadByName(rule.Condition.Conditioner.ConditionerProperty).ToString()}!"
            | ActionOperand.CallStack -> processCallStack (traceEvent.CallStack())
    
    if checkCondition = true then apply rule.Action
    else ()

In [None]:
let ETL_FILEPATH = @"C:\Users\mukun\OneDrive\Documents\CallstackShmuff.etl\CallstackShmuff.etl" 

let session = new TraceEventSession("TestSession2", ETL_FILEPATH)
let traceLog = TraceLog.OpenOrConvert(ETL_FILEPATH)

let sequenceOfAllocations : TraceEvent seq =
    traceLog.Events
    |> Seq.filter(fun e -> e.ProcessName = "GCRealTimeMon" && e.EventName.Contains("GC/AllocationTick"))
    |> Seq.take 5

In [None]:
let rule1       : string = "GC/AllocationTick.AllocationAmount > 108000: Print Alert"
let parsedRule1 : Rule   = parseRule rule1

let applyRule1 : unit =
    sequenceOfAllocations
    |> Seq.iter(fun e -> applyRule parsedRule1 e)

Alert!! GC/AllocationTick.AllocationAmount > 108000: Print Alert invoked as payload: 117752!
Alert!! GC/AllocationTick.AllocationAmount > 108000: Print Alert invoked as payload: 109016!


In [None]:
let rule2 : string     = "GC/AllocationTick.AllocationAmount isAnomaly detectIIDSpike: Print Alert"
let parsedRule2 : Rule = parseRule rule2

let applyRule2 : unit =
    sequenceOfAllocations
    |> Seq.iter(fun e -> applyRule parsedRule2 e)