In [1]:
# Load libraries necessary for demo
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from elasticsearch_dsl import Search
import es # This is part of this GitHub Repo
import retention # This is part of this GitHub Repo
import rollover # This is part of this GitHub Repo
import pandas as pd
import toml
import os
import json

In [2]:
# Connect to Elasticsearch
es_connection = Elasticsearch('http://localhost:9200')

In [3]:
# Cleanup previous demo data - optional
# Will throw errors if index does not exist - not a big deal
es_connection.indices.delete(index="demo-nessus*")
es_connection.indices.delete(index="demo-suricata*")

{'acknowledged': True}

In [4]:
# This creates a rollover instance with an index aliases to demo-nessus
es_connection.indices.create(index="demo-nessus-000001", body='{"aliases": { "demo-nessus": { "is_write_index": true }}}')
# Load Nessus sample data - Uses bulk insertion
nessus_data = es.load_json_file('demo/logs/nessus.json')
es.bulk_insert_data_to_es(es_connection, nessus_data, "demo-nessus")

True

In [5]:
# This creates a rollover instance with an index aliases to demo-suricata
# This can take a minute or two to run
es_connection.indices.create(index="demo-suricata-000001", body='{"aliases": { "demo-suricata": { "is_write_index": true }}}')
# Load Suricata sample data - Uses bulk insertion
suricata_data = es.load_json_file('demo/logs/suricata.json')
es.bulk_insert_data_to_es(es_connection, suricata_data, "demo-suricata")

True

In [6]:
# Set our demo search context - will search suricata and nessus data
search_context = Search(using=es_connection, index='demo-*', doc_type='doc')

In [7]:
# Test query data from Suricata
s = search_context.query('query_string', query='src_ip:172.16.0.2 AND dest_port:80')
response = s.execute()
if response.success():
    df = pd.DataFrame((d.to_dict() for d in s.scan()))
df

Unnamed: 0,timestamp,flow_id,event_type,src_ip,src_port,dest_ip,dest_port,proto,app_proto,flow,tcp,in_iface,tx_id,alert,payload,payload_printable,stream,app_proto_ts
0,2017-05-27T21:59:44.005328+0000,103379704739020,flow,172.16.0.2,61555,10.6.66.80,80,TCP,http,"{'pkts_toserver': 8, 'pkts_toclient': 15, 'byt...","{'tcp_flags': 'de', 'tcp_flags_ts': 'de', 'tcp...",,,,,,,
1,2017-05-27T21:59:44.005395+0000,1374363606816801,flow,172.16.0.2,61549,10.6.66.80,80,TCP,http,"{'pkts_toserver': 5, 'pkts_toclient': 4, 'byte...","{'tcp_flags': 'de', 'tcp_flags_ts': 'de', 'tcp...",,,,,,,
2,2017-05-27T21:59:44.005434+0000,1099734807976441,flow,172.16.0.2,61550,10.6.66.80,80,TCP,http,"{'pkts_toserver': 5, 'pkts_toclient': 4, 'byte...","{'tcp_flags': 'de', 'tcp_flags_ts': 'de', 'tcp...",,,,,,,
3,2017-05-27T21:59:44.005468+0000,1100357578216922,flow,172.16.0.2,61544,10.6.66.80,80,TCP,http,"{'pkts_toserver': 9, 'pkts_toclient': 17, 'byt...","{'tcp_flags': 'de', 'tcp_flags_ts': 'de', 'tcp...",,,,,,,
4,2017-05-27T21:59:44.005519+0000,1669221701625458,flow,172.16.0.2,61551,10.6.66.80,80,TCP,http,"{'pkts_toserver': 5, 'pkts_toclient': 4, 'byte...","{'tcp_flags': 'de', 'tcp_flags_ts': 'de', 'tcp...",,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
624,2017-05-27T22:36:51.002670+0000,1680972874891285,flow,172.16.0.2,54420,192.168.0.53,80,TCP,,"{'pkts_toserver': 5, 'pkts_toclient': 0, 'byte...","{'tcp_flags': '02', 'tcp_flags_ts': '02', 'tcp...",,,,,,,
625,2017-05-27T22:37:06.002053+0000,887159843536861,flow,172.16.0.2,17528,192.168.0.54,80,TCP,,"{'pkts_toserver': 1, 'pkts_toclient': 0, 'byte...","{'tcp_flags': '02', 'tcp_flags_ts': '02', 'tcp...",,,,,,,
626,2017-05-27T22:40:44.015055+0000,233884594681590,flow,172.16.0.2,14883,192.168.2.52,80,TCP,,"{'pkts_toserver': 1, 'pkts_toclient': 0, 'byte...","{'tcp_flags': '02', 'tcp_flags_ts': '02', 'tcp...",,,,,,,
627,2017-05-27T22:40:46.000659+0000,871027960742373,flow,172.16.0.2,42290,192.168.0.53,80,TCP,,"{'pkts_toserver': 1, 'pkts_toclient': 0, 'byte...","{'tcp_flags': '02', 'tcp_flags_ts': '02', 'tcp...",,,,,,,


In [8]:
# There will be no output from loading this function
def query_es(index, query):
    search_context = Search(using=es_connection, index=index, doc_type=query)
    s = search_context.query('query_string', query=query)
    response = s.execute()
    if response.success():
        df = pd.DataFrame((d.to_dict() for d in s.scan()))
        return df

In [9]:
# Test the new function
query_es("demo-nessus","Category:Services")

Unnamed: 0,Time,Entry Location,Entry,Enabled,Category,Profile,description,Signer,Company,Image Path,Version,Launch String,MD5,SHA-1,PESHA-1,PESHA-256,SHA-256,IMP
0,20170501-040423,HKLM\System\CurrentControlSet\Services,,,Services,System-wide,,,,,,,,,,,,
1,,HKLM\System\CurrentControlSet\Services,DcomLaunch,enabled,Services,System-wide,The DCOMLAUNCH service launches COM and DCOM s...,,,c:\windows\syswow64\rpcss.dll,,%SystemRoot%\system32\rpcss.dll,,,,,,
2,,HKLM\System\CurrentControlSet\Services,DeviceInstall,enabled,Services,System-wide,Enables a computer to recognize and adapt to h...,,,c:\windows\syswow64\umpnpmgr.dll,,%SystemRoot%\system32\umpnpmgr.dll,,,,,,
3,,HKLM\System\CurrentControlSet\Services,dot3svc,enabled,Services,System-wide,The Wired AutoConfig (DOT3SVC) service is resp...,,,c:\windows\syswow64\dot3svc.dll,,%SystemRoot%\System32\dot3svc.dll,,,,,,
4,,HKLM\System\CurrentControlSet\Services,DsRoleSvc,enabled,Services,System-wide,This service hosts the DS Role Server used for...,,,c:\windows\syswow64\dsrolesrv.dll,,%SystemRoot%\system32\dsrolesrv.dll,,,,,,
5,,HKLM\System\CurrentControlSet\Services,FontCache,enabled,Services,System-wide,Optimizes performance of applications by cachi...,,,c:\windows\syswow64\fntcache.dll,,%SystemRoot%\system32\FntCache.dll,,,,,,
6,,HKLM\System\CurrentControlSet\Services,KtmRm,enabled,Services,System-wide,Coordinates transactions between the Distribut...,,,c:\windows\syswow64\msdtckrm.dll,,%systemroot%\system32\msdtckrm.dll,,,,,,
7,,HKLM\System\CurrentControlSet\Services,MMCSS,enabled,Services,System-wide,Enables relative prioritization of work based ...,,,c:\windows\syswow64\mmcss.dll,,%SystemRoot%\system32\mmcss.dll,,,,,,
8,,HKLM\System\CurrentControlSet\Services,netprofm,enabled,Services,System-wide,Identifies the networks to which the computer ...,,,c:\windows\syswow64\netprofmsvc.dll,,%SystemRoot%\System32\netprofmsvc.dll,,,,,,
9,20170110-193221,HKLM\System\CurrentControlSet\Services,nxlog,enabled,Services,System-wide,This service is responsible for running the nx...,(Verified) NXLog Ltd,NXLog Ltd,c:\program files (x86)\nxlog\nxlog.exe,3.0.1845.0,"C:\Program Files (x86)\nxlog\nxlog.exe"""" -c """"...",DFE4561FB63AA2F16A3E60EE1A08E864,A6428BBE6D1A31313D2108A3A535BBDF26D07618,120B449561199ADC5E2AC24F24C547FE388460B3,AF4397E043F98A712B574944008F2B4B4DDF827EB34B5C...,2772D7403E807FA329DD7200EE31D10582AEFF336A3E0E...,9CE11C92462D6D0B568AA07F902EC3A2


In [10]:
# In case you don't want a DataFrame, this function returns a standard list
# Again, no output for loading the function
def query_es_return_list(index, query):
    search_context = Search(using=es_connection, index=index, doc_type=query)
    s = search_context.query('query_string', query=query)
    response = s.execute()
    if response.success():
        return response['hits']['hits']

In [11]:
# Test the new function - results may be easier for standard Python automation
query_es_return_list("demo-nessus","Category:Services")

[{'_index': 'demo-nessus-000001', '_type': '_doc', '_id': 'rm2C33YBoul7KFycm_-u', '_score': 2.0925374, '_source': {'Time': '20170501-040423', 'Entry Location': 'HKLM\\System\\CurrentControlSet\\Services', 'Entry': '', 'Enabled': '', 'Category': 'Services', 'Profile': 'System-wide', 'description': '', 'Signer': '', 'Company': '', 'Image Path': '', 'Version': '', 'Launch String': '', 'MD5': '', 'SHA-1': '', 'PESHA-1': '', 'PESHA-256': '', 'SHA-256': '', 'IMP': None}}, {'_index': 'demo-nessus-000001', '_type': '_doc', '_id': 'r22C33YBoul7KFycm_-u', '_score': 2.0925374, '_source': {'Time': '', 'Entry Location': 'HKLM\\System\\CurrentControlSet\\Services', 'Entry': 'DcomLaunch', 'Enabled': 'enabled', 'Category': 'Services', 'Profile': 'System-wide', 'description': 'The DCOMLAUNCH service launches COM and DCOM servers in response to object activation requests. If this service is stopped or disabled, programs using COM or DCOM will not function properly. It is strongly recommended that you ha

In [12]:
# Get all indices in Elasticsearch Cluster
es_connection.cat.indices(format="json")

[{'health': 'yellow',
  'status': 'open',
  'index': 'demo-suricata-000001',
  'uuid': '4W-cqiMSTz6EjGM4JHn1kg',
  'pri': '1',
  'rep': '1',
  'docs.count': '10000',
  'docs.deleted': '0',
  'store.size': '2.3mb',
  'pri.store.size': '2.3mb'},
 {'health': 'green',
  'status': 'open',
  'index': '.kibana_1',
  'uuid': '-3a5GbnES2e0XHc1_iFQCw',
  'pri': '1',
  'rep': '0',
  'docs.count': '2',
  'docs.deleted': '0',
  'store.size': '14.6kb',
  'pri.store.size': '14.6kb'},
 {'health': 'yellow',
  'status': 'open',
  'index': 'demo-nessus-000001',
  'uuid': 'lr85XylgQ5mK5lv3kvG04Q',
  'pri': '1',
  'rep': '1',
  'docs.count': '286',
  'docs.deleted': '0',
  'store.size': '266.6kb',
  'pri.store.size': '266.6kb'}]

In [13]:
# Get all indices in Elasticsearch Cluster but with specific fields and format bytes to raw number
# Also set full field names instead of short and sort by creation.date
es_connection.cat.indices(format="json", h=("health","status","index","uuid","shardsPrimary","shardsReplica","docsCount","docsDeleted","storeSize","creation.date.string","creation.date","memory.total", "pri.store.size"), s="creation.date", bytes="b")

[{'health': 'green',
  'status': 'open',
  'index': '.kibana_1',
  'uuid': '-3a5GbnES2e0XHc1_iFQCw',
  'shardsPrimary': '1',
  'shardsReplica': '0',
  'docsCount': '2',
  'docsDeleted': '0',
  'storeSize': '15026',
  'creation.date.string': '2021-01-07T22:03:02.754Z',
  'creation.date': '1610056982754',
  'memory.total': '3112',
  'pri.store.size': '15026'},
 {'health': 'yellow',
  'status': 'open',
  'index': 'demo-nessus-000001',
  'uuid': 'lr85XylgQ5mK5lv3kvG04Q',
  'shardsPrimary': '1',
  'shardsReplica': '1',
  'docsCount': '286',
  'docsDeleted': '0',
  'storeSize': '273090',
  'creation.date.string': '2021-01-08T01:00:47.036Z',
  'creation.date': '1610067647036',
  'memory.total': '10068',
  'pri.store.size': '273090'},
 {'health': 'yellow',
  'status': 'open',
  'index': 'demo-suricata-000001',
  'uuid': '4W-cqiMSTz6EjGM4JHn1kg',
  'shardsPrimary': '1',
  'shardsReplica': '1',
  'docsCount': '10000',
  'docsDeleted': '0',
  'storeSize': '2492877',
  'creation.date.string': '202

In [14]:
# Find out what the newest document creation date is of a given index
date = es.get_newest_document_date_in_index("", "demo-suricata", es_connection)
date.isoformat()

'2021-01-08T01:00:50.463000'

In [15]:
# Forcemerge an index - Will timeout on normal-to-large indices
# When this times out it continues to run in the background
es_connection.indices.forcemerge(index='demo-suricata', max_num_segments=1)

{'_shards': {'total': 2, 'successful': 1, 'failed': 0}}

In [8]:
# Check cluster health
es_connection.cluster.health()

{'cluster_name': 'demo',
 'status': 'yellow',
 'timed_out': False,
 'number_of_nodes': 1,
 'number_of_data_nodes': 1,
 'active_primary_shards': 3,
 'active_shards': 3,
 'relocating_shards': 0,
 'initializing_shards': 0,
 'unassigned_shards': 2,
 'delayed_unassigned_shards': 0,
 'number_of_pending_tasks': 0,
 'number_of_in_flight_fetch': 0,
 'task_max_waiting_in_queue_millis': 0,
 'active_shards_percent_as_number': 60.0}

In [16]:
# Load sample settings.toml and override a few settings for demo use
settings = toml.load("settings.toml.example")
settings['settings']['client_json_folder'] = os.getcwd()
settings['retention']['enabled'] = True
settings['rollover']['enabled'] = True
print(settings)

{'settings': {'client_json_folder': '/opt/elastic-ilm', 'debug': False, 'limit_to_client': '', 'password_authentication': False, 'ssl_enabled': False, 'ssl_certificate': 'disabled', 'check_hostname': False}, 'notification': {'smtp': 'disabled', 'ms-teams': 'disabled', 'jira': 'disabled'}, 'smtp': {'from_email': 'from@domain.com', 'username': 'user@domain.com', 'password': '', 'smtp_host': 'smtp.office365.com', 'smtp_port': 587}, 'ms-teams': {'webhook': ''}, 'retention': {'enabled': True, 'minutes_between_run': 60, 'health_check_level': 'yellow', 'ms-teams': True, 'jira': False}, 'rollover': {'enabled': True, 'minutes_between_run': 10, 'health_check_level': 'yellow', 'ms-teams': True, 'jira': False}, 'accounting': {'enabled': False, 'minutes_between_run': 240, 'output_folder': '', 'output_to_es': True, 'send_copy_to_client_name': '', 'health_check_level': 'green', 'retry_attempts': 10, 'retry_wait_in_seconds': 1200, 'fallback_health_check_level': 'yellow', 'ssd_cost': 0.03, 'sata_cost':

In [17]:
# Load sample client.json.example and override a few settings for demo use
with open('client.json.example') as f:
    client_config = json.load(f)
client_config['es_host'] = "localhost"
client_config['policy']['retention']['demo'] = 45
print(client_config)

{'client_name': 'demo', 'ca_file': '/home/jhenderson/elastic/ca.crt', 'es_host': 'localhost', 'es_port': 9200, 'ssl_enabled': False, 'ssl_certificate': 'required', 'check_hostname': False, 'password_authentication': False, 'es_user': 'elastic', 'es_password': 'password', 'policy': {'allocation': {'global': 30, '.monitoring': 7}, 'rollover': {'global': {'size': 'auto', 'days': 30}, 'demo-nessus': {'size': 0, 'days': 30}}, 'retention': {'global': 90, '.monitoring': 7, 'demo-suricata': 0, 'demo': 45}}}


In [19]:
# Let's validate the retention policy is applying correcly against an index starting with demo
retention_policies = es.get_retention_policy(client_config)
index_policy = es.check_index_retention_policy("demo-nessus-000001", retention_policies)
# This should print demo if it grabbed the retention policy starting with demo
print(index_policy)

demo


In [21]:
# Let's simulate having a policy apply for demo* but a more specific one winning for demo-suricata
client_config['policy']['retention']['demo-suricata'] = 40
index_policy = es.check_index_retention_policy("demo-suricata-000001", retention_policies)
# This should print demo-suricata if it grabbed the retention policy starting with demo-suricata
print(index_policy)

demo-suricata


In [22]:
# Same concept applies for rollover policies
rollover_policies = rollover.get_rollover_policy(client_config)
index_policy = es.check_index_rollover_policy("demo-suricata-000001", rollover_policies)
# This should print global because no rollover policy was created for demo
print(index_policy)

global


In [23]:
# Set rollover policy for demo and then test again
client_config['policy']['rollover']['demo'] = {}
client_config['policy']['rollover']['demo']['size'] = "auto"
client_config['policy']['rollover']['demo']['days'] = 30
index_policy = es.check_index_rollover_policy("demo-suricata-000001", rollover_policies)
# This should print demo because there now is a policy applying against indices starting with demo
print(index_policy)

demo


In [24]:
# Policy is used to validate if something should be deleted, rolled over, etc.
# Let's test a force rollover
es.rollover_index_with_connection(client_config, "demo-suricata-000001", 'demo-suricata', es_connection)

{'acknowledged': True}

In [25]:
# Check to see if demo-nessus index was rolled over - if it did it should now be demo-suricata-000002
es_connection.cat.aliases('demo-suricata', format="json", h=("index","is_write_index"))

[{'index': 'demo-suricata-000001', 'is_write_index': 'false'},
 {'index': 'demo-suricata-000002', 'is_write_index': 'true'}]

In [28]:
# THIS SECTION ONLY WORKS IF YOU COPY settings.toml.example TO settings.toml
# AND client.json.example to client.json and make them able to connect to your demo cluster
# Should output processing rollovers and retention but show no actions taken
# This is because the demo data imports with today's date and the policies are > 1 day
# 
# The default client.json sets the rollver policy for nessus to 0 GB. Thus it will rollvoer
# everytime you run apply_rollover_policies
# You can change the value and retest as you see fit
rollover.apply_rollover_policies("")

Processing rollovers for demo
Client demo has a healthy cluster (yellow)
Adding index demo-nessus-000003 to rollover due to Size Policy
Rollover successful for demo-nessus-000003
'Elasticsearch' object has no attribute 'get_index_operation_message'


In [30]:
# THIS SECTION ONLY WORKS IF YOU COPY settings.toml.example TO settings.toml
# AND client.json.example to client.json and make them able to connect to your demo cluster
# Should output processing rollovers and retention but show no actions taken
# This is because the demo data imports with today's date and the policies are > 1 day
# 
# The default client.json sets the retention policy for demo-suricata to 0 days. Thus it will delete
# all demo-suricata indices anytime you run apply_retention_policies
# You can change the value and retest as you see fit
retention.apply_retention_policies("yellow", "")

Processing retention for demo
Client demo has a healthy cluster (yellow)
