### Imports & environment

In [1]:
import os
from dotenv import load_dotenv

import pandas as pd
import oracledb
from elasticsearch import Elasticsearch, helpers

# Load .env from project root
env_loaded = load_dotenv()
print("ENV loaded:", env_loaded)

# ---- Oracle ----
ORACLE_HOST = os.getenv("ORACLE_HOST")
ORACLE_PORT = os.getenv("ORACLE_PORT")
ORACLE_SERVICE = os.getenv("ORACLE_SERVICE")
ORACLE_USER = os.getenv("ORACLE_USER")
ORACLE_PASSWORD = os.getenv("ORACLE_PASSWORD")

# Build DSN
ORACLE_DSN = f"{ORACLE_HOST}:{ORACLE_PORT}/{ORACLE_SERVICE}"

print("Oracle DSN:", ORACLE_DSN)
print("Oracle User:", ORACLE_USER)

# ---- Elasticsearch ----
ES_URL = os.getenv("ES_URL", "http://localhost:9200")
ES_USER = os.getenv("ES_USER", "elastic")
ES_PASS = os.getenv("ES_PASS", os.getenv("ELASTIC_PASSWORD"))

ES_INDEX = os.getenv("ES_INDEX", "oracle_elser_index_v2")
ES_PIPELINE = "elser-oracle-pipeline"

print("ES URL:", ES_URL)
print("ES INDEX:", ES_INDEX)
print("ES PIPELINE:", ES_PIPELINE)

ENV loaded: True
Oracle DSN: localhost:1521/XEPDB1
Oracle User: es_user
ES URL: http://localhost:9200
ES INDEX: oracle_elser_index_v2
ES PIPELINE: elser-oracle-pipeline


### Test Oracle connection + confirm row counts

In [2]:
conn = oracledb.connect(user=ORACLE_USER, password=ORACLE_PASSWORD, dsn=ORACLE_DSN)
print("Oracle connected OK")

with conn.cursor() as cur:
    cur.execute("SELECT COUNT(*) FROM docs")
    total = cur.fetchone()[0]
    print("DOCS row count:", total)

    # quick peek (latest 5)
    cur.execute("""
        SELECT id, title, updated_at
        FROM docs
        ORDER BY updated_at DESC NULLS LAST
        FETCH FIRST 5 ROWS ONLY
    """)
    rows = cur.fetchall()
    print("\nLatest 5 rows (id, title, updated_at):")
    for r in rows:
        print(r)

conn.close()
print("\nOracle connection closed.")

Oracle connected OK
DOCS row count: 5

Latest 5 rows (id, title, updated_at):
('5', '5', datetime.datetime(2025, 2, 5, 0, 0))
('4', '4', datetime.datetime(2025, 2, 1, 0, 0))
('3', '3', datetime.datetime(2025, 1, 22, 0, 0))
('2', '2', datetime.datetime(2025, 1, 15, 0, 0))
('1', '1', datetime.datetime(2025, 1, 10, 0, 0))

Oracle connection closed.


### Pull rows from Oracle and index them into Elasticsearch (with your ingest pipeline)

#### First, confirm what pipelines exist

In [5]:
# STEP 4A: List ingest pipelines and confirm the correct pipeline id

from elasticsearch import Elasticsearch

es = Elasticsearch(
    ES_URL,
    basic_auth=(ES_USER, ES_PASS),
    request_timeout=120
)

pipelines = es.ingest.get_pipeline()
print("Pipelines found:", len(pipelines))
for pid in sorted(pipelines.keys()):
    print("-", pid)


Pipelines found: 7
- behavioral_analytics-events-final_pipeline
- elser_oracle_pipeline
- ent-search-generic-ingestion
- logs-default-pipeline
- logs@default-pipeline
- logs@json-message
- logs@json-pipeline


#### Then, set the correct pipeline id

In [6]:
# STEP 4B: Set correct pipeline id (most likely)

ES_PIPELINE = "elser_oracle_pipeline"
print("Using ES_PIPELINE:", ES_PIPELINE)


Using ES_PIPELINE: elser_oracle_pipeline


In [None]:
# STEP 5: Convert Oracle LOB/CLOB to string before sending to Elasticsearch

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
import oracledb
from datetime import datetime

ES_PIPELINE = "elser_oracle_pipeline"

def lob_to_str(v):
    """Convert Oracle LOB/CLOB to Python str safely."""
    if v is None:
        return ""
    # oracledb LOB objects have .read()
    if hasattr(v, "read"):
        return v.read() or ""
    return str(v)

# --- Connect to ES
es = Elasticsearch(
    ES_URL,
    basic_auth=(ES_USER, ES_PASS),
    request_timeout=120
)

# --- Fetch docs from Oracle (id/title/body/updated_at)
conn = oracledb.connect(user=ORACLE_USER, password=ORACLE_PASSWORD, dsn=ORACLE_DSN)

docs = []
with conn.cursor() as cur:
    cur.execute("""
        SELECT id, title, body, updated_at
        FROM docs
        ORDER BY updated_at DESC NULLS LAST
        FETCH FIRST 500 ROWS ONLY
    """)
    for (doc_id, title, body, updated_at) in cur.fetchall():

        # Normalize updated_at to ISO string
        if isinstance(updated_at, datetime):
            updated_iso = updated_at.isoformat()
        else:
            updated_iso = str(updated_at) if updated_at is not None else None

        docs.append({
            "id": str(doc_id),
            "title": str(title) if title is not None else "",
            "body": lob_to_str(body),               # IMPORTANT FIX
            "updated_at": updated_iso
        })

conn.close()

print("Fetched docs from Oracle:", len(docs))
if docs:
    print("Sample doc types:",
          type(docs[0]["id"]).__name__,
          type(docs[0]["title"]).__name__,
          type(docs[0]["body"]).__name__,
          type(docs[0]["updated_at"]).__name__)
    print("Sample doc preview:", {k: docs[0].get(k) for k in ["id","title","updated_at"]})
    print("Sample body preview:", (docs[0].get("body","")[:200] + "...") if docs[0].get("body") else "(empty)")

# --- Prepare bulk actions
actions = [
    {
        "_op_type": "index",
        "_index": ES_INDEX,
        "_id": d["id"],
        "pipeline": ES_PIPELINE,
        "_source": d
    }
    for d in docs
]

# --- Execute bulk
ok, errors = bulk(es, actions, raise_on_error=False, raise_on_exception=False)
print("Bulk OK:", ok)
print("Bulk Errors:", len(errors))
for e in errors[:5]:
    print(e)

# --- Refresh + count
es.indices.refresh(index=ES_INDEX)
print("ES count after ingest:", es.count(index=ES_INDEX)["count"])


Fetched docs from Oracle: 5
Sample doc types: str str str str
Sample doc preview: {'id': '5', 'title': '5', 'updated_at': '2025-02-05T00:00:00'}
Sample body preview: Truck unloading accident resulting in minor material damage but no injuries....
Bulk OK: 5
Bulk Errors: 0
ES count after ingest: 10


### STEP 6 â€” Verify the ingest pipeline actually ran (ELSER fields exist)

In [None]:
# STEP 6: Verify ELSER enrichment exists on a known incident document

from elasticsearch import Elasticsearch
import json

ES_PIPELINE = "elser_oracle_pipeline"
ELSER_FIELD = "ml.inference.body_expanded"

es = Elasticsearch(
    ES_URL,
    basic_auth=(ES_USER, ES_PASS),
    request_timeout=120
)

doc_id = "5"  # pick one incident id you ingested

resp = es.get(
    index=ES_INDEX,
    id=doc_id,
    _source_includes=["id", "title", "body", "updated_at", ELSER_FIELD, "ml.inference_error"]
)

src = resp.get("_source", {})
print("Found doc:", src.get("id"), "| title:", src.get("title"))
print("Has ELSER field?:", ELSER_FIELD in src)

# Show a small sample of the expanded tokens/weights if present
expanded = src.get("ml", {}).get("inference", {}).get("body_expanded", {})
print("ELSER feature count:", len(expanded))

# Print top 20 features by weight (for sanity check)
top20 = sorted(expanded.items(), key=lambda kv: kv[1], reverse=True)[:20]
print("\nTop 20 ELSER features:")
for k, v in top20:
    print(f"{k:20s} {v}")

# If any inference error was captured, show it
inf_err = src.get("ml", {}).get("inference_error")
if inf_err:
    print("\nINFERENCE ERROR:", inf_err)
