<a href="https://colab.research.google.com/github/run-llama/llama_index/blob/main/docs/examples/vector_stores/postgres.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


# Issue [#17](https://github.com/ai-cfia/llamaindex-db/issues/17)


In [1]:
# %pip install -r ../../requirements.txt

In [85]:
import logging
import sys
from llama_index.core import StorageContext
from llama_index.core import VectorStoreIndex
from llama_index.vector_stores.postgres import PGVectorStore
from llama_index.embeddings.azure_openai import AzureOpenAIEmbedding
from llama_index.llms.azure_openai import AzureOpenAI
from llama_index.core import Settings
import os
from dotenv import load_dotenv
from llama_index.storage.index_store.postgres import PostgresIndexStore
from llama_index.storage.docstore.postgres import PostgresDocumentStore
from llama_index.core.node_parser import SentenceSplitter
from pprint import pprint
import psycopg
import pickle
from llama_index.readers.web import SimpleWebPageReader
from llama_index.core.extractors import QuestionsAnsweredExtractor
from psycopg.sql import SQL, Identifier
import nest_asyncio
import os
import re
from llama_index.core.schema import Document

nest_asyncio.apply()
load_dotenv()

# # Uncomment to see debug logs
# logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
# logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout))

True

In [64]:
FOLDER = "issue17/exclude/"


def save_to_pickle(data, filename, folder=FOLDER):
    os.makedirs(folder, exist_ok=True)
    with open(f"{folder}{filename}", "wb") as file:
        pickle.dump(data, file)


def load_from_pickle(filename, folder=FOLDER):
    with open(f"{folder}{filename}", "rb") as file:
        return pickle.load(file)

## Setup LLM and Embed Model


In [3]:
llm = AzureOpenAI(
    model="gpt-4",
    deployment_name="ailab-llm",
    api_key=os.getenv("API_KEY"),
    azure_endpoint=os.getenv("AZURE_ENDPOINT"),
    api_version=os.getenv("API_VERSION"),
)

embed_model = AzureOpenAIEmbedding(
    model="text-embedding-ada-002",
    deployment_name="ada",
    api_key=os.getenv("API_KEY"),
    azure_endpoint=os.getenv("AZURE_ENDPOINT"),
    api_version=os.getenv("API_VERSION"),
    max_retries=100000,
)

Settings.llm = llm
Settings.embed_model = embed_model

## Variables


In [127]:
database = os.getenv("DB_NAME")
host = os.getenv("DB_HOST")
password = os.getenv("DB_PASSWORD")
port = os.getenv("DB_PORT")
user = os.getenv("DB_USER")
llamaindex_db = "llamaindex_db_legacy"
llamaindex_schema = "v_0_0_4"


def get_url_id(url):
    match = re.search(r"/[a-z]{3}/[0-9]+/[0-9]+$", url)
    return match.group() if match else None


def get_url_from_id(url_id):
    return f"https://inspection.canada.ca{url_id}" if url_id else None


def get_minimal_url(url):
    return get_url_from_id(get_url_id(url))


def verify(context_nodes, answer_urls, question):
    for i, n in enumerate(context_nodes):
        full_url = get_minimal_url(n.metadata["url"])
        if full_url in answer_urls:
            print(f"✅ '{question}, {answer_urls[0]}'")
            return True

    print(f"❌ '{question}, {answer_urls[0]}'")


def test_index(schema, questions_urls):
    retriever = VectorStoreIndex.from_vector_store(
        PGVectorStore.from_params(
            database=llamaindex_db,
            host=host,
            password=password,
            port=port,
            user=user,
            embed_dim=1536,
            schema_name=schema,
        )
    ).as_retriever(similarity_top_k=10)
    i = 0
    for question, urls in questions_urls:
        context_nodes = retriever.retrieve(question)
        if verify(context_nodes, urls, question):
            i += 1
    print(f"Accuracy: {i}/{len(questions_urls)}")

## Observed problem

After embedding the metadata, we noticed a drastic drop in accuracy.


### Before the change


In [128]:
questions_urls = [
    (
        "Who to consult when regulatory action is required on the regulated party's permissions?",
        ["https://inspection.canada.ca/eng/1651258246384/1651258246869"],
    ),
    (
        "Comment sera évalué le résultat de la détection de Lm dans un produit de catégorie 2B?",
        ["https://inspection.canada.ca/fra/1540575149362/1540575299761"],
    ),
    (
        "Peut-on fournir un rapport d'analyse (RA) externe du SIESAL à la partie réglementée, pour les échantillons soumis à l'ACIA ou aux laboratoires contractuels tiers?",
        ["https://inspection.canada.ca/fra/1540234969218/1540235089869"],
    ),
]
test_index("v_0_0_1", questions_urls)

✅ 'Who to consult when regulatory action is required on the regulated party's permissions?, https://inspection.canada.ca/eng/1651258246384/1651258246869'
✅ 'Comment sera évalué le résultat de la détection de Lm dans un produit de catégorie 2B?, https://inspection.canada.ca/fra/1540575149362/1540575299761'
✅ 'Peut-on fournir un rapport d'analyse (RA) externe du SIESAL à la partie réglementée, pour les échantillons soumis à l'ACIA ou aux laboratoires contractuels tiers?, https://inspection.canada.ca/fra/1540234969218/1540235089869'
Accuracy: 3/3


### After the change


In [129]:
# index v_0_0_2 was renamed to v_0_0_2_ after applying the fix in this notebook
test_index("v_0_0_2_", questions_urls)

✅ 'Who to consult when regulatory action is required on the regulated party's permissions?, https://inspection.canada.ca/eng/1651258246384/1651258246869'
❌ 'Comment sera évalué le résultat de la détection de Lm dans un produit de catégorie 2B?, https://inspection.canada.ca/fra/1540575149362/1540575299761'
❌ 'Peut-on fournir un rapport d'analyse (RA) externe du SIESAL à la partie réglementée, pour les échantillons soumis à l'ACIA ou aux laboratoires contractuels tiers?, https://inspection.canada.ca/fra/1540234969218/1540235089869'
Accuracy: 1/3


## Root cause

What's changed from `v_0_0_1` to `v_0_0_2_`:

1. The chunking strategy

   `v_0_0_1` is based on the data generated by the legacy crawler into `louis_v005`. The legacy crawler used a custom algorithm to chunk the pages:

   - Group headings and the immediately following texts. The headings used were `h2` and higher, but not `h1`, hence the original issue [#17](https://github.com/ai-cfia/llamaindex-db/issues/17)
   - Concatenate enough groups to make a chunk of `~256` tokens
   - Try to break chunks of more than `512` tokens into smaller chunks with no guaranty of success

   In `v_0_0_2_`, I use open source tools for chunking:

   - Read the page's html into a `BeautifulSoup`
   - Remove unnecessary sections like header and footer
   - Transform the html to markdown text with `html2text`
   - Create the metadata from the html. From the metadata, only `title` and `keyword` will be embedded.
   - Break the text into chunks using llamaindex's `SimpleSentenceSplitter`. I used chunk size of `768` for metadata + text.

2. The embeddings

   `v_0_0_1` directly used the chunks and their associated embeddings that were generated by the legacy crawler into `louis_v005`.

   `v_0_0_2_` is built from scratch with newly generated embeddings and included the `title` and `keywords` metadata.

Is the chunk size the major cause of the problem? I rebuilt the index (`v_0_0_3`), this time with only `title` as metadata and chunk size of `256`. Keeping `keywords` wouldn't allow chunk sizes much below `728`.


In [130]:
# index v_0_0_3 is built with title included in the embeddings and chunk size of 256
test_index("v_0_0_3", questions_urls)

❌ 'Who to consult when regulatory action is required on the regulated party's permissions?, https://inspection.canada.ca/eng/1651258246384/1651258246869'
❌ 'Comment sera évalué le résultat de la détection de Lm dans un produit de catégorie 2B?, https://inspection.canada.ca/fra/1540575149362/1540575299761'
❌ 'Peut-on fournir un rapport d'analyse (RA) externe du SIESAL à la partie réglementée, pour les échantillons soumis à l'ACIA ou aux laboratoires contractuels tiers?, https://inspection.canada.ca/fra/1540234969218/1540235089869'
Accuracy: 0/3


The results are even worse. This means that including the metadata is the main cause of the problem. To prove that, I built another index `v_0_0_4` (now renamed `v_0_0_2`) with no metadata in the embeddings and chunk size of `256`.


In [131]:
# formerly v_0_0_4
# Renaming the indexes is not best practice, but it allows me to
# switch index in finesse without redeploying
test_index("v_0_0_2", questions_urls)

✅ 'Who to consult when regulatory action is required on the regulated party's permissions?, https://inspection.canada.ca/eng/1651258246384/1651258246869'
✅ 'Comment sera évalué le résultat de la détection de Lm dans un produit de catégorie 2B?, https://inspection.canada.ca/fra/1540575149362/1540575299761'
✅ 'Peut-on fournir un rapport d'analyse (RA) externe du SIESAL à la partie réglementée, pour les échantillons soumis à l'ACIA ou aux laboratoires contractuels tiers?, https://inspection.canada.ca/fra/1540234969218/1540235089869'
Accuracy: 3/3


It is still unclear to me why including the metadata in the embeddings has such negative effect on the index. My guess is that it introduces noise in the embeddings.

Metadata are still very important in classifying webpages. An improvement could be to create one chunk per page that contain the metadata instead of including them in all the chunks embeddings.


## Back to the initial `h1` title issue

It seems after all that for some pages, the h1 title was included in the text and not for some others.


### List of all the titles


In [90]:
documents: list[Document] = load_from_pickle("documents.pkl")
titles_urlids = [
    (d.metadata["title"], get_url_id(d.metadata["url"])) for d in documents
]
print(len(titles_urlids))

11054


### Documents for which the content doesn't contain the title


In [107]:
connection_string = (
    f"dbname={database} "
    f"user={user} "
    f"password={password} "
    f"host={host} "
    f"port={port}"
)

legacy_schema = "louis_v005"
with psycopg.connect(connection_string) as conn:
    with conn.cursor() as cur:
        cur.execute("CREATE TEMP TABLE tmp_titles (title TEXT, url_id TEXT)")
        for title, url_id in titles_urlids[:100]:  # <-------------- Sample 100 pages
            cur.execute(
                "INSERT INTO tmp_titles (title, url_id) VALUES (%s, %s)",
                (title, url_id),
            )
        cur.execute(
            f"""
            SELECT tt.title, tt.url_id
            FROM tmp_titles tt
            WHERE NOT EXISTS (
                SELECT 1
                FROM {legacy_schema}.documents d
                WHERE d.content LIKE '%' || tt.title || '%'
            )
        """
        )
        results = cur.fetchall()
        cur.execute("DROP TABLE tmp_titles")
        print(len(results))
        save_to_pickle(results, "titles_not_in_content.pkl")

44


On 100 sample pages, 44 did not have the title in the content text.


### Testing


In [108]:
results = load_from_pickle("titles_not_in_content.pkl")
titles_urls = [(title, [get_url_from_id(url_id)]) for title, url_id in results]
print(titles_urls[0])
save_to_pickle(titles_urls, "titles_urls.pkl")

('Alpha Meat Packers Ltd. brand Beef Burgers and Lean Ground Beef recalled due to E. coli O157:H7', ['https://inspection.canada.ca/eng/1569265402509/1569265408887'])


In [132]:
test_index("v_0_0_1", titles_urls)

✅ 'Alpha Meat Packers Ltd. brand Beef Burgers and Lean Ground Beef recalled due to E. coli O157:H7, https://inspection.canada.ca/eng/1569265402509/1569265408887'
❌ 'Evaluant les systèmes d'inspection des produits de viande de porc et de volaille destinés à l'exportation au Canada, https://inspection.canada.ca/fra/1530280996600/1530281090649'
✅ 'DD1998-27 : Détermination du risque associé au cotonnier (Gossypium hirsutum L.) BXNMC de Calgene, https://inspection.canada.ca/fra/1312575778642/1312575878895'
✅ 'L'ACIA suspend la licence pour la salubrité des aliments au Canada de Hannibal Inc., https://inspection.canada.ca/fra/1631811887170/1631811887513'
✅ 'Néomycine et Chlorhydrate d'oxytétracycline (NEOTC) – Notices sur les substances médicatrices, https://inspection.canada.ca/fra/1591116481035/1591116578791'
✅ 'Aflatoxins in Selected Corn Products, Nuts, Nut Products, Raisins, Cocoa Powder, Chili Powder and Paprika - April 1, 2012 to March 31, 2013, https://inspection.canada.ca/eng/15571

In [133]:
test_index("v_0_0_2", titles_urls)

❌ 'Alpha Meat Packers Ltd. brand Beef Burgers and Lean Ground Beef recalled due to E. coli O157:H7, https://inspection.canada.ca/eng/1569265402509/1569265408887'
✅ 'Evaluant les systèmes d'inspection des produits de viande de porc et de volaille destinés à l'exportation au Canada, https://inspection.canada.ca/fra/1530280996600/1530281090649'
✅ 'DD1998-27 : Détermination du risque associé au cotonnier (Gossypium hirsutum L.) BXNMC de Calgene, https://inspection.canada.ca/fra/1312575778642/1312575878895'
✅ 'L'ACIA suspend la licence pour la salubrité des aliments au Canada de Hannibal Inc., https://inspection.canada.ca/fra/1631811887170/1631811887513'
✅ 'Néomycine et Chlorhydrate d'oxytétracycline (NEOTC) – Notices sur les substances médicatrices, https://inspection.canada.ca/fra/1591116481035/1591116578791'
✅ 'Aflatoxins in Selected Corn Products, Nuts, Nut Products, Raisins, Cocoa Powder, Chili Powder and Paprika - April 1, 2012 to March 31, 2013, https://inspection.canada.ca/eng/15571

2 conclusions from the results:
- Even though the title is not included in the chunks, `v_0_0_1` can still match the content. It's a semantic search after all.
- `v_0_0_2` still misses some search by title which indicates that there are other problems worth investigating.


## Creating `v_0_0_4` now renamed `v_0_0_2`


In [None]:
conn_string = (
    f"dbname={database} "
    f"user={user} "
    f"password={password} "
    f"host={host} "
    f"port={port}"
)
query = """
    SELECT DISTINCT url_id
    FROM louis_v005.unique_documents
    WHERE url_id IS NOT NULL;
    """
with psycopg.connect(conn_string) as conn:
    with conn.cursor() as cur:
        results = cur.execute(query).fetchall()
        url_ids = [r[0] for r in results]

pprint(url_ids[0:5])
save_to_pickle(url_ids, "url_ids.pkl")

### Create documents from `url_ids`

#### Utility classes and functions


In [11]:
import re
from datetime import datetime, timezone
from typing import Callable
from urllib.parse import urljoin
import asyncio
import aiohttp

import html2text
from bs4 import BeautifulSoup
from llama_index.core.schema import Document


DEFAULT_BASE_URL = "https://inspection.canada.ca"
DEFAULT_N_WORKERS = 100
DEFAULT_EXCLUDED_KEYS = [
    "url_id",
    "last_crawled",
    "url",
    "description",
    "viewport",
    "language",
    "type",
    "subject",
    "creator",
    "issued",
    "modified",
    "audience",
]
DEFAULT_URL_ID_REGEX = r"/[a-z]{3}/[0-9]+/[0-9]+$"


class AsyncWorkerQueue:
    """A class representing an asynchronous worker queue."""

    def __init__(self, num_workers, maxsize=0):
        self.num_workers = num_workers
        self.queue = asyncio.Queue(maxsize=maxsize)
        self.futures = []
        self.workers = []

    async def worker(self, name):
        """A coroutine that processes tasks from the queue."""
        while True:
            future, task, args, kwargs = await self.queue.get()
            if task is None:
                break
            try:
                result = await task(*args, **kwargs)
                future.set_result(result)
            except Exception as e:
                print(f"Worker {name} encountered an error: {e}")
                # error_trace = traceback.format_exc()
                # print(f"Worker {name} encountered an error: {e}\n{error_trace}")
                future.set_exception(e)
            finally:
                self.queue.task_done()

    async def run(self, tasks_with_params):
        """
        A coroutine that runs the worker tasks and processes the given tasks
        with parameters.
        """
        self.workers = [
            asyncio.create_task(self.worker(f"Worker-{i}"))
            for i in range(self.num_workers)
        ]

        for task, args, kwargs in tasks_with_params:
            future = asyncio.Future()
            self.futures.append(future)
            await self.queue.put((future, task, args, kwargs))

        await self.queue.join()

        for _ in self.workers:
            await self.queue.put((None, None, None, None))
        await asyncio.gather(*self.workers, return_exceptions=True)

        results = []
        for future in self.futures:
            try:
                result = await future
                results.append(result)
            finally:
                continue

        return results


class WebPageReader:
    @classmethod
    async def get_html_from_url(cls, url):
        """Retrieve HTML content from a given URL asynchronously"""
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                response.raise_for_status()
                content_type = response.headers.get("content-type")
                if "text/html" not in content_type:
                    raise ValueError("The returned content is not HTML")
                return await response.text()


class MetadataExtractor:  # <--------- This extracts the metadata from the head section
    @classmethod
    async def extract_from_soup(cls, soup: BeautifulSoup):
        """Extract metadata from the soup"""
        metadata: dict[str, str] = {}
        if header := soup.find("head"):
            metas = header.find_all("meta")
            for meta in metas:
                name = meta.get("name", "").replace("dcterms.", "")
                content = meta.get("content")
                if name and content:
                    metadata[name] = content
        return metadata


class AiLabWebPageReader:
    """AiLab web page reader."""

    html_to_text: bool
    base_url: str
    _metadata_fn: Callable[[str], dict] | None
    url_id_regex: str

    def __init__(
        self,
        html_to_text: bool = False,
        base_url: str = DEFAULT_BASE_URL,
        url_id_regex: str = DEFAULT_URL_ID_REGEX,
        metadata_fn: Callable[[BeautifulSoup], dict] | None = None,
    ) -> None:
        """Initialize with parameters."""
        self.html_to_text = html_to_text
        self.base_url = base_url
        self.url_id_regex = url_id_regex
        self._metadata_fn = metadata_fn

    async def load_document(self, url_id: str):
        """Load data from the input directory."""
        assert re.match(self.url_id_regex, url_id), "Invalid URL ID"
        url = urljoin(self.base_url, url_id)
        response = await WebPageReader.get_html_from_url(url)
        response = BeautifulSoup(response, "html.parser")

        metadata: dict[str, str] | None = None  # <----------- metadata extraction here
        if self._metadata_fn is not None:
            metadata = await self._metadata_fn(response)
            metadata["url"] = url
            metadata["url_id"] = url_id
            metadata["last_crawled"] = datetime.now(timezone.utc).isoformat()

        excluded_keys = list(metadata.keys())  # <- exclude all metadata from embeddings

        # prune irrelevant sections like header, footer, etc.
        response = (response.find("main") or response).prettify()

        if self.html_to_text:
            response = html2text.html2text(response)

        return Document(
            text=response,
            id_=url,
            metadata=metadata or {},  # <---------- metadata are added to the nodes here
            excluded_embed_metadata_keys=excluded_keys,
            excluded_llm_metadata_keys=excluded_keys,
        )

    async def load_documents(
        self,
        url_ids: list[str],
        n_workers: int = DEFAULT_N_WORKERS,
    ) -> list[Document]:
        """Load data from the input directory."""
        assert isinstance(url_ids, list), "urls must be a list of strings."
        tasks = [(self.load_document, (u,), {}) for u in url_ids]
        async_queue = AsyncWorkerQueue(num_workers=n_workers)
        documents = await async_queue.run(tasks)
        return documents

#### Creating the documents


In [None]:
url_ids = load_from_pickle("url_ids.pkl", folder="")
pprint(url_ids[2:4])
documents = await AiLabWebPageReader(
    html_to_text=True, metadata_fn=MetadataExtractor.extract_from_soup
).load_documents(url_ids)
save_to_pickle(documents, "documents.pkl")

In [21]:
documents = load_from_pickle("documents.pkl")
print("len(documents):", len(documents))
print(
    "metadata to be included:",
    SentenceSplitter()._get_metadata_str(documents[0]) or "None",
)

len(documents): 11054
metadata to be included: None


### Create nodes from documents

By trial, I found that a chunk size of `512` tokens raises an error where, for some nodes, the metadata size is already larger than `512` tokens, leaving no room for the actual node text. I resorted to using `768` which is still reasonable.

The `overlap` parameter used below helps ensure that contextual information isn't lost by cutting the text abruptly from one chunk to another.


In [22]:
parser = SentenceSplitter(chunk_size=256, chunk_overlap=50)  # <--------- chunk size 256
nodes = parser.get_nodes_from_documents(documents, show_progress=True)
save_to_pickle(nodes, "nodes.pkl")

  from .autonotebook import tqdm as notebook_tqdm
Parsing nodes: 100%|██████████| 11054/11054 [00:59<00:00, 186.24it/s]


In [6]:
nodes = load_from_pickle("nodes.pkl")
print("len(nodes):", len(nodes))
print(
    "metadata to be included", SentenceSplitter()._get_metadata_str(nodes[0]) or "None"
)

len(nodes): 237183
metadata to be included None


- ✅ `title` and `keywords` metadata will be included in embeddings


### Create the vector store table


In [7]:
# nodes = load_from_pickle("nodes.pkl")
vector_store = PGVectorStore.from_params(
    database=llamaindex_db,
    host=host,
    password=password,
    port=port,
    user=user,
    embed_dim=1536,
    schema_name=llamaindex_schema,
)

storage_context = StorageContext.from_defaults(
    vector_store=vector_store,
)

In [8]:
# create the table with a single entry
index = VectorStoreIndex(nodes[:1], storage_context=storage_context)

### Create the `hnsw` index on the vector store table


In [9]:
connection_string = (
    f"dbname={llamaindex_db} "
    f"user={user} "
    f"password={password} "
    f"host={host} "
    f"port={port}"
)

schema = Identifier(llamaindex_schema)
index_query = SQL(
    "CREATE INDEX ON {}.data_llamaindex USING hnsw (embedding vector_cosine_ops)"
).format(schema)

with psycopg.connect(connection_string) as conn:
    conn.autocommit = True
    with conn.cursor() as cur:
        cur.execute(index_query)

### Populate the vector store table


In [None]:
index.insert_nodes(nodes[1:], show_progress=True)