# Data Ingestion for Document Retrieval Service

This notebook provides a comprehensive toolkit for ingesting data into our Weaviate vector database. This process is a fundamental part of our document retrieval service, as it prepares and populates the database with the information required for efficient and accurate searching.

For operational use, the toolkit presented here can be packaged into a standalone script to facilitate automated data ingestion workflows. (in other words, pack the notebook into script if you want!)

In [None]:
import os
import pickle
import random
from collections import namedtuple

import requests
import weaviate
from weaviate import classes as wvc
from weaviate.classes.query import MetadataQuery, QueryReference
from weaviate.classes.config import Property, DataType, ReferenceProperty

## Data Structure

In [None]:
ProcessedDoc = namedtuple("ProcessedDoc", ["docType", "docSummary", "docContent"])

## Constants

In [None]:
EMBEDDING_UPPER_LIMIT = 512 - 15 # 15 tokens for special tokens

# Endpoint for embedding service, related to Weaviate.
#
# IMPORTANT NOTE: A Weaviate collection is bond to the specified embedding service during creation.
# If you want to change the embedding service after creation, you need to recreate the collection.
#
# This is the endpoint for the **embedding service for Weaviate** (not your own embedding service!),
# which should be running in the same Docker network as Weaviate, and have the same hostname as docker-compose defined.
EMBEDDING_ENDPOINT = "http://embedding:8080"

MIN_CHUNK_SIZE = 8 # Minimum chunk size for document processing, smaller will be DISCARDED.
MAX_RERANKER_SIZE = 8192 # Max size for reranker service, an article more than this will be DISCARDED.

In [None]:
DOCUMENT_COLLECTION_ORIG_NAME = "%sOrig" # Name formatter for original document collection
DOCUMENT_CLASS_MAP = {
    "簽": "Qian",
    "函": "Han",
    "陳情": "Complaint",
}

In [None]:
# Connect to Weaviate instance
client = weaviate.connect_to_local()

## Helper Functions

In [None]:
def slicer(document: str, chunk_length: int, overlap: int) -> list[str]:
    """
    Slices a document into overlapping chunks.

    Args:
        document: The input string to be sliced.
        chunk_length: The desired length of each chunk.
        overlap: The number of characters to overlap between consecutive chunks.

    Returns:
        A list of strings, where each string is a chunk of the original document.
    """
    if not isinstance(document, str):
        raise TypeError("Input 'document' must be a string.")
    if not isinstance(chunk_length, int) or chunk_length <= 0:
        raise ValueError("'chunk_length' must be a positive integer.")
    if not isinstance(overlap, int) or overlap < 0:
        raise ValueError("'overlap' must be a non-negative integer.")
    if overlap >= chunk_length:
        raise ValueError("'overlap' cannot be greater than or equal to 'chunk_length'.")

    chunks = []
    start_index = 0
    doc_len = len(document)

    while start_index < doc_len:
        end_index = start_index + chunk_length
        chunk = document[start_index:end_index]
        chunks.append(chunk)

        # Move the start index for the next chunk
        # If the next chunk would exceed the document length, we stop
        if start_index + chunk_length - overlap >= doc_len and start_index + chunk_length >= doc_len :
            break
        start_index += (chunk_length - overlap)
        # Ensure the last chunk doesn't go beyond the document length
        # if start_index + chunk_length > doc_len and start_index < doc_len :
        #   chunks.append(document[start_index:])
        #   break

    return chunks

## Getting Document Data

This is just an example, you should implement your own logic to prepare the data.

The system requires a document paired with summary to provide a document matching process.

In [None]:
# prepare data
doc_db = list[ProcessedDoc]

with open("../resources/processed_documents.pkl", "rb") as f:
    doc_db: list[ProcessedDoc] = pickle.load(f)

## Querying Documents

In [None]:
TARGET_DOCUMENT_CLASS = "陳情" # NOTE: Try other document class to see how it works!
user_query = "網路訊號不佳"


TARGET_COLLECTION = DOCUMENT_CLASS_MAP[TARGET_DOCUMENT_CLASS] # No need to modify this line.

document_collection = client.collections.get(TARGET_COLLECTION)   

vd_query_response = document_collection.query.hybrid(
    query=user_query,
    query_properties=["content"],
    alpha=0.5,
    limit=3,
    return_metadata=MetadataQuery(score=True, explain_score=True),
    return_references=QueryReference(
        link_on="orig",
        return_properties=["content"]
    )
)

print(f"Query: {user_query}")
print(f"Collection: {TARGET_DOCUMENT_CLASS}")

for obj in vd_query_response.objects:
    print(f"Document Chunk: \n{obj.properties['content']}")
    print("\n")
    print(f"Original Document: \n{obj.references['orig']._CrossReference__objects[0].properties['content']}")
    print("\n")

### GraphQL Query

In [None]:
# Define the GraphQL payload, with a placeholder for the user query
# The query logic is exactly the same as the one used in actual Retrieval service.
graphql_payload = """
{
  Get {
    %s( # Placeholder for document chunk collection
      hybrid: {
        query: "%s",         # Your user query
        alpha: 0.5,          # Balance between vector/keyword search
        properties: ["content"] # Properties for keyword (BM25) search
      },
      limit: 2 # Limit the number of results FROM Weaviate
    ) {
      # --- Specify properties you need for reranking or display ---
      content   # You specifically extracted this for the reranker

      # --- Add any other properties if needed ---
      # Example: other_property

      # --- Request metadata ---
      _additional {
        score        # The hybrid search score, needed for sorting
        explainScore # Breakdown of keyword/vector contribution
        id           # Useful for unique identification
      }
      orig {
        ... on %sOrig { # Placeholder for original document collection
          content
        }
      }
    }
  }
}
"""

In [None]:
user_query = "TaipeiFree訊號很差"

response = requests.post(
    url="http://localhost:8080/v1/graphql",
    headers={"Content-Type": "application/json"},
    json={
        # NOTE: The query formatting process is same as the one used in actual Retrieval service.
        "query": graphql_payload % (TARGET_COLLECTION, user_query, TARGET_COLLECTION),
        "variables": None,
        "operationName": None
    }
)

print(response.json())

## ⚠️ DANGEROUS ⚠️ Define Document Collection

**WARNING**: This procedure will drop the existing collection and create a new one. All data will be lost. Do **NOT** run this if you just want to insert new data.

In [None]:
TARGET_DOCUMENT_CLASS = "陳情" # ⚠️WARN: Modify as needed

# Constants that need not be modified
COLLECTION = DOCUMENT_CLASS_MAP[TARGET_DOCUMENT_CLASS] 
ORIGINAL_DOCUMENT_CLASS = DOCUMENT_COLLECTION_ORIG_NAME % COLLECTION

# We use two-way references to link the original documents to their chunks.
#
# Collection for the original documents
# Remove existing class if it exists
client.collections.delete(COLLECTION)
client.collections.delete(ORIGINAL_DOCUMENT_CLASS)

# Make a collection for the document class.
documentsOrig = client.collections.create(
    name=ORIGINAL_DOCUMENT_CLASS,
    properties=[
        Property(name="content", data_type=DataType.TEXT),
    ],
    vectorizer_config=wvc.config.Configure.Vectorizer.none(),
)

documents = client.collections.create(
    name=COLLECTION,
    properties=[
        Property(name="content", data_type=DataType.TEXT),
    ],
    vectorizer_config=wvc.config.Configure.Vectorizer.text2vec_openai(
        base_url=EMBEDDING_ENDPOINT,  # The URL of the embedding service
    ),
    references=[
        ReferenceProperty(
            name="orig",
            target_collection=ORIGINAL_DOCUMENT_CLASS,
        ),
    ],
)

### Add Data to Collection

In [None]:
# Add the test data to the collection
documentOrig = client.collections.get(ORIGINAL_DOCUMENT_CLASS)
chunkCollection = client.collections.get(COLLECTION)

selected_docs = [ x for x in doc_db if x.docType == TARGET_DOCUMENT_CLASS ]

for i, doc in enumerate(selected_docs):
    chunkSize = EMBEDDING_UPPER_LIMIT
    overlap = 150 # You can try different overlaps.

    # Insert original document
    docId = documentOrig.data.insert(
        properties={
            "content": doc.docContent, # Original document
        },
    )
    print(f"Document #{i}: Original document inserted.")

    chunksSum = slicer(doc.docSummary, chunk_length=chunkSize, overlap=overlap) # Slice the summary into chunks

    # Process summary chunk
    for j, chunk in enumerate(chunksSum):
        chunkCollection.data.insert(
            properties={
                "content": chunk,
            },
            references={"orig": docId} # UUID reference to the original document
        )
        print(f"Document #{i}: Summary Chunk #{j} inserted.")
    
    
    chunksOrig = slicer(doc.docContent, chunk_length=chunkSize, overlap=overlap) # You can try different overlaps.
    

    # Process document chunk
    for j, chunk in enumerate(chunksOrig):
        chunkCollection.data.insert(
            properties={
                "content": chunk,
            },
            references={"orig": docId} # UUID reference to the original document
        )
        print(f"Document #{i}: Chunk #{j} inserted.")

    print(f"Document #{i}: Done.")
