In [None]:
# Dataset configuration: MS MARCO passage training set with 200-way split
IR_DATASET_NAME="msmarco-passage/train/split200-train"

In [None]:
# Connect to OpenSearch instance
import open_search_connect
from open_search_connect import connect
client = connect()

In [None]:
# Load dataset and check the current document count in the index
dataset = ir_datasets.load(IR_DATASET_NAME)

client.count(index=INDEX_NAME)

In [None]:
# Create OpenSearch index with document mappings (doc_id as keyword, text as full-text)
index_body = {
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0,
        "knn": True
    },
    "mappings": {
        "properties": {
            "doc_id": {"type": "keyword"},
            "text": {
                "type": "text",
                "analyzer": "english"
            }
        }
    }
}
client.info()
client.indices.create(index=INDEX_NAME, body=index_body)

In [None]:
# Test indexing: index a single document to verify the connection and mapping work
import ir_datasets
dataset = ir_datasets.load(IR_DATASET_NAME)

doc = next(dataset.docs_iter())
print(doc)

client.index(
    index=INDEX_NAME,
    id=doc.doc_id,
    body={
        "doc_id": doc.doc_id,
        "text": doc.text
    },
    refresh=True   # IMPORTANT: ensures document is immediately searchable
)

In [None]:
# Bulk index all documents from the dataset with progress tracking
from opensearchpy import OpenSearch, helpers
import ir_datasets
import time
dataset = ir_datasets.load(IR_DATASET_NAME)
from opensearchpy.helpers import bulk
from tqdm import tqdm
MAX_DOCS = 10_000_000
BATCH_SIZE = 1000   # safe value for laptops


def index_docs():
    """Single-document indexing (slower but simpler)"""
    for doc in dataset.docs_iter():
        client.index(
        index=INDEX_NAME,
        id=doc.doc_id,
        body={
            "doc_id": doc.doc_id,
            "text": doc.text
        },
        refresh=True   # IMPORTANT for immediate search
)

def index_docs_bulk():
    """Generator function that yields documents in bulk format for streaming_bulk"""
    docs_iter = dataset.docs_iter()

    for i, doc in enumerate(docs_iter):
        if i >= MAX_DOCS:
            break
        yield {
            "_index": INDEX_NAME,
            "_id": doc.doc_id,
            "_source": {
                "doc_id": doc.doc_id,
                "text": doc.text
            }
        }

# Bulk index with streaming to handle large datasets efficiently
start_time = time.time()
doc_count = 0
error_count = 0

with tqdm(total=MAX_DOCS, desc="Indexing documents") as pbar:
    for success, info in helpers.streaming_bulk(
        client,
        index_docs_bulk(),
        chunk_size=BATCH_SIZE,
        request_timeout=120,
    ):
        if success:
            doc_count += 1
        else:
            error_count += 1
        pbar.update(1)

end_time = time.time()

# Re-enable refresh interval and refresh index after bulk load
client.indices.put_settings(
    index=INDEX_NAME,
    body={"index": {"refresh_interval": "1s"}}
)
client.indices.refresh(index=INDEX_NAME)

# Print final statistics about the indexing operation
elapsed = end_time - start_time
rate = doc_count / elapsed

count_in_index = client.count(index=INDEX_NAME)["count"]

print("\n====== INGESTION COMPLETE ======")
print(f"Documents indexed: {doc_count}")
print(f"Errors: {error_count}")
print(f"Elapsed time: {elapsed:.2f} seconds")     
print(f"Indexing rate: {rate:.2f} docs/sec")
print(f"Docs in index: {count_in_index}")

client.transport.close()

In [None]:
# Alternative bulk indexing approach (simpler but less progress feedback than streaming_bulk)
import json
from opensearchpy import OpenSearch, helpers

print("Indexing documents...")
helpers.bulk(
    client,
    index_docs_bulk(),
    chunk_size=100,
    request_timeout=120
)

print("âœ… Indexing complete")