In [1]:
from pprint import pprint
from elasticsearch  import Elasticsearch, helpers
import json

#Connect to elasticsearch
es = Elasticsearch('http://localhost:9200')
client_info = es.info()
print("Connected to Elasticsearch")
pprint(client_info.body)

Connected to Elasticsearch
{'cluster_name': 'docker-cluster',
 'cluster_uuid': '4Xf3-_8JQxCX6HVz9r0yvA',
 'name': '5a0a9ef6b26a',
 'tagline': 'You Know, for Search',
 'version': {'build_date': '2024-08-05T10:05:34.233336849Z',
             'build_flavor': 'default',
             'build_hash': '1a77947f34deddb41af25e6f0ddb8e830159c179',
             'build_snapshot': False,
             'build_type': 'docker',
             'lucene_version': '9.11.1',
             'minimum_index_compatibility_version': '7.0.0',
             'minimum_wire_compatibility_version': '7.17.0',
             'number': '8.15.0'}}


In [2]:
# Delete old index if it exists
es.indices.delete(index='hdfs', ignore_unavailable=True)

# Create HDFS index
es.indices.create(
    index="hdfs",
    settings={
        "index": {
            "number_of_shards": 3,
            "number_of_replicas": 2
        }
    },
    mappings={
        "properties": {
            # Parsed / normalized timestamps (you'll populate these when ingesting)
            "timestamp_iso": {
                "type": "date",
                "format": "strict_date_time||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS"
            },
            "timestamp_epoch": {"type": "long"},

            # Raw date/time fragments from the HDFS log line
            # Example: "081109 203615 148 INFO dfs.DataNode$PacketResponder: ..."
            "date_raw":   {"type": "keyword"},   # e.g. "081109"
            "time_raw":   {"type": "keyword"},   # e.g. "203615"
            "millis":     {"type": "integer"},   # e.g. 148

            # Log meta info
            "log_level":  {"type": "keyword"},   # INFO / WARN / ERROR ...
            "component":  {"type": "keyword"},   # e.g. "dfs.DataNode$PacketResponder"

            # Message & full line
            "message":    {"type": "text"},      # e.g. "PacketResponder 1 for block blk_... terminating"
            "raw_line":   {"type": "text"},      # full unparsed log line

            # Optional: where the log came from
            "source_path": {"type": "keyword"}   # e.g. "C:\\Users\\nikhi\\loghub\\HDFS\\HDFS_2k.log"
        }
    }
)

print("Index created: hdfs")

Index created: hdfs


In [3]:
# Bulk import NDJSON for the hdfs index
import json
from elasticsearch import helpers

NDJSON_FILE = r"C:\Users\nikhi\loghub\HDFS\hdfs_bulk.ndjson"

def generate_actions():
    with open(NDJSON_FILE, "r", encoding="utf-8") as f:
        lines = f.readlines()

        # NDJSON follows the same pattern:
        # { "index": {} }
        # { actual_document }
        for i in range(0, len(lines), 2):
            _ = json.loads(lines[i])            # index meta (not used)
            doc = json.loads(lines[i + 1])      # actual HDFS log document

            yield {
                "_index": "hdfs",
                "_source": doc
            }

helpers.bulk(es, generate_actions())

print("Bulk upload completed for HDFS!")


Bulk upload completed for HDFS!


In [5]:
resp = es.search(
    index="hdfs",
    query={"match_all": {}},
    size=10
)

resp

ObjectApiResponse({'took': 136, 'timed_out': False, '_shards': {'total': 3, 'successful': 3, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 2000, 'relation': 'eq'}, 'max_score': 1.0, 'hits': [{'_index': 'hdfs', '_id': '0U5jgZoBd5gvbmloCY5C', '_score': 1.0, '_source': {'timestamp_epoch': 1226243287, 'timestamp_iso': '2008-11-09T20:38:07.222000Z', 'date_raw': '081109', 'time_raw': '203807', 'millis': '222', 'level': 'INFO', 'component': 'dfs.DataNode$PacketResponder', 'message': 'PacketResponder 0 for block blk_-6952295868487656571 terminating', 'raw_line': '081109 203807 222 INFO dfs.DataNode$PacketResponder: PacketResponder 0 for block blk_-6952295868487656571 terminating'}}, {'_index': 'hdfs', '_id': '0k5jgZoBd5gvbmloCY5C', '_score': 1.0, '_source': {'timestamp_epoch': 1226243405, 'timestamp_iso': '2008-11-09T20:40:05.035000Z', 'date_raw': '081109', 'time_raw': '204005', 'millis': '35', 'level': 'INFO', 'component': 'dfs.FSNamesystem', 'message': 'BLOCK* NameSystem.addStoredB

In [6]:
resp = es.search(
    index="hdfs",
    query={
        "term": {
            "log_level": "INFO"
        }
    },
    size=10
)

resp


ObjectApiResponse({'took': 4, 'timed_out': False, '_shards': {'total': 3, 'successful': 3, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 0, 'relation': 'eq'}, 'max_score': None, 'hits': []}})