## PubMedQA - Metadata Indexing
This notebook demonstrates the process of indexing PubMedQA into Elasticsearch for advanced search and retrieval. The workflow includes:

- Loading and exploring the PubMedQA dataset
- Data preprocessing and deduplication
- Creating Elasticsearch indices with custom mappings for BM25 and KNN search
- Generating dense vector embeddings using Sentence Transformers
- Indexing enriched metadata into Elasticsearch for both BM25 and KNN retrieval

The notebook is structured with clear sections for each step, including code, explanations, and output. Please ensure you have the necessary dependencies and access to an Elasticsearch instance before running the indexing steps.

In [1]:
from datetime import date
from datasets import load_dataset
import pandas as pd
from pathlib import Path
import warnings
warnings.filterwarnings('ignore')
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import RequestError
import numpy as np
from elasticsearch import helpers,exceptions, RequestError

In [None]:
ds = load_dataset("qiaojin/PubMedQA", "pqa_labeled")

In [None]:
df_train = pd.DataFrame(ds['train'])
df_train.head()

Unnamed: 0,pubid,question,context,long_answer,final_decision
0,21645374,Do mitochondria play a role in remodelling lac...,{'contexts': ['Programmed cell death (PCD) is ...,Results depicted mitochondrial dynamics in viv...,yes
1,16418930,Landolt C and snellen e acuity: differences in...,{'contexts': ['Assessment of visual acuity dep...,"Using the charts described, there was only a s...",no
2,9488747,"Syncope during bathing in infants, a pediatric...",{'contexts': ['Apparent life-threatening event...,"""Aquagenic maladies"" could be a pediatric form...",yes
3,17208539,Are the long-term results of the transanal pul...,{'contexts': ['The transanal endorectal pull-t...,Our long-term study showed significantly bette...,no
4,10808977,Can tailored interventions increase mammograph...,{'contexts': ['Telephone counseling and tailor...,The effects of the intervention were most pron...,yes


In [4]:
len(df_train)

1000

In [5]:
## Custom_label
df_train['labelled'] = "Yes"

In [6]:
df_train.head()

Unnamed: 0,pubid,question,context,long_answer,final_decision,labelled
0,21645374,Do mitochondria play a role in remodelling lac...,{'contexts': ['Programmed cell death (PCD) is ...,Results depicted mitochondrial dynamics in viv...,yes,Yes
1,16418930,Landolt C and snellen e acuity: differences in...,{'contexts': ['Assessment of visual acuity dep...,"Using the charts described, there was only a s...",no,Yes
2,9488747,"Syncope during bathing in infants, a pediatric...",{'contexts': ['Apparent life-threatening event...,"""Aquagenic maladies"" could be a pediatric form...",yes,Yes
3,17208539,Are the long-term results of the transanal pul...,{'contexts': ['The transanal endorectal pull-t...,Our long-term study showed significantly bette...,no,Yes
4,10808977,Can tailored interventions increase mammograph...,{'contexts': ['Telephone counseling and tailor...,The effects of the intervention were most pron...,yes,Yes


In [6]:
dict_data =df_train['context'][0]

In [7]:
dict_data

{'contexts': ['Programmed cell death (PCD) is the regulated death of cells within an organism. The lace plant (Aponogeton madagascariensis) produces perforations in its leaves through PCD. The leaves of the plant consist of a latticework of longitudinal and transverse veins enclosing areoles. PCD occurs in the cells at the center of these areoles and progresses outwards, stopping approximately five cells from the vasculature. The role of mitochondria during PCD has been recognized in animals; however, it has been less studied during PCD in plants.',
  'The following paper elucidates the role of mitochondrial dynamics during developmentally regulated PCD in vivo in A. madagascariensis. A single areole within a window stage leaf (PCD is occurring) was divided into three areas based on the progression of PCD; cells that will not undergo PCD (NPCD), cells in early stages of PCD (EPCD), and cells in late stages of PCD (LPCD). Window stage leaves were stained with the mitochondrial dye MitoT

## Distribution of final_decision

In [7]:
df_train['final_decision'].value_counts()

final_decision
yes      552
no       338
maybe    110
Name: count, dtype: int64

## 1. Indexing 

In [None]:
from dotenv import load_dotenv
import os

# Path to your .env file
env_path = "../.env"  # Change path if needed

# Load environment variables from .env
load_dotenv(dotenv_path=env_path)
# Access the environment variables
ES_URL = os.getenv("ES_URL")
ES_USER = os.getenv("ES_USER")
ES_PASS = os.getenv("ES_PASS")

In [None]:
# Create a global client connection to elastic search
es_client = Elasticsearch(
    ES_URL,
    basic_auth=(ES_USER, ES_PASS),
    verify_certs=False,
    request_timeout=10000
)

In [None]:
print(es_client.info())

In [None]:
## Indexing PubmedQA
index_name = "research_index_bm25_pubmedqa_metadata"
index_mapping = {
    "settings" :{
    "number_of_replicas": 0,
        "number_of_shards": 1,
        "refresh_interval": "1m",
        "analysis": {
            "filter": {
                "possessive_english_stemmer": {
                    "type": "stemmer",
                    "language": "possessive_english"
                },
                "light_english_stemmer": {
                    "type": "stemmer",
                    "language": "light_english"
                },
                "english_stop": {
                    "ignore_case": "true",
                    "type": "stop",
                    "stopwords": ["a", "about", "all", "also", "am", "an", "and", "any", "are", "as", "at",
                                  "be", "been", "but", "by", "can", "de", "did", "do", "does", "for", "from",
                                  "had", "has", "have", "he", "her", "him", "his", "how", "if", "in", "into",
                                  "is", "it", "its", "more", "my", "nbsp", "new", "no", "non", "not", "of",
                                  "on", "one", "or", "other", "our", "she", "so", "some", "such", "than",
                                  "that", "the", "their", "then", "there", "these", "they", "this", "those",
                                  "thus", "to", "up", "us", "use", "was", "we", "were", "what", "when", "where",
                                  "which", "while", "why", "will", "with", "would", "you", "your", "yours"]
                }
            },
            "analyzer": {
                "text_en_no_stop": {
                    "filter": [
                        "lowercase",
                        "possessive_english_stemmer",
                        "light_english_stemmer"
                    ],
                    "tokenizer": "standard"
                },
                "text_en_stop": {
                    "filter": [
                        "lowercase",
                        "possessive_english_stemmer",
                        "english_stop",
                        "light_english_stemmer"
                    ],
                    "tokenizer": "standard"
                },
                "whitespace_lowercase": {
                    "tokenizer": "whitespace",
                    "filter": [
                        "lowercase"
                    ]
                }
            },
            "normalizer": {
                "keyword_lowercase": {
                    "filter": [
                        "lowercase"
                    ]
                }
            }
        }
    },
    "mappings": {
        "properties": {
            "id": {"type": "text"},
            "pubid": {"type": "text"},
            "contexts":{"type": "text"},
            "labels": {"type": "text"},
            "meshes": {"type": "text"},
            "keywords": {"type": "text"},
            "topics": {"type": "text"},
            "phrases": {"type": "text"},
            "synonyms": {"type": "text"},
            "achronym": {"type": "text"},
            "long_answer": {"type": "text"},
            "labelled":{"type": "text"}
        }
    }
}

In [None]:
def create_index(index_name,mapping):
    try:
        es_client.indices.create(index=index_name,body = mapping)
        print(f"Index '{index_name}' created successfully.")
    except RequestError as e:
        if e.error == 'resource_already_exists_exception':
            print(f"Index '{index_name}' already exists.")
        else:
            print(f"An error occurred while creating index '{index_name}': {e}")

In [None]:
create_index(index_name,index_mapping)

In [None]:
index_name_knn = 'research_index_knn_pubmedqa_metadata'
index_mapping = {
    "settings" :{
    "number_of_replicas": 0,
        "number_of_shards": 1,
        "refresh_interval": "1m",
        "analysis": {
            "filter": {
                "possessive_english_stemmer": {
                    "type": "stemmer",
                    "language": "possessive_english"
                },
                "light_english_stemmer": {
                    "type": "stemmer",
                    "language": "light_english"
                },
                "english_stop": {
                    "ignore_case": "true",
                    "type": "stop",
                    "stopwords": ["a", "about", "all", "also", "am", "an", "and", "any", "are", "as", "at",
                                  "be", "been", "but", "by", "can", "de", "did", "do", "does", "for", "from",
                                  "had", "has", "have", "he", "her", "him", "his", "how", "if", "in", "into",
                                  "is", "it", "its", "more", "my", "nbsp", "new", "no", "non", "not", "of",
                                  "on", "one", "or", "other", "our", "she", "so", "some", "such", "than",
                                  "that", "the", "their", "then", "there", "these", "they", "this", "those",
                                  "thus", "to", "up", "us", "use", "was", "we", "were", "what", "when", "where",
                                  "which", "while", "why", "will", "with", "would", "you", "your", "yours"]
                }
            },
            "analyzer": {
                "text_en_no_stop": {
                    "filter": [
                        "lowercase",
                        "possessive_english_stemmer",
                        "light_english_stemmer"
                    ],
                    "tokenizer": "standard"
                },
                "text_en_stop": {
                    "filter": [
                        "lowercase",
                        "possessive_english_stemmer",
                        "english_stop",
                        "light_english_stemmer"
                    ],
                    "tokenizer": "standard"
                },
                "whitespace_lowercase": {
                    "tokenizer": "whitespace",
                    "filter": [
                        "lowercase"
                    ]
                }
            },
            "normalizer": {
                "keyword_lowercase": {
                    "filter": [
                        "lowercase"
                    ]
                }
            }
        }
    },
    "mappings": {
        "properties": {
           "id": {"type": "text"},
            "pubid": {"type": "text"},
            "contexts":{"type": "text"},
            "labels": {"type": "text"},
            "meshes": {"type": "text"},
            "long_answer": {"type": "text"},
            "labelled":{"type": "text"},
            "keywords": {"type": "text"},
            "topics": {"type": "text"},
            "phrases": {"type": "text"},
            "synonyms": {"type": "text"},
            "achronym": {"type": "text"},
            "contexts_embedding": {
                    "type": "dense_vector", "dims": 384,
                    "similarity": "cosine", "index": "true"
                }
            
        }
    }
}

create_index(index_name_knn,index_mapping)

## Indexing Pipeline

In [9]:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')

In [None]:
# List of JSON file paths
file_paths = [
    "output/pubmedqa/meta/pubmedqa_index_metadata_0to20000.json",
    "output/pubmedqa/meta/pubmedqa_index_metadata_20000TO30000.json",
    "output/pubmedqa/meta/pubmedqa_index_metadata_30000TO38468.json",
    "output/pubmedqa/meta/pubmedqa_index_metadata_38469TO40000.json",
    "output/pubmedqa/meta/pubmedqa_index_metadata_40000to41000.json",
    "output/pubmedqa/meta/pubmedqa_index_metadata_41000to42000.json",
    "output/pubmedqa/meta/code/pubmedqa_index_metadata_42000to45000.json",
    "output/pubmedqa/meta/pubmedqa_index_metadata_45000to48000.json",
    "output/pubmedqa/meta/pubmedqa_index_metadata_48000TO50000.json",
    "output/pubmedqa/meta/pubmedqa_index_metadata_50000TO51000.json",
    "output/pubmedqa/meta/pubmedqa_index_metadata_51000TO55000.json",
    "output/pubmedqa/meta/pubmedqa_index_metadata_55000TO58000.json",
    "output/pubmedqa/meta/pubmedqa_index_metadata_58000TO60000.json",
    "output/pubmedqa/meta/pubmedqa_index_metadata_60000TO63000.json",
]

In [None]:
# List to store all unique data
unique_data = []
seen_pubids = set()

# Reading and appending unique JSON data
for file_path in file_paths:
    if os.path.exists(file_path):
        try:
            with open(file_path, 'r', encoding='utf-8') as file:
                data = json.load(file)
                for item in data:
                    pubid = item.get('pubid')
                    if pubid and pubid not in seen_pubids:
                        seen_pubids.add(pubid)
                        unique_data.append(item)
        except (json.JSONDecodeError, IOError) as e:
            print(f"Error reading {file_path}: {e}")
    else:
        print(f"File not found: {file_path}")

# Saving the deduplicated data
output_file = "/home/magma/Abhilasha/Meta_Data_Enrichment/code/combined_unique_metadata.json"
try:
    with open(output_file, 'w', encoding='utf-8') as outfile:
        json.dump(unique_data, outfile, ensure_ascii=False, indent=4)
    print(f"Unique data successfully saved to {output_file}")
except IOError as e:
    print(f"Error writing to {output_file}: {e}")

In [None]:
print(len(unique_data))

In [None]:
count =0
for d in unique_data:
    if 'passage_embedding' in d:
        count = count+1
print(count)

In [None]:
index_doc =unique_data

In [None]:
index_doc_knn =[]

#### Context Embedding using model encoding

In [None]:
for data in unique_data:
    contexts = data['contexts']
    context_str =''
    for context in contexts:
        context_str += context
    passage_embedding = model.encode(context_str)
    data["passage_embedding"] = passage_embedding
    index_doc_knn.append(data)

In [None]:
index_data =[]
for data in index_doc:
    synonyms = data['synonyms']
    achronym = data['achronym']
    data['synonyms'] = json.dumps(synonyms)
    data['achronym'] = json.dumps(achronym)
    index_data.append(data)


## BM25 indexing

In [None]:
import time
documents = []
for doc in index_data:
    documents.append(
        {
            "_index": index_name, ## CHANGE INDEX NAME
            "_source": doc,
        }
    )

In [None]:
from elasticsearch import helpers,exceptions, RequestError
def chunk_documents(documents, num_chunks):
    chunk_size = len(documents) // num_chunks
    remainder = len(documents) % num_chunks

    start = 0
    for i in range(num_chunks):
        chunk_end = start + chunk_size + (1 if i < remainder else 0)
        yield documents[start:chunk_end]
        start = chunk_end

# Example usage
total_docs = len(documents)
num_chunks = 100

start_time = time.time()
for i, chunk in enumerate(chunk_documents(documents, num_chunks)):
    #clear_output(wait=True)
    print(f"Chunk {i+1}: {len(chunk)} documents")
    try:
        helpers.bulk(es_client, chunk)
        print("Done indexing documents into ",{index_name}, "index!",{len(chunk)}) ## CHANGE INDEX NAME
    except Exception as e: 
        # Handle the exception
        print("An error occurred:", e)

## KNN Indexing

In [None]:
index_data_knn =[]
for data in index_doc_knn:
    synonyms = data['synonyms']
    achronym = data['achronym']
    data['synonyms'] = json.dumps(synonyms)
    data['achronym'] = json.dumps(achronym)
    index_data_knn.append(data)

In [None]:
import time
documents = []
for doc in index_data_knn:
    documents.append(
        {
            "_index": index_name_knn, ## CHANGE INDEX NAME
            "_source": doc,
        }
    )

In [None]:
index_doc_knn[0]

In [None]:
## Testing purpose
es_client.index(index=index_name_knn,body =index_doc_knn[0])

In [None]:
# Example usage
total_docs = len(documents)
num_chunks = 100
start_time = time.time()
for i, chunk in enumerate(chunk_documents(documents, num_chunks)):
    #clear_output(wait=True)
    print(f"Chunk {i+1}: {len(chunk)} documents")
    try:
        helpers.bulk(es_client, chunk)
        print("Done indexing documents into ",{index_name_knn}, "index!",{len(chunk)}) ## CHANGE INDEX NAME
    except Exception as e: 
        # Handle the exception
        print("An error occurred:", e)