In [9]:
!pip install PyMuPDF

Collecting PyMuPDF
  Downloading pymupdf-1.26.1-cp39-abi3-macosx_10_9_x86_64.whl.metadata (3.4 kB)
Downloading pymupdf-1.26.1-cp39-abi3-macosx_10_9_x86_64.whl (23.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m23.1/23.1 MB[0m [31m50.6 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: PyMuPDF
Successfully installed PyMuPDF-1.26.1


In [3]:
import os
import re
import fitz  # PyMuPDF
from PIL import Image
import pytesseract
import pandas as pd
from datetime import datetime
from typing import List, Optional

import gradio as gr
from dotenv import load_dotenv, find_dotenv

from pydantic import BaseModel, Field
from langchain_openai import ChatOpenAI
from langchain_core.utils.function_calling import convert_to_openai_function
from langchain.output_parsers.openai_functions import JsonKeyOutputFunctionsParser
from langchain_core.prompts.chat import HumanMessagePromptTemplate, ChatPromptTemplate

_ = load_dotenv(find_dotenv())
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
if not OPENAI_API_KEY:
    raise ValueError("OPENAI_API_KEY is not set.")

model = ChatOpenAI(model="gpt-4o", temperature=0, api_key=OPENAI_API_KEY)

class DelegationSession(BaseModel):
    country: str = Field(description="Country name.")
    officials: Optional[List[str]] = Field(default_factory=list)
    representatives: Optional[List[str]] = Field(default_factory=list)
    alternate_representatives: Optional[List[str]] = Field(default_factory=list)
    advisers: Optional[List[str]] = Field(default_factory=list)
    other_attendees: Optional[List[str]] = Field(default_factory=list)
    leader_present: bool = Field(description="True if any official is president or prime minister")
    year: str = Field(default="NA")

class DelegationData(BaseModel):
    sessions: List[DelegationSession]

prompt_template = HumanMessagePromptTemplate.from_template(
    template=(
        "You are extracting UN delegation session data from the following text for the country: {country}.\n\n"
        "Instructions:\n"
        "1. Extract **all** names under each category: officials, representatives, alternate representatives, advisers, other attendees.\n"
        "2. The lists might be long and span multiple lines. Be exhaustive and include every name.\n"
        "3. If the text mentions 'President' or 'Prime Minister', set leader_present = true.\n"
        "4. Use 'NA' if year is missing.\n"
        "5. Return the result as JSON matching the schema.\n\n"
        "Text:\n{text}"
    )
)

prompt = ChatPromptTemplate.from_messages([prompt_template])
functions = [convert_to_openai_function(DelegationData)]
gpt_func_model = model.bind(functions=functions, function_call={"name": "DelegationData"})
extraction_chain = prompt | gpt_func_model | JsonKeyOutputFunctionsParser(key_name="sessions")

def extract_full_ocr_text(pdf_path, dpi=300):
    doc = fitz.open(pdf_path)

    # Extract year from first page only (page 0)
    first_page = doc.load_page(0)
    pix = first_page.get_pixmap(matrix=fitz.Matrix(dpi / 72, dpi / 72))
    img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
    first_page_text = pytesseract.image_to_string(img)

    year_match = re.search(r"\b(19|20)\d{2}(?:\s*[-\u2013]\s*(19|20)\d{2})?\b", first_page_text)
    if year_match:
        year = year_match.group(0)
        if "-" in year or "–" in year:
            year = re.split(r"[-–]", year)[0].strip()
    else:
        year = "NA"

    # Extract text starting from page 5 (index 4)
    full_text = ""
    for i in range(4, len(doc)):
        page = doc.load_page(i)
        pix = page.get_pixmap(matrix=fitz.Matrix(dpi / 72, dpi / 72))
        img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
        page_text = pytesseract.image_to_string(img)
        full_text += "\n" + page_text

    return full_text, year

def clean_text_no_internal_blank_lines(raw_text):
    lines = [line.strip() for line in raw_text.splitlines()]
    lines = [line for line in lines if line and not re.fullmatch(r'\d+', line)]
    return '\n'.join(lines)

def split_text_by_Country(text):
    cleaned_text = clean_text_no_internal_blank_lines(text)
    country_pattern = r'^(?P<country>[A-Z][A-Z\s\-&.,]*)$'

    decorated_text = re.sub(
        country_pattern,
        lambda m: f"\n~~~~~ {m.group('country').strip()}",
        cleaned_text,
        flags=re.MULTILINE
    )

    chunks = decorated_text.split("~~~~~")
    final_chunks = [chunk.strip() for chunk in chunks if chunk.strip()]
    spaced_text = "\n\n".join(f"~~~~~ {chunk}" for chunk in final_chunks)
    return final_chunks, spaced_text

def filter_chunks_by_last_country(chunks, last_country="ZIMBABWE"):
    filtered_chunks = []
    for chunk in chunks:
        lines = chunk.splitlines()
        if not lines:
            continue
        country = lines[0].strip().upper()
        filtered_chunks.append(chunk)
        if country == last_country.upper():
            break  # stop after last country
    return filtered_chunks

def extract_sessions_from_text_chunks(chunks):
    all_sessions = []
    for chunk in chunks:
        lines = chunk.splitlines()
        country = lines[0].strip() if lines else "NA"
        chunk_text = '\n'.join(lines[1:]).strip() if len(lines) > 1 else ""
        try:
            sessions = extraction_chain.invoke({
                "country": country,
                "text": chunk_text
            })
            all_sessions.extend(sessions)
        except Exception as e:
            print(f"[ERROR] LangChain extraction failed for country {country}: {e}")
    return all_sessions

def merge_sessions(sessions: List[dict]) -> List[dict]:
    merged = {}
    for s in sessions:
        country = s.get("country") or "NA"
        year = s.get("year") or "NA"
        key = (country, year)
        if key not in merged:
            merged[key] = {
                "country": country,
                "year": year,
                "officials": set(s.get("officials") or []),
                "representatives": set(s.get("representatives") or []),
                "alternate_representatives": set(s.get("alternate_representatives") or []),
                "advisers": set(s.get("advisers") or []),
                "other_attendees": set(s.get("other_attendees") or []),
                "leader_present": s.get("leader_present", False)
            }
        else:
            for k in ["officials", "representatives", "alternate_representatives", "advisers", "other_attendees"]:
                merged[key][k].update(s.get(k) or [])
            merged[key]["leader_present"] |= s.get("leader_present", False)
    return [{
        "country": k[0],
        "year": k[1],
        "officials": sorted(v["officials"]),
        "representatives": sorted(v["representatives"]),
        "alternate_representatives": sorted(v["alternate_representatives"]),
        "advisers": sorted(v["advisers"]),
        "other_attendees": sorted(v["other_attendees"]),
        "leader_present": v["leader_present"]
    } for k, v in merged.items()]

def save_sessions_to_excel(sessions, filename=None):
    if not filename:
        filename = f"UN_Session_Counts_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
    rows = []
    for session_id, s in enumerate(sessions, start=1):
        attendees_count = sum(len(s.get(k) or []) for k in ["officials", "representatives", "alternate_representatives", "advisers"])
        rows.append({
            "session_id": session_id,
            "country": s["country"].title(),  # ✅ Title-case the country name
            "year": s["year"],
            "officials": len(s["officials"]),
            "leader_present": int(s["leader_present"]),
            "representatives": len(s["representatives"]),
            "alternate_representatives": len(s["alternate_representatives"]),
            "advisers": len(s["advisers"]),
            "attendees": attendees_count
        })
    df = pd.DataFrame(rows)
    df.to_excel(filename, index=False)
    return filename, df

def process_uploaded_pdfs(pdf_files):
    if not pdf_files:
        return [], None, None

    all_sessions = []
    cleaned_prompt_texts = []

    for pdf_path in pdf_files:
        full_text, detected_year = extract_full_ocr_text(pdf_path)

        chunks, cleaned_marked_text = split_text_by_Country(full_text)
        chunks = filter_chunks_by_last_country(chunks, last_country="ZIMBABWE")

        # Regenerate cleaned_marked_text after filtering
        cleaned_marked_text = "\n\n".join(f"~~~~~ {chunk}" for chunk in chunks)
        cleaned_prompt_texts.append(cleaned_marked_text)

        sessions = extract_sessions_from_text_chunks(chunks)
        for s in sessions:
            if s["year"] == "NA":
                s["year"] = detected_year

        all_sessions.extend(sessions)

    merged_sessions = merge_sessions(all_sessions)
    excel_path, df = save_sessions_to_excel(merged_sessions)

    cleaned_text_path = f"cleaned_prompt_text_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
    with open(cleaned_text_path, "w", encoding="utf-8") as f:
        for text in cleaned_prompt_texts:
            f.write(text + "\n\n=====\n\n")

    return df.to_dict(orient="records"), excel_path, cleaned_text_path

def create_gradio_app():
    with gr.Blocks(title="UN Delegation Extractor") as demo:
        gr.Markdown("## UN Delegation Session Extractor (LangChain + GPT-4o + OCR)")
        pdf_input = gr.File(label="Upload scanned UN Session PDFs", file_types=[".pdf"], file_count="multiple", type="filepath")
        extract_btn = gr.Button("Extract Delegation Data")

        result_json = gr.JSON(label="Extracted Structured Data")
        download_excel = gr.File(label="Download Excel Summary", visible=True)
        download_cleaned = gr.File(label="Download Cleaned Text for GPT Input", visible=True)

        extract_btn.click(
            fn=process_uploaded_pdfs,
            inputs=[pdf_input],
            outputs=[result_json, download_excel, download_cleaned]
        )
    return demo

if __name__ == "__main__":
    app = create_gradio_app()
    app.launch(share=True, inbrowser=True, show_error=True)


* Running on local URL:  http://127.0.0.1:7862
* To create a public link, set `share=True` in `launch()`.
