### Get all image descriptions from ChromaDB file

In [None]:
import chromadb
import json


client = chromadb.PersistentClient(path = "chromadb")

collection = client.get_or_create_collection(
    name = "my_collection", metadata = {"hnsw:space": "cosine"})

image_descriptions = collection.get(where = {"type": "image"})["documents"]
ids = collection.get(where = {"type": "image"})["ids"]
ids = [id.split("_page_") for id in ids]
ids = [{"file": id[0], "page": int(id[-1].split("_image_")[0])} for id in ids]

for j in range(len(ids)):
    ids[j]["image_description"] = image_descriptions[j]

with open("image_descriptions/image_descriptions.json", "w") as f:
    json.dump(ids, f)


### Split documents into pages, with text only

In [None]:
from langchain_community.document_loaders import PyMuPDFLoader
import os


folder = "../sources"
files = []

for fname in os.listdir(folder):
    complete_path = os.path.join(folder, fname)
    if os.path.isfile(complete_path):
        files.append(complete_path)

docs = []
for file in files:
    loader = PyMuPDFLoader(file)
    async for doc in loader.alazy_load():
        docs.append(doc)


### Remove first page and index pages

In [4]:
docs = [doc for doc in docs if doc.metadata["page"] != 0]

docs = [doc for doc in docs 
        if not doc.page_content.lower().startswith(("index", "table of contents", "índice"))]

### Concatenate text and image descriptions

In [5]:
import json


with open("image_descriptions/image_descriptions.json", "r") as f:
    image_descriptions = json.load(f)

    for imd in image_descriptions:
        file = f"../sources/{imd["file"]}.pdf"
        page = imd["page"]
        doc = next(filter(lambda doc: doc.metadata["source"] == file and doc.metadata["page"] == page, docs), None)
        if doc != None:
            doc.page_content += f"\n{imd["image_description"]}"


### Clean text

In [6]:
import re


def decapitalize_content(pages: list[str]):

    """Turns document content into lower case"""

    for p in pages:
        p.page_content = p.page_content.lower()


def remove_non_ASCII(pages: list[str]):

    """Removes non ASCII characters from document. Not suitable for many non english languages 
    which have several non ASCII characters """

    for p in pages:
        if "non-en" not in p.metadata["keywords"]:
            p.page_content = re.sub(r"[^\x00-\x7F]+", "", p.page_content)


def remove_bullets(pages: list[str]):

    """Removes bullets from document """

    for p in pages:
        p.page_content = re.sub(r"^[→•▪\-*✔➢●✗]\s*", "", p.page_content, flags = re.MULTILINE)
        p.page_content = re.sub(r"\d+\.(?=\s*[a-zA-Z])", "", p.page_content)


def remove_escape(pages: list[str]):

    """Turns multiple consecutive escape characters into a single white space"""
    
    for p in pages:
        p.page_content = ' '.join(p.page_content.split())


remove_non_ASCII(docs)
decapitalize_content(docs)
remove_bullets(docs)
remove_escape(docs)

### Chunking

In [10]:
from langchain_text_splitters import TokenTextSplitter
from langchain.docstore.document import Document
import os


def merge_and_split(docs: list[Document], splitter):

    from collections import defaultdict
    

    docs_groups = defaultdict(list)
    for doc in docs:
        docs_groups[doc.metadata["source"]].append(doc)

    giant_docs = []
    for _, docs in docs_groups.items():
        giant_doc = {}
        metadata = {k: v for k, v in docs[0].metadata.items() if k != "page"}
        page_content = ""
        for doc in docs:
            page_content += doc.page_content
        giant_doc["metadata"] = metadata
        giant_doc["page_content"] = page_content
        giant_docs.append(giant_doc)

    files = []
    for gdoc in giant_docs:
        page_contents = splitter.split_text(gdoc["page_content"])
        files += [{"metadata": gdoc["metadata"], "page_content": pc} for pc in page_contents]

    files = [Document(metadata = file["metadata"], page_content = file["page_content"]) for file in files]

    return files


def save_chunks(pages: list, path: str):

    from langchain_core.load import dumpd
    import json
    import os


    if not os.path.exists(path):
        os.mkdir(path)
    for chunk in range(len(pages)):
        full_path = path + "/" + "chunk_" + str(chunk + 1)
        with open(full_path, "w") as ser_file:
            page_d = dumpd(pages[chunk])
            json.dump(page_d, ser_file)


chunk_types = ["page_chunking", "fixed_number"]
chunk_sizes = [256, 384]
chunk_overlaps = [0, 20, 50, 100]
base_path = "chunkings/Text+Images/"

for chunk_type in chunk_types:
    if chunk_type == "fixed_number":
        for chunk_size in chunk_sizes:
            for chunk_overlap in chunk_overlaps:
                splitter = TokenTextSplitter(chunk_size = chunk_size, chunk_overlap = chunk_overlap)
                split_docs = merge_and_split(docs, splitter)
                path = f"{base_path}/{chunk_size}_{chunk_overlap}"
                os.mkdir(path)
                save_chunks(split_docs, path)
    else:
        path = f"{base_path}/page_chunking"
        os.mkdir(path)
        save_chunks(docs, path)


### Define embedding models

In [3]:
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_openai import AzureOpenAIEmbeddings
import os
import getpass


os.environ["OPENAI_API_KEY"] = getpass.getpass()

all_embeddings = {     
    "mpnet_base_v2": HuggingFaceEmbeddings(model_name = "sentence-transformers/all-mpnet-base-v2"),
     
    "minilm_l6": HuggingFaceEmbeddings(model_name = "sentence-transformers/all-MiniLM-L6-v2"),
    
    "minilm_l12": HuggingFaceEmbeddings(model_name = "sentence-transformers/all-MiniLM-L12-v2"),

    "multilingual": HuggingFaceEmbeddings(model_name = "intfloat/multilingual-e5-large"),
    
    "text_embedding_3_large": AzureOpenAIEmbeddings(
        azure_endpoint="https://keystone1.openai.azure.com/openai/deployments/text-embedding-3-large-2/embeddings?api-version=2023-05-15",
        api_key = os.environ["OPENAI_API_KEY"],
        model = "TextEmbedding3LargeDeployment",
        api_version = "2023-05-15",
        show_progress_bar = True,
        chunk_size = 128
    )
}

### Create vector stores

In [None]:
from langchain_core.vectorstores import InMemoryVectorStore
import os


def load_chunks(path: str):

    import os
    import json
    from langchain_core.load import load


    pages = []

    try:   
        for fname in os.listdir(path):
            f = os.path.join(path, fname)
            with open(f, "r") as file:
                page = load(json.load(file))
                pages.append(page)
    
    except FileNotFoundError:
        return []
    
    return pages


model_name = "text_embedding_3_large"
chunk_type = "fixed_number"
chunk_size = 256
chunk_overlap = 100
base_path = "chunkings/Text+Images"

if chunk_type == "page_chunking":
    chunking = chunk_type
    path = f"{base_path}/{chunk_type}"

elif chunk_type == "fixed_number":
    chunking = f"{chunk_size}_{chunk_overlap}"
    path = f"{base_path}/{chunking}"

docs = load_chunks(path)

embeddings = all_embeddings[model_name]
vector_store_path = f"models/Text+Images/{model_name}/{chunking}/{chunking}_{model_name}"

if os.path.exists(vector_store_path):
    vector_store = InMemoryVectorStore.load(path = vector_store_path, embedding = embeddings)

else: 
    vector_store = InMemoryVectorStore.from_documents(documents = docs, embedding = embeddings)
    vector_store.dump(vector_store_path)
