In [11]:
from elasticsearch import Elasticsearch
import pandas as pd
import ast
import numpy as np
import json

/bin/bash: line 1: elsds/bin/activate: No such file or directory


In [12]:
# --------------------------------------------
# Step 1: Connect to Elasticsearch
# --------------------------------------------
es = Elasticsearch(
    ["http://192.168.2.46:9200"],
    # Uncomment and update basic_auth if needed:
    # basic_auth=("elastic", "password"),
    verify_certs=False,  # Adjust for production!
    request_timeout=30,
    max_retries=3,
    retry_on_timeout=True
)

In [13]:
# --------------------------------------------
# Step 2: Query across all indices using wildcard
# --------------------------------------------
index_pattern = "*"  # Pull data from all indices
query_body = {
    "query": {"match_all": {}},
    "sort": [{"@timestamp": {"order": "desc"}}],
    "size": 1000
}
scroll_duration = "2m"  # Duration for which the scroll context is maintained

try:
    page = es.search(index=index_pattern, body=query_body, scroll=scroll_duration)
except Exception as e:
    print(f"Error during search: {e}")
    exit(1)

scroll_id = page.get('_scroll_id')
hits = page['hits']['hits']

In [14]:
# --------------------------------------------
# Step 3: Collect all documents using the scroll API
# --------------------------------------------
documents = []
documents.extend(hits)

while True:
    page = es.scroll(scroll_id=scroll_id, scroll=scroll_duration)
    hits = page['hits']['hits']
    if not hits:
        break
    documents.extend(hits)
    scroll_id = page.get('_scroll_id')

In [15]:
# --------------------------------------------
# Step 4: Build a DataFrame from the documents
# --------------------------------------------
if documents:
    data = []
    for doc in documents:
        row = {}
        row['_index'] = doc.get('_index', '')
        source = doc.get('_source', {})
        row.update(source)
        data.append(row)
    
    df = pd.DataFrame(data)
else:
    print("No documents found.")
    exit(0)

In [16]:
# --------------------------------------------
# Step 5: Define a function to parse JSON-like strings
# --------------------------------------------
def parse_json_value(val):
    """
    Convert a JSON-like string (often with single quotes) into a dictionary.
    If not a string or parsing fails, return the original value.
    """
    if isinstance(val, str):
        val = val.strip()
        if (val.startswith("{") and val.endswith("}")) or (val.startswith("[") and val.endswith("]")):
            try:
                return ast.literal_eval(val)
            except Exception:
                return val  # Return the original string if parsing fails
    return val

In [17]:
# --------------------------------------------
# Step 6: Parse and flatten JSON columns
# --------------------------------------------
json_columns = ['agent', 'ecs', 'host', 'input', 'log']

for col in json_columns:
    if col in df.columns:
        df[col] = df[col].apply(parse_json_value)
        parsed = df[col].apply(lambda x: x if isinstance(x, (dict, list)) else {})
        # If the column contains only dictionaries, flatten it.
        if parsed.apply(lambda x: isinstance(x, dict)).all():
            expanded = pd.json_normalize(parsed)
            expanded.columns = [f"{col}.{subcol}" for subcol in expanded.columns]
            df = pd.concat([df.drop(columns=[col]), expanded], axis=1)
        else:
            # Leave the column as is; later, lists will be converted to strings for normalization.
            df[col] = parsed

In [18]:
# --------------------------------------------
# (Optional) Convert @timestamp to a datetime object
# --------------------------------------------
if '@timestamp' in df.columns:
    df['@timestamp'] = pd.to_datetime(df['@timestamp'], errors='coerce')

In [19]:
# --------------------------------------------
# Step 7: Write the Actual (Flattened) Data to CSV
# --------------------------------------------
actual_csv = "output.csv"
df.to_csv(actual_csv, index=False, encoding="utf-8")
print(f"Actual data exported to {actual_csv}")

Actual data exported to output.csv


In [20]:
###STOP HERE IF NO NORMALIZED DATA NEEDED###
###SEE ELS_norm IPYNB FOR DATA NORMALIZATION PIPELINE###