# Indexing and Retrieval — Activity 1 (Elasticsearch)

This notebook preprocesses news + Wikipedia data, plots word frequency distributions (raw vs cleaned), indexes into Elasticsearch (`ESIndex-v1.0`), and measures latency/throughput and simple functional metrics.

Prereqs: run `pip install -r requirements.txt`, and start Elasticsearch locally (e.g., Docker: `docker run -p 9200:9200 -e xpack.security.enabled=false docker.elastic.co/elasticsearch/elasticsearch:8.15.3`).


In [3]:
%pip install -r ..\requirements.txt

import os
from typing import Dict, List

import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm

from datasets import load_dataset

from preprocess import TextPreprocessor, PreprocessConfig
from es_index import get_es, ensure_index, bulk_index
from metrics import measure_latency, measure_throughput, percentile_latencies, precision_recall_at_k

ES_INDEX = "ESIndex-v1.0"
SAMPLE_SIZE_PER_SOURCE = 5000  # Adjust if you need a smaller/faster run



Collecting matplotlib>=3.8.0 (from -r ..\requirements.txt (line 3))
  Downloading matplotlib-3.10.7-cp313-cp313-win_amd64.whl.metadata (11 kB)
Collecting nltk>=3.8.1 (from -r ..\requirements.txt (line 5))
  Downloading nltk-3.9.2-py3-none-any.whl.metadata (3.2 kB)
Collecting regex>=2023.10.3 (from -r ..\requirements.txt (line 6))
  Downloading regex-2025.10.23-cp313-cp313-win_amd64.whl.metadata (41 kB)
Collecting elasticsearch>=8.14.0 (from -r ..\requirements.txt (line 8))
  Downloading elasticsearch-9.2.0-py3-none-any.whl.metadata (8.9 kB)
Collecting elastic-transport>=8.13.0 (from -r ..\requirements.txt (line 9))
  Downloading elastic_transport-9.2.0-py3-none-any.whl.metadata (3.9 kB)
Collecting scipy>=1.11.0 (from -r ..\requirements.txt (line 10))
  Downloading scipy-1.16.3-cp313-cp313-win_amd64.whl.metadata (60 kB)
Collecting contourpy>=1.0.1 (from matplotlib>=3.8.0->-r ..\requirements.txt (line 3))
  Downloading contourpy-1.3.3-cp313-cp313-win_amd64.whl.metadata (5.5 kB)
Collectin


[notice] A new release of pip is available: 25.2 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip
  from .autonotebook import tqdm as notebook_tqdm


In [None]:
def load_webz_io_sample(n: int) -> pd.DataFrame:
    # Webz.io sample via webhose free news datasets often come as CSV/JSON lines.
    # For simplicity we’ll use a small remote subset via datasets if available; otherwise expect a local file path set.
    # Fallback: empty DataFrame.
    try:
        ds = load_dataset("webhose/news-category-dataset")  # may not exist; replace with actual if available
        df = ds["train"].to_pandas()
        df = df.head(n)
        df = df.rename(columns={"headline": "title", "short_description": "text"})
        df["source"] = "webz"
        df["doc_id"] = df.index.map(lambda i: f"webz-{i}")
        return df[["doc_id", "title", "text", "source"]].dropna()
    except Exception:
        return pd.DataFrame(columns=["doc_id", "title", "text", "source"])  # placeholder


def load_wikipedia_en_sample(n: int) -> pd.DataFrame:
    # HuggingFace: wikimedia/wikipedia split 20231101.en
    ds = load_dataset("wikimedia/wikipedia", "20231101.en", split="train")
    # Columns: e.g., id, url, title, text
    df = ds.to_pandas().head(n)
    df["source"] = "wiki"
    df["doc_id"] = df["id"].apply(lambda x: f"wiki-{x}")
    df["title"] = df.get("title", "")
    df["text"] = df.get("text", "")
    return df[["doc_id", "title", "text", "source"]].dropna()


webz_df = load_webz_io_sample(SAMPLE_SIZE_PER_SOURCE)
wikip_df = load_wikipedia_en_sample(SAMPLE_SIZE_PER_SOURCE)

data_df = pd.concat([webz_df, wikip_df], ignore_index=True)
print(f"Loaded documents: {len(data_df)} (webz={len(webz_df)}, wiki={len(wikip_df)})")



Downloading data:   0%|          | 0/41 [00:00<?, ?files/s]Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`
Downloading data:   2%|▏         | 1/41 [00:18<12:20, 18.52s/files]Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`
Downloading data:   5%|▍         | 2/41 [00:50<17:07, 26.35s/files]Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`
Downloading data:   7%|▋         | 3/41 [01:19<17:25, 27.51s/files]Xet Storage is enabled for this repo, but th

In [None]:
# Word frequency plots (raw vs preprocessed)
raw_texts = (data_df["title"].fillna("") + "\n" + data_df["text"].fillna("")).tolist()

pp_raw = TextPreprocessor(PreprocessConfig(lowercase=False, remove_stopwords=False, stem=False))
pp_clean = TextPreprocessor(PreprocessConfig(lowercase=True, remove_stopwords=True, stem=True))

from collections import Counter
raw_counter = Counter()
clean_counter = Counter()

SAMPLE_FOR_PLOTS = min(10000, len(raw_texts))
for t in raw_texts[:SAMPLE_FOR_PLOTS]:
    raw_counter.update(pp_raw.tokenize(t))
    clean_counter.update(pp_clean.tokenize(t))

raw_top = raw_counter.most_common(30)
clean_top = clean_counter.most_common(30)

fig, axes = plt.subplots(1, 2, figsize=(18, 6))
axes[0].bar([w for w,_ in raw_top], [c for _,c in raw_top])
axes[0].set_title("Top 30 words (raw)")
axes[0].tick_params(axis='x', rotation=90)

axes[1].bar([w for w,_ in clean_top], [c for _,c in clean_top])
axes[1].set_title("Top 30 words (preprocessed)")
axes[1].tick_params(axis='x', rotation=90)
plt.show()


In [None]:
# Index into Elasticsearch
es = get_es()
ensure_index(es, ES_INDEX)

records = data_df.to_dict(orient="records")
success_count, failed_count = bulk_index(es, ES_INDEX, records, batch_size=1000)
print({"indexed": success_count, "failed": failed_count})


In [None]:
# Simple search wrapper for metrics
from elasticsearch import Elasticsearch

def es_search_fn(q: str):
    body = {
        "query": {
            "query_string": {
                "query": q,
                "fields": ["title^2", "text"],
                "default_operator": "AND"
            }
        },
        "size": 20
    }
    resp = es.search(index=ES_INDEX, body=body)
    hits = resp.get("hits", {}).get("hits", [])
    return [h.get("_id") for h in hits]

# Create a small diverse query set (replace with LLM-probed set and justification in report)
query_set = [
    '"climate change" AND policy',
    '"football" AND (world OR cup)',
    '"quantum computing" AND algorithms',
    'NOT "covid" AND vaccination',
    '("space exploration" AND mars) OR mission',
]

# Latency and throughput
latencies, perc = measure_latency(es_search_fn, query_set)
qps = measure_throughput(es_search_fn, query_set * 5)
print({"latency_ms": perc, "throughput_qps": qps})


In [None]:
# Functional metrics (placeholder) — requires relevance judgments
# Provide a relevance list per query (doc_id list). Replace with your judged results.
relevance_judgments = {
    "climate change policy": [],
    "football world cup": [],
    "quantum computing algorithms": [],
    "covid vaccination effectiveness": [],
    "space exploration mars mission": [],
}

precisions = {}
recalls = {}
for q in query_set:
    predicted = es_search_fn(q)
    relevant = relevance_judgments.get(q, [])
    p, r = precision_recall_at_k(predicted, relevant, k=10)
    precisions[q] = p
    recalls[q] = r

print({"precision@10": precisions, "recall@10": recalls})


In [None]:
# Memory footprint (process RSS)
import psutil, os
process = psutil.Process(os.getpid())
rss_mb = process.memory_info().rss / (1024*1024)
print({"process_rss_mb": round(rss_mb, 2)})


In [None]:
# Build and query SelfIndex variants
from self_index import SelfIndex

# Variant controls: x (info), y (datastore), z (compression), i (optim), q (qproc)
# x: BOOLEAN|WORDCOUNT|TFIDF  -> 1|2|3
# y: CUSTOM -> 1 (only y=1 implemented here)
# z: NONE|CODE|CLIB -> 0|1|2 (we'll use CODE and CLIB)
# i: Null|Skipping -> 0|1 (skipping present internally but not toggled here)
# q: TERMatat|DOCatat -> T|D

variant = dict(info="TFIDF", dstore="CUSTOM", compr="CODE", qproc="TERMatat", optim="Null")
idx = SelfIndex(core='SelfIndex', info=variant['info'], dstore=variant['dstore'], qproc=variant['qproc'], compr=variant['compr'], optim=variant['optim'])
index_id = f"{idx.identifier_short}"

# Prepare files iterable from dataframe
files_iter = [(r['doc_id'], (str(r['title']) + "\n" + str(r['text']))) for _, r in data_df.iterrows()]

idx.create_index(index_id, files_iter)
idx.load_index(f"indices/{index_id}")

import json as _json

def self_search_fn(q: str):
    res = _json.loads(idx.query(q))
    return [r['doc_id'] for r in res.get('results', [])]

# Compare latency/throughput with ES for the same query set
latencies_self, perc_self = measure_latency(self_search_fn, query_set)
qps_self = measure_throughput(self_search_fn, query_set * 5)
print({"self_latency_ms": perc_self, "self_throughput_qps": qps_self})
