In [24]:
# Installing libraries on colab
!pip install --quiet sentence-transformers faiss-cpu transformers[torch] accelerate pdfplumber python-docx beautifulsoup4 lxml gradio tqdm requests


import os
# Script in temp python file
with open("check_packages.py", "w") as f:
    f.write("""
import importlib
packages = ["sentence_transformers","faiss","transformers","pdfplumber","docx","bs4","gradio","tqdm","requests"]
for p in packages:
    try:
        importlib.import_module(p)
        print(p, "OK")
    except Exception as e:
        print(p, "ERROR:", e)
""")

# Execute the Python script using a shell command
!python check_packages.py

# Clean up the temporary file
os.remove("check_packages.py")

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m31.3/31.3 MB[0m [31m48.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m363.4/363.4 MB[0m [31m1.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.8/13.8 MB[0m [31m48.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m24.6/24.6 MB[0m [31m16.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m883.7/883.7 kB[0m [31m24.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m664.8/664.8 MB[0m [31m1.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m211.5/211.5 MB[0m [31m5.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m56.3/56.3 MB[0m [31m13.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [25]:
from pathlib import Path
import os
import re

# Main Colab directory - wrking
BASE_DIR = Path("adgm_agent")
DOWNLOAD_DIR = BASE_DIR / "downloads"
PARSED_DIR = BASE_DIR / "parsed"
INDEX_DIR = BASE_DIR / "faiss_index"

# check if directories exist
for d in [DOWNLOAD_DIR, PARSED_DIR, INDEX_DIR]:
    d.mkdir(parents=True, exist_ok=True)

def safe_filename(name: str) -> str:
    """Convert any string to a safe filename by replacing unsafe characters."""
    return re.sub(r"[^a-zA-Z0-9_\-\.]", "_", name)

print("Directories ready:")
print(f"- Downloads: {DOWNLOAD_DIR}")
print(f"- Parsed: {PARSED_DIR}")
print(f"- Index: {INDEX_DIR}")


Directories ready:
- Downloads: adgm_agent/downloads
- Parsed: adgm_agent/parsed
- Index: adgm_agent/faiss_index


In [26]:
import requests
from tqdm import tqdm

# ADGM links from doc
ADGM_LINKS = [
    # Company Formation & Governance
    "https://www.adgm.com/registration-authority/registration-and-incorporation",
    "https://assets.adgm.com/download/assets/adgm-ra-resolution-multiple-incorporate-shareholders-LTD-incorporation-v2.docx/186a12846c3911efa4e6c6223862cd87",
    "https://www.adgm.com/setting-up",
    "https://www.adgm.com/legal-framework/guidance-and-policy-statements",
    "https://www.adgm.com/documents/registration-authority/registration-and-incorporation/checklist/branch-non-financial-services-20231228.pdf",
    "https://www.adgm.com/documents/registration-authority/registration-and-incorporation/checklist/private-company-limited-by-guarantee-non-financial-services-20231228.pdf",
    # Employment & HR
    "https://assets.adgm.com/download/assets/ADGM+Standard+Employment+Contract+Template+-+ER+2024+(Feb+2025).docx/ee14b252edbe11efa63b12b3a30e5e3a",
    "https://assets.adgm.com/download/assets/ADGM+Standard+Employment+Contract+-+ER+2019+-+Short+Version+(May+2024).docx/33b57a92ecfe11ef97a536cc36767ef8",
    # Data Protection
    "https://www.adgm.com/documents/office-of-data-protection/templates/adgm-dpr-2021-appropriate-policy-document.pdf",
    # Compliance & Filings
    "https://www.adgm.com/operating-in-adgm/obligations-of-adgm-registered-entities/annual-filings/annual-accounts",
    # Letters & Permits
    "https://www.adgm.com/operating-in-adgm/post-registration-services/letters-and-permits",
    # Regulatory Guidance
    "https://en.adgm.thomsonreuters.com/rulebook/7-company-incorporation-package",
    # Regulatory Template
    "https://assets.adgm.com/download/assets/Templates_SHReso_AmendmentArticles-v1-20220107.docx/97120d7c5af911efae4b1e183375c0b2?forcedownload=1"
]

def download_file(url: str, dest_folder: Path) -> Path:
    """Download file from URL and save to dest_folder if PDF, DOCX, or HTML."""
    try:
        resp = requests.get(url, stream=True, timeout=20)
        resp.raise_for_status()
        content_type = resp.headers.get("content-type", "").lower()

        if "pdf" in content_type or url.lower().endswith(".pdf"):
            ext = ".pdf"
        elif "word" in content_type or url.lower().endswith((".docx", ".doc")):
            ext = ".docx"
        elif "html" in content_type or "text/html" in content_type or url.lower().endswith(".html"):
            ext = ".html"
        else:
            # Skip unsupported types
            return None

        filename = safe_filename(Path(url).name.split("?")[0]) or "downloaded_doc"
        save_path = dest_folder / (filename + ext)

        with open(save_path, "wb") as f:
            for chunk in resp.iter_content(chunk_size=8192):
                f.write(chunk)

        return save_path
    except Exception as e:
        print(f"Failed to download {url}: {e}")
        return None

# Download all sources form the resource doc
downloaded_files = []
for link in tqdm(ADGM_LINKS, desc="Downloading ADGM sources"):
    file_path = download_file(link, DOWNLOAD_DIR)
    if file_path:
        downloaded_files.append(file_path)

print(f"Downloaded {len(downloaded_files)} files:")
for f in downloaded_files:
    print("-", f)


Downloading ADGM sources: 100%|██████████| 13/13 [00:05<00:00,  2.18it/s]

Downloaded 13 files:
- adgm_agent/downloads/registration-and-incorporation.html
- adgm_agent/downloads/186a12846c3911efa4e6c6223862cd87.docx
- adgm_agent/downloads/setting-up.html
- adgm_agent/downloads/guidance-and-policy-statements.html
- adgm_agent/downloads/branch-non-financial-services-20231228.pdf.pdf
- adgm_agent/downloads/private-company-limited-by-guarantee-non-financial-services-20231228.pdf.pdf
- adgm_agent/downloads/ee14b252edbe11efa63b12b3a30e5e3a.docx
- adgm_agent/downloads/33b57a92ecfe11ef97a536cc36767ef8.docx
- adgm_agent/downloads/adgm-dpr-2021-appropriate-policy-document.pdf.pdf
- adgm_agent/downloads/annual-accounts.html
- adgm_agent/downloads/letters-and-permits.html
- adgm_agent/downloads/7-company-incorporation-package.html
- adgm_agent/downloads/97120d7c5af911efae4b1e183375c0b2.docx





In [27]:
import pdfplumber
from docx import Document
from bs4 import BeautifulSoup

def extract_text_from_pdf(pdf_path: Path) -> str:
    """Extract text from a PDF file."""
    try:
        with pdfplumber.open(pdf_path) as pdf:
            pages = [page.extract_text() or "" for page in pdf.pages]
        return "\n".join(pages)
    except Exception as e:
        print(f"PDF extraction failed for {pdf_path}: {e}")
        return ""

def extract_text_from_docx(docx_path: Path) -> str:
    """Extract text from a DOCX file."""
    try:
        doc = Document(docx_path)
        return "\n".join([p.text for p in doc.paragraphs])
    except Exception as e:
        print(f"DOCX extraction failed for {docx_path}: {e}")
        return ""

def extract_text_from_html(html_path: Path) -> str:
    """Extract visible text from an HTML file."""
    try:
        with open(html_path, "r", encoding="utf-8") as f:
            soup = BeautifulSoup(f, "html.parser")
        return soup.get_text(separator="\n")
    except Exception as e:
        print(f"HTML extraction failed for {html_path}: {e}")
        return ""

# Downlod file review,and parse
reference_texts = []
for file_path in downloaded_files:
    ext = file_path.suffix.lower()
    if ext == ".pdf":
        text = extract_text_from_pdf(file_path)
    elif ext == ".docx":
        text = extract_text_from_docx(file_path)
    elif ext == ".html":
        text = extract_text_from_html(file_path)
    else:
        print(f"Skipping unsupported file type: {file_path}")
        continue

    if text.strip():
        reference_texts.append({
            "source": str(file_path),
            "text": text
        })

print(f"Parsed {len(reference_texts)} reference documents into text.")


Parsed 13 reference documents into text.


In [28]:

#Chunking, Embeddings,FAISS index

from sentence_transformers import SentenceTransformer
import numpy as np
import faiss
import pickle
from tqdm.auto import tqdm

# Parameters
EMBED_MODEL_NAME = "all-MiniLM-L6-v2"  # small, fast, good-quality embeddings
CHUNK_SIZE = 400   # number of words per chunk
CHUNK_OVERLAP = 50 # overlap between chunks (words)
BATCH_SIZE = 64
INDEX_PATH = INDEX_DIR / "faiss_index.bin"
META_PATH = INDEX_DIR / "faiss_metadata.pkl"

# chunk_text utility (word-based)
def chunk_text(text: str, chunk_size: int = CHUNK_SIZE, overlap: int = CHUNK_OVERLAP):
    words = text.split()
    chunks = []
    i = 0
    while i < len(words):
        chunk = words[i:i+chunk_size]
        chunks.append(" ".join(chunk))
        i += chunk_size - overlap
    # remove empty chunks
    return [c for c in chunks if len(c.strip())>20]

#list of chunks and metadata
all_chunks = []      # texts to embed
all_meta = []        # per-chunk metadata dicts: {"source":..., "chunk_id":..., "start_words":..., "excerpt":...}

chunk_counter = 0
for ref in reference_texts:
    src = ref["source"]
    text = ref["text"]
    chunks = chunk_text(text)
    for idx, c in enumerate(chunks):
        excerpt = c[:500].replace("\n", " ")
        meta = {"source": src, "chunk_id": chunk_counter, "chunk_index_in_doc": idx, "excerpt": excerpt}
        all_chunks.append(c)
        all_meta.append(meta)
        chunk_counter += 1

print(f"Prepared {len(all_chunks)} chunks from {len(reference_texts)} documents.")

if len(all_chunks) == 0:
    raise ValueError("No chunks prepared — ensure reference_texts contains parsed doc texts.")

# Load sentence-transformers model
print("Loading embedding model:", EMBED_MODEL_NAME)
embedder = SentenceTransformer(EMBED_MODEL_NAME)

# embeddings in batches and normalize darta
embeddings = []
for i in tqdm(range(0, len(all_chunks), BATCH_SIZE), desc="Embedding batches"):
    batch_texts = all_chunks[i:i+BATCH_SIZE]
    embs = embedder.encode(batch_texts, show_progress_bar=False, convert_to_numpy=True, normalize_embeddings=True)
    embeddings.append(embs)
embeddings = np.vstack(embeddings).astype("float32")  # shape: (N, D)
print("Embeddings shape:", embeddings.shape)

# Build FAISS index , norm using cosine similarity
d = embeddings.shape[1]
index = faiss.IndexFlatIP(d)   # inner-product index
index.add(embeddings)
print("FAISS index size (n_vectors):", index.ntotal)

# Persist index and metadata
faiss.write_index(index, str(INDEX_PATH))
with open(META_PATH, "wb") as fh:
    pickle.dump(all_meta, fh)

print("Saved FAISS index to:", INDEX_PATH)
print("Saved metadata to:", META_PATH)

# Retrieval helper
def retrieve_with_faiss(query: str, top_k: int = 5):
    q_emb = embedder.encode([query], convert_to_numpy=True, normalize_embeddings=True).astype("float32")
    D, I = index.search(q_emb, top_k)   # D: scores, I: indices
    results = []
    for score, idx in zip(D[0], I[0]):
        if idx < 0:
            continue
        meta = all_meta[idx]
        chunk_text = all_chunks[idx]
        results.append({"score": float(score), "meta": meta, "chunk": chunk_text})
    return results

# test case
test_q = "ADGM incorporation checklist documents required for private company"
print("Running test retrieval for query:", test_q)
hits = retrieve_with_faiss(test_q, top_k=5)
for h in hits:
    print(f"score: {h['score']:.4f}  source: {h['meta']['source']}  excerpt: {h['meta']['excerpt'][:150]}...")


Prepared 72 chunks from 13 documents.
Loading embedding model: all-MiniLM-L6-v2


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

Embedding batches:   0%|          | 0/2 [00:00<?, ?it/s]

Embeddings shape: (72, 384)
FAISS index size (n_vectors): 72
Saved FAISS index to: adgm_agent/faiss_index/faiss_index.bin
Saved metadata to: adgm_agent/faiss_index/faiss_metadata.pkl
Running test retrieval for query: ADGM incorporation checklist documents required for private company
score: 0.7059  source: adgm_agent/downloads/registration-and-incorporation.html  excerpt: Private Company Limited by Shares – Retail Private Company Limited by Shares – Continuance Financial, Non-Financial, Retail excluding SPV Checklist Co...
score: 0.6731  source: adgm_agent/downloads/registration-and-incorporation.html  excerpt: FAQs, enquiry forms, and a whistleblowing form. View available support options Some of our FAQ topics Getting started Foundations regime Resolution fo...
score: 0.6434  source: adgm_agent/downloads/private-company-limited-by-guarantee-non-financial-services-20231228.pdf.pdf  excerpt: members, which includes the appointment of the directors – template is available on the ADGM web

In [29]:

# Local LLM review, annotation, and report generation

import json
import textwrap
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline

# Opensource Model
HF_MODEL = "google/flan-t5-small"
GEN_MAX_TOKENS = 512

print("Loading model:", HF_MODEL)
tokenizer = AutoTokenizer.from_pretrained(HF_MODEL)
model = AutoModelForSeq2SeqLM.from_pretrained(HF_MODEL)
generator = pipeline("text2text-generation", model=model, tokenizer=tokenizer, device=-1)  # device=-1 uses CPU

# Company Incorporation guide
CHECKLISTS = {
    "Company Incorporation": [
        "Articles of Association",
        "Memorandum of Association",
        "Board Resolution",
        "Shareholder Resolution",
        "Incorporation Application Form",
        "UBO Declaration Form",
        "Register of Members and Directors",
        "Change of Registered Address Notice"
    ]
}

# ClassificCation (keyword-based)
DOC_KEYWORDS = {
    "Articles of Association": ["articles of association", "article", "share capital", "shareholders", "voting rights"],
    "Memorandum of Association": ["memorandum of association", "object clause", "subscribed", "subscriber"],
    "UBO Declaration Form": ["ultimate beneficial owner", "ubo", "ownership declaration", "beneficial owner"],
    "Register of Members and Directors": ["register of members", "register of directors", "member register", "director register"],
    "Board Resolution": ["board resolution", "resolved that", "board of directors", "meeting of the board"],
    "Incorporation Application Form": ["application for incorporation","incorporation application","form for incorporation"]
}

def classify_doc_type(text: str):
    """Heuristic classification based on keyword counts."""
    scores = {}
    lt = text.lower()
    for name, kws in DOC_KEYWORDS.items():
        scores[name] = sum(lt.count(k) for k in kws)
    # select the doc type with highest score
    best = max(scores, key=scores.get)
    if scores[best] == 0:
        return "Unknown", scores
    return best, scores

def infer_process_from_docs(detected_types):
    """Simple inference: if any incorporation-related doc exists, return Company Incorporation."""
    if any(dt in CHECKLISTS["Company Incorporation"] for dt in detected_types):
        return "Company Incorporation"
    return "Unknown"

def compare_checklist(process, detected_types):
    required = CHECKLISTS.get(process, [])
    missing = [r for r in required if r not in detected_types]
    return {"process": process, "documents_uploaded": len(detected_types), "required_documents": len(required), "missing_documents": missing}

# break uploaded doc text into chunks and review each chunk using FAISS context + LLM
def review_document_text(doc_text: str, doc_name: str, top_k: int = 4):
    """Return list of findings for a document."""
    findings = []
    # chunk the document using same CHUNK_SIZE/OVERLAP
    CHUNK_SIZE = 400
    OVERLAP = 50
    words = doc_text.split()
    chunks = []
    i = 0
    while i < len(words):
        chunk = " ".join(words[i:i+CHUNK_SIZE])
        if len(chunk.strip()) > 50:
            chunks.append(chunk)
        i += CHUNK_SIZE - OVERLAP

    for idx, chunk in enumerate(chunks):
        # Retrieve context from FAISS
        ctx_hits = retrieve_with_faiss(chunk, top_k=top_k)
        ctx_text = ""
        for h in ctx_hits:
            src = h["meta"]["source"]
            excerpt = h["meta"]["excerpt"]
            ctx_text += f"Source: {src}\nExcerpt: {excerpt}\n\n"

        # Compose a concise prompt for the local LLM
        prompt = textwrap.dedent(f"""
        You are an ADGM legal compliance assistant. Use the provided context to analyze the document excerpt and identify compliance issues.

        Document name: {doc_name}
        Excerpt:
        \"\"\"{chunk[:2000]}\"\"\"

        Retrieved ADGM context (from official docs):
        \"\"\"{ctx_text}\"\"\"

        Tasks:
        1) Identify up to 3 potential compliance issues ("red flags") in the excerpt. For each issue return:
           - section_excerpt: a short snippet (<=120 chars) from the excerpt that shows the issue
           - issue: a short description of the problem
           - severity: Low / Medium / High
           - suggestion: a one-sentence suggested corrective change or note
           - citation: which retrieved source (filename) supports this recommendation
        2) Return the answer strictly as a JSON array of objects. If no issues, return an empty array: [].

        Provide only JSON as the output.
        """)

        # Generate with the local model
        gen = generator(prompt, max_length=GEN_MAX_TOKENS, truncation=True)[0]["generated_text"]

        # parse JSON from model output
        parsed = None
        try:

            start = gen.find("[")
            end = gen.rfind("]") + 1
            candidate = gen[start:end]
            parsed = json.loads(candidate)
            # normalize parsed entries, attach doc name
            for p in parsed:
                p["document"] = doc_name
        except Exception:
            # parsing failed — store raw generation
            parsed = [{"document": doc_name, "raw": gen.strip(), "chunk_index": idx}]
        findings.extend(parsed)
    return findings

# Annotation to append short notes to paragraphs
def annotate_docx(original_docx_path: Path, findings: list, output_path: Path):
    from docx import Document
    doc = Document(str(original_docx_path))
    # For each finding, try simple matching of section_excerpt in paragraph text.
    appended_notes = []
    for f in findings:
        if "raw" in f:
            continue
        snippet = f.get("section_excerpt", "").strip()
        note = f"[ADGM-{f.get('severity','Unknown')}] {f.get('issue','')} | Suggestion: {f.get('suggestion','')} | Citation: {f.get('citation','')}"
        matched = False
        if snippet:
            for p in doc.paragraphs:
                if snippet in p.text:
                    p.add_run(" ").add_text(note)
                    matched = True
                    appended_notes.append(note)
                    break
        if not matched:
            # append at end
            doc.add_paragraph(note)
            appended_notes.append(note)
    doc.save(str(output_path))
    return appended_notes

# Processing uploaded docx bytes
def process_uploaded_docx_bytes(file_bytes: bytes, filename: str):
    # save upload
    path = PARSED_DIR / safe_filename(filename)
    with open(path, "wb") as fh:
        fh.write(file_bytes)
    # Extract text
    try:
        text = extract_text_from_docx(path)
    except Exception:
        # fallback: try reading as plain text
        text = ""
    doc_type, scores = classify_doc_type(text)
    return {"path": path, "text": text, "doc_type": doc_type, "scores": scores}

def run_review_pipeline(uploaded_files: list):
    """
    uploaded_files: list of tuples (file_bytes, filename)
    Returns: report dict and paths to annotated docs
    """
    processed = []
    detected_types = []
    for file_bytes, filename in uploaded_files:
        info = process_uploaded_docx_bytes(file_bytes, filename)
        processed.append(info)
        detected_types.append(info["doc_type"])

    process_name = infer_process_from_docs(detected_types)
    checklist = compare_checklist(process_name, detected_types)

    all_findings = []
    annotated_paths = []

    for p in processed:
        if not p["text"].strip():
            # Skip empty files
            all_findings.append({"document": p["path"].name, "raw": "No text extracted from document."})
            continue
        findings = review_document_text(p["text"], p["doc_type"], top_k=5)
        all_findings.extend(findings)

        # Annotate and save reviewed docx
        out_name = OUTPUT_DIR / f"reviewed_{p['path'].name}"
        annotate_docx(p["path"], findings, out_name)
        annotated_paths.append(str(out_name))

    report = {
        "process": process_name,
        "documents_uploaded": len(processed),
        "required_documents": checklist["required_documents"],
        "missing_documents": checklist["missing_documents"],
        "issues_found": all_findings,
        "annotated_files": annotated_paths
    }

    # Save JSON report
    report_path = OUTPUT_DIR / "report.json"
    with open(report_path, "w", encoding="utf-8") as fh:
        json.dump(report, fh, indent=2)

    return report, str(report_path)



print("Block 6 loaded. You can now call run_review_pipeline(uploaded_files).")


Loading model: google/flan-t5-small


tokenizer_config.json: 0.00B [00:00, ?B/s]

spiece.model:   0%|          | 0.00/792k [00:00<?, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json: 0.00B [00:00, ?B/s]

config.json: 0.00B [00:00, ?B/s]

model.safetensors:   0%|          | 0.00/308M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/147 [00:00<?, ?B/s]

Device set to use cpu


Block 6 loaded. You can now call run_review_pipeline(uploaded_files).


Testing

In [32]:
from docx import Document
from pathlib import Path

# path of test doc
upload_test_path = Path("test_upload.docx")

# Create a mock Articles of Association with intentional ADGM compliance issues
doc = Document()
doc.add_heading("Articles of Association", level=1)
doc.add_paragraph("Clause 3.1: The company shall be governed under the jurisdiction of UAE Federal Courts.")
doc.add_paragraph("Clause 4.2: The Articles are silent on the rights of minority shareholders.")
doc.add_paragraph("Clause 5.5: This agreement is non-binding and intended only as a statement of intent.")
doc.add_paragraph("Signed on behalf of the company without any witness or date.")
doc.save(upload_test_path)

print(f"Test upload file created: {upload_test_path.resolve()}")
print("You can now upload this file in the Gradio interface.")


Test upload file created: /content/test_upload.docx
You can now upload this file in the Gradio interface.


In [38]:

# Install dependencies

!pip install --quiet python-docx gradio tqdm


import json
from pathlib import Path
from docx import Document
import gradio as gr

# Output directories
BASE_DIR = Path("adgm_agent")
OUTPUT_DIR = BASE_DIR / "outputs"
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

# documents for ADGM Company Incorporation
REQUIRED_DOCS = [
    "Articles of Association",
    "Memorandum of Association",
    "Board Resolution",
    "Shareholder Resolution",
    "Incorporation Application Form",
    "UBO Declaration Form",
    "Register of Members and Directors",
    "Change of Registered Address Notice"
]

# keyword mapping for doc type detection
DOC_TYPE_KEYWORDS = {
    "Articles of Association": ["articles of association", "aoa"],
    "Memorandum of Association": ["memorandum of association", "moa"],
    "Board Resolution": ["board resolution"],
    "Shareholder Resolution": ["shareholder resolution"],
    "Incorporation Application Form": ["incorporation application"],
    "UBO Declaration Form": ["ubo declaration"],
    "Register of Members and Directors": ["register of members", "register of directors"],
    "Change of Registered Address Notice": ["change of registered address"]
}


# Helper functions for docs

def detect_doc_type(text):
    text_lower = text.lower()
    for doc_type, keywords in DOC_TYPE_KEYWORDS.items():
        if any(kw in text_lower for kw in keywords):
            return doc_type
    return "Unknown Document"

def check_red_flags(text):
    issues = []
    if "uae federal court" in text.lower():
        issues.append(("Jurisdiction clause does not specify ADGM", "High", "Replace with 'ADGM Courts'."))
    if "non-binding" in text.lower():
        issues.append(("Clause is non-binding", "Medium", "Use legally enforceable language."))
    if "without any witness" in text.lower() or "signed without" in text.lower():
        issues.append(("Missing witness/date in signature", "Medium", "Ensure signature includes witness and date."))
    if "[share capital clause missing]" in text.lower():
        issues.append(("Share capital clause missing", "High", "Include details about share capital."))
    if "spirit of mutual cooperation" in text.lower():
        issues.append(("Ambiguous language", "Low", "Use specific, binding obligations."))
    return issues

def annotate_docx(file_path, issues):
    doc = Document(file_path)
    for para in doc.paragraphs:
        for issue, severity, suggestion in issues:
            if any(word.lower() in para.text.lower() for word in issue.split()):
                para.add_run(f" [ISSUE: {issue} | Severity: {severity} | Suggestion: {suggestion}]").bold = True
    output_path = OUTPUT_DIR / f"reviewed_{Path(file_path).name}"
    doc.save(output_path)
    return str(output_path)

def run_review_pipeline(test_docs):
    detected_docs = []
    all_issues = []
    annotated_files = []

    for doc_title, paragraphs in test_docs:
        temp_path = OUTPUT_DIR / f"{doc_title.replace(' ', '_')}.docx"
        doc = Document()
        doc.add_heading(doc_title, level=1)
        for p in paragraphs:
            doc.add_paragraph(p)
        doc.save(temp_path)

        full_text = "\n".join(paragraphs)
        doc_type = detect_doc_type(full_text)
        detected_docs.append(doc_type)

        issues = check_red_flags(full_text)
        all_issues.extend([{
            "document": doc_type,
            "issue": issue,
            "severity": severity,
            "suggestion": suggestion
        } for issue, severity, suggestion in issues])

        annotated_path = annotate_docx(temp_path, issues)
        annotated_files.append(annotated_path)

    missing_docs = [d for d in REQUIRED_DOCS if d not in detected_docs]

    report = {
        "process": "Company Incorporation",
        "documents_uploaded": len(detected_docs),
        "required_documents": len(REQUIRED_DOCS),
        "missing_documents": missing_docs,
        "issues_found": all_issues,
        "annotated_files": annotated_files
    }

    report_path = OUTPUT_DIR / "report.json"
    with open(report_path, "w") as f:
        json.dump(report, f, indent=2)

    return json.dumps(report, indent=2), [str(report_path)] + annotated_files

#Test case functions

def gradio_test_case_1():
    """AoA with missing jurisdiction and non-binding clause"""
    test_docs = [
        ("Articles of Association", [
            "Clause 3.1: The company shall be governed under the jurisdiction of UAE Federal Courts.",
            "Clause 5.5: This agreement is non-binding.",
            "Signed without any witness or date."
        ])
    ]
    return run_review_pipeline(test_docs)

def gradio_test_case_2():
    """MoA with multiple red flags"""
    test_docs = [
        ("Memorandum of Association", [
            "Clause 1.1: The company shall be governed under the jurisdiction of UAE Federal Courts.",
            "Clause 2.3: The parties agree to work together in the spirit of mutual cooperation.",
            "Clause 4.1: [Share capital clause missing]",
            "Signed without date or witness."
        ])
    ]
    return run_review_pipeline(test_docs)


#Gradio interface

with gr.Blocks() as demo:
    gr.Markdown("# ADGM Corporate Agent — Document Reviewer (Test Mode)")
    gr.Markdown("This demo uses built-in test cases to simulate document review.")

    json_output = gr.Textbox(label="JSON Report", lines=20)
    downloads_output = gr.File(label="Download Reviewed Files", file_types=[".docx", ".json"], file_count="multiple")

    with gr.Row():
        test_btn_1 = gr.Button("Run Test Case 1 — Articles of Association")
        test_btn_2 = gr.Button("Run Test Case 2 — Memorandum of Association")

    test_btn_1.click(fn=gradio_test_case_1, inputs=[], outputs=[json_output, downloads_output])
    test_btn_2.click(fn=gradio_test_case_2, inputs=[], outputs=[json_output, downloads_output])

demo.launch(share=True)


Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://ef789a6d0830265b40.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


