In [1]:
pip install python-docx docx2txt langchain openai llama-index chromadb pydantic gradio



In [11]:
import gradio as gr
import io
import zipfile
import json
import re
import os
from datetime import datetime
from typing import List, Dict
import docx
from difflib import SequenceMatcher
import openai
import tempfile  # For handling temporary files

# Read OpenAI API key securely from environment variable
openai.api_key = os.getenv(
    "OPENAI_API_KEY",
    "sk-proj-xyFuWhZozv9_3ekFudfbQ28I9QfTJ7I11zFHkIYCsVnI1-OnmodQ6Gd9V9ellTEKzPdGL8tkT3BlbkFJjxFT9bq-4qAeFivRuihmnbLlSziuLTeOYlsmEQrg1VrZVqlUEKthD86PM1qaCbGaj2Eg0q8A"
)


In [12]:
def find_section_for_keyword(doc: docx.Document, keyword: str, match_text: str, window: int = 500) -> str:
    """
    Find the paragraph where match_text or keyword appears,
    then return the closest preceding section heading.
    Uses enhanced heading detection logic.
    """

    def extract_section_headings(doc: docx.Document) -> List[Dict]:
        """
        Extract paragraphs that look like section headings using regex patterns and heading styles.
        """
        section_headings = []
        heading_patterns = [
            r"^(Clause|Section|Article)\s*\d+(\.\d+)*",  # Clause 3, Section 4.1, Article 2.3.4
            r"^\d+(\.\d+)+",                            # 2.1 or 3.1.4 etc.
            r"^\d+\)",                                  # 2) or 3) numbered lists
            r"^[A-Z][A-Z\s]{3,}$",                      # ALL CAPS headings (at least 4 letters)
            r"^[IVXLCDM]+\.",                           # Roman numerals (e.g., I., II., III.)
            r"^[A-Z]\.",                                # Single capital letter followed by period (e.g., A., B.)
            r"^\([a-z]\)",                              # (a), (b) style sub-headings
            r"^\([ivx]+\)",                             # (i), (ii), (iii) style sub-headings
        ]
        for idx, para in enumerate(doc.paragraphs):
            text = para.text.strip()
            if not text:
                continue

            is_heading_style = False
            if para.style and 'Heading' in para.style.name:
                is_heading_style = True

            is_pattern_match = any(re.match(pat, text, re.I) for pat in heading_patterns)

            if is_heading_style or is_pattern_match:
                section_headings.append({"index": idx, "text": text})

        return section_headings

    search_string = match_text if match_text else keyword
    if not search_string:
        return "Unknown Section (No Search String Provided)"

    section_headings = extract_section_headings(doc)

    # Find paragraph index containing the keyword or match_text
    match_index = -1
    search_lower = search_string.lower()
    for i, para in enumerate(doc.paragraphs):
        if search_lower in para.text.lower():
            match_index = i
            break

    if match_index == -1:
        return "Unknown Section (Match Not Found)"

    # Search backward from matched paragraph to find closest preceding heading
    for idx in range(match_index, -1, -1):
        heading = next((sh for sh in section_headings if sh["index"] == idx), None)
        if heading:
            return heading["text"]

    # If no preceding heading found, optionally return first heading or unknown
    if section_headings:
        return section_headings[0]["text"]

    return "Unknown Section (No Clear Heading Found Before Match)"


In [13]:
def insert_comments_in_docx(file_obj, issues: List[Dict]) -> io.BytesIO:
    """
    Insert comments into a DOCX document based on issues found,
    returning a BytesIO object with the updated document content.
    """

    # --- 1. Load Document from File Object ---
    file_obj.seek(0)
    doc = docx.Document(file_obj)

    # --- 2. Locate Paragraphs for Each Issue ---
    issues_with_para_index = []
    for issue in issues:
        # Use context or issue keyword for searching in paragraphs
        search_text = issue.get("context", issue.get("issue", ""))
        if not search_text or "Processing Error" in search_text:
            continue  # Skip if no valid search text or error flag present

        found = False
        # Escape special characters for regex and limit search text length
        limited_search_text = search_text[:500]

        for i, para in enumerate(doc.paragraphs):
            # Check if paragraph contains the limited search text (case-insensitive)
            if limited_search_text.lower() in para.text.lower():
                # Avoid commenting on a paragraph that's only a section title
                section_title = issue.get("section", "")
                if section_title and section_title.lower().strip() == para.text.lower().strip():
                    # If paragraph is section title, comment on next paragraph if exists
                    if i + 1 < len(doc.paragraphs):
                        issues_with_para_index.append((i + 1, issue))
                        found = True
                        break
                    else:
                        # Last paragraph fallback
                        issues_with_para_index.append((i, issue))
                        found = True
                        break
                else:
                    # Add comment to paragraph with context/keyword
                    issues_with_para_index.append((i, issue))
                    found = True
                    break  # Only one comment per issue

        if not found:
            print(f"Warning: Could not find suitable paragraph for comment for issue: {issue.get('issue', 'N/A')}")

    # --- 3. Sort Issues by Paragraph Index Descending ---
    # To avoid messing paragraph indices while inserting comments
    issues_with_para_index.sort(key=lambda x: x[0], reverse=True)

    # --- 4. Insert Comments into Document ---
    for para_index, issue in issues_with_para_index:
        if para_index < len(doc.paragraphs):
            para = doc.paragraphs[para_index]
            issue_type = issue.get("issue", "N/A")
            context = issue.get("details", "N/A")
            section = issue.get("section", "Unknown Section")

            # Format comment text (truncate long context)
            comment_text = f"Issue: {issue_type}\nSection: {section}\nContext: {context[:200]}{'...' if len(context) > 200 else ''}"

            # Avoid duplicate comments for the same paragraph
            if comment_text not in para.text:
                try:
                    comment_marker = f"[Review Needed: {comment_text}]"
                    if comment_marker not in para.text:
                        run = para.add_run(f"  {comment_marker}")
                        run.italic = True
                        run.font.color.rgb = docx.shared.RGBColor(0xFF, 0x00, 0x00)  # Red color
                except Exception as e:
                    print(f"Error adding comment to paragraph {para_index}: {e}")
        else:
            print(f"Warning: Paragraph index {para_index} out of bounds for commenting.")

    # --- 5. Save Modified Document to BytesIO ---
    bio = io.BytesIO()
    try:
        doc.save(bio)
    except Exception as e:
        print(f"Error saving document after adding comments: {e}")
        # Attempt fallback to original file with error note
        file_obj.seek(0)
        original_content = file_obj.read()
        bio = io.BytesIO(original_content)
        try:
            error_doc = docx.Document(io.BytesIO(original_content))
            error_doc.add_paragraph(f"[ERROR: Failed to add comments due to: {e}. See JSON report for details.]")
            bio = io.BytesIO()
            error_doc.save(bio)
        except Exception as e_save_error_doc:
            print(f"Further error saving error doc: {e_save_error_doc}")
            file_obj.seek(0)
            bio = io.BytesIO(file_obj.read())

    bio.seek(0)
    return bio


In [14]:
# --- Mandatory Documents List ---
MANDATORY_DOCS = {
    "Company Incorporation": [
        "Articles of Association",
        "Memorandum of Association",
        "Board Resolution Template",
        "Shareholder Resolution Template",
        "Register of Members and Directors",
        "Incorporation Application Form",
    ]
}

In [23]:

def analyze_documents(uploaded_files):
    # Gradio outputs: checklist_out, per_file_out, structured_out, download_zip, download_json
    # Need to return placeholders if there's an error before generating outputs
    checklist_md = ""
    per_file_md = ""
    structured_output = {}
    zip_file_path = None # Will store path to temporary zip file
    json_file_path = None # Will store path to temporary json file


    try:
        if not uploaded_files:
            return "⚠️ Please upload one or more .docx files to start analysis.", "", {}, None, None

        # uploaded_files is a list of file paths (strings) from Gradio
        file_objs = []
        for f in uploaded_files:
            try:
                with open(f, "rb") as file_data:
                    bio = io.BytesIO(file_data.read())
                bio.name = os.path.basename(f)
                file_objs.append(bio)
            except Exception as e:
                 print(f"Error reading uploaded file {f}: {e}")
                 # Skip this file but continue with others
                 continue

        if not file_objs:
             return "⚠️ No valid files were read for analysis.", "", {}, None, None

        # Identify doc types
        docs_text = parse_docx_documents(file_objs)
        detected_docs = identify_document_types_with_score(docs_text)
        doc_types_list = [d["doc_type"] for d in detected_docs]
        uploaded_file_names = [d['file_name'] for d in detected_docs] # Get actual names of processed files

        # Determine legal process & checklist
        legal_process = "Company Incorporation" if any(d in MANDATORY_DOCS["Company Incorporation"] for d in doc_types_list) else "Unknown"
        mandatory_list = MANDATORY_DOCS.get(legal_process, MANDATORY_DOCS["Company Incorporation"])
        checklist_found, checklist_missing = check_mandatory_documents(doc_types_list, mandatory_list)
        process_match_score = round(len(checklist_found) / len(mandatory_list), 2) if mandatory_list else 0

        checklist_md = (
            f"**Process detected:** {legal_process} — (match hits: {len(checklist_found)}/{len(mandatory_list)})\n\n"
            f"**Checklist found:** {checklist_found}\n\n"
            f"**Checklist missing:** {checklist_missing}\n"
        )

        # Red flag detection
        issues_per_file, reviewed_docs = [], []
        # Create a dictionary mapping original file names to their BytesIO objects for easy access
        file_obj_dict = {f.name: f for f in file_objs}

        for d in detected_docs:
            fname = d["file_name"]
            current_file_obj = file_obj_dict.get(fname)

            if current_file_obj is None:
                 print(f"Error: File object for {fname} not found in memory.")
                 issues_per_file.append({
                     "file_name": fname,
                     "doc_type": d["doc_type"],
                     "doc_score": d["doc_score"],
                     "red_flags": [{"keyword": "Processing Error", "context": "Internal error: File object not found for analysis.", "adgm_present": False, "section": "N/A"}],
                 })
                 continue


            try:
                current_file_obj.seek(0) # Ensure file pointer is at the beginning
                doc_obj = docx.Document(current_file_obj)
                red_flags = detect_red_flags_detailed(fname, doc_obj)
                issues_per_file.append({
                    "file_name": fname,
                    "doc_type": d["doc_type"],
                    "doc_score": d["doc_score"],
                    "red_flags": red_flags,
                })
                # Reset file object pointer before inserting comments
                current_file_obj.seek(0)
                # Pass the issue structure expected by insert_comments_in_docx
                comments_issues = [{
                    # Use 'keyword' for 'issue' type in the comment
                    "issue": rf.get("keyword", "N/A"),
                    # Use 'context' for 'details' in the comment
                    "details": rf.get("context", "No context"),
                    "section": rf.get("section", "Unknown Section")
                } for rf in red_flags]

                marked_bio = insert_comments_in_docx(current_file_obj, comments_issues)
                marked_bio.name = f"reviewed_{fname}" # Explicitly set name for Gradio
                reviewed_docs.append(marked_bio) # Append BytesIO object directly
            except Exception as e:
                print(f"Error during red flag detection or commenting for {fname}: {e}")
                import traceback
                traceback.print_exc() # Print traceback for debugging

                issues_per_file.append({
                     "file_name": fname,
                     "doc_type": d["doc_type"],
                     "doc_score": d["doc_score"],
                     "red_flags": [{"keyword": "Processing Error", "context": f"Error during analysis: {e}", "adgm_present": False, "section": "N/A"}],
                 })
                # Still try to include the original file in the zip if possible
                if current_file_obj:
                    current_file_obj.seek(0)
                    error_bio = io.BytesIO(current_file_obj.read())
                    error_bio.name = f"error_{fname}"
                    reviewed_docs.append(error_bio)


        # Create ZIP of reviewed docs and save to a temporary file
        if reviewed_docs:
             with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmp_zip:
                 zip_buffer = io.BytesIO()
                 with zipfile.ZipFile(zip_buffer, "w") as zf:
                     for bio in reviewed_docs:
                         bio.seek(0)
                         zf.writestr(bio.name, bio.read())
                 tmp_zip.write(zip_buffer.getvalue())
                 zip_file_path = tmp_zip.name
             print(f"Generated temporary ZIP file: {zip_file_path}, size: {os.path.getsize(tmp_zip.name)} bytes")
        else:
             print("No documents to include in ZIP.")
             zip_file_path = None


        # Build issues summary (Simplified for the requested output structure)
        severity_map = {
            "jurisdiction (non-ADGM)": "High",
            "jurisdiction (ADGM reference missing)": "High",
            "reference to UAE federal courts": "High",
            "explicit non-ADGM governing law": "High",
            "signature section missing": "Medium",
            "signature details potentially missing": "Low",
            "ambiguous wording": "Low",
            "governing law (non-ADGM)": "High",
            "governing law (ADGM law reference missing)": "High",
            "governing law clause unclear or incomplete": "Medium",
            "force majeure clause (ADGM context unclear)": "Low",
            "indemnity/liability clause (ADGM context unclear)": "Medium",
            "confidentiality clause (ADGM context unclear)": "Low",
            "GDPR or non-ADGM data protection reference": "Low",
            "non-AED/USD currency reference (convertibility unclear)": "Low",
            "potentially outdated date reference": "Low",
            "termination clause (details unclear/missing)": "Medium",
            "assignment/transfer restrictions (ADGM context unclear)": "Medium",
            "specific performance/equitable remedies clause (ADGM context unclear)": "Medium",
            "Processing Error": "High" # Mark processing errors clearly
        }
        suggestion_map = {
            "jurisdiction (non-ADGM)": "Ensure jurisdiction is explicitly set to ADGM Courts.",
            "jurisdiction (ADGM reference missing)": "Ensure jurisdiction is explicitly set to ADGM Courts.",
            "reference to UAE federal courts": "Replace references with ADGM Courts.",
            "explicit non-ADGM governing law": "Explicitly state that the agreement is governed by ADGM law.",
            "signature section missing": "Add authorized signatory section with name, title, and date fields.",
            "signature details potentially missing": "Verify name, title, and date fields are present for signatories.",
            "ambiguous wording ('may')": "Replace 'may' with clear, mandatory language (e.g., 'shall') if a requirement is intended.", # More specific suggestion
            "ambiguous wording ('might')": "Replace 'might' with clear, mandatory language (e.g., 'shall') if a requirement is intended.",
            "ambiguous wording ('should')": "Replace 'should' with 'shall' if a mandatory action is intended.",
            "ambiguous wording ('could')": "Replace 'could' with clear language indicating possibility or requirement.",
            "ambiguous wording ('possibly')": "Replace 'possibly' with clear language indicating possibility or requirement.",
            "ambiguous wording ('endeavour')": "Replace 'endeavour' with a firm commitment (e.g., 'shall use best efforts' or 'shall').",
            "ambiguous wording ('endeavor')": "Replace 'endeavor' with a firm commitment (e.g., 'shall use best efforts' or 'shall').",
            "ambiguous wording ('best efforts')": "Define 'best efforts' or replace with a clearer standard.",
            "ambiguous wording ('reasonable efforts')": "Define 'reasonable efforts' or replace with a clearer standard.",

            "governing law (non-ADGM)": "Ensure governing law is explicitly set to ADGM law.",
            "governing law (ADGM law reference missing)": "Ensure governing law is explicitly set to ADGM law.",
            "governing law clause unclear or incomplete": "Clarify and complete the governing law clause, specifying ADGM law.",
            "force majeure clause (ADGM context unclear)": "Review and ensure the Force Majeure clause aligns with ADGM regulations or common practice.",
            "indemnity/liability clause (ADGM context unclear)": "Review and ensure Indemnity and Liability clauses align with ADGM regulations or common practice.",
            "confidentiality clause (ADGM context unclear)": "Review and ensure the Confidentiality clause aligns with ADGM regulations or common practice.",
            "GDPR or non-ADGM data protection reference": "Assess if non-ADGM data protection references are necessary and ensure compliance with ADGM data protection regulations.",
            "non-AED/USD currency reference (convertibility unclear)": "Confirm if non-AED/USD currencies are acceptable or if values should be in AED/USD as per ADGM requirements.",
            "potentially outdated date reference": "Verify the date is correct and relevant to the agreement's validity.",
            "termination clause (details unclear/missing)": "Review and complete the termination clause, including notice periods and grounds.",
            "assignment/transfer restrictions (ADGM context unclear)": "Review assignment/transfer clauses for ADGM compliance.",
            "specific performance/equitable remedies clause (ADGM context unclear)": "Review remedies clause for ADGM compliance regarding specific performance or equitable relief.",
            "Processing Error": "Analysis failed for this document. Please check the file format or contact support."
        }

        issues_summary_list = [] # Flat list for the structured output format
        for file_info in issues_per_file:
            for rf in file_info["red_flags"]:
                keyword = rf.get("keyword", "N/A")
                severity = severity_map.get(keyword, "Low")
                # Use the specific keyword match for ambiguous wording to get a better suggestion
                if "ambiguous wording" in keyword:
                     suggestion = suggestion_map.get(keyword, "Review clause for clarity.")
                else:
                     suggestion = suggestion_map.get(keyword, "Review clause for ADGM compliance.")


                issues_summary_list.append({
                    "document": file_info.get("doc_type", file_info.get("file_name", "Unknown File")),
                    "section": rf.get("section", "Unknown Section"),
                    "issue": rf.get("keyword", "Unknown Issue Type"), # Use keyword as the issue type
                    "severity": severity,
                    "suggestion": suggestion
                })


        structured_output = {
            "process": legal_process,
            "documents_uploaded": len(uploaded_file_names),
            "required_documents": len(mandatory_list),
            "missing_document": checklist_missing[0] if checklist_missing else None, # Report only the first missing or None
            "issues_found": issues_summary_list
        }


        # Create JSON report and save to a temporary file
        try:
            with tempfile.NamedTemporaryFile(suffix=".json", delete=False) as tmp_json:
                json_bytes = json.dumps(structured_output, indent=2).encode("utf-8")
                tmp_json.write(json_bytes)
                json_file_path = tmp_json.name
            print(f"Generated temporary JSON file: {json_file_path}, size: {os.path.getsize(tmp_json.name)} bytes")
        except Exception as e:
            print(f"Error generating temporary JSON report file: {e}")
            json_file_path = None # Return None on error


        # Generate per_file_md (Keep detailed markdown for per-file view)
        per_file_md = "**Per-file summary:**\n\n"
        if issues_per_file:
            for file_info in issues_per_file:
                per_file_md += f"  **{file_info['file_name']}** — type: {file_info['doc_type']} (score {file_info['doc_score']})\n"
                if file_info["red_flags"]:
                    for rf in file_info["red_flags"]:
                        keyword = rf.get('keyword', 'N/A')
                        section = rf.get('section', 'Unknown Section')
                        context = rf.get('context', 'No context')
                        adgm_present_info = f" (ADGM context: {rf.get('adgm_present', 'N/A')})" if 'adgm_present' in rf else ''
                        per_file_md += f"    - **Issue:** {keyword} {adgm_present_info}\n"
                        per_file_md += f"      Section: {section}\n"
                        per_file_md += f"      Context: {context[:150]}{'...' if len(context)>150 else ''}\n" # Limit context length
                        per_file_md += f"      Severity: {severity_map.get(keyword, 'Low')}\n"
                        # Use the specific keyword match for ambiguous wording to get a better suggestion in markdown too
                        if "ambiguous wording" in keyword:
                            suggestion = suggestion_map.get(keyword, "Review clause for clarity.")
                        else:
                            suggestion = suggestion_map.get(keyword, "Review clause.")
                        per_file_md += f"      Suggestion: {suggestion}\n"
                    per_file_md += "\n" # Add newline after red flags for a file
                else:
                    per_file_md += "    No red flags found.\n\n" # Add newline even if no flags
        else:
            per_file_md += "No files were successfully analyzed."

        # Return file paths instead of BytesIO objects
        return checklist_md, per_file_md, structured_output, zip_file_path, json_file_path

    except Exception as e:
        print(f"An unexpected top-level error occurred during analysis: {e}")
        import traceback
        traceback.print_exc() # Print traceback for debugging
        # Return error messages and None for files if a top-level exception occurs
        return f"An unexpected error occurred during analysis: {e}", "", {}, None, None



In [24]:

# --- Gradio UI ---

with gr.Blocks() as demo:
    gr.Markdown("# ADGM-Compliant Corporate Agent - Document Review")
    gr.Markdown("Upload legal document(s) (.docx) for analysis and receive compliance feedback.")

    uploaded_files = gr.File(
        file_types=[".docx"],
        file_count="multiple",
        type="filepath",  # returns list of file paths
        label="Upload .docx files"
    )

    analyze_btn = gr.Button("Analyze Documents")

    checklist_out = gr.Markdown()
    per_file_out = gr.Markdown()
    structured_out = gr.JSON()
    # Change outputs to accept file paths
    download_zip = gr.File(label="Download Reviewed Documents (ZIP)", interactive=False)
    download_json = gr.File(label="Download Analysis Report (JSON)", interactive=False)

    analyze_btn.click(
        analyze_documents,
        inputs=[uploaded_files],
        outputs=[checklist_out, per_file_out, structured_out, download_zip, download_json]
    )

demo.launch()

It looks like you are running Gradio on a hosted Jupyter notebook, which requires `share=True`. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://891a81b1df2bec9243.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


