In [8]:
import requests
import json
import time
import http.client
from datetime import datetime

# Increase header limit
http.client._MAXHEADERS = 1000

# Configuration
HOST = 'localhost'
PORT = 19200
PROTOCOL = 'http'
AUTH = ('admin', 'OpenSearch@2024')
VERIFY_SSL = False
MODEL_ID = "RLmMzpoBDLC7DRstN6vK"
TARGET_HOST = "patroni1"  # üéØ The specific host we want to index

BASE_URL = f"{PROTOCOL}::{HOST}:{PORT}"

def run_request(method, endpoint, body=None):
    url = f"{PROTOCOL}://{HOST}:{PORT}/{endpoint}"
    headers = {"Content-Type": "application/json"}
    try:
        if method == "GET":
            resp = requests.get(url, auth=AUTH, verify=VERIFY_SSL, json=body, headers=headers)
        elif method == "POST":
            resp = requests.post(url, auth=AUTH, verify=VERIFY_SSL, json=body, headers=headers)
        elif method == "PUT":
            resp = requests.put(url, auth=AUTH, verify=VERIFY_SSL, json=body, headers=headers)
        elif method == "DELETE":
            resp = requests.delete(url, auth=AUTH, verify=VERIFY_SSL, json=body, headers=headers)
        elif method == "HEAD":
            resp = requests.head(url, auth=AUTH, verify=VERIFY_SSL, json=body, headers=headers)
        else:
            print(f"Unsupported method: {method}")
            return None
        return resp
    except Exception as e:
        print(f"Connection Error: {e}")
        return None

print(f"Connecting to {BASE_URL}...")
resp = run_request("GET", "")
if resp and resp.status_code == 200:
    print("‚úÖ Connected to OpenSearch")
else:
    print("‚ùå Failed to connect")

Connecting to http::localhost:19200...
‚úÖ Connected to OpenSearch


In [12]:
# 1. Verify Model Availability
print(f"Checking Model ID: {MODEL_ID}")
resp = run_request("GET", f"_plugins/_ml/models/{MODEL_ID}")
if resp and resp.status_code == 200:
    state = resp.json().get("model_state")
    print(f"Model State: {state}")
    if state != "DEPLOYED":
        print("‚ö†Ô∏è Model is not deployed. Please deploy it using the previous notebook.")
else:
    print("‚ùå Model not found. Please check the ID.")

Checking Model ID: RLmMzpoBDLC7DRstN6vK
Model State: DEPLOYED


In [13]:
# 2. Create Filtering Pipeline
# This pipeline does two things:
# 1. DROPS any document where host.name is NOT 'patroni1'.
# 2. Generates embeddings for the remaining documents.

pipeline_id = "future-host-pipeline"

pipeline_body = {
  "description": f"Filter logs for {TARGET_HOST} and vectorize",
  "processors": [
    {
      "drop": {
        "if": f"ctx.host?.name != '{TARGET_HOST}'"
      }
    },
    {
      "text_embedding": {
        "model_id": MODEL_ID,
        "field_map": {
          "_raw": "message_embedding"
        }
      }
    }
  ]
}

print(f"Creating Pipeline '{pipeline_id}' for host '{TARGET_HOST}'...")
resp = run_request("PUT", f"_ingest/pipeline/{pipeline_id}", pipeline_body)
print(resp.text)

Creating Pipeline 'future-host-pipeline' for host 'patroni1'...
{"acknowledged":true}


In [14]:
# 3. Create Future Index
# We create a new index that uses the pipeline by default.
index_name = "patroni-future-host"

run_request("DELETE", index_name)

index_body = {
  "settings": {
    "index.knn": True,
    "default_pipeline": pipeline_id
  },
  "mappings": {
    "properties": {
      "timestamp": { "type": "date" },
      "host": { 
          "properties": {
              "name": { "type": "keyword" }
          }
      },
      "_raw": { "type": "text" },
      "message_embedding": {
        "type": "knn_vector",
        "dimension": 384,
        "method": {
          "name": "hnsw",
          "engine": "lucene",
          "parameters": { "m": 16, "ef_construction": 128 }
        }
      }
    }
  }
}

print(f"Creating Index '{index_name}'...")
resp = run_request("PUT", index_name, index_body)
print(resp.text)

Creating Index 'patroni-future-host'...
{"acknowledged":true,"shards_acknowledged":true,"index":"patroni-future-host"}
{"acknowledged":true,"shards_acknowledged":true,"index":"patroni-future-host"}


In [15]:
# 4. Simulate Future Log Ingestion
# We will send a mix of logs from 'patroni1' (Target) and 'patroni2' (Ignored).
# Only 'patroni1' logs should be indexed.

logs = [
    # TARGET HOST (patroni1) - Should be indexed
    {
        "timestamp": datetime.now().isoformat(),
        "host": { "name": "patroni1" },
        "_raw": "2025-11-30 10:00:01 FATAL: database system is shutting down due to lack of disk space"
    },
    {
        "timestamp": datetime.now().isoformat(),
        "host": { "name": "patroni1" },
        "_raw": "2025-11-30 10:00:05 INFO: acquired leader lock"
    },
    # IGNORED HOST (patroni2) - Should be dropped
    {
        "timestamp": datetime.now().isoformat(),
        "host": { "name": "patroni2" },
        "_raw": "2025-11-30 10:00:02 INFO: following new leader"
    },
    {
        "timestamp": datetime.now().isoformat(),
        "host": { "name": "patroni2" },
        "_raw": "2025-11-30 10:00:03 WARNING: connection to master lost"
    }
]

print("Simulating ingestion of 4 logs (2 from target, 2 from others)...")

for log in logs:
    # We post directly to the index. The default_pipeline will intercept.
    run_request("POST", f"{index_name}/_doc", log)

# Refresh to make data available
run_request("POST", f"{index_name}/_refresh")
print("Ingestion complete.")

Simulating ingestion of 4 logs (2 from target, 2 from others)...
Ingestion complete.
Ingestion complete.


In [16]:
# 5. Verify Filtering
# We expect exactly 2 documents in the index (only from patroni1).

print("--- Verifying Index Content ---")
resp = run_request("GET", f"{index_name}/_search")
hits = resp.json().get("hits", {}).get("hits", [])

print(f"Total Documents Indexed: {len(hits)}")
for hit in hits:
    src = hit["_source"]
    print(f"Indexed Log from: {src.get('host', {}).get('name')} | Message: {src.get('_raw')}")

if len(hits) == 2:
    print("\n‚úÖ SUCCESS: Only target host logs were indexed.")
else:
    print(f"\n‚ùå FAILURE: Expected 2 documents, found {len(hits)}.")

--- Verifying Index Content ---
Total Documents Indexed: 2
Indexed Log from: patroni1 | Message: 2025-11-30 10:00:01 FATAL: database system is shutting down due to lack of disk space
Indexed Log from: patroni1 | Message: 2025-11-30 10:00:05 INFO: acquired leader lock

‚úÖ SUCCESS: Only target host logs were indexed.


In [17]:
# 6. Run Neural Search on Future Data
# Now we search this new index.

def neural_search(query_text):
    query = {
        "query": {
            "neural": {
                "message_embedding": {
                    "query_text": query_text,
                    "model_id": MODEL_ID,
                    "k": 3
                }
            }
        },
        "_source": ["_raw", "host"]
    }
    
    print(f"\nüîç Query: '{query_text}'")
    resp = run_request("GET", f"{index_name}/_search", query)
    hits = resp.json().get("hits", {}).get("hits", [])
    
    for hit in hits:
        score = hit.get("_score")
        msg = hit.get("_source").get("_raw")
        host = hit.get("_source").get("host", {}).get("name")
        print(f"   [{score:.4f}] [{host}] {msg}")

neural_search("disk space issue")
neural_search("leader election")


üîç Query: 'disk space issue'
   [0.5224] [patroni1] 2025-11-30 10:00:01 FATAL: database system is shutting down due to lack of disk space
   [0.3282] [patroni1] 2025-11-30 10:00:05 INFO: acquired leader lock

üîç Query: 'leader election'
   [0.4185] [patroni1] 2025-11-30 10:00:05 INFO: acquired leader lock
   [0.3499] [patroni1] 2025-11-30 10:00:01 FATAL: database system is shutting down due to lack of disk space
   [0.5224] [patroni1] 2025-11-30 10:00:01 FATAL: database system is shutting down due to lack of disk space
   [0.3282] [patroni1] 2025-11-30 10:00:05 INFO: acquired leader lock

üîç Query: 'leader election'
   [0.4185] [patroni1] 2025-11-30 10:00:05 INFO: acquired leader lock
   [0.3499] [patroni1] 2025-11-30 10:00:01 FATAL: database system is shutting down due to lack of disk space
