# Elasticsearch Data Extraction Notebook

This Jupyter Notebook provides a step-by-step guide for extracting data from an Elasticsearch instance using the Python Elasticsearch library and pandas. The notebook covers the following main tasks:

1. Connecting to Elasticsearch.
2. Retrieving alerts from Elasticsearch based on severity levels.
3. Flattening columns containing nested dictionaries.
4. Extracting specific columns to create flow information.
5. Saving the extracted flow information to a CSV file.

The notebook is designed to be executed in sequence, with each cell representing a specific task in the data extraction process. It includes explanations, code comments, and sample output to help you understand and visualize each step.

**Prerequisites:**
- Python environment with required libraries installed (`pandas`, `elasticsearch`, `elasticsearch-dsl`, `urllib3`).
- Access to an Elasticsearch instance with the necessary credentials.

**Note:**
- Replace the authentication and configuration parameters in the first cell with your own Elasticsearch host, username, password, and desired version.
- Execute each cell in order to perform the corresponding data extraction task.

Feel free to adapt and customize this notebook for your specific use case. Happy data extraction!


# Import Libraries and Configuration


In [4]:
import pandas as pd
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search
import urllib3

# Set authentication and configuration parameters
username = 'jupyter'
password = 'zqjdi;qnsour'
elastic_host = 'https://192.168.43.10:9200'
version = 'import'  # The security onion Architecture used : 'import' or 'standalone'
severity = ['1', '2', '3']  # Select the severity levels of the alerts that will be retrived 
output_csv = './flow_info.csv'  # Output CSV file path


# Connect to Elasticsearch

In [6]:
try:
    es = Elasticsearch([elastic_host],
                       ca_certs=False, verify_certs=False, http_auth=(username, password))
    if not es.ping():
        print("Failed to connect to Elasticsearch!")
        is_connected = False
    else:
        urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
        is_connected = True
except Exception as e:
    print(f"Error occurred while connecting to Elasticsearch: {e}")
    is_connected = False

is_connected




True

# Retrieve Alerts from Elasticsearch

In [7]:
if is_connected:
    search_context = Search(using=es, index='*:so-*', doc_type='doc') \
        .query('query_string', query='event.module:suricata') \
        .filter('terms', **{'rule.severity': severity})

    response = search_context.execute()

    if not response.success():
        print("Failed to retrieve alerts.")
        alerts_df = None
    else:
        alerts_df = pd.DataFrame((d.to_dict() for d in search_context.scan()))
        print(f"Successfully retrieved {len(alerts_df)} alerts.")
else:
    alerts_df = None

alerts_df.head()


Successfully retrieved 244 alerts.


Unnamed: 0,import,log,destination,rule,source,message,tags,network,observer,@timestamp,ecs,destination_geo,imported,host,event,source_geo
0,"{'file': 'eve-2023-07-23-21:48.json', 'id': '6...",{'file': {'path': '/nsm/import/69a0849bf705ba1...,"{'geo': {'continent_name': 'Europe', 'region_i...","{'severity': 1, 'reference': 'https://doc.emer...","{'port': 62179, 'ip': '10.0.19.14'}","{""timestamp"":""2022-03-21T20:58:11.609747+0000""...",[import],{'community_id': '1:611zLtP7N471qhgDX887m6SWaF...,{'name': 'soimport'},2022-03-21T20:58:11.609Z,{'version': '8.0.0'},"{'ip': '188.166.154.118', 'organization_name':...",True,{'name': 'soimport'},"{'severity': 3, 'ingested': '2023-07-23T21:48:...",
1,"{'file': 'eve-2023-07-23-21:48.json', 'id': '6...",{'file': {'path': '/nsm/import/69a0849bf705ba1...,"{'geo': {'continent_name': 'Europe', 'region_i...","{'severity': 1, 'rev': 4, 'metadata': {'perfor...","{'port': 62179, 'ip': '10.0.19.14'}","{""timestamp"":""2022-03-21T20:58:11.609747+0000""...",[import],{'community_id': '1:611zLtP7N471qhgDX887m6SWaF...,{'name': 'soimport'},2022-03-21T20:58:11.609Z,{'version': '8.0.0'},"{'ip': '188.166.154.118', 'organization_name':...",True,{'name': 'soimport'},"{'severity': 3, 'ingested': '2023-07-23T21:48:...",
2,"{'file': 'eve-2023-07-23-21:48.json', 'id': '6...",{'file': {'path': '/nsm/import/69a0849bf705ba1...,"{'port': 62180, 'ip': '10.0.19.14'}","{'severity': 3, 'reference': 'https://doc.emer...","{'geo': {'continent_name': 'North America', 'r...","{""timestamp"":""2022-03-21T20:58:36.186262+0000""...",[import],{'community_id': '1:4KYsDrI30OqBxIWTRyJlef83tS...,{'name': 'soimport'},2022-03-21T20:58:36.186Z,{'version': '8.0.0'},,True,{'name': 'soimport'},"{'severity': 1, 'ingested': '2023-07-23T21:48:...","{'ip': '157.245.142.66', 'organization_name': ..."
3,"{'file': 'eve-2023-07-23-21:48.json', 'id': '6...",{'file': {'path': '/nsm/import/69a0849bf705ba1...,"{'port': 62182, 'ip': '10.0.19.14'}","{'severity': 3, 'reference': 'https://doc.emer...","{'geo': {'continent_name': 'North America', 'r...","{""timestamp"":""2022-03-21T20:59:37.194924+0000""...",[import],{'community_id': '1:yHKr2Akgz5VsxN9sENJvHJzeZl...,{'name': 'soimport'},2022-03-21T20:59:37.194Z,{'version': '8.0.0'},,True,{'name': 'soimport'},"{'severity': 1, 'ingested': '2023-07-23T21:48:...","{'ip': '157.245.142.66', 'organization_name': ..."
4,"{'file': 'eve-2023-07-23-21:48.json', 'id': '6...",{'file': {'path': '/nsm/import/69a0849bf705ba1...,"{'port': 62184, 'ip': '10.0.19.14'}","{'severity': 3, 'reference': 'https://doc.emer...","{'geo': {'continent_name': 'North America', 'r...","{""timestamp"":""2022-03-21T20:59:38.429842+0000""...",[import],{'community_id': '1:p6Z3aHGpVucbdmofeWZwkeGqPB...,{'name': 'soimport'},2022-03-21T20:59:38.429Z,{'version': '8.0.0'},,True,{'name': 'soimport'},"{'severity': 1, 'ingested': '2023-07-23T21:48:...","{'ip': '157.245.142.66', 'organization_name': ..."


# Flatten Columns Containing Dictionaries

In [8]:
if alerts_df is not None:
    dict_cols = [col for col in alerts_df.columns if isinstance(alerts_df[col].iloc[0], dict)]
    for col in dict_cols:
        flattened = pd.json_normalize(alerts_df[col])
        flattened.columns = [f"{col}.{subcol}" for subcol in flattened.columns]
        alerts_df = alerts_df.drop(col, axis=1).join(flattened)

alerts_df.head()


Unnamed: 0,message,tags,@timestamp,imported,source_geo,import.file,import.id,log.offset,log.file.path,log.id.uid,...,destination_geo.network,host.name,event.severity,event.ingested,event.module,event.category,event.dataset,event.severity_label,event.acknowledged,event.escalated
0,"{""timestamp"":""2022-03-21T20:58:11.609747+0000""...",[import],2022-03-21T20:58:11.609Z,True,,eve-2023-07-23-21:48.json,69a0849bf705ba17de1f4d49d0cdd20e,0,/nsm/import/69a0849bf705ba17de1f4d49d0cdd20e/s...,203297560709042,...,188.166.0.0/16,soimport,3,2023-07-23T21:48:19.131Z,suricata,network,alert,high,,
1,"{""timestamp"":""2022-03-21T20:58:11.609747+0000""...",[import],2022-03-21T20:58:11.609Z,True,,eve-2023-07-23-21:48.json,69a0849bf705ba17de1f4d49d0cdd20e,2708,/nsm/import/69a0849bf705ba17de1f4d49d0cdd20e/s...,203297560709042,...,188.166.0.0/16,soimport,3,2023-07-23T21:48:19.131Z,suricata,network,alert,high,,
2,"{""timestamp"":""2022-03-21T20:58:36.186262+0000""...",[import],2022-03-21T20:58:36.186Z,True,"{'ip': '157.245.142.66', 'organization_name': ...",eve-2023-07-23-21:48.json,69a0849bf705ba17de1f4d49d0cdd20e,6888,/nsm/import/69a0849bf705ba17de1f4d49d0cdd20e/s...,1804203672181747,...,,soimport,1,2023-07-23T21:48:19.131Z,suricata,network,alert,low,,
3,"{""timestamp"":""2022-03-21T20:59:37.194924+0000""...",[import],2022-03-21T20:59:37.194Z,True,"{'ip': '157.245.142.66', 'organization_name': ...",eve-2023-07-23-21:48.json,69a0849bf705ba17de1f4d49d0cdd20e,9581,/nsm/import/69a0849bf705ba17de1f4d49d0cdd20e/s...,598413082629892,...,,soimport,1,2023-07-23T21:48:19.132Z,suricata,network,alert,low,,
4,"{""timestamp"":""2022-03-21T20:59:38.429842+0000""...",[import],2022-03-21T20:59:38.429Z,True,"{'ip': '157.245.142.66', 'organization_name': ...",eve-2023-07-23-21:48.json,69a0849bf705ba17de1f4d49d0cdd20e,12270,/nsm/import/69a0849bf705ba17de1f4d49d0cdd20e/s...,1718789661686758,...,,soimport,1,2023-07-23T21:48:19.132Z,suricata,network,alert,low,,


# Extract Flow Information

In [10]:
if version == 'import':
    columns = ['log.id.uid', 'destination.ip', 'destination.port', 'source.port', 'source.ip',
               'network.transport', '@timestamp', 'host.name', 'import.id']
elif version == 'standalone':
    columns = ['log.id.uid', 'destination.ip', 'destination.port', 'source.port', 'source.ip',
               'network.transport', '@timestamp', 'host.name']
else:
    print("version should be either import or standalone")
    columns = None

if alerts_df is not None and columns is not None:
    flow_info = alerts_df[columns]

flow_info.head()


Unnamed: 0,log.id.uid,destination.ip,destination.port,source.port,source.ip,network.transport,@timestamp,host.name,import.id
0,203297560709042,188.166.154.118,80,62179,10.0.19.14,TCP,2022-03-21T20:58:11.609Z,soimport,69a0849bf705ba17de1f4d49d0cdd20e
1,203297560709042,188.166.154.118,80,62179,10.0.19.14,TCP,2022-03-21T20:58:11.609Z,soimport,69a0849bf705ba17de1f4d49d0cdd20e
2,1804203672181747,10.0.19.14,62180,443,157.245.142.66,TCP,2022-03-21T20:58:36.186Z,soimport,69a0849bf705ba17de1f4d49d0cdd20e
3,598413082629892,10.0.19.14,62182,443,157.245.142.66,TCP,2022-03-21T20:59:37.194Z,soimport,69a0849bf705ba17de1f4d49d0cdd20e
4,1718789661686758,10.0.19.14,62184,443,157.245.142.66,TCP,2022-03-21T20:59:38.429Z,soimport,69a0849bf705ba17de1f4d49d0cdd20e


# Save Flow Information to CSV

In [11]:
if flow_info is not None:
    flow_info.to_csv(output_csv, index=False)
    print(f"Flow information saved to {output_csv}")


Flow information saved to ./flow_info.csv
