In [7]:
import json
from elasticsearch import Elasticsearch, helpers
import urllib3
import os


elastic_password = os.getenv('ELASTIC_PASSWORD_SERVER')

es = Elasticsearch(
    ['https://localhost:9200'],
    basic_auth=('elastic', elastic_password),
    verify_certs=False,
    ca_certs="C:/Users/linus/http_ca.crt"
)

urllib3.disable_warnings()

In [8]:
# Test the connection
es.info()

ObjectApiResponse({'name': 'b3472380ffa2', 'cluster_name': 'docker-cluster', 'cluster_uuid': 'DQvmIapdSNS30vfmGkeR8w', 'version': {'number': '8.13.1', 'build_flavor': 'default', 'build_type': 'docker', 'build_hash': '9287f29bba5e270bd51d557b8daccb7d118ba247', 'build_date': '2024-03-29T10:05:29.787251984Z', 'build_snapshot': False, 'lucene_version': '9.10.0', 'minimum_wire_compatibility_version': '7.17.0', 'minimum_index_compatibility_version': '7.0.0'}, 'tagline': 'You Know, for Search'})

### Indexing Documents with Embeddings into Elasticsearch for Vector Similarity Search

initialize the index with the appropriate mapping for the dense vector field.

In [14]:
# drop the index if it already exists
if es.indices.exists(index='pubmed_emb_index'):
   es.indices.delete(index='pubmed_emb_index')

In [7]:
# Define the index name
index_name = "pubmed_emb_index"

# Check if the index already exists
if not es.indices.exists(index=index_name):
    # Define the mapping
    mapping = {
        "mappings": {
            "properties": {
                "embeddings": {"type": "dense_vector", "dims": 768}  # Adjust the dimension size as needed
                # Add other field mappings as necessary
            }
        }
    }
    
    # Create the index with the defined mapping
    es.indices.create(index=index_name, body=mapping)



load the JSONL files containing the PubMed documents, extract the embeddings, and index the documents into Elasticsearch.

In [15]:
from pathlib import Path
import os
import json
from tqdm import tqdm

source_directory = Path('C:/Users/linus/big_data/pubmed/first100JSONLembedded/')

index_name = "pubmed_emb_index"

def bulk_index_documents(source_directory, index_name):
    if not source_directory.exists():
        print("The source directory does not exist.")
        return

    actions = [] 

    for file_name in tqdm(os.listdir(source_directory)):
        if file_name.endswith('.jsonl'):
            source_file = source_directory / file_name
            
            with open(source_file, 'r') as json_file:
                for line in json_file:
                    try:
                        doc = json.loads(line)
                        action = {
                            "_index": index_name,
                            "_source": doc
                        }
                        actions.append(action)

                        if len(actions) == 600: # Bulk 600 docs
                            helpers.bulk(es, actions)
                            actions = [] 
                    except json.JSONDecodeError as e:
                        print(f"Error decoding JSON: {e}")
                    except Exception as e:
                        print(f"An error occurred: {e}")

    if actions:
        helpers.bulk(es, actions)

    print('Indexing complete')

bulk_index_documents(source_directory, index_name)

 63%|██████▎   | 63/100 [35:47<51:34, 83.63s/it]

An error occurred: Connection timed out


100%|██████████| 100/100 [48:28<00:00, 29.09s/it]


ConnectionTimeout: Connection timed out

In [9]:
count_result = es.count(index='pubmed_emb_index')

# Print the count
print(f"Index contains 100 JSONL Chunks with {count_result['count']} documents.")

Index contains 100 JSONL Chunks with 1795307 documents.


In [10]:
response = es.indices.stats(index='pubmed_emb_index')
index_size = response['_all']['total']['store']['size_in_bytes']

print(f"Die Grösse des Indexes ist {round(index_size/1000000000, 2)} GB.")

Die Grösse des Indexes ist 30.2 GB.
