# Elasticsearch & Warden
This notebook will illustrate how to organize our data in elasticsearch.  

First, have a local version of elasticsearch running.
  
  
Lets connect to our elasticsearch and test the connection.  
Run this cell before using the notebook.

In [2]:
from elasticsearch import Elasticsearch
import json

ES = Elasticsearch()

print('Testing connection...')
if ES.ping():
    print('Success!')
else:
    print('No connection...')

Testing connection...
Success!


## Index creation  
  
The first question we need to answer is *What will we put in our index*? We used to drop whole states of every endpoint in an organization in daily indices. However, the list of pii files contained in the state object is an unbounded list, meaning it could grow to whatever size. This made the state object have an unlimited number of nested objects. A large number of nested objects per document (> 10k) can cause problem in elasticsearch. This limit can be modified but in this interest of robustness, scalability and best practices, we will instead post **individual files** in our index.  
  
We now define the mapping that will represent a file object in our index.  
Each file that was posted as part of a single state will have the same **state** parameter, this will enable us to retrieve the content of a state with ease.


In [5]:
mappings = {
    'dynamic': 'strict',  # Makes it so the index will reject data if it does not respect the mapping
    'properties': {
        'path': {'type': 'text'},
        'score': {'type': 'float'},
        'mime_type': {'type': 'keyword'},
        'hash': {'type': 'keyword'},
        'encrypted': {'type': 'boolean'},
        'timestamp': {'type': 'date'},
        'content': {
            'type': 'nested',
            'properties': {
                'type_name': {'type': 'text'},    # This level represents the content of a pii file
                'type_id': {'type': 'keyword'},   # Type name and id represent the name and id of the corresponding RegExes
                'amount': {'type': 'integer'},
                'correlations': {
                    'type': 'nested',
                    'properties': {
                        'type_name': {'type': 'text'},
                        'type_id': {'type': 'keyword'},              # This level represents the correlation level
                        'correlation': {'type': 'float'}       # This structure allows for correlation on multiple entities, not just names
                        }
                }
            }
        },
        'state': {
            'properties': {
                'timestamp': {'type': 'date'},
                'endpoint_id': {'type': 'keyword'},
                'build_number': {'type': 'keyword'},
            }
        }
    }
}

The settings parameter remains to be fine tuned.

In [6]:
settings = {
    "number_of_shards": 1, 
    "max_inner_result_window": 1000, # Number of top hits we can pull from an aggregation
}

The next questions we need to answer are:

-*When do we create a new index*?


 -- to figure out --


-*How do we format the names of our indices*?  

We need to be able to identify to which organization the new index belongs so we will add an organization identifier in the name. We will also need to know what time period is contained within the index, so we will add a date in it. Elasticsearch allows queries on wildcard strings for indices, so by using a format such as **mondata-{date}**, we can easily fetch all indices for the same organization by querying ** mondata-* **. Also, elasticsearch allows for date maths on queries, so we can easily query for specific time periods.

In [7]:
import time

org_id = 'mondata'
created_at = int(round(time.time()*1000, 0)) # ms since epoch

index_name = f'{org_id}-{created_at}'
request_body = {'mappings': mappings, 'settings': settings}

print(f'Attempting to create index: {index_name}')

response = ES.indices.create(index=index_name, body=request_body)

print('Response: ', response)

index_id = response['index']

Attempting to create index: mondata-1588692159917
Response:  {'acknowledged': True, 'shards_acknowledged': True, 'index': 'mondata-1588692159917'}


The following cells can be used for testing purposes.

In [8]:
index_id = 'mondata-1588692159917' # If the notebook had to be restarted

In [None]:
ES.indices.put_mapping(mappings, index=index_id) # If you need to change the mapping 

In [48]:
# ES.delete(index=index_id, id='PqdMVnEBD8xa8m9A2jYh')
# ES.delete_by_query(index=index_id, body={"query": {"match_all": {}}})
# ES.indices.delete(index=index_id)

{'acknowledged': True}

In [54]:
# ES.search(index=index_id, body={'query': {'match_all': {}}})

{'took': 0,
 'timed_out': False,
 '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0},
 'hits': {'total': {'value': 0, 'relation': 'eq'},
  'max_score': None,
  'hits': []}}

## Uploading data  
  
The process for uploading data is pretty straight forward. We wrote code to generate realistic and scalable test data. This makes it easy to test queries on the model and test with various loads.  
  
Here we generate a **post** object that contains a series of files. This is what we want to upload in our index.  
  
Since our mapping has parameter 'dynamic':'strict', ES will reject our data if it doesnt fit the mapping.

In [13]:
from file import File
from post import Post

post = Post(amount_of_files=6, endpoint_index=0)

# The bulk api requires a particular format [ {action}, {object}, {action}, {object}, ... ]
temp_list = [ [{'index': {'_index': index_id}}, f.json()] for f in post.files ]
body = [val for sublist in temp_list for val in sublist]

ES.bulk(body=body)

{'took': 36,
 'errors': False,
 'items': [{'index': {'_index': 'mondata-1588692159917',
    '_type': '_doc',
    '_id': 'I3vx5XEBaKK3TgPP7Dur',
    '_version': 1,
    'result': 'created',
    '_shards': {'total': 2, 'successful': 1, 'failed': 0},
    '_seq_no': 6,
    '_primary_term': 1,
    'status': 201}},
  {'index': {'_index': 'mondata-1588692159917',
    '_type': '_doc',
    '_id': 'JHvx5XEBaKK3TgPP7Dur',
    '_version': 1,
    'result': 'created',
    '_shards': {'total': 2, 'successful': 1, 'failed': 0},
    '_seq_no': 7,
    '_primary_term': 1,
    'status': 201}},
  {'index': {'_index': 'mondata-1588692159917',
    '_type': '_doc',
    '_id': 'JXvx5XEBaKK3TgPP7Dur',
    '_version': 1,
    'result': 'created',
    '_shards': {'total': 2, 'successful': 1, 'failed': 0},
    '_seq_no': 8,
    '_primary_term': 1,
    'status': 201}},
  {'index': {'_index': 'mondata-1588692159917',
    '_type': '_doc',
    '_id': 'Jnvx5XEBaKK3TgPP7Dur',
    '_version': 1,
    'result': 'created',
  

## Querying an index  
  
Here we detail the process of querying the state of an endpoint at a specific point in time.  
  
We start by querying every file before our reference timestamp that has the right *state.endpoint_id*. Next we run a term aggregation bucket on *state.timestamp* to regroup files that were posted together as part of a single state. We get the top hits and the maximum *score* from this bucket. This represent the most sensitive files in the state and the sensitivity score of the endpoint at that time.  

The *doc_count* parameter in the response is the total number of files that belong in the state that were found in the index. If the *top_hits* aggregation returned less than this number, we have an incomplete state. This is not a bad thing as a state can get quite large. However, the results are sorted by *score* so the first files returned will be the most sensitive ones.

In [12]:
files_as_of = 1986777120284
size = 100

query = {
    'query': {
        'bool': {
            'filter': [
                { "term":  { "state.endpoint_id": 'Nics endpoint' }},
                { "range": { "state.timestamp": { "lte":  files_as_of }}},
            ]
        }
    },
    'size': 0,
    'aggregations': {
        'states': {
            'terms': {'field': 'state.timestamp', 'size': 1, 'order': {'_key': 'desc'} },
            'aggregations': {
                'score': {
                    'max': {'field': 'score'}
                },
                'files': {
                    'top_hits': {
                        'sort': [
                            {"score": { "order": "desc"} },
                        ],
                        'size': size
                    }
                }
            }
        },
    },
}

res = ES.search(index=index_id, body=query)
f = open("query_1.txt", "a")
f.write(json.dumps(res, sort_keys=True, indent=4, separators=[',', ':']))
f.close()

Next we will write a query to get the score histogram of an organization. 

In [45]:
FROM = 0
TO = int(round(time.time()*1000, 0)) + 1000000000000
timezone = '-05:00'
interval = '1d'
number_of_files = 3

query = {
    'query': {
        'bool': {
            'filter': [
                {'range': {'state.timestamp': {'time_zone': timezone, 'gte': FROM, 'lte': TO}}}
            ]
        }
    },
    'size': 0,
    'aggregations': {
        'calendar_slices': {
            'date_histogram': {
                'field': 'state.timestamp',
                'calendar_interval': interval,
                'time_zone': timezone,
            },
            'aggregations': {
                'score': {'max': {'field': 'score'}},
                'most_sensitive_files': {
                    'top_hits': {
                        'sort': [
                            {'score': {'order': 'desc'}}
                        ],
                        'size': number_of_files
                    }
                }
                
            }
        }
    }
}

res = ES.search(index=index_id, body=query)
f = open("query_2.txt", "a")
f.write(json.dumps(res, sort_keys=True, indent=4, separators=[',', ':']))
f.close()

We can do the same for a single endpoint

In [None]:
FROM = 0
TO = int(round(time.time()*1000, 0)) + 1000000000000
timezone = '-05:00'
interval = '1d'
number_of_files = 3
endpoint_id = 'Nics endpoint'

query = {
    'query': {
        'bool': {
            'filter': [
                { "term":  { "state.endpoint_id": endpoint_id }},
                {'range': {'state.timestamp': {'time_zone': timezone, 'gte': FROM, 'lte': TO}}}
            ]
        }
    },
    'size': 0,
    'aggregations': {
        'calendar_slices': {
            'date_histogram': {
                'field': 'state.timestamp',
                'calendar_interval': interval,
                'time_zone': timezone,
            },
            'aggregations': {
                'score': {'max': {'field': 'score'}},
                'most_sensitive_files': {
                    'top_hits': {
                        'sort': [
                            {'score': {'order': 'desc'}}
                        ],
                        'size': number_of_files
                    }
                }
                
            }
        }
    }
}

res = ES.search(index=index_id, body=query)
f = open("query_2.txt", "a")
f.write(json.dumps(res, sort_keys=True, indent=4, separators=[',', ':']))
f.close()