First, install the dependencies.

In [None]:
%pip install azure-ai-documentintelligence==1.0.0b1 azure-search-documents==11.4.0 unidecode==1.3.8 nltk==3.8.1

Plus, install NLTK data.

In [None]:
from nltk import download

download("stopwords")

Then, initialize the clients to Document Intelligence and AI Search.

In [16]:
from azure.ai.documentintelligence.aio import DocumentIntelligenceClient
from azure.core.credentials import AzureKeyCredential
from azure.core.exceptions import HttpResponseError
from azure.search.documents import SearchClient
from nltk.stem import SnowballStemmer
from nltk.tokenize import word_tokenize
from random import randint
from typing import Dict, Tuple
from unidecode import unidecode
import asyncio
import glob
import os.path
import re

doc_endpoint = "https://claim-ai.cognitiveservices.azure.com"
doc_credential = AzureKeyCredential("xxx")
doc_client = DocumentIntelligenceClient(
  endpoint=doc_endpoint,
  credential=doc_credential,
)

search_endpoint = "https://claim-ai.search.windows.net"
search_credential = AzureKeyCredential("xxx")
search_client = SearchClient(
  endpoint=search_endpoint,
  index_name="trainings",
  credential=search_credential,
)

Next, transform PDF in Markdown text. We are using Document Intelligence for that.

Be warned that this step can take a few minutes, depending on the size of the PDF. From a minute for a few pages to 20 minutes for a 1000 pages PDF.

In [None]:
async def pdf_to_markdown(source: str) -> Tuple[str, str]:
    if os.path.exists(source + ".md"):  # Test cache
        print(f"Skipping {source}, cached found")
        with open(source + ".md", "r") as file:
            return source, file.read()

    with open(source, "rb") as file:  # Load file content
        print(f"Starting {source}, no cache found")
        await asyncio.sleep(randint(0, 5))  # Avoid API rate limit
        doc_poller = await doc_client.begin_analyze_document(
            analyze_request=file,
            content_type="application/octet-stream",
            locale="fr-FR",  # We only have French documents in this dataset
            model_id="prebuilt-layout",
            output_content_format="markdown",
        )
        doc_result = await doc_poller.result()

        with open(source + ".md", "w") as file:  # Store result in cache
            file.write(doc_result.content)

        return source, doc_result.content


doc_results: Dict[str, str] = {}
doc_tasks = []

for source in glob.glob("dataset/*.pdf"):
    doc_tasks.append(asyncio.create_task(pdf_to_markdown(source)))

print("Waiting for results...")
for doc_task in asyncio.as_completed(doc_tasks):
    source = None
    try:
        source, content = await doc_task
        print(f"Ended {source}")
        doc_results[source] = content
    except HttpResponseError as e:
        print(f"Failed {source}: {e}")

Split the Markdown text into smaller blocks, we'll call chuncks. Each block content is minified with a stemmer.

In [None]:
lang = "french"
stemmer = SnowballStemmer(lang)


def compress_and_clean(text: str) -> str:
    text = text.replace("\\", "")  # Remove all backslashes
    text = re.sub(r":[a-z]*:", "", text)  # Remove all :unselected: and :selected: tags
    text = re.sub(r"<!--[^<>]*-->", "", text)  # Remove all comments
    tokenized = word_tokenize(text, language=lang)
    tokens = [stemmer.stem(token) for token in tokenized]
    prompt = " ".join(tokens)
    return prompt


def data(content: str, source_uri: str, title: str, iterator: int) -> dict[str, str]:
    return {
        "content": compress_and_clean(content),
        "id": f"{'_'.join(re.sub('[^a-z0-9]', ' ', unidecode(source_uri).lower()).split())}-{iterator}",  # Use deterministic ID to avoid duplicates after a new run
        "source_uri": unidecode(source_uri).lower(),
        "title": ' '.join(re.sub('[^a-z0-9]', ' ', unidecode(title).lower()).split()),  # Remove all special characters
    }


chuncks = []
iterator = 0

for source, content in doc_results.items():
  if not content:
    continue
  for section in content.split("\n#"):
    lines = section.split("\n")
    title = lines[0].strip()
    paragraph = " ".join(lines[1:])
    if not section:
      continue
    chuncks.append(data(
      f"# {title} {paragraph}",
      source,
      title,
      iterator,
    ))
    iterator += 1

print(chuncks[:5])
print(f"Created {len(chuncks)} chunks")

Finally, upload the chuncks to AI Search.

In [None]:
print(f"Uploading {len(chuncks)} documents to Azure Search")
search_client.merge_or_upload_documents(chuncks)
print(f"There are {search_client.get_document_count()} documents in the index")

**Congratulations! 😎**

Bonus: Clean up the documents from the AI Search index.

In [26]:
while True:
    docs = search_client.search(search_text="*", select=["id"])
    ids = [{"id": doc["id"]} for doc in docs if doc["id"]]
    if not ids:
        break
    print(ids[:5])
    search_client.delete_documents(ids)
    print(f"Deleted {len(ids)} documents")
