In [1]:
from rag_data_uploader import ElasticsearchUploader, OpensearchUploader
from rag_data_uploader.utils.mappings import ES7MappingNewCPI, ES7MappingCPI

import os
import json
import requests


In [2]:
url = "http://localhost:9200"
username = ""
password = ""

In [3]:
alias = "rag_data"
folder = "../data/new_json/nr-ran"
date = "2023-10-01"
index = f"{folder.split('/')[-1]}_{date}"

#### First, we have to make sure that we are running a vector store locally at the url `http://localhost:9200` using one of the docker-compose yaml files in this repo. When this is done, we can instantiate an uploader corresponding to the vector store type we are running.

In [4]:
uploader = ElasticsearchUploader(url, username, password)
# uploader = OpensearchUploader(url, username, password)

##### The uploader has a couple of uploading methods depending on if documents are stored as files or if they are kept in memory. The method which uploads from files does so in batches, to allow for loading large numbers of documents without having to read them all into memory at once. Currently, the maximum number of files uploaded at a time is 10 000.

In [6]:
mapping = ES7MappingNewCPI()
uploader.upload_from_folder(folder, index, alias, mapping=mapping)

URL:  http://localhost:9200
Response from document store at URL http://localhost:9200:  OK
Some documents had an empty or non-existing content field and were excluded from upload


Uploading documents: 100%|██████████| 7681/7681 [00:02<00:00, 3805.27it/s]


Number of successfully uploaded documents:  7681


##### The method which uploads documents from memory takes similar inputs, and works in a similar way. Let's upload a few documents from the radio folder


In [7]:
folder = "../data/new_json/radio"
index = f"{folder.split('/')[-1]}_{date}"
file_names = [name for name in os.listdir(folder)[:10] if name.endswith(".json")]
documents_to_upload = []
for name in file_names:
    with open(f"{folder}/{name}", "r") as f:
        documents_to_upload.extend(json.load(f))

In [8]:
uploader.upload_documents(documents_to_upload, index=index, alias=alias, mapping=mapping)

Some documents had an empty or non-existing content field and were excluded from upload
Number of documents: 379


Uploading documents: 100%|██████████| 379/379 [00:00<00:00, 4625.11it/s]

Number of successfully uploaded documents:  379





##### Now we can see that both indices that we have uploaded are included in the specified alias

In [9]:
response = requests.get(f"{url}/_alias/{alias}")
response.json()

{'radio_2023-10-01': {'aliases': {'rag_data': {}}},
 'nr-ran_2023-10-01': {'aliases': {'rag_data': {}}}}

##### If we upload an index with the same index name but a different date, the uploader will upload the new index, and replace the old index in the alias with the new one. This will allow for version control.

In [10]:
date = "2023-12-01"
index = f"{folder.split('/')[-1]}_{date}"
uploader.upload_documents(documents_to_upload, index=index, alias=alias, mapping=mapping)

Some documents had an empty or non-existing content field and were excluded from upload
Number of documents: 379


Uploading documents: 100%|██████████| 379/379 [00:00<00:00, 5383.45it/s]

Number of successfully uploaded documents:  379
Index/indices radio_2023-10-01 removed from alias rag_data





##### If we again check which indices are in the alias, we see that the old radio index has been replaced with the new one

In [11]:
response = requests.get(f"{url}/_alias/{alias}")
response.json()

{'radio_2023-12-01': {'aliases': {'rag_data': {}}},
 'nr-ran_2023-10-01': {'aliases': {'rag_data': {}}}}

##### The old index is still stored on the vector store, which will allow us to roll back which data is searchable when we want to.

In [12]:
response = requests.get(f"{url}/*")
indices = [name for name in response.json().keys() if not name.startswith(".")]
print("Indices in vector store:", indices)

Indices in vector store: ['nr-ran_2023-10-01', 'radio_2023-10-01', 'radio_2023-12-01']


##### If we try to upload document to an existing index, with another mapping than the mapping that the index has, the mapping will automatically be changed to the one in the existing index. Let's try to upload documents using a mapping which includes an embedding key.

In [14]:
mapping = ES7MappingCPI()
uploader.upload_documents(documents_to_upload, index=index, alias=alias, mapping=mapping)

Mapping has been changed to that of alias/index rag_data
Some documents had an empty or non-existing content field and were excluded from upload
Number of documents: 379


Uploading documents:   0%|          | 0/379 [00:00<?, ?it/s]

Uploading documents: 100%|██████████| 379/379 [00:00<00:00, 4021.26it/s]

Number of successfully uploaded documents:  379





##### If we try to upload documents to a new index, but to an alias which exists, the uploader will enforce the mapping in the alias

In [15]:
date = "2023-12-15"
index = f"{folder.split('/')[-1]}_{date}"
uploader.upload_documents(documents_to_upload, index=index, alias=alias, mapping=mapping)

User-defined mapping is in conflict with alias mapping.
Changing mapping to alias mapping.
Some documents had an empty or non-existing content field and were excluded from upload
Number of documents: 379


Uploading documents: 100%|██████████| 379/379 [00:00<00:00, 2086.78it/s]


Number of successfully uploaded documents:  379
Index/indices radio_2023-12-01 removed from alias rag_data


#### Now we can see the indices in the vector store as well as the alias

In [16]:
response = requests.get(f"{url}/*")
indices = [name for name in response.json().keys() if not name.startswith(".")]
print("Indices in vector store:", indices)

Indices in vector store: ['nr-ran_2023-10-01', 'radio_2023-10-01', 'radio_2023-12-01', 'radio_2023-12-15']


In [17]:
response = requests.get(f"{url}/_alias/{alias}")
response.json()

{'radio_2023-12-15': {'aliases': {'rag_data': {}}},
 'nr-ran_2023-10-01': {'aliases': {'rag_data': {}}}}