In [14]:
import io
import json
import os
import sys
import uuid
import glob
from pathlib import Path
from typing import List, Dict, Any
from datetime import datetime

# Azure imports
from azure.core.credentials import AzureKeyCredential
from azure.ai.documentintelligence import DocumentIntelligenceClient
from azure.ai.documentintelligence.models import AnalyzeResult, AnalyzeDocumentRequest
from azure.identity import DefaultAzureCredential, get_bearer_token_provider

# Import content understanding client
parent_dir = Path(Path.cwd()).parent
sys.path.append(str(parent_dir))
from python.content_understanding_client import AzureContentUnderstandingClient

# LangChain imports for text processing and embeddings
from langchain_text_splitters import MarkdownHeaderTextSplitter, RecursiveCharacterTextSplitter
from langchain_openai import AzureOpenAIEmbeddings, AzureChatOpenAI
from langchain.vectorstores.azuresearch import AzureSearch

# Other imports
import fitz
from PIL import Image
from dotenv import load_dotenv

# Load environment variables
load_dotenv(override=True)

# Azure AI Services configs
AZURE_AI_SERVICE_ENDPOINT = os.getenv("AZURE_AI_SERVICE_ENDPOINT")
AZURE_AI_SERVICE_API_VERSION = os.getenv("AZURE_AI_SERVICE_API_VERSION") or "2024-12-01-preview"
AZURE_DOCUMENT_INTELLIGENCE_API_VERSION = os.getenv("AZURE_DOCUMENT_INTELLIGENCE_API_VERSION") or "2024-11-30"

# Azure OpenAI configs
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT")
AZURE_OPENAI_CHAT_DEPLOYMENT_NAME = os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT_NAME")
AZURE_OPENAI_CHAT_API_VERSION = os.getenv("AZURE_OPENAI_CHAT_API_VERSION") or "2024-08-01-preview"
AZURE_OPENAI_EMBEDDING_DEPLOYMENT_NAME = os.getenv("AZURE_OPENAI_EMBEDDING_DEPLOYMENT_NAME")
AZURE_OPENAI_EMBEDDING_API_VERSION = os.getenv("AZURE_OPENAI_EMBEDDING_API_VERSION") or "2023-05-15"

# Azure Search configs
AZURE_SEARCH_ENDPOINT = os.getenv("AZURE_SEARCH_ENDPOINT")
AZURE_SEARCH_KEY = os.getenv("AZURE_SEARCH_KEY")
AZURE_SEARCH_INDEX_NAME = os.getenv("AZURE_SEARCH_INDEX_NAME") or "multi-document-index"

# Document Intelligence configs
DOCUMENT_INTELLIGENCE_ENDPOINT = os.getenv("AZURE_DOCUMENT_INTELLIGENCE_ENDPOINT")
DOCUMENT_INTELLIGENCE_KEY = os.getenv("DOCUMENT_INTELLIGENCE_KEY")

# For content understanding
ANALYZER_TEMPLATE_PATH = "../analyzer_templates/image_chart_diagram_understanding.json"
ANALYZER_ID = "content-understanding-search-sample-" + str(uuid.uuid4())

In [15]:
# Set up authentication
credential = DefaultAzureCredential()
token_provider = get_bearer_token_provider(credential, "https://cognitiveservices.azure.com/.default")

# Initialize clients
document_intelligence_client = DocumentIntelligenceClient(
    endpoint=DOCUMENT_INTELLIGENCE_ENDPOINT,
    api_version="2024-11-30",
    credential=AzureKeyCredential(DOCUMENT_INTELLIGENCE_KEY),
    output=str('figures')
)

content_understanding_client = AzureContentUnderstandingClient(
    endpoint=AZURE_AI_SERVICE_ENDPOINT,
    api_version=AZURE_AI_SERVICE_API_VERSION,
    token_provider=token_provider,
    subscription_key=os.getenv("AZURE_AI_SERVICE_KEY"),
    x_ms_useragent="azure-ai-content-understanding-python/multi-document-processor"
)

In [16]:
# Create the analyzer for content understanding
try:
    response = content_understanding_client.begin_create_analyzer(ANALYZER_ID, analyzer_template_path=ANALYZER_TEMPLATE_PATH)
    result = content_understanding_client.poll_result(response)
    print(f'Created analyzer: {result["result"]["analyzerId"]}')
except Exception as e:
    print(f"Error creating analyzer: {str(e)}")
    print("Using existing analyzer ID if available")


Created analyzer: content-understanding-search-sample-2e7835a6-2b7e-41a2-bc98-e41e32947184


In [18]:
# Define helper functions
def crop_image_from_pdf_page(pdf_path, page_number, bounding_box):
    """
    Crops a region from a given page in a PDF and returns it as an image.
    """
    doc = fitz.open(pdf_path)
    page = doc.load_page(page_number)
    
    # Cropping the page. The rect requires the coordinates in the format (x0, y0, x1, y1).
    bbx = [x * 72 for x in bounding_box]
    rect = fitz.Rect(bbx)
    pix = page.get_pixmap(matrix=fitz.Matrix(300 / 72, 300 / 72), clip=rect)
    
    img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
    
    doc.close()
    return img

def format_content_understanding_result(content_understanding_result):
    """
    Formats the JSON output of the Content Understanding result as Markdown for downstream usage in text.
    """
    def _format_result(key, result):
        result_type = result["type"]
        if result_type in ["string", "integer", "number", "boolean"]:
            return f"**{key}**: " + str(result[f'value{result_type.capitalize()}']) + "\n"
        elif result_type == "array":
            return f"**{key}**: " + ', '.join([str(result["valueArray"][i][f"value{r['type'].capitalize()}"]) for i, r in enumerate(result["valueArray"])]) + "\n"
        elif result_type == "object":
            return f"**{key}**\n" + ''.join([_format_result(f"{key}.{k}", result["valueObject"][k]) for k in result["valueObject"]])

    fields = content_understanding_result['result']['contents'][0]['fields']
    markdown_result = ""
    for field in fields:
        markdown_result += _format_result(field, fields[field])

    return markdown_result

def insert_figure_contents(md_content, figure_contents, span_offsets):
    """
    Inserts the figure content for each of the provided figures in figure_contents
    before the span offset of that figure in the given markdown content.
    """
    # Validate span_offsets are sorted and strictly increasing
    if span_offsets != sorted(span_offsets) or not all([o < span_offsets[i + 1] for i, o in enumerate(span_offsets) if i < len(span_offsets) - 1]):
        raise ValueError("span_offsets should be sorted and strictly increasing.")

    # Split the content based on the provided spans
    parts = []
    preamble = None
    for i, offset in enumerate(span_offsets):
        if i == 0 and offset > 0:
            preamble = md_content[0:offset]
            parts.append(md_content[offset:span_offsets[i + 1]] if i + 1 < len(span_offsets) else md_content[offset:])
        elif i == len(span_offsets) - 1:
            parts.append(md_content[offset:])
        else:
            parts.append(md_content[offset:span_offsets[i + 1]])

    # Join the parts back together with the figure content inserted
    modified_content = ""
    if preamble:
        modified_content += preamble
    for i, part in enumerate(parts):
        modified_content += f"<!-- FigureContent=\"{figure_contents[i]}\" -->" + part

    return modified_content

In [None]:
def process_document(file_path):
    """Process a single document with Content Understanding and return its formatted content"""
    file_name = os.path.basename(file_path)
    print(f"Processing {file_name}...")
    
    with open(file_path, 'rb') as f:
        pdf_bytes = f.read()

    # Use Document Intelligence to analyze the document
    poller = document_intelligence_client.begin_analyze_document(
        "prebuilt-layout",
        AnalyzeDocumentRequest(bytes_source=pdf_bytes),
        output=[str('figures')],
        features=['ocrHighResolution'],
        output_content_format="markdown"
    )

    result: AnalyzeResult = poller.result()
    md_content = result.content
    
    # Process figures if available
    figure_contents = []
    if result.figures:
        print(f"Extracting {len(result.figures)} figure contents from {file_name}")
        for figure_idx, figure in enumerate(result.figures):
            for region in figure.bounding_regions:
                bounding_box = (
                    region.polygon[0],  # x0 (left)
                    region.polygon[1],  # y0 (top
                    region.polygon[4],  # x1 (right)
                    region.polygon[5]   # y1 (bottom)
                )
            
            # Get page number for the figure
            page_number = figure.bounding_regions[0]['pageNumber']
            cropped_img = crop_image_from_pdf_page(file_path, page_number - 1, bounding_box)

            # Create a folder structure with document name
            doc_figures_dir = os.path.join("figures", os.path.splitext(file_name)[0])
            os.makedirs(doc_figures_dir, exist_ok=True)

            figure_filename = f"figure_{page_number}_{figure_idx + 1}.png"
            figure_filepath = os.path.join(doc_figures_dir, figure_filename)

            # Save the figure
            cropped_img.save(figure_filepath)
            bytes_io = io.BytesIO()
            cropped_img.save(bytes_io, format='PNG')
            cropped_img = bytes_io.getvalue()

            # Analyze the figure with Content Understanding
            content_understanding_response = content_understanding_client.begin_analyze(ANALYZER_ID, figure_filepath)
            content_understanding_result = content_understanding_client.poll_result(content_understanding_response, timeout_seconds=1000)
            
            # Get figure content and metadata
            figure_content = format_content_understanding_result(content_understanding_result)
            
            # Add metadata to figure content
            figure_metadata = {
                "document": file_name,
                "page_number": page_number,
                "figure_index": figure_idx + 1,
                "content": figure_content,
                "figure_path": figure_filepath
            }
            
            figure_contents.append(figure_metadata)
            print(f"Processed figure {figure_idx + 1} on page {page_number}")

        # Insert figure content into corresponding location in document
        md_content = insert_figure_contents(
            md_content, 
            [f["content"] for f in figure_contents], 
            [f.spans[0]["offset"] for f in result.figures]
        )
    
    # Save processed document to cache
    cache_dir = "cache"
    os.makedirs(cache_dir, exist_ok=True)
    cache_file = os.path.join(cache_dir, f"{os.path.splitext(file_name)[0]}.cache")
    
    document_data = {
        "file_name": file_name,
        "content": md_content,
        "figures": figure_contents,
        "raw_result": result.as_dict(),
        "processed_at": datetime.now().isoformat()
    }
    
    # Save to cache file
    with open(cache_file, 'w') as f:
        json.dump(document_data, f)
    
    return document_data

In [None]:














def process_multiple_documents(file_paths):
    """Process multiple documents and prepare them for indexing"""
    processed_documents = []
    
    for file_path in file_paths:
        try:
            doc_data = process_document(file_path)
            processed_documents.append(doc_data)
        except Exception as e:
            print(f"Error processing {file_path}: {str(e)}")
    
    return processed_documents

def chunk_document(document_data, chunk_size=512, chunk_overlap=20):
    """
    Split a document into chunks for indexing, preserving metadata
    """
    # Configure headers to split on
    headers_to_split_on = [
        ("#", "Header 1"),
        ("##", "Header 2"),
        ("###", "Header 3")
    ]
    
    # First split text using Markdown headers
    text_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on, strip_headers=False)
    header_chunks = text_splitter.split_text(document_data["content"])
    
    # Then further split the text using recursive character text splitting
    char_text_splitter = RecursiveCharacterTextSplitter(
        separators=["<!--", "\n\n", "#"], 
        chunk_size=chunk_size, 
        chunk_overlap=chunk_overlap, 
        is_separator_regex=True
    )
    
    # Convert to LangChain document format with metadata
    chunks_with_metadata = []
    
    for i, chunk in enumerate(char_text_splitter.split_documents(header_chunks)):
        # Start with base metadata
        metadata = {
            "file_name": document_data["file_name"],
            "chunk_id": i,
            "id": f"{document_data['file_name'].replace('.', '_')}_{i}"
        }
        
        # Extract page number if possible
        page_number = None
        
        # Look for figure content in the chunk
        figure_info = None
        for fig in document_data["figures"]:
            if fig["content"] in chunk.page_content:
                figure_info = fig
                page_number = fig["page_number"]
                break
        
        # Add figure metadata if found
        if figure_info:
            metadata.update({
                "figure_index": figure_info["figure_index"],
                "figure_path": figure_info["figure_path"],
                "section_type": "figure"
            })
        else:
            metadata["section_type"] = "text"
        
        # Add page number (if found or inferred)
        metadata["page_number"] = page_number or 1
        
        # Add any header information from the chunk
        if "Header 1" in chunk.metadata:
            metadata["header_1"] = chunk.metadata["Header 1"]
        if "Header 2" in chunk.metadata:
            metadata["header_2"] = chunk.metadata["Header 2"]
        if "Header 3" in chunk.metadata:
            metadata["header_3"] = chunk.metadata["Header 3"]
        
        # Update the chunk metadata
        chunk.metadata = metadata
        chunks_with_metadata.append(chunk)
    
    return chunks_with_metadata

def setup_vector_store(index_name, embedding_model_dimensions=1536):
    """
    Set up Azure Search vector store with the required index structure
    Returns the vector store instance ready for adding documents
    """
    # Set up embeddings
    aoai_embeddings = AzureOpenAIEmbeddings(
        azure_deployment=AZURE_OPENAI_EMBEDDING_DEPLOYMENT_NAME,
        openai_api_version=AZURE_OPENAI_EMBEDDING_API_VERSION,
        azure_endpoint=AZURE_OPENAI_ENDPOINT,
        api_key=os.getenv("AZURE_OPENAI_KEY"),
    )
    
    # Set up vector store
    vector_store = AzureSearch(
        azure_search_endpoint=AZURE_SEARCH_ENDPOINT,
        azure_search_key=AZURE_SEARCH_KEY,
        index_name=index_name,
        embedding_function=aoai_embeddings.embed_query,
        fields={
            "file_name": {"type": "Edm.String", "filterable": True, "searchable": True},
            "page_number": {"type": "Edm.Int32", "filterable": True, "sortable": True},
            "section_type": {"type": "Edm.String", "filterable": True, "facetable": True},
            "figure_index": {"type": "Edm.Int32", "filterable": True},
            "figure_path": {"type": "Edm.String", "filterable": False},
            "chunk_id": {"type": "Edm.Int32", "filterable": True, "sortable": True},
            "header_1": {"type": "Edm.String", "filterable": True, "searchable": True},
            "header_2": {"type": "Edm.String", "filterable": True, "searchable": True},
            "header_3": {"type": "Edm.String", "filterable": True, "searchable": True}
        }
    )
    
    return vector_store

def process_and_index_documents(pdf_directory, index_name):
    """
    Main function to process multiple PDF files and index them
    """
    # Find all PDF files in the directory
    pdf_files = glob.glob(os.path.join(pdf_directory, "*.pdf"))
    
    if not pdf_files:
        print(f"No PDF files found in {pdf_directory}")
        return
    
    print(f"Found {len(pdf_files)} PDF files to process")
    
    # Process all documents
    processed_documents = process_multiple_documents(pdf_files)
    print(f"Successfully processed {len(processed_documents)} documents")
    
    # Chunk each document and collect all chunks
    all_chunks = []
    for doc in processed_documents:
        doc_chunks = chunk_document(doc)
        all_chunks.extend(doc_chunks)
        print(f"Created {len(doc_chunks)} chunks from {doc['file_name']}")
    
    # Set up vector store and add documents
    vector_store = setup_vector_store(index_name)
    vector_store.add_documents(documents=all_chunks)
    
    print(f"Indexed {len(all_chunks)} chunks in total")
    
    return vector_store

def search_documents(vector_store, query, filter_criteria=None, top_k=5):
    """
    Search for documents using vector search
    """
    # Set up the retriever
    search_params = {"top": top_k}
    
    if filter_criteria:
        search_params["filter"] = filter_criteria
    
    retriever = vector_store.as_retriever(
        search_type="similarity",
        search_kwargs=search_params
    )
    
    # Retrieve relevant documents
    retrieved_docs = retriever.invoke(query)
    
    return retrieved_docs

def generate_answer(query, retrieved_docs, temperature=0.7):
    """
    Generate an answer to the query using the retrieved documents as context
    """
    # Helper function to generate the formatted context
    def generate_context(chunks):
        context = []
        for i, chunk in enumerate(chunks):
            s = (f'Source {i} Metadata: {chunk.metadata}\n'
                 f'Source {i} Content: {chunk.page_content}')
            context.append(s)
        context = '\n---\n'.join(context)
        return context
    
    # Remove redundant chunks
    appeared = set()
    unique_chunks = []
    for chunk in retrieved_docs:
        chunk_id = chunk.metadata['id']
        if chunk_id not in appeared:
            appeared.add(chunk_id)
            unique_chunks.append(chunk)
    
    # Create context from unique chunks
    context = generate_context(unique_chunks)
    
    # Define system prompt template
    prompt = """
    You are an expert in document analysis. You are proficient in reading and analyzing technical reports. You are good at numerical reasoning and have a good understanding of financial concepts. You are given a question which you need to answer based on the references provided. To answer this question, you may first read the question carefully to know what information is required or helpful to answer the question. Then, you may read the references to find the relevant information.

    If you find enough information to answer the question, you can first write down your thinking process and then provide a concise answer at the end.
    If you find that there is not enough information to answer the question, you can state that there is insufficient information.
    If you are not able or sure how to answer the question, say that you are not able to answer the question.
    Do not provide any information that is not present in the references.
    References are in markdown format, you may follow the markdown syntax to better understand the references.

    ---
    References:
    {context}
    ---

    Now, here is the question:
    ---
    Question:
    {question}
    ---
    Thinking Process::: 
    Answer::: 
    """
    
    # Format the prompt
    formatted_prompt = prompt.format(
        question=query,
        context=context
    )
    
    # Set up the chat model
    chat_llm = AzureChatOpenAI(
        model=AZURE_OPENAI_CHAT_DEPLOYMENT_NAME,
        azure_endpoint=AZURE_OPENAI_ENDPOINT,
        api_key=os.getenv("AZURE_OPENAI_KEY"),
        api_version=AZURE_OPENAI_CHAT_API_VERSION,
        temperature=temperature
    )
    
    # Generate the answer
    answer = chat_llm.invoke(formatted_prompt)
    
    return answer.content

# Main execution
if __name__ == "__main__":
    # Directory containing PDF files
    pdf_directory = "../data"
    
    # Process and index documents
    vector_store = process_and_index_documents(pdf_directory, AZURE_SEARCH_INDEX_NAME)
    
    # Example search
    query = "Explain the defrost cycle in heat pumps"
    
    # Search with filters
    # Example filter: "file_name eq 'document1.pdf' and page_number gt 5"
    retrieved_docs = search_documents(vector_store, query, filter_criteria=None, top_k=5)
    
    # Generate answer
    answer = generate_answer(query, retrieved_docs)
    
    # Print the answer
    print(f"Query: {query}")
    print("=" * 50)
    print(answer)

Error creating analyzer: 409 Client Error: Conflict for url: https://gk-cu-aiservices.services.ai.azure.com/contentunderstanding/analyzers/content-understanding-search-sample-2e7835a6-2b7e-41a2-bc98-e41e32947184?api-version=2024-12-01-preview
Using existing analyzer ID if available
Found 50 PDF files to process
Processing AC-SVU003A-EN_12012023.pdf...


KeyboardInterrupt: 