# Elasticsearch Demo: Classic Search, Vectors, and RAG


## История

* Lucene
* Solr
* Elasticsearch
* Opensearch
* Vector search
* RAG

## Кишочки

* Шарды
* индексы
* типы данных

In [1]:
import os, requests, json
from IPython.display import JSON as pprint
from openai import OpenAI

ES=os.getenv("ES_URL","http://localhost:9200")
ES_INDEX_TICKETS=os.getenv("ES_INDEX_TICKETS","tickets")
ES_INDEX_KB=os.getenv("ES_INDEX_KB","kb_articles")
EMBED_MODEL=os.getenv("OPENAI_EMBED_MODEL","text-embedding-3-small")
RAG_MODEL=os.getenv("OPENAI_RAG_MODEL","gpt-5-mini")

client=OpenAI()

def es(method,path,**kw):
    return requests.request(method,ES+path,**kw)


## 1. "Old School" Elasticsearch: Keyword + Фильтры


In [2]:
# Create a tiny tickets index with sample docs if it does not exist yet
r=es("GET",f"/{ES_INDEX_TICKETS}")
if r.status_code==404:
    tickets=[
        {"id":1,"title":"Payment timeout","description":"payment service timeout for tenant acme","service":"billing","severity":"critical","tenant":"acme"},
        {"id":2,"title":"Login failure","description":"users cannot log in after password reset","service":"auth","severity":"high","tenant":"acme"},
        {"id":3,"title":"Slow search","description":"search is very slow for large customers","service":"search","severity":"medium","tenant":"globex"},
        {"id":4,"title":"Invoice email bounce","description":"invoice emails bounce for some domains","service":"billing","severity":"low","tenant":"initech"}
    ]
    for t in tickets:
        es("POST",f"/{ES_INDEX_TICKETS}/_doc/{t['id']}",json=t)
    es("POST","/_refresh")
    r=es("GET",f"/{ES_INDEX_TICKETS}")
pprint(r.json(), expanded=True)


<IPython.core.display.JSON object>

In [3]:
# Simple "Google for tickets" query
txt="timeout error"
q={"query":{"match":{"description":txt}},"size":5}
res=es("GET",f"/{ES_INDEX_TICKETS}/_search",json=q).json()
[hit["_source"] for hit in res["hits"]["hits"]]


[{'id': 1,
  'title': 'Payment timeout',
  'description': 'payment service timeout for tenant acme',
  'service': 'billing',
  'severity': 'critical',
  'tenant': 'acme'}]

In [4]:
# Text + filters + aggregation
q={
 "query":{
  "bool":{
   "must":{"match":{"description":"timeout"}},
   "filter":[{"term":{"service":"billing"}}]
  }
 },
 "aggs":{
  "by_tenant":{"terms":{"field":"tenant.keyword"}}
 },
 "size":5
}
res=es("GET",f"/{ES_INDEX_TICKETS}/_search",json=q).json()
pprint(res)



<IPython.core.display.JSON object>

## 2. OpenAI Embeddings Helper


In [5]:
# text-embedding-3-small -> 1536 dims, good default
print("Embedding model:",EMBED_MODEL)

def embed_all(texts,model=EMBED_MODEL):
    if isinstance(texts,str):
        texts=[texts]
    r=client.embeddings.create(model=model,input=texts)
    return [d.embedding for d in r.data]

# Quick smoke test (won't hit ES)
embed_all("test sentence")[0][:8]


Embedding model: text-embedding-3-small


[-0.01057574711740017,
 -0.024866271764039993,
 0.01539737731218338,
 -0.018164508044719696,
 -0.0324474535882473,
 0.0042909481562674046,
 0.007452300284057856,
 -0.012508947402238846]

## 3. Vector Index: Knowledge Base Articles


In [6]:
# Create KB index with dense_vector field matching embedding dims (1536 for text-embedding-3-small)
kb_mapping={
 "mappings":{
  "properties":{
   "title":{"type":"text"},
   "body":{"type":"text"},
   "embedding":{
    "type":"dense_vector",
    "dims":1536,
    "index":True,
    "similarity":"cosine"
   }
  }
 }
}
res=es("PUT",f"/{ES_INDEX_KB}",json=kb_mapping).json()
pprint(res, expanded=True)


<IPython.core.display.JSON object>

In [7]:
# Index a few KB articles with embeddings
kb_docs=[
 {"id":1,"title":"Reset password","body":"To reset your password, go to settings, click reset password, and follow the email link."},
 {"id":2,"title":"Fix payment timeouts","body":"Payment timeouts usually mean the gateway is slow or unreachable. Check gateway status, retry with exponential backoff, and alert on error rate spikes."},
 {"id":3,"title":"Tenant onboarding","body":"New tenants are onboarded via the admin portal. Make sure billing details are verified before enabling production traffic."},
 {"id":4,"title":"Improve search performance","body":"To speed up search, add proper filters, avoid wildcards at the beginning of terms, and keep indices time-based for logs."}
]
vecs=embed_all([d["body"] for d in kb_docs])
for d,v in zip(kb_docs,vecs):
    doc=dict(d)
    doc["embedding"]=v
    es("POST",f"/{ES_INDEX_KB}/_doc/{d['id']}",json=doc)
es("POST","/_refresh")


<Response [200]>

## 4. Live Vector Search Demo
Ask a natural-language question and retrieve semantically similar docs.


In [8]:
q="why are payments failing for acme customers?"
qv=embed_all(q)[0]
knn_body={
 "knn":{
  "field":"embedding",
  "query_vector":qv,
  "k":3,
  "num_candidates":50
 }
}
res=es("POST",f"/{ES_INDEX_KB}/_search",json=knn_body).json()
kb_hits=res["hits"]["hits"]
sources = []
for k in kb_hits:
    k["_source"]["embedding"] = k["_source"]["embedding"][:5] + ["..."]
    sources.append(k["_source"])

pprint(sources, expanded=True)

<IPython.core.display.JSON object>

## 5. Tiny RAG Demo: ES + OpenAI
Use Elasticsearch as the retriever, LLM as the explainer.


In [9]:
context="\n\n".join(f"{h['_source']['title']}: {h['_source']['body']}" for h in kb_hits)
rag_prompt=f"Answer the question using only the context below.\n\nContext:\n{context}\n\nQuestion: {q}"
print(rag_prompt[:600]+"...\n")


Answer the question using only the context below.

Context:
Fix payment timeouts: Payment timeouts usually mean the gateway is slow or unreachable. Check gateway status, retry with exponential backoff, and alert on error rate spikes.

Tenant onboarding: New tenants are onboarded via the admin portal. Make sure billing details are verified before enabling production traffic.

Reset password: To reset your password, go to settings, click reset password, and follow the email link.

Question: why are payments failing for acme customers?...



In [11]:
resp=client.responses.create(model=RAG_MODEL,input=rag_prompt)
print(resp.output_text)

Likely causes (from the provided info)
- Payment timeouts: the gateway may be slow or unreachable.
- Tenant onboarding: Acme may not have verified billing details before being enabled for production.

What to do (from the provided info)
- Check gateway status, implement retries with exponential backoff, and alert on error‑rate spikes.
- Confirm Acme’s billing details are verified in the admin portal before allowing production traffic.


## Вопросы?