# allganize-RAG-Evaluation data + multimodal ingestion
## Methodology
```
1. Load Document Readers
    1-1. Load DoclingPDFReader
        1-1-1. Initialize Docling Converter
        1-1-2. Initialize PSIKing Reader
    1-2. Load PDF2ImageReader
2. Load PDF File Data
3. Ingest Data
    3-1. (Reader) PDF File -> PSIKing Document
    3-2. (Splitter) Chunk Documents
4. Embed
5. Insert into DocumentStore, VectorStore
    5-1. Insert to DocStore
    5-2. Insert to VectorStore
6. Test Query
```

## Settings
[Dataset]
* real-life Korean finance pdf files from `allganize-RAG-Evaluation-Dataset-KO`
    * https://huggingface.co/datasets/allganize/RAG-Evaluation-Dataset-KO
    * use 10 'finance' domain files

[Embedder]
* model: `jina-embeddings-v4-vllm-retrieval` [[hf link]](https://huggingface.co/jinaai/jina-embeddings-v4-vllm-retrieval)
    * served using vLLM `v0.9.1` docker image

In [1]:
import json
import os

from pathlib import Path
import time
from typing import Any, Dict, List, Optional

import pandas as pd
from pydantic import BaseModel
from tqdm import tqdm

from config import settings
# Artifacts should contain model weights downloaded using `docling-tools models download`
# Typically set to `~/.cache/docling/models`
# os.environ["DOCLING_ARTIFACTS_PATH"] = settings.docling_artifacts_path

In [2]:
## Import Core Schemas
from psiking.core.base.schema import Document, TextNode, ImageNode, TableNode

# 1. Load Document Readers

## 1-1. Load DoclingPDFReader

### 1-1-1. Initialize Docling Converter

In [3]:
from docling_core.types.doc import PictureItem

from docling.datamodel.base_models import InputFormat

from docling.datamodel.pipeline_options import (
    AcceleratorDevice,
    VlmPipelineOptions,
    PdfPipelineOptions,
    PictureDescriptionApiOptions,
    ResponseFormat,
    TableStructureOptions,
    TableFormerMode
)
from docling.document_converter import DocumentConverter, PdfFormatOption
from docling.pipeline.vlm_pipeline import VlmPipeline
from docling.datamodel.pipeline_options_vlm_model import (
    ApiVlmOptions,
    InferenceFramework,
    InlineVlmOptions,
    ResponseFormat,
    TransformersModelType
)

In [4]:
pipeline_options = PdfPipelineOptions()

# If force_backend_text = True, text from backend will be used instead of generated text
pipeline_options.force_backend_text = False
pipeline_options.generate_picture_images = True

pipeline_options.images_scale = 1.5
pipeline_options.generate_page_images = True
pipeline_options.generate_picture_images = True
pipeline_options.do_ocr = False

# TableStructure
pipeline_options.do_table_structure = True
pipeline_options.table_structure_options = TableStructureOptions(mode=TableFormerMode.ACCURATE)

pipeline_options.accelerator_options.device = AcceleratorDevice.MPS

In [None]:
from psiking.core.reader.pdf.docling.picture_description import (
    openai_options as docling_openai_picture_description_options
) 

pipeline_options.do_picture_description = True
pipeline_options.enable_remote_services = True

print(settings.vlm_model)
pipeline_options.picture_description_options=docling_openai_picture_description_options(
    api_key=settings.vlm_api_key,
    model=settings.vlm_model
)

gpt-4.1-nano


In [6]:
converter = DocumentConverter(
    allowed_formats = [
        InputFormat.PDF,
    ],
    format_options={
        InputFormat.PDF: PdfFormatOption(
            pipeline_options=pipeline_options,
        ),
    }
)

### 1-1-2. Initialize PSIKing Reader (DoclingPDFReader)

In [None]:
from psiking.core.reader.pdf.docling import DoclingPDFReader

# initalize reader
reader = DoclingPDFReader(converter=converter)

## 1-2. Load PDF2ImageReader

In [None]:
from psiking.core.reader.pdf.pdf2image import PDF2ImageReader

poppler_path = "/opt/homebrew/Cellar/poppler/25.07.0/bin"
pdf2img_reader = PDF2ImageReader(poppler_path=poppler_path)

# 2. Load PDF File Data
* 10 pdf files, convert to image with pdf2image

In [None]:
# PDF File directory
# pdf_dir = os.path.join(settings.data_dir, "retrieval_dataset/allganize-RAG-Evaluation-Dataset-KO/finance")
pdf_dir = '../data/pdf/finance'
pdf_fnames =[x for x in os.listdir(pdf_dir) if x.endswith(".pdf")]
print("num files:", len(pdf_fnames))
pdf_fnames[:10]

num files: 10


['7373884a-8255-482d-9e7c-00b919083526.pdf',
 '5484364a-38de-48b7-a0a6-b009f361bd9e.pdf',
 'b59c836c-ec57-44ba-b4a8-2ae3d58a22e4.pdf',
 '99d45724-817a-4c05-85e2-83e0aa8ac8c0.pdf',
 '03d95093-ed1f-4a66-83dc-5534dfbd87e3.pdf',
 'c94f675e-7d81-48bd-88f8-c5ff766190cc.pdf',
 '053248f8-4311-413e-b34b-9a65a4251f4f.pdf',
 '72b54f4b-7002-48ea-ad20-2c613d8360f6.pdf',
 'bbd035d6-51a2-41ba-b913-8357d89b7852.pdf',
 '980889bb-16cd-447f-b5eb-1384b84903cc.pdf']

In [None]:
# Map to FileIds
metadata_df = pd.read_csv('../data/metadata.tsv', sep='\t')

pdf_file_ids = [
    metadata_df[metadata_df.id==x.replace('.pdf', '')].iloc[0]['id'] for x in pdf_fnames
]

In [11]:
len(pdf_file_ids)

10

# 3. Ingest Data

In [12]:
import traceback

## 3-1. (Reader) PDF File -> PSIKing Document
[Note]
* Some files may fail to parse with error `RuntimeError: Invalid code point` due to error with docling PDF pase backend
    * Related Issues:
        * https://github.com/docling-project/docling/issues/1111
        * https://github.com/docling-project/docling-parse/issues/133
* Files exhibiting this error will be handled as PDF2Image

In [13]:
# Convert pages to image
documents = []
failed_fnames = []

for doc_i in tqdm(range(len(pdf_fnames))):
    fname=pdf_fnames[doc_i]
    file_path = os.path.join(pdf_dir, fname)
    file_id = pdf_file_ids[doc_i]
    
    try:
        document = reader.run(
            file_path, 
            extra_info = {
                "source_id": file_id,
                "domain": "finance"
            }
        )
        documents.append(document)
        continue
    except Exception as e:
        # print("[DOCLING READER] failed {} - {}".format(fname, str(e)))
        print("[DOCLING READER] failed {}, Falling back to PDF2IMG".format(fname))
        # print(traceback.format_exc())

    # Fallback - PDF2IMG
    try:
        document = pdf2img_reader.run(
            file_path,
            extra_info = {
                "source_id": file_id,
                "domain": "finance"
            }
        )
        documents.append(document)
    except Exception as e:
        print("[PDF2IMG READER] failed {} - {}".format(fname, str(e)))
        failed_fnames.append(fname)
    
for node in document.nodes[:3]:
    print(type(node))

 20%|██        | 2/10 [04:22<15:32, 116.62s/it]Encountered an error during conversion of document 02616dbc4dc47f992b7008e68e4f1d4cb49ccece229e7fad02a38a3470346a63:
Traceback (most recent call last):

  File "/opt/miniconda3/envs/psiking/lib/python3.10/site-packages/docling/pipeline/base_pipeline.py", line 160, in _build_document
    for p in pipeline_pages:  # Must exhaust!

  File "/opt/miniconda3/envs/psiking/lib/python3.10/site-packages/docling/pipeline/base_pipeline.py", line 126, in _apply_on_pages
    yield from page_batch

  File "/opt/miniconda3/envs/psiking/lib/python3.10/site-packages/docling/models/page_assemble_model.py", line 70, in __call__
    for page in page_batch:

  File "/opt/miniconda3/envs/psiking/lib/python3.10/site-packages/docling/models/table_structure_model.py", line 177, in __call__
    for page in page_batch:

  File "/opt/miniconda3/envs/psiking/lib/python3.10/site-packages/docling/models/layout_model.py", line 151, in __call__
    for page in page_batch:


[DOCLING READER] failed b59c836c-ec57-44ba-b4a8-2ae3d58a22e4.pdf, Falling back to PDF2IMG


 40%|████      | 4/10 [04:48<04:38, 46.47s/it] Encountered an error during conversion of document ce014774ce984417127bff298a0e883db7ad2652e7cb66d49bbbb2423cc4176c:
Traceback (most recent call last):

  File "/opt/miniconda3/envs/psiking/lib/python3.10/site-packages/docling/pipeline/base_pipeline.py", line 160, in _build_document
    for p in pipeline_pages:  # Must exhaust!

  File "/opt/miniconda3/envs/psiking/lib/python3.10/site-packages/docling/pipeline/base_pipeline.py", line 126, in _apply_on_pages
    yield from page_batch

  File "/opt/miniconda3/envs/psiking/lib/python3.10/site-packages/docling/models/page_assemble_model.py", line 70, in __call__
    for page in page_batch:

  File "/opt/miniconda3/envs/psiking/lib/python3.10/site-packages/docling/models/table_structure_model.py", line 177, in __call__
    for page in page_batch:

  File "/opt/miniconda3/envs/psiking/lib/python3.10/site-packages/docling/models/layout_model.py", line 151, in __call__
    for page in page_batch:


[DOCLING READER] failed 03d95093-ed1f-4a66-83dc-5534dfbd87e3.pdf, Falling back to PDF2IMG


100%|██████████| 10/10 [11:49<00:00, 70.93s/it] 

<class 'psiking.core.base.schema.TextNode'>
<class 'psiking.core.base.schema.TextNode'>
<class 'psiking.core.base.schema.TextNode'>





In [None]:
print(len(documents))

10


In [15]:
document.metadata

{'reader': 'DoclingPDFReader',
 'source_id': '980889bb-16cd-447f-b5eb-1384b84903cc',
 'domain': 'finance'}

In [16]:
# image = document.nodes[0].image

# # Crop to half
# width, height = image.size
# left_half = image.crop((0, 0, width, height//2))
# left_half

## 3-2. (Splitter) Chunk Documents
1. merge text nodes with `TextNodeMerger`
2. split texts into chunks with `LangchainRecursiveCharacterTextSplitter`

In [17]:
from psiking.core.processor.document.text_merger import TextNodeMerger
# Split Documents page-level
merger = TextNodeMerger()

merged_documents = []
for document in documents:
    merged_document = merger.run(document)
    merged_documents.append(merged_document)

In [18]:
# merged_documents[0]
merged_documents[0].nodes[0]

TextNode(id_='aac25e2f-9b7a-487f-831f-c94621894b44', metadata={'prov': '[{"page_no": 1, "bbox": {"l": 71.444, "t": 702.6370374023437, "r": 511.598, "b": 645.7080374023437, "coord_origin": "BOTTOMLEFT"}, "charspan": [0, 37]}]'}, text_type=<TextType.PLAIN: 'plain'>, label=<TextLabel.PLAIN: 'plain'>, resource=MediaResource(data=None, text='증권사 리서치센터장, 자산운용사 대표와 함께하는 제1회 증시 콘서트\n2019 하반기 증시 대전망\n|\xa0일\xa0시\xa0| 2019.\xa07.\xa02\xa0(화)\xa014:30\n|\xa0장\xa0소\xa0| 금융투자협회\xa03층\xa0불스홀', path=None, url=None, mimetype=None))

In [19]:
# Run Splitter
import copy
from psiking.core.splitter.text.langchain_text_splitters import LangchainRecursiveCharacterTextSplitter

splitter = LangchainRecursiveCharacterTextSplitter(
    chunk_size = 1024,
    chunk_overlap = 128
)

chunks = []
for document in merged_documents:
    document_chunks = []
    document_metadata = document.metadata
    
    for i, node in enumerate(document.nodes):
        # Run Splitter
        if isinstance(node, TextNode):
            try:
                split_nodes = splitter.run(node)
            except Exception as e:
                print(i, node)
                print(str(e))
                raise e
        else:
            split_nodes = [node]
            
        node_metadata = node.metadata
        # Add 
        chunk_metadata = copy.deepcopy(document_metadata)
        chunk_metadata['prov'] = node_metadata['prov']
        
        # Create New Document
        for split_node in split_nodes:
            # Each Document contains single node
            chunk = Document(
                nodes=[split_node],
                metadata=chunk_metadata
            )
            document_chunks.append(chunk)
    chunks.extend(document_chunks)
print(len(chunks))

1032


# 4. Embed

## 4-1. Initialize Embedder

In [None]:
import asyncio
from tqdm.asyncio import tqdm

from psiking.core.embedder.vllm.online_jina_emb_v4 import VLLMOnlineJinaEmbV4Embedder

embedder = VLLMOnlineJinaEmbV4Embedder(
    base_url=settings.multimodal_embedding_base_url,
    model=settings.multimodal_embedding_model
)

## 4-2. Embed Documents

In [None]:
import base64
from io import BytesIO
from PIL import Image

def img_to_base64(img: Image.Image, format="PNG"):
    buffer = BytesIO()
    img.save(buffer, format=format)         # or format="JPEG"
    buffer.seek(0)

    # 3. Base64-encode
    b64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
    return b64

async def embed(semaphore, doc: Document):
    node = doc.nodes[0]
    
    text = doc.nodes[0].text
    messages = [
        {
            'role': 'user',
            'content': [
                {'type': 'text', 'text': text},
            ]
        }
    ]
    
    if isinstance(node, ImageNode) or isinstance(node, TableNode):
        if not node.image_data is None:
            image_base64 = img_to_base64(node.image)
            messages[0]['content'].append(
                {
                    "type": "image_url",
                    "image_url": {"url": f"data:image/png;base64,{image_base64}"}
                }
            )
    async with semaphore:
        try:
            embedding = await embedder.arun(
                input=messages,
                input_format='messages',
                pool=True,
                normalize=True
            )
        except Exception as e:
            print("ERR DOC {} {}".format(doc.id_, str(e)))
            raise e
    return embedding

In [22]:
semaphore = asyncio.Semaphore(16)

tasks = []
for chunk in chunks:
    task = embed(semaphore, chunk)
    tasks.append(task)

embeddings = await tqdm.gather(*tasks)

100%|██████████| 1032/1032 [1:08:40<00:00,  3.99s/it]


In [23]:
embeddings = [x.tolist() for x in embeddings]

In [24]:
# (num_chunks, seq_len, embedding_dim)
print(len(embeddings))
print(len(embeddings[0]))

1032
2048


In [25]:
type(embeddings[0][0])

float

# 5. Insert into DocumentStore, VectorStore

## 5-1. Insert to DocStore

In [26]:
from psiking.core.storage.docstore.in_memory import InMemoryDocumentStore

In [27]:
doc_store = InMemoryDocumentStore()

In [28]:
doc_store.add(chunks)

In [29]:
doc_store.count()

1032

In [None]:
doc_store.save('storage/docstore_v2507.json')

## 5-2. Insert to VectorStore

In [None]:
from qdrant_client import QdrantClient
from psiking.core.storage.vectorstore.qdrant import QdrantSingleVectorStore

# initialize client
# client = QdrantClient(":memory:")
client = QdrantClient(host="localhost", port=6333)
collection_name = "allganize-finance-multimodal-v2507"

vector_store = QdrantSingleVectorStore(
    collection_name=collection_name,
    client=client,
)

In [None]:
from qdrant_client.http import models

# embedding_dim = 1024
embedding_dim = len(embeddings[0])

vector_store.create_collection(
    on_disk_payload=True,  # store the payload on disk
    vectors_config = models.VectorParams(
        size=embedding_dim,
        distance=models.Distance.COSINE,
        on_disk=True,
        hnsw_config = {
            "m": 16,
            "ef_construct": 100,
        }
    )
)

In [33]:
len(chunks), len(embeddings)

(1032, 1032)

In [36]:
vector_store.add(
    documents=chunks,
    embeddings=embeddings,
    metadata_keys=["source_id", "domain", 'prov']
)

In [37]:
chunks[0].id_

'8bd9aa6b-06cf-4ef8-a617-41d7eec2ab89'

In [38]:
points = vector_store._client.retrieve(
    collection_name=vector_store.collection_name,
    ids=[chunks[0].id_],
    with_vectors=True
)

In [39]:
print(points[0].id)
print(points[0].payload)
print(len(points[0].vector))

8bd9aa6b-06cf-4ef8-a617-41d7eec2ab89
{'source_id': '7373884a-8255-482d-9e7c-00b919083526', 'domain': 'finance', 'prov': '[{"page_no": 1, "bbox": {"l": 71.444, "t": 702.6370374023437, "r": 511.598, "b": 645.7080374023437, "coord_origin": "BOTTOMLEFT"}, "charspan": [0, 37]}]'}
2048


# 6. Test Query

In [40]:
import numpy as np
from psiking.core.storage.vectorstore.schema import (
    MetadataFilters,
    FilterOperator,
    VectorStoreQuery,
    VectorStoreQueryMode,
    VectorStoreQueryOptions,
)   

In [41]:
# Use random query embedding
query_embedding = np.random.randn(embedding_dim)

vsquery=VectorStoreQuery(
    dense_embedding=query_embedding
)
vsoptions=VectorStoreQueryOptions(
    mode=VectorStoreQueryMode.DENSE,
    top_k=10
)

In [42]:
points = vector_store.query(
    query=vsquery,
    options=vsoptions
)

In [43]:
points

[ScoredPoint(id='f83d0deb-a3c4-4a89-9bdf-f1197ed971aa', version=13, score=0.056224834, payload={'source_id': '980889bb-16cd-447f-b5eb-1384b84903cc', 'domain': 'finance', 'prov': '[{"page_no": 57, "bbox": {"l": 494.632, "t": 580.8330073242187, "r": 501.125, "b": 498.1310073242187, "coord_origin": "BOTTOMLEFT"}, "charspan": [0, 21]}]'}, vector=None, shard_key=None, order_value=None),
 ScoredPoint(id='a805d829-95dc-4b88-a22d-76d453f9163b', version=16, score=0.048955273, payload={'source_id': '980889bb-16cd-447f-b5eb-1384b84903cc', 'domain': 'finance', 'prov': '[{"page_no": 134, "bbox": {"l": 430.1534118652344, "t": 105.8568115234375, "r": 477.24481201171875, "b": 58.15570068359375, "coord_origin": "BOTTOMLEFT"}, "charspan": [0, 0]}]'}, vector=None, shard_key=None, order_value=None),
 ScoredPoint(id='b90681d6-2aa5-4d0d-bf3e-f05343809908', version=9, score=0.048723314, payload={'source_id': '72b54f4b-7002-48ea-ad20-2c613d8360f6', 'domain': 'finance', 'prov': '[{"page_no": 35, "bbox": {"l": 

In [44]:
# Get Retrieved Result from docstore
retrieved_doc_id = points[0].id

retrieved_doc = doc_store.get(retrieved_doc_id)[0]

In [45]:
nodes = retrieved_doc.nodes
print(len(nodes))

1
