# SEMANTIC INDEXING WITH NORMALIZED DOCUMENTS

## This notebook defines a step-by-step procedure for semantic indexing data in ElasticSearch.

First we import the required libraries and set the important variables

In [1]:
import sys
sys.path.append('../')
from elasticsearch import Elasticsearch, exceptions, NotFoundError, helpers
from time import sleep
import xmltodict
from mappings import mappings
from ingest_pipeline import get_ingest_pipeline
import json
import os
from openai import OpenAI
from lxml import etree
from utils import data_utils, index_utils, embed_utils
import numpy as np
from textwrap import fill
from dotenv import load_dotenv
from tiktoken import get_encoding
from itertools import islice
import re

# Load environment variables from the .env file
load_dotenv()

# Get sensitive configuration from .env file or define as environment variables
ELASTICSEARCH_URL = os.getenv('ELASTICSEARCH_URL')
ELASTIC_USERNAME = os.getenv('ELASTIC_USERNAME')
ELASTIC_PASSWORD = os.getenv('ELASTIC_PASSWORD')
OPENAI_KEY = os.getenv('OPENAI_KEY')

# Define other configuration variables
INDEX_NAME = 'distill_index'


OAIclient = OpenAI(api_key=OPENAI_KEY)

## Data preparation

Point variables to the XML and XSD files of the grants data.

In [3]:
GRANTS_FILE = '../data/grants-20230602.xml'
GRANTS_SCHEMA = '../data/grants-20230530.xsd'

Validate the XML file using the schema defined in XSD file.

Go through the errors, if any, to ensure they are not critical.

In [4]:
# Validate the XML file using schema
data_utils.validate_xml_with_xsd(GRANTS_FILE, GRANTS_SCHEMA)

XML is not valid according to XSD: <string>:1376624:0:ERROR:SCHEMASV:SCHEMAV_CVC_DATATYPE_VALID_1_2_1: Element 'is_limited': 'None' is not a valid value of the atomic type 'xs:integer'.
<string>:1377711:0:ERROR:SCHEMASV:SCHEMAV_CVC_DATATYPE_VALID_1_2_1: Element 'is_limited': 'None' is not a valid value of the atomic type 'xs:integer'.
<string>:1389856:0:ERROR:SCHEMASV:SCHEMAV_CVC_DATATYPE_VALID_1_2_1: Element 'is_limited': 'None' is not a valid value of the atomic type 'xs:integer'.
<string>:1389913:0:ERROR:SCHEMASV:SCHEMAV_CVC_DATATYPE_VALID_1_2_1: Element 'is_limited': 'None' is not a valid value of the atomic type 'xs:integer'.
<string>:1389959:0:ERROR:SCHEMASV:SCHEMAV_CVC_DATATYPE_VALID_1_2_1: Element 'is_limited': 'None' is not a valid value of the atomic type 'xs:integer'.
<string>:1390059:0:ERROR:SCHEMASV:SCHEMAV_CVC_DATATYPE_VALID_1_2_1: Element 'is_limited': 'None' is not a valid value of the atomic type 'xs:integer'.
<string>:1390140:0:ERROR:SCHEMASV:SCHEMAV_CVC_DATATYPE_VALI

Convert XML data to dict format using function in data_utils.

In [5]:
dict_data = data_utils.parse_xml_to_dict(GRANTS_FILE)

# Print data
# with open('grants.json', 'w') as f: 
#     json.dump(dict_data, f)
# with open('grants.json', 'r') as f: 
#     json_data = json.load(f)

print(f"Number of grants = {len(dict_data['grants_data']['grant'])}")

Number of grants = 92308


Clean the data using function in data_utils. This function renames some fields and converts to required format.

In [6]:
dict_data = data_utils.clean_dict_data(dict_data)

In [7]:
print(fill(str(dict_data['grants_data']['grant'][0].keys()), width=200))

dict_keys(['@id', 'url', 'amount_info', 'site_grant_type', 'modified_date', 'application_url', 'title', 'all_titles', 'submission_info', 'all_grant_source_urls', 'status', 'description',
'eligibility', 'categories_display', 'limited_grant_info', 'user_categories', 'submit_date', 'is_limited', 'site_categories', 'cost_sharing', 'grant_source_url', 'deadlines', 'amounts', 'all_types',
'all_applicant_types', 'locations', 'sponsors'])


### Adding normalized summaries (This is an expensive operation!)

Since  generating embeddings for long documents has several drawbacks, we developed a document normalization method - 
1. use an LLM to generate detail-rich, structured summaries for all documents.
2. generate embeddings for these normalized documents and add to the index.
3. perform approximate kNN using the 'normalized embeddings'.

In [9]:
def get_prompt(grant):
    return  [
            {
                "role": "system",
                "content": "You are a highly skilled summarizer specialized in grants. You are provided with detailed information about a grant and your task is to create a structured summary according to a specified format. The summary must be accurate and informative, following the given structure precisely."
            },
            {
                "role": "user",
                "content": f"Based on the provided grant information, generate a structured summary with the following fields:\n\n\
                    1. Title: [The name or title of the grant.](Limit: 10 words)\n\
                    2. Amount: [The funding details, including the minimum and maximum amounts, if specified.](Limit: 40 words)\n\
                    3. Deadline: [The submission deadline(s) for the grant application.](Limit: 15 words)\n\
                    4. Description: [A very detailed overview of the grant's purpose and objectives. This should be comprehensive and informative.](Limit: 200 words)\n\
                    5. Eligibility: [The detailed criteria for applicants to be eligible for the grant, including any specific requirements.](Limit: 50 words)\n\
                    6. Sponsor: [The organization or entity sponsoring the grant.](Limit: 20 words)\n\
                    7. Categories: [The areas or fields the grant supports.](Limit: 20 words)\n\
                    7. Activity: [The EXACT activity/activities the grant funds. This should be comprehensive and accurate](Limit: 70 words)\n\n\
                    Ensure that the description and eligibility sections are detailed and comprehensive. Reply in JSON format while following word limits.\nHere is the information for the grant:\n[{grant}]"
            }
            ]

for index, grant in enumerate(dict_data['grants_data']['grant']):
    messages = get_prompt(grant)
    
    completion = OAIclient.chat.completions.create(
        model='gpt-4o',
        messages=messages,
        temperature = 0.5)
    resp = completion.choices[0].message.content
    match = re.search(r'\{.*\}', resp, re.DOTALL)
    summary = match.group(0)
    grant['normalized_info'] = summary
    if(index % 100 == 0):
        print(f"Processed {index} grants")

Processed 0 grants
Processed 100 grants
Processed 200 grants
Processed 300 grants
Processed 400 grants
Processed 500 grants
Processed 600 grants
Processed 700 grants
Processed 800 grants
Processed 900 grants
Processed 1000 grants
Processed 1100 grants
Processed 1200 grants


KeyboardInterrupt: 

## Note :
This notebook creates embeddings for both normalized summaries and the aggregated embeddings for several fields like in the basic semantic indexing notebook. This is to allow comparisons between the two methods.

To add embeddings for the normalized documents only, simply alter the mappings.py and skip the 'pipeline_id' parameter in the construct_indexing_actions() method. Also skip the next cell.

In [8]:
def find_token_limit_index(tokenizer, text, max_tokens=8191):
    tokens = tokenizer.encode(text)
    if len(tokens) <= max_tokens:
        return len(text)  # Return the full length of the text if it has fewer or equal tokens

    truncated_tokens = tokens[:max_tokens]
    truncated_text = tokenizer.decode(truncated_tokens)
    
    # Find the last character index of the truncated text in the original text
    index = text.find(truncated_text) + len(truncated_text)
    return index


tokenizer = get_encoding("cl100k_base")
for data in dict_data['grants_data']['grant']:
    data['description_truncate_length'] = find_token_limit_index(tokenizer, str(data['description']))
    data['submission_info_truncate_length'] = find_token_limit_index(tokenizer, str(data['submission_info']))
    data['eligibility_truncate_length'] = find_token_limit_index(tokenizer, str(data['eligibility']))

In [12]:
print(fill(str(dict_data['grants_data']['grant'][0].keys()), width=200))

dict_keys(['@id', 'url', 'amount_info', 'site_grant_type', 'modified_date', 'application_url', 'title', 'all_titles', 'submission_info', 'all_grant_source_urls', 'status', 'description',
'eligibility', 'categories_display', 'limited_grant_info', 'user_categories', 'submit_date', 'is_limited', 'site_categories', 'cost_sharing', 'grant_source_url', 'deadlines', 'amounts', 'all_types',
'all_applicant_types', 'locations', 'sponsors', 'description_truncate_length', 'submission_info_truncate_length', 'eligibility_truncate_length', 'normalized_info'])


Connect to your ElasticSearch client.

In [2]:
# Connecting to ElasticSearch container
ESclient = Elasticsearch(
  ELASTICSEARCH_URL,
  basic_auth = (ELASTIC_USERNAME, ELASTIC_PASSWORD),
  request_timeout = 60
)

ESclient.info()

ObjectApiResponse({'name': '601bb54c777c', 'cluster_name': 'docker-cluster', 'cluster_uuid': 'FS5t3wmfS3y-5pkG0ZWv-A', 'version': {'number': '8.14.1', 'build_flavor': 'default', 'build_type': 'docker', 'build_hash': '93a57a1a76f556d8aee6a90d1a95b06187501310', 'build_date': '2024-06-10T23:35:17.114581191Z', 'build_snapshot': False, 'lucene_version': '9.10.0', 'minimum_wire_compatibility_version': '7.17.0', 'minimum_index_compatibility_version': '7.0.0'}, 'tagline': 'You Know, for Search'})

Create index with settings as defined in mappings.py. Alter the "embeddings" field to according to your model

In [16]:
# ESclient.indices.delete(index = INDEX_NAME)
index_utils.create_index(ESclient, INDEX_NAME, mappings)

Index 'distill_index' created successfully.


Create an inference endpoint within the ElasticSearch for embeddings

In [3]:
# add OpenAI inference endpoint for embeddings
INFERENCE_ID = "openai-embeddings-small"
EMBEDDING_MODEL = "text-embedding-3-small"
try: 
    resp = ESclient.inference.put_model(
        task_type = "text_embedding",
        inference_id = INFERENCE_ID,
        body = {
            "service": "openai",
            "service_settings": {
                "api_key": OPENAI_KEY,
                "model_id": EMBEDDING_MODEL,
            },
        },
    )
    print(resp)
except Exception as e:
    print(f"Unexpected error: {e}")


{'model_id': 'openai-embeddings-small', 'task_type': 'text_embedding', 'service': 'openai', 'service_settings': {'model_id': 'text-embedding-3-small', 'similarity': 'dot_product', 'dimensions': 1536}, 'task_settings': {}}


Create an ingestion pipeline that adds embeddings to documents as they are indexed. It is defined in ingest_pipeline.py

In [26]:
PIPELINE_ID = "embedding_pipeline"
pipeline_json = get_ingest_pipeline(INFERENCE_ID)

# ESclient.ingest.delete_pipeline(id=pipeline_id)
try:
    resp = ESclient.ingest.put_pipeline(
        id=PIPELINE_ID,
        body = pipeline_json
    )
    print(resp)
except Exception as e:
    print(f"Unexpected error: {e}") 

{'acknowledged': True}


Index documents

In [19]:
def batched(iterable, n):
    """Batch data into tuples of length n. The last batch may be shorter."""
    # batched('ABCDEFG', 3) --> ABC DEF G
    if n < 1:
        raise ValueError('n must be at least one')
    it = iter(iterable)
    while (batch := tuple(islice(it, n))):
        yield batch

In [20]:
body = index_utils.construct_indexing_actions(dict_data, INDEX_NAME, pipeline_id=PIPELINE_ID)
batches = embed_utils.batched(body, 100)
# print(f"Total batches = {len(batches)}")

In [21]:
for n,batch in enumerate(batches):
    index_utils.bulk_index_documents(ESclient, batch, chunk_size=100)
    print(f"Batch {n} done")

Successfully indexed 100 documents.
Batch 0 done
Successfully indexed 100 documents.
Batch 1 done
Successfully indexed 100 documents.
Batch 2 done
Successfully indexed 100 documents.
Batch 3 done
Failed to index document {'index': {'_index': 'distill_index', '_id': '418497', 'status': 400, 'error': {'type': 'document_parsing_exception', 'reason': "[1:120704] failed to parse field [submission_info_truncate_length] of type [short] in document with id '418497'. Preview of field's value: '35422'", 'caused_by': {'type': 'x_content_parse_exception', 'reason': '[1:120709] Numeric value (35422) out of range of Java short\n at [Source: (byte[])"{"embeddings":[-0.004931410624149012,-8.20359479678672E-4,-0.026459856601301738,0.008429200465821804,0.01257195472640755,-0.005687967692210576,-0.006992712086598717,0.03822942605271352,8.440860214778098E-4,0.02580706505288592,0.06802776369753853,-0.02848831872385207,0.04832537313993002,-0.03657953906368772,-0.01879884162674925,0.03254303538974431,-0.0297

In [27]:
ESclient.indices.refresh(index=INDEX_NAME)
ESclient.indices.stats(index=INDEX_NAME)

ObjectApiResponse({'_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_all': {'primaries': {'docs': {'count': 1182, 'deleted': 0, 'total_size_in_bytes': 86466706}, 'shard_stats': {'total_count': 1}, 'store': {'size_in_bytes': 86467211, 'total_data_set_size_in_bytes': 86467211, 'reserved_in_bytes': 0}, 'indexing': {'index_total': 1182, 'index_time_in_millis': 3149, 'index_current': 0, 'index_failed': 0, 'delete_total': 0, 'delete_time_in_millis': 0, 'delete_current': 0, 'noop_update_total': 0, 'is_throttled': False, 'throttle_time_in_millis': 0, 'write_load': 0.005242660263708502}, 'get': {'total': 0, 'time_in_millis': 0, 'exists_total': 0, 'exists_time_in_millis': 0, 'missing_total': 0, 'missing_time_in_millis': 0, 'current': 0}, 'search': {'open_contexts': 0, 'query_total': 0, 'query_time_in_millis': 0, 'query_current': 0, 'fetch_total': 0, 'fetch_time_in_millis': 0, 'fetch_current': 0, 'scroll_total': 0, 'scroll_time_in_millis': 0, 'scroll_current': 0, 'suggest_total': 0, 'sugge

## Error Handling

In [28]:
# Check if all docs are indexed
len(dict_data['grants_data']['grant']) - ESclient.indices.stats(index=INDEX_NAME)['_all']['primaries']['docs']['count']

18

In [29]:
# find ids of docs which are not indexed
ids = [item['@id'] for item in dict_data['grants_data']['grant']]
failed_ids = index_utils.get_missing_ids(ESclient, INDEX_NAME, ids)
print(len(failed_ids))

18


In [31]:
body = index_utils.construct_actions_from_ids(dict_data, failed_ids, INDEX_NAME, pipeline_id=PIPELINE_ID)
batches = embed_utils.batched(body, 100)
# print(f"Total batches = {len(batches)}")

In [32]:
for n,batch in enumerate(batches):
    index_utils.bulk_index_documents(ESclient, batch, chunk_size=100)
    print(f"Batch {n} done")

Failed to index document {'index': {'_index': 'distill_index', '_id': '418497', 'status': 400, 'error': {'type': 'document_parsing_exception', 'reason': "[1:120773] failed to parse field [submission_info_truncate_length] of type [short] in document with id '418497'. Preview of field's value: '35422'", 'caused_by': {'type': 'x_content_parse_exception', 'reason': '[1:120778] Numeric value (35422) out of range of Java short\n at [Source: (byte[])"{"embeddings":[-0.0049305316348109305,-8.560951884688183E-4,-0.026457419478486635,0.00850732909740603,0.012545657315650998,-0.005710840093466355,-0.006989561287496217,0.03817654526047908,8.646485668558175E-4,0.02576429419408808,0.06800553745842478,-0.028453574230312476,0.04830401718780571,-0.03659440332510236,-0.018821429393144042,0.03255232802679213,-0.029765837743032413,0.00303543348732425,-0.01842438014009812,-0.019218422076305967,0.007155488203341389,-0.008316820321210028,-0.0202636696297617"[truncated 129489 bytes]; line: 1, column: 120778]'

In [33]:
print(len(index_utils.get_missing_ids(ESclient, INDEX_NAME, ids)))

18


In [34]:
# find ids of docs that where creation of embeddings failed
failed_ids = index_utils.get_failed_embedding_ids(ESclient, INDEX_NAME)
print(len(failed_ids))

0


In [None]:
body = index_utils.construct_actions_from_ids(dict_data, failed_ids, INDEX_NAME, pipeline_id=PIPELINE_ID)
batches = embed_utils.batched(body, 100)
print(f"Total batches = {len(batches)}")

In [None]:
for n,batch in enumerate(batches):
    index_utils.bulk_index_documents(ESclient, batch, chunk_size=100)
    print(f"Batch {n} done")

In [35]:
print(len(index_utils.get_failed_embedding_ids(ESclient, INDEX_NAME)))

0
