Simulating the example from x-pack elasticsearch

![image.png](pictures/x-pack-job.json.png)

In [2]:
from dsio.helpers import detect_time
from dsio.main import restream_dataframe
from dsio.anomaly_detectors import Gaussian1D
import pandas as pd


def display_anomalies(dataframe, detector, detectors, influencers, bucket_span="10Min", timefield=None,
        speed=10, es_uri=None, kibana_uri=None, index_name='',
        entry_type='', bokeh_port=5001, cols=3):
    """
    detector: The detector used for the anomalies
    detectors: as used in x-pack (see above image)
    influencers: fields providing extra information
    rest arguments same as restream_dataframe function in dsio.main
    """
    
    # each influencer and partition field should be an existing column
    try:   
        # code taken from normalize_timefield
        # minor changes to keep the original date format for re sampling to buckets
        
        if not timefield: # Try to auto detect timefield
            timefield, unix = detect_time(dataframe)
        
        if not timefield:
            print("No time column specified")
            return
    
        if unix:
            dataframe.index = pd.to_datetime(dataframe[timefield], unit='ms')
        else:
            dataframe.index = pd.to_datetime(dataframe[timefield])

        # replace empty intsances in data from float('nan') to empty string
        # if no substitution is made a value of zero for total attempts will appear
        dataframe.fillna('')

        # groupby requested frequency and partition fields
        grouperFields = []
        grouperFields.append(pd.Grouper(freq=bucket_span))
        for partion_field in detectors['partition_fields']:
            grouperFields.append(partion_field)
            
        # only works for function count
        if (detectors['function'] != 'high_count'):
            print('Function not supported')
            return
        
        if (len(influencers) <= 0):
            grouped_size_df = dataframe.groupby(grouperFields).size()
        else:
            aggregations = {}
            for influencer in influencers:
                aggregations[influencer] = [lambda x : set(x)]
            
            # append to last influencer the count responsibility
            aggregations[influencers[len(influencers) - 1]].append('count')
            
            grouped_size_df = dataframe.groupby(grouperFields).agg(aggregations)

        influencers.append(detectors['function'])
        grouped_size_df.columns = influencers
        
        
        # exclude index_names column if included twice
        index_names = grouped_size_df.index.names
        
        for column_name in index_names:
            if column_name in grouped_size_df:
                grouped_size_df = grouped_size_df.drop(column_name,1)
        
        
        grouped_size_df.reset_index(inplace=True)
        
        # temporary display
        grouped_size_df.to_csv('C:/Users/Sotiris/Desktop/project/data/remove.csv', float_format='%.f')

        # sometimes glitches when running bokeh inside a notebook
        # works fine if write and read from a temp file

        restream_dataframe(
            grouped_size_df, detector,
            sensors=['high_count'],
            speed=speed, es_uri=es_uri,
            kibana_uri=kibana_uri, index_name=index_name,
            entry_type=entry_type, bokeh_port=bokeh_port,
            cols=cols)
        
    except KeyError as e:
        print('Column', e, 'does not exists in dataframe')

In [3]:
from dsio.main import restream_dataframe
from dsio.anomaly_detectors import Gaussian1D
import pandas as pd

dataframe = pd.read_csv('C:/Users/Sotiris/Desktop/project/data/failed_auth_my_log.csv', sep=',')

detector = Gaussian1D


display_anomalies(
            dataframe, detector, {'function' : 'high_count', 'partition_fields': ['hostname']}, 
            influencers = ['hostname','user','ip'],
            speed=50000, es_uri="http://localhost:9200/",
            kibana_uri="http://localhost:5601/app/kibana", index_name="login_failed",
            entry_type="meassurement", bokeh_port=5001,
            cols=2)

data found from 2018-03-27 17:50:00 to 2018-04-20 16:50:00
Converting to milliseconds ...
Done
Adding time offset of -583416.55 seconds
Setting speed to 50000x
Done


PUT http://localhost:9200/.kibana/visualization/login_failed-high_count/_create?refresh=true [status:409 request:0.084s]
PUT http://localhost:9200/.kibana/dashboard/login_failed-dashboard/_create?refresh=true [status:409 request:0.002s]



Writing 27 rows dated 2018-03-20 22:46:23.450000 to 2018-03-20 22:46:26.450000
Deleting existing index login_failed
Creating index login_failed
....
Writing 25 rows dated 2018-03-20 22:46:26.450000 to 2018-03-20 22:46:29.450000
...
Writing 10 rows dated 2018-03-20 22:46:29.450000 to 2018-03-20 22:46:32.450000
..
Writing 5 rows dated 2018-03-20 22:46:32.450000 to 2018-03-20 22:46:35.450000
...
Writing 5 rows dated 2018-03-20 22:46:35.450000 to 2018-03-20 22:46:38.450000
...
Writing 15 rows dated 2018-03-20 22:46:38.450000 to 2018-03-20 22:46:41.450000
...
Writing 6 rows dated 2018-03-20 22:46:41.450000 to 2018-03-20 22:46:44.450000
..
Writing 10 rows dated 2018-03-20 22:46:44.450000 to 2018-03-20 22:46:47.450000
...
Writing 8 rows dated 2018-03-20 22:46:47.450000 to 2018-03-20 22:46:50.450000
...
Writing 9 rows dated 2018-03-20 22:46:50.450000 to 2018-03-20 22:46:53.450000
...
Writing 8 rows dated 2018-03-20 22:46:53.450000 to 2018-03-20 22:46:56.450000
..
Writing 8 rows dated 2018-03-