## Univariate anomaly detection functions

Using independent ensembling techniques provide an estimate of if a value with a group is an outlier from the other group values. The techniques used in this notebook are:

- **Mean and standard deviation:**
> - Values that are outside of the mean + 3 times the standard deviation are considered outliers
- **Median and median absolute deviation:**
> - Values that are outside of median + 3 times the median absolute deviation are consideredd outliers
- **Median and IQR ranges**
> - Values beyond 1.5 times the 25th or 75th IQR range

In [10]:
import pandas as pd
import numpy as np
from statsmodels import robust

def detect_univariate_statistical(
        dataframe,
        sensitivity_score,
        max_fractional_anomalies
):
    df_out = df.assign(is_anomaly=False, anomaly_score=0.0)
    return (df_out, [0,0,0], "No ensemble chosen.")

def check_stat(val:float,
               midpoint:float,
               distance:int,
               n:int):
    """
    Check if a given value is within a given range of a 
    midpoint value and a number of increments. If the value is within 
    this range return a percentage else return 1.0 indicating the value 
    is an statistical outlier
    """
    if (abs(val - midpoint) < (n * distance)):
        return abs(val-midpoint) / (n * distance)
    return 1.0

def check_sd(val:float,
             mean:float,
             sd:float,
             min_num_sd:int):
     """
     Check if a given value is a specified number of 
     standard deviations away from the mean
     """
     return check_stat(val, mean, sd, min_num_sd)

def check_mad(val:float,
              median:float,
              mad:float,
              min_num_mad:int):
    """
    Check if a given value is with the range of
    the median absolute value and a specific length or distance
    If the value is within the range return a percentage, else
    return 1.o indicating it is an outlier
    """
    return check_stat(val, median, mad, min_num_mad)

def check_iqr(val:float,
              median:float,
              p25:float,
              p75:float,
              iqr:float,
              min_iqr_diff:float):
    """
    Check if on which side of the median a value exists
    If below the median checks if the value is min_iqr_diff times below the p25 IQR
    if above checks if the value min_iqr_diff times above the p75.
    if the value passes those checks return 1.0 to suggest the value
    is an outlier
    """
    if val < median:
        if val > p25:
             return 0.0
        elif (p25 - val) < (min_iqr_diff * iqr):
             return abs(p25 - val) / (min_iqr_diff * iqr)
        else:
            return 1.0
    else:
        if val < p75:
            return 0.0
        elif (val - p75) < (min_iqr_diff * iqr):
            return abs(val - p75) / (min_iqr_diff * iqr)
        else:
            return 1.0
        

def run_tests(dataframe):
    """
    Pandas dataframe containing univariate data to perform 
    anomaly detection against
    """
    mean = dataframe.value.mean()
    sd = dataframe.value.std(0)
    p25 = np.quantile(dataframe.value, 0.25)
    p75 = np.quantile(dataframe.value, 0.75)
    iqr = p75 - p25
    median = dataframe.value.median()
    mad = robust.mad(dataframe.value)
    calculations = {
        "mean": mean, "sd": sd, "p25": p25,
        "p75": p75, "iqr": iqr, "median": median,
        "mad":mad
    }
    dataframe["sds"] = [check_sd(val, mean, sd, 3.0) for val in dataframe.value]
    dataframe["mads"] = [check_mad(val, median, mad, 3.0) for val in dataframe.value]
    dataframe["iqrs"] = [check_iqr(val, median, p25, p75, iqr, 1.5) for val in dataframe.value]
    
    return (dataframe, calculations)
    
def score_results(
        dataframe,
        weights
):
    """
    Take a dataframe and dictionary of weights
    """
    return dataframe.assign(anomaly_score=(
        dataframe["sds"] * weights["sds"] + 
        dataframe["iqrs"] * weights["iqrs"] +
        dataframe["mads"] * weights["mads"]
    ))

def determine_outliers(
        dataframe,
        sensitivity_score,
        max_fractional_anomalies
):
    sensitivity_score = (100 -  sensitivity_score) / 100.0
    max_fractional_anomaly_score = np.quantile(dataframe.anomaly_score,
                                           1.0 - max_fractional_anomalies)
    if max_fractional_anomaly_score > sensitivity_score and max_fractional_anomalies < 1.0:
        sensitivity_score = max_fractional_anomaly_score
        
    return dataframe.assign(
        is_anomaly=(dataframe.anomaly_score > sensitivity_score)
        )
    

def detect_univariate_statistical(
        dataframe,
        sensitivity_score,
        max_fractional_anomalies
):
    weights = {
        "sds": 0.25,
        "iqrs": 0.35,
        "mads": 0.45
    }
    #print(dataframe)
    if (dataframe.value.count() < 3):
        return (dataframe.assign(is_anomaly=False, anomaly_score=0.0), weights, "Must have minimum of 3 items for anomaly detection")
    elif (max_fractional_anomalies <= 0.0 or max_fractional_anomalies > 1.0):
        return (dataframe.assign(is_anomaly=False, anomaly_score=0.0), weights, "Must have valid max fraction of anomalies, 0 < x <= 1.0")
    elif (sensitivity_score <= 0 or sensitivity_score > 100):
        return (dataframe.assign(is_anomaly=False, anomaly_score=0.0), weights, "Must have valid sensitivity score, 0 < x <= 100.0")
    else:
        df_test, calculations = run_tests(dataframe)
        df_scored = score_results(df_test, weights)
        df_out = determine_outliers(df_scored, sensitivity_score, max_fractional_anomalies)
        return  (df_out, weights, {"message" : "Ensemble of [mean +/- 3*SD, median +/- 3*MAD, median +/- 1.5*IQR],",
                                "calculations": calculations}) 

### Example Run with a Small Dataset

In [4]:
df = pd.DataFrame([1,2,3,4,5,6,7,8,9,0,600, -50], columns=["value"])
detect_univariate_statistical(df, 20, .20)

(    value       sds      mads      iqrs  anomaly_score  is_anomaly
 0       1  0.093037  0.262302  0.090909       0.173113       False
 1       2  0.091122  0.187358  0.000000       0.107092       False
 2       3  0.089207  0.112415  0.000000       0.072888       False
 3       4  0.087292  0.037472  0.000000       0.038685       False
 4       5  0.085377  0.037472  0.000000       0.038207       False
 5       6  0.083462  0.112415  0.000000       0.071452       False
 6       7  0.081547  0.187358  0.000000       0.104698       False
 7       8  0.079632  0.262302  0.090909       0.169762       False
 8       9  0.077717  0.337245  0.212121       0.245432       False
 9       0  0.094952  0.337245  0.212121       0.249741       False
 10    600  1.000000  1.000000  1.000000       1.050000        True
 11    -50  0.190702  1.000000  1.000000       0.847675        True,
 {'sds': 0.25, 'iqrs': 0.35, 'mads': 0.45},
 {'message': 'Ensemble of [mean +/- 3*SD, median +/- 3*MAD, median +/- 

## Bring in a larger dataset of Zeek connection logs

In [11]:
zeek_columns = ['ts', 'uid', 'orig_h','orig_p','resp_h','resp_p',
 'proto','service','duration','orig_bytes','resp_bytes','conn_state',
 'local_orig','local_resp','missed_bytes','history', 'orig_pkts',
 'orig_ip_bytes','resp_pkts','resp_ip_bytes','tunnel_parents']

df = pd.read_csv("reduced_conn.log", sep="\t", skiprows=8, names=zeek_columns, low_memory=False, na_values=["-"])
df.fillna(value = 0, inplace=True)

## Pick a column to run an anomaly test against - using source bytes for this run

In [28]:
obytes = df.orig_bytes.tolist()

In [29]:
results_df, _, calculations = detect_univariate_statistical(
    pd.DataFrame(obytes, columns=["value"]), 80, .10)

In [38]:
results_df.head(40)

Unnamed: 0,value,sds,mads,iqrs,anomaly_score,is_anomaly
0,213.0,0.003056,1.0,1.0,0.800764,True
1,0.0,0.003063,1.0,0.0,0.450766,False
2,63.0,0.003061,1.0,0.287879,0.551523,False
3,40.0,0.003062,1.0,0.0,0.450765,False
4,71.0,0.003061,1.0,0.409091,0.593947,False
5,220.0,0.003056,1.0,1.0,0.800764,True
6,56.0,0.003061,1.0,0.181818,0.514402,False
7,216.0,0.003056,1.0,1.0,0.800764,True
8,236.0,0.003055,1.0,1.0,0.800764,True
9,42.0,0.003062,1.0,0.0,0.450765,False


## Checking the distribution of the outlier vs normal findings from the tests

In [37]:
display(results_df.is_anomaly.value_counts())

perc_outlier = len(results_df.query("is_anomaly == True")) / len(results_df.query("is_anomaly == False"))

print(f"\nPercent found an outlier, based on the sensitivity score and minimum threshold: {perc_outlier}")

is_anomaly
False    269995
True      29997
Name: count, dtype: int64


Percent found an outlier, based on the sensitivity score and minimum threshold: 0.11110205744550825
