### ELSER v2 + BM25 Search — Single-Notebook Edition ES 9


### 1) Install dependencies

In [None]:
%pip install "elasticsearch>=9.1,<9.2" pandas openpyxl python-dateutil

### 2) Class definition (BertDescriptionElser)

In [1]:
# UPDATED: ES 9.x endpoint-based ELSER (no Kibana, no old text_expansion processor)
from __future__ import annotations
from datetime import datetime
from pathlib import Path
from typing import Dict, Any, Iterable, Optional, List, Sequence, Union
import json
import pandas as pd
from dateutil import parser as dtparser
from elasticsearch import Elasticsearch, helpers
try:
    from elastic_transport import ApiError
except Exception:
    try:
        from elasticsearch import ApiError  # type: ignore
    except Exception:
        class ApiError(Exception):
            pass
try:
    from elasticsearch.helpers import BulkIndexError
except Exception:
    class BulkIndexError(Exception):
        pass

def _coerce_str(v) -> Optional[str]:
    if v is None:
        return None
    s = str(v).strip()
    return s if s else None

def to_iso(v) -> Optional[str]:
    if pd.isna(v):
        return None
    try:
        if isinstance(v, datetime):
            return v.isoformat()
        return dtparser.parse(str(v)).isoformat()
    except Exception:
        return None

class BertDescriptionElser:
    """
    ES 9.x implementation:
      - Stores doc text in <description_col>
      - Stores ELSER sparse vector tokens in ml.description_tokens (type=sparse_vector)
      - At query time, uses sparse_vector query with inference_id=<endpoint_id> (hybrid with BM25)
    """
    def __init__(
        self,
        es_url: str = "http://localhost:9200",
        es_user: str = "elastic",
        es_pass: str = "changeme",
        index_name: str = "chat_elser_description_only",
        endpoint_id: str = "elser-local",       # <-- NEW: inference endpoint
        description_col: str = "Description",
        request_timeout: int = 120,
        use_ml: bool = True,                   # toggle ELSER usage
    ) -> None:
        self.es = Elasticsearch(
            es_url,
            basic_auth=(es_user, es_pass),
            request_timeout=request_timeout,
            verify_certs=False,
        )
        self.index_name = index_name
        self.endpoint_id = endpoint_id
        self.description_col = description_col
        self.use_ml_requested = use_ml
        self.es_url = es_url  # for direct HTTP calls if needed

    # ---------- Index + Mapping ----------
    def ensure_index(self) -> None:
        """
        Creates mapping with sparse_vector at ml.description_tokens.
        Safe to call multiple times.
        """
        props: Dict[str, Any] = {
            self.description_col: {"type": "text"},
            "timestamp": {"type": "date", "ignore_malformed": True},
            "ml": {
                "properties": {
                    "description_tokens": {"type": "sparse_vector"}  # <-- ES 9.x sparse vector
                }
            }
        }
        body = {"mappings": {"properties": props}}
        if self.es.indices.exists(index=self.index_name):
            self.es.indices.put_mapping(index=self.index_name, properties=props)
        else:
            self.es.indices.create(index=self.index_name, **body)

    # ---------- Inference (ELSER) ----------
    def _infer_sparse(self, text: str) -> Dict[str, float]:
        """
        Calls POST /_inference/sparse_embedding/{endpoint_id} with { "input": text }
        Returns token->weight dict suitable for sparse_vector field.
        """
        # use elasticsearch client’s transport to POST arbitrary path
        path = f"/_inference/sparse_embedding/{self.endpoint_id}"
        resp = self.es.perform_request(
            method="POST",
            path=path,
            body={"input": text},
        )
        # shape: { "sparse_embedding": [ { "is_truncated": bool, "embedding": {token: weight, ...} } ] }
        emb = resp["sparse_embedding"][0]["embedding"]
        return emb

    # ---------- Data prep ----------
    def _sanitize_dataframe(self, df: pd.DataFrame) -> pd.DataFrame:
        if self.description_col not in df.columns:
            raise ValueError(
                f"Required column '{self.description_col}' not found. Available columns: {list(df.columns)}"
            )
        s = df[self.description_col].astype(str).map(lambda x: x.strip())
        df = df.copy()
        df[self.description_col] = s
        df = df[df[self.description_col].astype(bool)]
        if df.empty:
            raise ValueError(
                f"All rows are empty in '{self.description_col}'. Provide non-empty text."
            )
        return df

    # ---------- Bulk index ----------
    def _iter_actions(self, df: pd.DataFrame, id_field: Optional[str]) -> Iterable[Dict[str, Any]]:
        for _, row in df.iterrows():
            doc: Dict[str, Any] = {}
            for c in df.columns:
                val = row[c]
                if pd.isna(val):
                    continue
                doc[c] = val

            # set timestamp if found
            for cand in ("created_dttm", "created_at", "timestamp", "time", "date"):
                if cand in df.columns and not pd.isna(row.get(cand)):
                    iso = to_iso(row[cand])
                    if iso:
                        doc["timestamp"] = iso
                        break

            # add ELSER sparse vector
            if self.use_ml_requested:
                try:
                    tokens = self._infer_sparse(str(doc[self.description_col]))
                    doc.setdefault("ml", {})["description_tokens"] = tokens
                except Exception as e:
                    # if inference fails, index without tokens
                    pass

            action = {"_op_type": "index", "_index": self.index_name, "_source": doc}
            if id_field and id_field in row and pd.notna(row[id_field]):
                action["_id"] = str(row[id_field])
            yield action

    def bulk_index_dataframe(self, df: pd.DataFrame, id_field: Optional[str] = None, chunk_size: int = 300) -> None:
        df = self._sanitize_dataframe(df)
        try:
            helpers.bulk(
                self.es,
                self._iter_actions(df, id_field),
                chunk_size=chunk_size,
                refresh="wait_for",
            )
        except BulkIndexError as bie:
            errors = getattr(bie, "errors", [])
            preview = errors[:3]
            raise RuntimeError(
                f"Bulk indexing failed for {len(errors)} documents. First errors: {preview}"
            ) from bie

    def bulk_index_file(self, csv_or_xlsx: Union[str, Path], id_field: Optional[str] = None) -> None:
        p = Path(csv_or_xlsx)
        if not p.exists():
            raise FileNotFoundError(p)
        if p.suffix.lower() == ".csv":
            df = pd.read_csv(p)
        elif p.suffix.lower() in (".xlsx", ".xls"):
            df = pd.read_excel(p, engine="openpyxl")
        else:
            raise ValueError("Only .csv, .xlsx, or .xls are supported")
        self.bulk_index_dataframe(df, id_field=id_field)

    # ---------- Query ----------
    def _build_body(self, question: str, size: int, include_elser: bool, fields_to_return: Optional[Sequence[str]]) -> Dict[str, Any]:
        should: List[Dict[str, Any]] = []
        # BM25
        should.append({"match": {self.description_col: {"query": question, "boost": 0.6}}})
        # ELSER sparse query (endpoint at query time)
        if include_elser and self.use_ml_requested:
            should.append({
                "sparse_vector": {
                    "field": "ml.description_tokens",
                    "inference_id": self.endpoint_id,
                    "query": question
                }
            })
        body: Dict[str, Any] = {"size": size, "query": {"bool": {"should": should, "minimum_should_match": 1}}}
        if fields_to_return:
            body["_source"] = list(fields_to_return)
        return body

    def semantic_search(self, question: str, size: int = 10, hybrid: bool = True, fields_to_return: Optional[Sequence[str]] = None) -> pd.DataFrame:
        if not _coerce_str(question):
            raise ValueError("Provide a non-empty search question.")
        try:
            body = self._build_body(question, size, include_elser=hybrid, fields_to_return=fields_to_return)
            res = self.es.search(index=self.index_name, body=body)
        except ApiError:
            # fallback BM25-only
            body = self._build_body(question, size, include_elser=False, fields_to_return=fields_to_return)
            res = self.es.search(index=self.index_name, body=body)
        rows: List[Dict[str, Any]] = []
        for h in res.get("hits", {}).get("hits", []):
            src = h.get("_source", {})
            rows.append({"_score": h.get("_score", 0.0), **src})
        return pd.DataFrame(rows)

### 3) Helper functions

In [2]:
# UPDATED: no ingest pipeline; we index tokens client-side
import pandas as pd
from pathlib import Path

def ensure_indexed(pipe: BertDescriptionElser, file_path: str, reindex: bool) -> None:
    count = 0
    if pipe.es.indices.exists(index=pipe.index_name):
        try:
            count = pipe.es.count(index=pipe.index_name)["count"]
        except Exception:
            count = 0
    if reindex or count == 0:
        if pipe.es.indices.exists(index=pipe.index_name):
            pipe.es.indices.delete(index=pipe.index_name, ignore_unavailable=True)
        pipe.ensure_index()
        pipe.bulk_index_file(file_path, id_field=None)   # tokens added client-side via endpoint
        count = pipe.es.count(index=pipe.index_name)["count"]
        print(f"[INFO] Indexed docs: {count}")
    else:
        print(f"[INFO] Using existing index '{pipe.index_name}' with {count} docs.")

def do_query(pipe: BertDescriptionElser, q: str, text_col: str, size: int = 10, bm25_only: bool = False):
    hits = pipe.semantic_search(question=q, size=size, hybrid=(not bm25_only))
    if hits.empty:
        print("(no matches)")
        return hits
    cols = ["_score"]
    if text_col in hits.columns:
        cols.append(text_col)
    if "timestamp" in hits.columns:
        cols.append("timestamp")
    try:
        display(hits[cols] if set(cols).issubset(hits.columns) else hits)
    except Exception:
        print(hits[cols] if set(cols).issubset(hits.columns) else hits)
    return hits


### 4) Configure here

In [3]:
# UPDATED: use endpoint id; model id not needed here
ES_URL    = "http://localhost:9200"
ES_USER   = "elastic"
ES_PASS   = "changeme"

DATA_FILE   = r"C:\Users\dell\elser-python\Other\sample_descriptions.xlsx"
INDEX_NAME  = "chat_elser_description_only"
ENDPOINT_ID = "elser-local"   # <-- must exist (we created this earlier)

TEXT_COL       = "Description"
ONE_SHOT_QUERY = "BlueSky Airlines safety compliance"
TOP_K          = 10
BM25_ONLY      = False
REINDEX        = False

### 4b) Verify server and model, auto-switch BM25 if needed

In [4]:
# UPDATED: correct header for ES 9.x infer call
from elasticsearch import Elasticsearch

def _parse_major(ver: str):
    try:
        return int(str(ver).split(".")[0])
    except Exception:
        return None

es_check = Elasticsearch(
    ES_URL,
    basic_auth=(ES_USER, ES_PASS),
    verify_certs=False,
    request_timeout=60
)

try:
    server_info = es_check.info()
except Exception as e:
    raise SystemExit(f"[FATAL] Could not connect to Elasticsearch at {ES_URL}: {e}")

server_ver = server_info.get("version", {}).get("number")
major = _parse_major(server_ver)
print(f"[INFO] Server version: {server_ver}")
if major != 9:
    print("[WARN] Server is not 9.x. Pin the Python client accordingly.")

def endpoint_exists(es: Elasticsearch, endpoint_id: str) -> bool:
    try:
        es.perform_request("GET", f"/_inference/sparse_embedding/{endpoint_id}")
        return True
    except Exception:
        return False

BM25_ONLY = bool(BM25_ONLY)
if endpoint_exists(es_check, ENDPOINT_ID):
    try:
        resp = es_check.perform_request(
            method="POST",
            path=f"/_inference/sparse_embedding/{ENDPOINT_ID}",
            headers={"Content-Type": "application/json"},    # <-- FIXED
            body={"input": "hello world"},
        )
        print("[INFO] infer() smoke test: OK (endpoint responding).")
    except Exception as e:
        print(f"[WARN] infer() test failed: {e}")
        BM25_ONLY = True
else:
    print(f"[WARN] Inference endpoint {ENDPOINT_ID!r} not found.")
    BM25_ONLY = True

print("[INFO] Hybrid allowed" if not BM25_ONLY else "[INFO] BM25-only mode")


[INFO] Server version: 9.1.3
[WARN] infer() test failed: BadRequestError(400, 'media_type_header_exception', 'Invalid media-type value on headers [Accept, Content-Type]', A compatible version is required on both Content-Type and Accept headers if either one has requested a compatible version. Accept=null, Content-Type=application/vnd.elasticsearch+json; compatible-with=9)
[INFO] BM25-only mode


### 5) Preview your data

In [5]:
from pathlib import Path
import pandas as pd
fp = Path(DATA_FILE)
assert fp.exists(), f"Input file not found: {fp}"
if fp.suffix.lower() in (".xlsx", ".xls"):
    df_preview = pd.read_excel(fp)
elif fp.suffix.lower() == ".csv":
    df_preview = pd.read_csv(fp)
else:
    raise AssertionError("Only .xlsx, .xls, or .csv are supported.")
print("=== DATA PREVIEW (first 3 rows) ===")
try:
    display(df_preview.head(3))
except Exception:
    print(df_preview.head(3).to_string(index=False))
print("=== COLUMNS ===")
print(list(df_preview.columns))
assert TEXT_COL in df_preview.columns, f"Column '{TEXT_COL}' not found. Available: {list(df_preview.columns)}"


=== DATA PREVIEW (first 3 rows) ===


Unnamed: 0,Description
0,"John Doe, born on January 14, 1985, currently ..."
1,Mary Johnson joined BlueSky Airlines in 2018 a...
2,"Ahmed Ibrahim, a senior architect at GreenBuil..."


=== COLUMNS ===
['Description']


### 6) Create mapping/pipeline and index (as needed)

In [6]:
pipe = BertDescriptionElser(
    es_url=ES_URL,
    es_user=ES_USER,
    es_pass=ES_PASS,
    index_name=INDEX_NAME,
    endpoint_id=ENDPOINT_ID,
    description_col=TEXT_COL,
    use_ml=(not BM25_ONLY),
)
ensure_indexed(pipe, str(fp), reindex=REINDEX)


[INFO] Using existing index 'chat_elser_description_only' with 10 docs.


### 7) One-shot search

In [7]:
if ONE_SHOT_QUERY and str(ONE_SHOT_QUERY).strip():
    print(f"=== SEARCH RESULTS for: {ONE_SHOT_QUERY!r} ===")
    _hits = do_query(pipe, ONE_SHOT_QUERY, TEXT_COL, size=TOP_K, bm25_only=BM25_ONLY)
else:
    print("(Skipped)")

=== SEARCH RESULTS for: 'BlueSky Airlines safety compliance' ===


Unnamed: 0,_score,Description
0,4.704845,Mary Johnson joined BlueSky Airlines in 2018 a...


### 8) Interactive search loop (optional)

In [None]:
print("Interactive mode. Type your query and press Enter.")
print("Commands: :quit to exit, :help for help.")
while True:
    try:
        q = input("query> ").strip()
    except (EOFError, KeyboardInterrupt):
        print("\nExiting.")
        break
    if not q:
        continue
    if q in {":quit", ":exit"}:
        print("Exiting.")
        break
    if q in {":help", "help", "?"}:
        print("Enter any text to search. Use :quit to exit.")
        continue
    print(f"\n=== SEARCH RESULTS for: {q!r} ===")
    _hits = do_query(pipe, q, TEXT_COL, size=TOP_K, bm25_only=BM25_ONLY)
    print("")


Interactive mode. Type your query and press Enter.
Commands: :quit to exit, :help for help.

=== SEARCH RESULTS for: 'when does Robert Green born?' ===


Unnamed: 0,_score,Description
0,2.520117,"Robert Green, born December 9, 1975, serves as..."
1,0.090938,Sarah Thompson currently teaches environmental...
2,0.090938,"Miguel Santos, born July 11, 1991, is a logist..."
3,0.090938,Olivia Brown joined MedCare Hospital in 2015 a...
4,0.087962,"Ahmed Ibrahim, a senior architect at GreenBuil..."
5,0.087962,"Li Wei, born on August 5, 1993, works for Sino..."
6,0.086546,Mary Johnson joined BlueSky Airlines in 2018 a...
7,0.086546,"David Kim, born February 22, 1987, is employed..."
8,0.082558,"John Doe, born on January 14, 1985, currently ..."



