### Ingest data into OpenSearch
- Clean up text
- Split text into chunks
- Format text for OpenSearch

In [8]:
import re
import os
import ocrmypdf
from pathlib import Path
import pymupdf
import torch
import spacy
from tqdm.auto import tqdm
import sys

import json
import urllib.parse
import hashlib
import itertools

from opensearchpy import OpenSearch, helpers

sys.path.append('../src')

from utils import *
from config import *

In [4]:
# Run on GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'Running code @ {device}')

Running code @ cpu


Load spaCy language model for Romanian

In [3]:
# load the pre-trained spacy language model "ro_core_news_lg"
nlp = spacy.load("ro_core_news_lg")

##### Clean up text

In [17]:
def cleanup_text(text: str) -> str:
    # remove extra whitespace
    text = re.sub(r'\s+', ' ', text).strip()

    # # convert to lowercase
    # text = text.lower()

    # clean up table of contents 
    text = re.sub(r'\.{4,}\s*\d+', '', text).strip()

    return text

#### Parse PDF

In [16]:
def parse_pdf(pdf_path: str, url: str) -> dict:
    assert os.path.exists(pdf_path), f'Path {pdf_path} not found'

    # decode percent-encoded/URL-encoded filename -> get diacritics
    filename = urllib.parse.unquote(pdf_path.split('/')[-1])

    document = pymupdf.open(pdf_path, filetype="pdf")
    data = {
        "filename": filename,
        "path": pdf_path,
        "url": url,
        "sentences": [],
    }

    for page_number, page in enumerate(document):
        text = cleanup_text(page.get_text())

        doc = nlp(text)

        data["sentences"].extend([
            {
                "page_number": page_number + 1,
                "text": str(sentence)
            }
            for sentence in doc.sents
        ])
    
    return data

#### Split text into chunks

In [18]:
def split_text_into_chunks(data: dict, max_length: int = 1024) -> dict:
    paragraphs = []

    current_paragraph = []
    page_number = 1
    length = 0
    chunk_number = 1

    for sentence in data["sentences"]:
        if length + len(sentence["text"]) > max_length:
            paragraphs.append({
                "id": get_id(data["url"], chunk_number),
                "text": " ".join(current_paragraph),
                "url": data["url"],
                "type": "pdf",
                "filename": data["filename"],
                "page_number": page_number,
            })
            current_paragraph.clear()
            page_number = sentence["page_number"]
            length = 0
            chunk_number += 1
        
        current_paragraph.append(sentence["text"])
        length += len(sentence["text"])
    
    if current_paragraph:
        paragraphs.append({
            "id": get_id(data["url"], chunk_number),
            "text": " ".join(current_paragraph),
            "url": data["url"],
            "type": "pdf",
            "filename": data["filename"],
            "page_number": page_number,
        })

    return paragraphs

#### Ingest data into OpenSearch

In [None]:
ADMIN_PASSWD = os.environ['OPENSEARCH_INITIAL_ADMIN_PASSWORD']
INDEX_NAME = 'rag-knn-index'
MODEL_ID = ''

In [12]:
host = 'localhost'
port = 9200

# Create the client with SSL/TLS and hostname verification disabled.
client = OpenSearch(
    hosts = [{'host': host, 'port': port}],
    http_compress = True, # enables gzip compression for request bodies
    http_auth = ('admin', ADMIN_PASSWD),
    use_ssl = True,
    verify_certs = False,
    ssl_assert_hostname = False,
    ssl_show_warn = False,
)

print(json.dumps(client.info(), indent=4))

{
    "name": "opensearch-node1",
    "cluster_name": "opensearch-cluster",
    "cluster_uuid": "exKC0fQ7Tomx9X3wA4fBbA",
    "version": {
        "distribution": "opensearch",
        "number": "2.19.1",
        "build_type": "tar",
        "build_hash": "2e4741fb45d1b150aaeeadf66d41445b23ff5982",
        "build_date": "2025-02-27T01:16:47.726162386Z",
        "build_snapshot": false,
        "lucene_version": "9.12.1",
        "minimum_wire_compatibility_version": "7.10.0",
        "minimum_index_compatibility_version": "7.0.0"
    },
    "tagline": "The OpenSearch Project: https://opensearch.org/"
}


#### Format data for OpenSearch index

In [19]:
def format_data(paragraphs: dict[str, str]) -> dict[str, str]:
    return [
        {"_index": INDEX_NAME, "_id": paragraph["id"]} | paragraph
        for paragraph in paragraphs
    ]

In [21]:
def ingest_data(path=DATA_DIR) -> None:
    directories = get_directories(path)
    directories.remove(OCR_DIR.name)

    for directory in directories:
        pdfs = get_files_by_extension(path=os.path.join(DATA_DIR, directory), extension=".pdf")

        metadata_filepath = Path(path) / directory / "metadata.json"
        metadata = read_metadata(metadata_filepath)

        for filename in pdfs:
            pdf_path = DATA_DIR / directory / filename
            if "ocr_path" in metadata[filename]:
                pdf_path = BASE_DIR / metadata[filename]["ocr_path"]

            data = parse_pdf(pdf_path=pdf_path.as_posix(), url=metadata[filename]["url"])

            paragraphs = split_text_into_chunks(data)
            # for paragraph in paragraphs[:10]:
            #     print(paragraph)

            entries = format_data(paragraphs)
            # print(entries)

            ret = helpers.parallel_bulk(
                client, 
                actions=entries, 
                chunk_size=10, 
                raise_on_error=False,
                raise_on_exception=False,
                max_chunk_bytes=20 * 1024 * 1024,
                request_timeout=60
            )

            print(list(ret))
            
ingest_data()

[(True, {'index': {'_index': 'rag-knn-index', '_id': 'fab2ac7fe58ff17b51af9bdd7be38904e3a4f541961db7da0259d45ceebee82e-1', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 2, 'failed': 0}, '_seq_no': 0, '_primary_term': 1, 'status': 201}}), (True, {'index': {'_index': 'rag-knn-index', '_id': 'fab2ac7fe58ff17b51af9bdd7be38904e3a4f541961db7da0259d45ceebee82e-2', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 2, 'failed': 0}, '_seq_no': 3, '_primary_term': 1, 'status': 201}}), (True, {'index': {'_index': 'rag-knn-index', '_id': 'fab2ac7fe58ff17b51af9bdd7be38904e3a4f541961db7da0259d45ceebee82e-3', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 2, 'failed': 0}, '_seq_no': 4, '_primary_term': 1, 'status': 201}}), (True, {'index': {'_index': 'rag-knn-index', '_id': 'fab2ac7fe58ff17b51af9bdd7be38904e3a4f541961db7da0259d45ceebee82e-4', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 2, 'fail

### Search data

In [22]:
query_text = "atributii responsabili de proces"

query = {
    "size": 3,
    "query": {
      "neural": {
        "passage_embedding": {
          "query_text": query_text,
          "model_id": MODEL_ID,
          "k": 3
        }
      }
  }
}

response = client.search(
    body = query,
    index = INDEX_NAME,
)

NameError: name 'MODEL_ID' is not defined