# **Install Dependencies**

In [1]:
!pip install --quiet python-docx gradio

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/253.0 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m253.0/253.0 kB[0m [31m7.3 MB/s[0m eta [36m0:00:00[0m
[?25h

# **Imports + Create Folders + Default Rules + Sample docx**

In [2]:
import os, json, zipfile, re, textwrap
from docx import Document
import gradio as gr

# Create folders
os.makedirs('rules', exist_ok=True)
os.makedirs('rules_texts', exist_ok=True)   # optional ADGM text files go here
os.makedirs('examples', exist_ok=True)
os.makedirs('output', exist_ok=True)

# Default (example) ADGM rules JSON.
# You should replace/add entries in rules/adgm_rules.json with the actual checklists and snippets
sample_rules = {
  "company_incorporation": {
    "display_name": "Company Incorporation",
    "required_docs": [
      "Articles of Association",
      "Memorandum of Association",
      "UBO Declaration Form",
      "Incorporation Application Form",
      "Register of Members and Directors"
    ],
    # keywords used to detect doc types inside uploaded .docx
    "doc_type_keywords": {
      "Articles of Association": ["articles of association", "aoa"],
      "Memorandum of Association": ["memorandum of association", "moa", "memorandum"],
      "UBO Declaration Form": ["ubo declaration", "ultimate beneficial owner"],
      "Incorporation Application Form": ["incorporation application", "incorporation application form"],
      "Register of Members and Directors": ["register of members", "register of directors"]
    },
    # red flag rules: pattern -> if must_exist==True then it's required (issue if missing),
    # otherwise issue if the pattern is found.
    "red_flags": [
      {"pattern": "UAE Federal Courts", "label": "Incorrect jurisdiction", "must_exist": False, "severity": "High", "suggestion": "Replace with 'ADGM Courts'."},
      {"pattern": "signature", "label": "Missing signature block", "must_exist": True, "severity": "High", "suggestion": "Add a signature block and dated signature."},
      {"pattern": "ambiguous", "label": "Ambiguous language", "must_exist": False, "severity": "Medium", "suggestion": "Clarify the clause to remove ambiguous terms."}
    ],
    # optional static citation snippets to include in comments when relevant
    "citation_snippets": {
      "Incorrect jurisdiction": "Per ADGM Companies Regulations 2020, jurisdiction must reference ADGM Courts (example snippet).",
      "Missing signature block": "ADGM requires signature blocks for executed documents per registration guidance (example snippet)."
    }
  }
}

rules_path = 'rules/adgm_rules.json'
if not os.path.exists(rules_path):
    with open(rules_path,'w') as f:
        json.dump(sample_rules, f, indent=2)
    print(f"Created example rules at {rules_path} (edit this with ADGM content if you have it).")
else:
    print(f"Using existing rules at {rules_path}")

# create a small sample .docx (so you can test immediately)
sample_doc = 'examples/sample_before.docx'
if not os.path.exists(sample_doc):
    doc = Document()
    doc.add_heading('Articles of Association', level=1)
    doc.add_paragraph('This is the Articles of Association for the company.')
    doc.add_paragraph('Jurisdiction: UAE Federal Courts')  # intentionally bad to trigger red flag
    doc.add_paragraph('Some unclear ambiguous clause that needs clarity.')
    # note: intentionally leave signature blank to trigger missing-signature detection
    doc.add_paragraph('Date: ')
    doc.save(sample_doc)
    print(f"Sample doc created at {sample_doc}")
else:
    print(f"Sample doc already exists at {sample_doc}")

Created example rules at rules/adgm_rules.json (edit this with ADGM content if you have it).
Sample doc created at examples/sample_before.docx


# **Helper Functions**
**(extraction, detection, comment insertion, local retrieval)**

In [3]:
import io
from pathlib import Path

# Load rules
with open('rules/adgm_rules.json','r') as f:
    adgm_rules = json.load(f)

def extract_paragraphs(docx_path):
    """Return a list of non-empty paragraph texts from a .docx file."""
    doc = Document(docx_path)
    paras = [p.text for p in doc.paragraphs]
    # keep even blank paragraphs for indexing consistency — but strip empties if desired
    return paras

def text_of_doc(docx_path):
    """Return full text as single string (joined paragraphs)."""
    paras = extract_paragraphs(docx_path)
    return "\n".join(paras)

def detect_doc_types_in_text(text, rules):
    """
    Given the text and a process rules dict, return list of matched doc types
    using the doc_type_keywords mapping.
    """
    found = []
    ltext = text.lower()
    for doc_name, keywords in rules.get("doc_type_keywords", {}).items():
        for kw in keywords:
            if kw.lower() in ltext:
                found.append(doc_name)
                break
    return found

def detect_red_flags_in_text(text, rules):
    """
    Returns list of issue dicts found in the text per rules['red_flags'].
    Each issue has: label, pattern, severity, suggestion, location (paragraph idx or 'Not found'), snippet.
    """
    issues = []
    paras = extract_paragraphs_from_text = text.splitlines()
    ltext = text.lower()
    for rf in rules.get("red_flags", []):
        pattern = rf["pattern"].lower()
        must_exist = rf.get("must_exist", False)
        label = rf.get("label", pattern)
        severity = rf.get("severity", "Medium")
        suggestion = rf.get("suggestion", "")
        # Case 1: pattern must exist but is missing -> issue
        if must_exist:
            if pattern not in ltext:
                issues.append({
                    "label": label,
                    "pattern": rf["pattern"],
                    "severity": severity,
                    "suggestion": suggestion,
                    "location": "Not found (required content missing)",
                    "snippet": ""
                })
            # if it exists, no issue
            continue
        # Case 2: pattern is problematic if present
        if pattern in ltext:
            # find first paragraph that contains the pattern (for contextual location)
            location = None
            snippet = ""
            for idx, p in enumerate(paras):
                if pattern in p.lower():
                    snippet = p.strip()
                    location = f"Paragraph {idx+1}"
                    break
            if location is None:
                location = "Found in document (no paragraph match)"
            issues.append({
                "label": label,
                "pattern": rf["pattern"],
                "severity": severity,
                "suggestion": suggestion,
                "location": location,
                "snippet": snippet
            })
    return issues

def local_retrieve_snippets(pattern, texts_folder='rules_texts', radius=120):
    """
    Simulated local 'retrieval' from uploaded ADGM reference text files.
    Returns a list of (filename, snippet) for files that contain the pattern.
    If no files are present or no matches, returns [].
    """
    matches = []
    if not os.path.isdir(texts_folder):
        return matches
    for fname in os.listdir(texts_folder):
        path = os.path.join(texts_folder, fname)
        try:
            with open(path, 'r', encoding='utf-8', errors='ignore') as f:
                content = f.read().lower()
        except Exception:
            continue
        if pattern.lower() in content:
            idx = content.index(pattern.lower())
            start = max(0, idx - radius)
            end = min(len(content), idx + len(pattern) + radius)
            snippet = content[start:end].strip()
            matches.append((fname, snippet))
    return matches

def append_review_comments(original_docx_path, issues, out_path, rules_for_process=None):
    """
    Creates a reviewed copy of the original docx and appends
    clear REVIEW COMMENT paragraphs that reference paragraph indexes or missing items.
    """
    doc = Document(original_docx_path)
    doc.add_paragraph("")  # separator
    doc.add_paragraph("----- REVIEWER COMMENTS (automated) -----")
    if not issues:
        doc.add_paragraph("No issues detected by the offline checker.")
    else:
        for i, issue in enumerate(issues, start=1):
            header = f"COMMENT {i}: [{issue.get('label')}] (severity: {issue.get('severity')})"
            doc.add_paragraph(header)
            # location & snippet
            doc.add_paragraph(f"Location: {issue.get('location')}")
            snippet = issue.get('snippet', '')
            if snippet:
                # keep snippet reasonably short
                doc.add_paragraph("Context snippet: " + (snippet if len(snippet) < 500 else snippet[:500] + "..."))
            # suggestion
            doc.add_paragraph("Suggestion: " + issue.get('suggestion','No suggestion available.'))
            # try to add citation from rules if available
            if rules_for_process and rules_for_process.get('citation_snippets'):
                citation = rules_for_process['citation_snippets'].get(issue.get('label'))
                if citation:
                    doc.add_paragraph("ADGM reference (local rules file): " + citation)
            # as a fallback attempt to find a matching ADGM snippet in rules_texts
            retrieved = local_retrieve_snippets(issue.get('pattern',''))
            if retrieved:
                doc.add_paragraph("Local ADGM snippet(s) from rules_texts:")
                for fname, snip in retrieved[:2]:
                    doc.add_paragraph(f" - {fname}: {snip[:400]}...")
            doc.add_paragraph("")  # blank line between comments
    doc.save(out_path)
    return out_path

# **Main Pipeline**
**(process multiple uploaded .docx files → produce reviewed docs + JSON summary + ZIP)**

In [4]:

def process_uploaded_files(uploaded_files):
    """
    uploaded_files: single file or list of files uploaded via Gradio.
    Returns (summary_dict, zip_path)
    """
    # normalize input
    if uploaded_files is None:
        return {"error": "No files provided"}, None
    if not isinstance(uploaded_files, (list, tuple)):
        uploaded_files = [uploaded_files]
    processed = []
    # load rules once
    with open('rules/adgm_rules.json','r') as f:
        adgm_rules = json.load(f)
    # We'll record all detected doc types across files to decide the process
    for f in uploaded_files:
        # Gradio's uploaded file objects usually have a .name attribute with the temp path
        file_path = getattr(f, "name", None) or f
        if not str(file_path).lower().endswith('.docx'):
            # skip non-docx files
            continue
        # read text
        text = text_of_doc(file_path)
        # detect types & issues across all known processes
        detected_types = []
        issues_for_file = []
        for process_key, rules in adgm_rules.items():
            types = detect_doc_types_in_text(text, rules)
            if types:
                detected_types.extend(types)
            issues = detect_red_flags_in_text(text, rules)
            # attach process info to each issue
            for iss in issues:
                iss['detected_in_file'] = os.path.basename(file_path)
                iss['process_key'] = process_key
            issues_for_file.extend(issues)
        detected_types = list(sorted(set(detected_types)))
        reviewed_name = os.path.basename(file_path).replace('.docx','_reviewed.docx')
        reviewed_path = os.path.join('output', reviewed_name)
        # for creating comments include rules for best-guess process if exactly one process had matches,
        # otherwise pass a generic empty rules dict
        # (we will pick the final process below from aggregated counts)
        tentative_process = None
        # Choose tentative process as the one that has most doc keywords present in this file
        best_score = 0
        for pk, rules in adgm_rules.items():
            score = 0
            for dt in detected_types:
                if dt in rules.get('required_docs', []):
                    score += 1
            if score > best_score:
                best_score = score
                tentative_process = pk
        rules_for_comments = adgm_rules.get(tentative_process) if tentative_process else None
        append_review_comments(file_path, issues_for_file, reviewed_path, rules_for_comments)
        processed.append({
            "original_path": file_path,
            "reviewed_path": reviewed_path,
            "detected_types": detected_types,
            "issues": issues_for_file
        })
    # decide overall process: count matched required docs per process across all processed files
    process_scores = {}
    for pk, rules in adgm_rules.items():
        required = set(rules.get('required_docs', []))
        # how many of required docs were detected in uploaded files
        detected_across = set()
        for p in processed:
            for d in p['detected_types']:
                if d in required:
                    detected_across.add(d)
        process_scores[pk] = len(detected_across)
    # pick process with highest score (or Unknown)
    best_proc = max(process_scores.items(), key=lambda x: x[1])
    if best_proc[1] == 0:
        chosen_process = "Unknown"
        required_documents = []
        missing_documents = []
    else:
        chosen_process = best_proc[0]
        required_documents = adgm_rules[chosen_process]['required_docs']
        detected_across = set()
        for p in processed:
            for d in p['detected_types']:
                detected_across.add(d)
        missing_documents = [r for r in required_documents if r not in detected_across]
    # prepare final JSON
    issues_found = []
    for p in processed:
        for iss in p['issues']:
            issues_found.append({
                "document": os.path.basename(iss.get('detected_in_file', 'Unknown')),
                "label": iss.get('label'),
                "section": iss.get('location'),
                "issue": f"Matched pattern: {iss.get('pattern')}",
                "severity": iss.get('severity'),
                "suggestion": iss.get('suggestion')
            })
    summary = {
        "process": adgm_rules.get(chosen_process, {}).get('display_name', chosen_process) if chosen_process!="Unknown" else "Unknown",
        "documents_uploaded": len(processed),
        "required_documents": len(required_documents),
        "missing_documents": missing_documents,
        "issues_found": issues_found
    }
    # create ZIP with all reviewed docs + summary.json
    zip_path = os.path.join('output', 'reviewed_bundle.zip')
    with zipfile.ZipFile(zip_path, 'w') as zf:
        # add reviewed docs
        for p in processed:
            zf.write(p['reviewed_path'], arcname=os.path.basename(p['reviewed_path']))
        # add summary.json
        summary_path = os.path.join('output','summary.json')
        with open(summary_path,'w') as sf:
            json.dump(summary, sf, indent=2)
        zf.write(summary_path, arcname='summary.json')
    return summary, zip_path

# **Gradio UI**

In [6]:
def gradio_process(files):
    summary, zip_path = process_uploaded_files(files)
    return json.dumps(summary, indent=2), zip_path

demo = gr.Interface(
    fn=gradio_process,
    inputs=gr.File(file_count="multiple", label="Upload one or more .docx files"),
    outputs=[gr.Textbox(label="Analysis JSON"), gr.File(label="Download results (ZIP)")],
    title="ADGM Corporate Agent by Shubhi",
    description="Upload .docx files. The agent runs with keyword-based checks, appends reviewer comments into a reviewed .docx, and returns a ZIP with reviewed files + summary.json."
)

# In Colab, using share=True makes the UI reachable; you can set share=False but Colab often needs share=True to expose the UI.
demo.launch(share=True)

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://20b46cc72c1763ce9a.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


