# RAG ingestion pipeline
Starting from pre-processed artifacts generated with the document ingestion pipeline.

In [1]:
%pip install -q "instructlab[mps]"
%pip install -q haystack milvus_haystack
%pip install -q "sentence-transformers>=3.0.0"'
%pip install -q jq
%pip install -q pymilvus


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m24.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m24.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.
zsh:1: unmatched '
Note: you may need to restart the kernel to use updated packages.

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m24.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart 

In [2]:
from pathlib import Path

from haystack import Pipeline
from haystack.components.converters import JSONConverter
from haystack.components.embedders import SentenceTransformersDocumentEmbedder
from haystack.components.joiners import DocumentJoiner
from haystack.components.preprocessors import DocumentSplitter, DocumentCleaner
from haystack.components.routers import FileTypeRouter
from haystack.components.writers import DocumentWriter
from milvus_haystack import MilvusDocumentStore

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
milvus_db_uri = "./milvus.db"  # Milvus Lite
docs_collection_name = "UserDocs"
document_store = MilvusDocumentStore(
    connection_args={"uri": milvus_db_uri},
    collection_name=docs_collection_name,
    drop_old=True,
)

In [4]:
jq_expr = '.["main-text"][]'
json_converter = JSONConverter(
    jq_schema=jq_expr, content_key="text", extra_meta_fields={"type", "name"}
)

In [5]:

file_type_router = FileTypeRouter(mime_types=["text/plain", "application/pdf", "text/markdown"])
document_joiner = DocumentJoiner()

document_cleaner = DocumentCleaner()
document_splitter = DocumentSplitter(split_by="word", split_length=150, split_overlap=50)

document_embedder = SentenceTransformersDocumentEmbedder(model="sentence-transformers/all-MiniLM-L6-v2")
document_writer = DocumentWriter(document_store)

In [6]:
preprocessing_pipeline = Pipeline()
preprocessing_pipeline.add_component(instance=json_converter, name="json_converter")
preprocessing_pipeline.add_component(instance=document_joiner, name="document_joiner")
preprocessing_pipeline.add_component(instance=document_cleaner, name="document_cleaner")
preprocessing_pipeline.add_component(instance=document_splitter, name="document_splitter")
preprocessing_pipeline.add_component(instance=document_embedder, name="document_embedder")
preprocessing_pipeline.add_component(instance=document_writer, name="document_writer")

In [7]:
preprocessing_pipeline.connect("json_converter", "document_joiner")
preprocessing_pipeline.connect("document_joiner", "document_cleaner")
preprocessing_pipeline.connect("document_cleaner", "document_splitter")
preprocessing_pipeline.connect("document_splitter", "document_embedder")
preprocessing_pipeline.connect("document_embedder", "document_writer")

<haystack.core.pipeline.pipeline.Pipeline object at 0x33e8e4510>
🚅 Components
  - json_converter: JSONConverter
  - document_joiner: DocumentJoiner
  - document_cleaner: DocumentCleaner
  - document_splitter: DocumentSplitter
  - document_embedder: SentenceTransformersDocumentEmbedder
  - document_writer: DocumentWriter
🛤️ Connections
  - json_converter.documents -> document_joiner.documents (List[Document])
  - document_joiner.documents -> document_cleaner.documents (List[Document])
  - document_cleaner.documents -> document_splitter.documents (List[Document])
  - document_splitter.documents -> document_embedder.documents (List[Document])
  - document_embedder.documents -> document_writer.documents (List[Document])

In [8]:
output_dir = "output/docling-artifacts"
ingestion_results = preprocessing_pipeline.run(
    {"json_converter": {"sources": list(Path(output_dir).glob("**/*"))}}
)

print(f"count_documents: {document_store.count_documents()}")
print(f"document_writer.documents_written: {ingestion_results['document_writer']['documents_written']}")

Failed to extract text from output/docling-artifacts/Bash-commands-cheat-sheet-Red-Hat-Developer.md. Skipping it. Error: parse error: Invalid numeric literal at line 2, column 5
'text' not found in {'name': 'Picture', 'type': 'figure', '$ref': '#/figures/0'}. Skipping it.
'text' not found in {'name': 'Picture', 'type': 'figure', '$ref': '#/figures/1'}. Skipping it.
'text' not found in {'name': 'Picture', 'type': 'figure', '$ref': '#/figures/2'}. Skipping it.
'text' not found in {'name': 'Picture', 'type': 'figure', '$ref': '#/figures/3'}. Skipping it.
'text' not found in {'name': 'Picture', 'type': 'figure', '$ref': '#/figures/4'}. Skipping it.
'text' not found in {'name': 'Picture', 'type': 'figure', '$ref': '#/figures/5'}. Skipping it.
'text' not found in {'name': 'Picture', 'type': 'figure', '$ref': '#/figures/6'}. Skipping it.
'text' not found in {'name': 'Picture', 'type': 'figure', '$ref': '#/figures/7'}. Skipping it.
'text' not found in {'name': 'Picture', 'type': 'figure', '$re

count_documents: 994
document_writer.documents_written: 994


## Validating content

In [9]:
from pymilvus import MilvusClient
client = MilvusClient("./milvus.db")

In [10]:
client.list_collections()

['UserDocs']

In [11]:
from pymilvus import DataType
desc = client.describe_collection(docs_collection_name)
print(f"Collection: {desc['collection_name']}")
for f in desc['fields']:
  print(f"Field {f['name']}, of type {DataType(f['type'])._name_}")

Collection: UserDocs
Field file_path, of type VARCHAR
Field name, of type VARCHAR
Field type, of type VARCHAR
Field source_id, of type VARCHAR
Field page_number, of type INT64
Field split_id, of type INT64
Field split_idx_start, of type INT64
Field text, of type VARCHAR
Field id, of type VARCHAR
Field vector, of type FLOAT_VECTOR


In [12]:
res = client.query(
    collection_name=docs_collection_name,
    filter="type not in ['paragraph']",
    # output_fields=["file_path", "type", "text", "vector"],
    output_fields=["file_path", "type"],
    offset=5,
    limit=50
)
print(res)

data: ["{'id': '18a261bb15782711649fbd21d878ad076d50b0293096282ddc4b74b1f10c935a', 'file_path': 'output/docling-artifacts/Linux-commands-cheat-sheet-2023-Red-Hat-Developer.json', 'type': 'subtitle-level-1'}", "{'id': '193def2c038e6de34385d191d7427b72163033abd2efd316e601c3e0c7e16856', 'file_path': 'output/docling-artifacts/Bash-commands-cheat-sheet-Red-Hat-Developer.json', 'type': 'subtitle-level-1'}", "{'id': '1978b81e9367619f2c719daec6639c4e04a47cb024d414e3c31a712a3db460a6', 'file_path': 'output/docling-artifacts/Bash-commands-cheat-sheet-Red-Hat-Developer.json', 'type': 'subtitle-level-1'}", "{'id': '1a36f00d7283383c650ebda7f5fd419b615f26234ca10ce8fe0adf5043a89dc1', 'file_path': 'output/docling-artifacts/Bash-commands-cheat-sheet-Red-Hat-Developer.json', 'type': 'subtitle-level-1'}", "{'id': '2931182500b48f551efa25ffec5d077965597307561e4d85aeee231156880f30', 'file_path': 'output/docling-artifacts/Bash-commands-cheat-sheet-Red-Hat-Developer.json', 'type': 'subtitle-level-1'}", "{'id':

In [13]:
client.close()
# client.drop_collection(docs_collection_name)