In [7]:
# !pip3 install PyPDF2
# !pip3 install langdetect

In [8]:
import os
import re
from typing import Optional, Dict
from PyPDF2 import PdfReader
from dateutil import parser as date_parser
from datetime import datetime
from langdetect import detect, DetectorFactory

# Ensure consistent results
DetectorFactory.seed = 0

SUPPORTED_DOCUMENT_TYPES = {
    "TRADE LICENSE": "Trade License",
    "MEMORANDUM OF ASSOCIATION": "MOA / AOA",
    "BOARD RESOLUTION": "Board Resolution",
    "PASSPORT": "ID",
    "UAE PASSPORT": "ID",
    "BANK": "Bank Letter",
    "VAT REGISTRATION": "VAT / TRN",
    "BALANCE SHEET": "Financial Statement",
    "PROFIT & LOSS": "Financial Statement",
}


class DocumentProcessor:
    def extract_text(self, file_path: str) -> str:
        reader = PdfReader(file_path)
        text = ""
        for page in reader.pages:
            text += page.extract_text() + "\n"
        return text

    def detect_language(self, text: str) -> str:
        try:
            lang = detect(text)
            return lang  # e.g., "en", "ar", etc.
        except:
            return "unknown"

    def classify_document(self, text: str) -> Dict:
        confidence = 0.0
        detected_type = "Unsupported"

        text_upper = text.upper()
        for keyword, doc_type in SUPPORTED_DOCUMENT_TYPES.items():
            if keyword in text_upper:
                detected_type = doc_type
                confidence += 0.5

        # Boost confidence if strong header match
        first_lines = text_upper[:300]
        for keyword in SUPPORTED_DOCUMENT_TYPES.keys():
            if keyword in first_lines:
                confidence += 0.4
                break

        confidence = min(confidence, 0.99)

        return {
            "classType": detected_type,
            "confidence": round(confidence, 2)
        }

    def extract_date(self, text: str, label_patterns: list) -> Optional[str]:
        for pattern in label_patterns:
            match = re.search(pattern, text, re.IGNORECASE)
            if match:
                try:
                    parsed_date = date_parser.parse(match.group(1), dayfirst=True)
                    return parsed_date.date().isoformat()
                except:
                    continue
        return None

    def extract_issue_and_expiry(self, text: str) -> Dict:
        issue_patterns = [
            r"ISSUE DATE:\s*(.+)",
            r"DATE OF ISSUE:\s*(.+)",
            r"REGISTRATION DATE:\s*(.+)"
        ]

        expiry_patterns = [
            r"EXPIRY DATE:\s*(.+)",
            r"DATE OF EXPIRY:\s*(.+)"
        ]

        issue_date = self.extract_date(text, issue_patterns)
        expiry_date = self.extract_date(text, expiry_patterns)

        return {
            "issueDate": issue_date,
            "expiryDate": expiry_date
        }

    def process_document(self, file_path: str) -> Dict:
        text = self.extract_text(file_path)

        classification = self.classify_document(text)
        dates = self.extract_issue_and_expiry(text)
        language = self.detect_language(text)

        return {
            "fileName": os.path.basename(file_path),
            "classType": classification["classType"],
            "issueDate": dates["issueDate"],
            "expiryDate": dates["expiryDate"],
            "confidence": classification["confidence"],
            "language": language,  # ✅ added language detection
            "processedAt": datetime.utcnow().isoformat()
        }


def run_pipeline(folder_path: str):
    processor = DocumentProcessor()
    results = []

    for file in os.listdir(folder_path):
        if file.endswith(".pdf"):
            result = processor.process_document(os.path.join(folder_path, file))
            results.append(result)

    return results


# -----------------------------
# RUN
# -----------------------------
folder = "synthetic_documents"
output = run_pipeline(folder)

for doc in output:
    print(doc)
    classType=doc["classType"]
    issueDate=doc["issueDate"]
    expiryDate=doc["expiryDate"]
    confidence=doc["confidence"]
    language=doc["language"]
    print(classType,issueDate,expiryDate,confidence,language)

{'fileName': 'profit_loss.pdf', 'classType': 'Financial Statement', 'issueDate': None, 'expiryDate': None, 'confidence': 0.9, 'language': 'en', 'processedAt': '2026-02-25T11:36:17.940006'}
Financial Statement None None 0.9 en
{'fileName': 'id_john_smith.pdf', 'classType': 'ID', 'issueDate': '2022-01-01', 'expiryDate': '2032-01-01', 'confidence': 0.99, 'language': 'en', 'processedAt': '2026-02-25T11:36:17.941451'}
ID 2022-01-01 2032-01-01 0.99 en
{'fileName': 'balance_sheet.pdf', 'classType': 'Financial Statement', 'issueDate': None, 'expiryDate': None, 'confidence': 0.99, 'language': 'en', 'processedAt': '2026-02-25T11:36:17.943035'}
Financial Statement None None 0.99 en
{'fileName': 'bank_letter.pdf', 'classType': 'Bank Letter', 'issueDate': None, 'expiryDate': None, 'confidence': 0.9, 'language': 'en', 'processedAt': '2026-02-25T11:36:17.944345'}
Bank Letter None None 0.9 en
{'fileName': 'trade_license.pdf', 'classType': 'Trade License', 'issueDate': '2024-01-01', 'expiryDate': '2026

  "processedAt": datetime.utcnow().isoformat()


In [9]:
import os
import re
from typing import Dict, List, Optional
from PyPDF2 import PdfReader
from dateutil import parser as date_parser
from datetime import datetime
import json

# =========================================================
# Utility: Standard Field Builder (Traceable & Auditable)
# =========================================================

def build_field(value, source, confidence, method="regex_v1"):
    return {
        "value": value,
        "sourceDocument": source,
        "confidence": round(confidence, 2),
        "extractionMethod": method
    }

# =========================================================
# Document Processor
# =========================================================

class KYBExtractionPipeline:

    SUPPORTED_TYPES = {
        "TRADE LICENSE": "Trade License",
        "MEMORANDUM OF ASSOCIATION": "MOA / AOA",
        "BOARD RESOLUTION": "Board Resolution",
        "PASSPORT": "ID",
        "BANK": "Bank Letter",
        "VAT REGISTRATION": "VAT / TRN",
        "BALANCE SHEET": "Balance Sheet",
        "PROFIT & LOSS": "Profit & Loss"
    }

    def __init__(self):
        pass

    # -------------------------
    # TEXT EXTRACTION
    # -------------------------

    def extract_text(self, file_path: str) -> str:
        reader = PdfReader(file_path)
        text = ""
        for page in reader.pages:
            text += page.extract_text() + "\n"
        return text.upper()

    # -------------------------
    # CLASSIFICATION
    # -------------------------

    def classify_document(self, text: str) -> Dict:
        confidence = 0.0
        doc_type = "Unsupported"

        for keyword, dtype in self.SUPPORTED_TYPES.items():
            if keyword in text:
                doc_type = dtype
                confidence += 0.6

        if doc_type != "Unsupported":
            confidence += 0.3

        return {
            "classType": doc_type,
            "confidence": min(round(confidence, 2), 0.99)
        }

    # -------------------------
    # DATE EXTRACTION
    # -------------------------

    def extract_date(self, text: str, patterns: List[str]) -> Optional[str]:
        for pattern in patterns:
            match = re.search(pattern, text)
            if match:
                try:
                    return date_parser.parse(match.group(1)).date().isoformat()
                except:
                    continue
        return None

    def extract_issue_expiry(self, text: str):
        issue_patterns = [
            r"ISSUE DATE:\s*(.+)",
            r"DATE OF ISSUE:\s*(.+)",
            r"REGISTRATION DATE:\s*(.+)"
        ]
        expiry_patterns = [
            r"EXPIRY DATE:\s*(.+)",
            r"DATE OF EXPIRY:\s*(.+)"
        ]

        return {
            "issueDate": self.extract_date(text, issue_patterns),
            "expiryDate": self.extract_date(text, expiry_patterns)
        }

    # -------------------------
    # FIELD EXTRACTIONS
    # -------------------------

    def extract_company_profile(self, text, file):
        profile = {}
        patterns = {
            "legalName": r"COMPANY NAME:\s*(.+)",
            "registrationNumber": r"LICENSE NUMBER:\s*(.+)",
            "jurisdiction": r"JURISDICTION:\s*(.+)",
            "legalForm": r"LEGAL FORM:\s*(.+)",
            "licenseIssuingAuthority": r"ISSUING AUTHORITY:\s*(.+)"
        }

        for field, pattern in patterns.items():
            match = re.search(pattern, text)
            if match:
                profile[field] = build_field(match.group(1).strip(), file, 0.95)
        return profile

    def extract_shareholders(self, text, file):
        shareholders = []
        matches = re.findall(r"-\s*(.+?):\s*(\d+)%", text)
        for name, pct in matches:
            shareholders.append({
                "name": build_field(name.strip(), file, 0.9),
                "ownershipPercentage": build_field(float(pct), file, 0.9),
                "controlType": build_field("Direct", file, 0.8)
            })
        return shareholders

    def extract_signatories(self, text, file):
        signatories = []
        matches = re.findall(r"MR\.?\s*(.+?),\s*(CEO|CFO|DIRECTOR)", text)
        for name, role in matches:
            signatories.append({
                "name": build_field(name.strip(), file, 0.85),
                "role": build_field(role.strip(), file, 0.85),
                "authoritySource": build_field("Board Resolution", file, 0.8)
            })
        return signatories

    def extract_financials(self, text, file):
        financials = {}
        numeric_patterns = {
            "revenue": r"REVENUE:\s*([\d,]+)",
            "netProfit": r"NET PROFIT:\s*([\d,]+)",
            "totalAssets": r"TOTAL ASSETS:\s*([\d,]+)",
            "totalLiabilities": r"TOTAL LIABILITIES:\s*([\d,]+)"
        }

        for field, pattern in numeric_patterns.items():
            match = re.search(pattern, text)
            if match:
                value = float(match.group(1).replace(",", ""))
                financials[field] = build_field(value, file, 0.95)

        audit = re.search(r"AUDIT STATUS:\s*(.+)", text)
        if audit:
            financials["auditStatus"] = build_field(audit.group(1).strip(), file, 0.9)

        period = re.search(r"FY\s*(\d{4})", text)
        if period:
            financials["financialPeriod"] = build_field(period.group(1), file, 0.85)

        return financials

    # -------------------------
    # SINGLE FILE UPDATE
    # -------------------------

    def update_unified_object(self, unified: Dict, file_path: str) -> None:
        file_name = os.path.basename(file_path)
        text = self.extract_text(file_path)
        classification = self.classify_document(text)
        dates = self.extract_issue_expiry(text)

        # Append document metadata
        unified["documents"].append({
            "fileName": file_name,
            "classType": classification["classType"],
            "confidence": classification["confidence"],
            "issueDate": dates["issueDate"],
            "expiryDate": dates["expiryDate"],
            "processedAt": datetime.utcnow().isoformat()
        })

        # Update companyProfile (merge)
        unified["companyProfile"].update(self.extract_company_profile(text, file_name))
        unified["shareholders"].extend(self.extract_shareholders(text, file_name))
        unified["signatories"].extend(self.extract_signatories(text, file_name))
        unified["financialIndicators"].update(self.extract_financials(text, file_name))

        # Detect missing fields dynamically
        self.detect_missing_fields(unified)

    # -------------------------
    # MISSING FIELD DETECTION
    # -------------------------

    def detect_missing_fields(self, output):
        required_profile = ["legalName", "registrationNumber", "jurisdiction"]
        required_financials = ["totalAssets", "totalLiabilities"]

        # Reset missing fields each time
        output["missingFields"] = []

        for field in required_profile:
            if field not in output["companyProfile"]:
                output["missingFields"].append(field)
        for field in required_financials:
            if field not in output["financialIndicators"]:
                output["missingFields"].append(field)


# =========================================================
# RUN: PROCESS ALL PDFs INTO SINGLE UNIFIED OBJECT
# =========================================================

pipeline = KYBExtractionPipeline()

folder_path = "synthetic_documents"
unified_company = {
    "companyProfile": {},
    "licenseDetails": {},
    "addresses": {},
    "shareholders": [],
    "ubos": [],
    "documents": [],
    "signatories": [],
    "financialIndicators": {},
    "riskAssessment": {
        "financialRiskScore": 0,
        "riskBand": "",
        "riskDrivers": [],
        "confidenceLevel": ""
    },
    "complianceIndicators": {},
    "missingFields": []
}

# Get all PDFs in folder
pdf_files = [
    os.path.join(folder_path, f)
    for f in os.listdir(folder_path)
    if f.lower().endswith(".pdf")
]

# Process each PDF into the same object
for file_path in pdf_files:
    print(f"Processing: {file_path}")
    pipeline.update_unified_object(unified_company, file_path)

# Print final unified company object
unified_company

Processing: synthetic_documents/profit_loss.pdf
Processing: synthetic_documents/id_john_smith.pdf
Processing: synthetic_documents/balance_sheet.pdf
Processing: synthetic_documents/bank_letter.pdf
Processing: synthetic_documents/trade_license.pdf
Processing: synthetic_documents/vat_certificate.pdf
Processing: synthetic_documents/moa_aoa.pdf
Processing: synthetic_documents/board_resolution.pdf


  "processedAt": datetime.utcnow().isoformat()


{'companyProfile': {'legalName': {'value': 'ACME FINTECH SOLUTIONS LLC',
   'sourceDocument': 'moa_aoa.pdf',
   'confidence': 0.95,
   'extractionMethod': 'regex_v1'},
  'registrationNumber': {'value': 'TL-2026-987654',
   'sourceDocument': 'trade_license.pdf',
   'confidence': 0.95,
   'extractionMethod': 'regex_v1'},
  'legalForm': {'value': 'LIMITED LIABILITY COMPANY',
   'sourceDocument': 'moa_aoa.pdf',
   'confidence': 0.95,
   'extractionMethod': 'regex_v1'},
  'licenseIssuingAuthority': {'value': 'DUBAI DEPARTMENT OF ECONOMIC DEVELOPMENT',
   'sourceDocument': 'trade_license.pdf',
   'confidence': 0.95,
   'extractionMethod': 'regex_v1'},
  'jurisdiction': {'value': 'UAE',
   'sourceDocument': 'vat_certificate.pdf',
   'confidence': 0.95,
   'extractionMethod': 'regex_v1'}},
 'licenseDetails': {},
 'addresses': {},
 'shareholders': [{'name': {'value': 'ALICE JOHNSON',
    'sourceDocument': 'moa_aoa.pdf',
    'confidence': 0.9,
    'extractionMethod': 'regex_v1'},
   'ownershipPe

In [10]:
from datetime import datetime
from typing import Dict, List

class RiskEngine:

    def __init__(self):
        self.score = 0
        self.risk_drivers = []  # ← track all risk drivers
        self.exceptions = []

    # ------------------------------
    # HELPER METHODS
    # ------------------------------

    def add_risk(self, points: int, reason: str):
        self.score += points
        self.risk_drivers.append(reason)

    def add_exception(self, severity: str, fields: List[str], action: str):
        self.exceptions.append({
            "severity": severity,
            "impactedFields": fields,
            "requiredReviewerAction": action
        })

    # ------------------------------
    # FINANCIAL RISK EVALUATION
    # ------------------------------

    def evaluate_financial_risk(self, data: Dict):

        financials = data.get("financialIndicators", {})
        documents = data.get("documents", [])

        # 1️⃣ Conservative default for missing financials
        if not financials:
            self.add_risk(40, "Missing financial statements (conservative default)")
            self.add_exception(
                "High",
                ["financialIndicators"],
                "Obtain latest audited financial statements"
            )
            return

        assets = financials.get("totalAssets", {}).get("value")
        liabilities = financials.get("totalLiabilities", {}).get("value")
        net_profit = financials.get("netProfit", {}).get("value")
        audit_status = financials.get("auditStatus", {}).get("value")
        period = financials.get("financialPeriod", {}).get("value")

        # 2️⃣ Conservative default for missing numeric values
        if assets is None:
            self.add_risk(20, "Total assets missing (conservative default)")
            self.add_exception("High", ["totalAssets"], "Obtain total assets")
        if liabilities is None:
            self.add_risk(20, "Total liabilities missing (conservative default)")
            self.add_exception("High", ["totalLiabilities"], "Obtain total liabilities")
        if net_profit is None:
            self.add_risk(20, "Net profit missing (conservative default)")
            self.add_exception("High", ["netProfit"], "Obtain net profit/loss")

        # 3️⃣ Rule-based scoring
        if net_profit is not None and net_profit < 0:
            self.add_risk(30, "Net loss reported")
            self.add_exception(
                "High",
                ["netProfit"],
                "Assess sustainability of business model"
            )

        if assets is not None and liabilities is not None and liabilities > assets:
            self.add_risk(25, "Liabilities exceed assets")
            self.add_exception(
                "High",
                ["totalAssets", "totalLiabilities"],
                "Review solvency position"
            )

        if audit_status:
            if audit_status.upper() == "UNAUDITED":
                self.add_risk(15, "Financial statements unaudited")
                self.add_exception(
                    "Medium",
                    ["auditStatus"],
                    "Request audited statements"
                )
        else:
            self.add_risk(20, "Audit status unknown (conservative default)")
            self.add_exception(
                "Medium",
                ["auditStatus"],
                "Clarify audit status"
            )

        if period:
            try:
                year = int(period)
                if datetime.utcnow().year - year > 1:
                    self.add_risk(20, "Outdated financial statements")
                    self.add_exception(
                        "Medium",
                        ["financialPeriod"],
                        "Obtain latest financial period"
                    )
            except:
                self.add_risk(10, "Financial period parse failed (conservative default)")
                self.add_exception(
                    "Medium",
                    ["financialPeriod"],
                    "Verify financial period"
                )
        else:
            self.add_risk(15, "Financial period missing (conservative default)")
            self.add_exception(
                "Medium",
                ["financialPeriod"],
                "Obtain latest financial period"
            )

    # ------------------------------
    # DOCUMENT VALIDATION
    # ------------------------------

    def validate_documents(self, data: Dict):
        documents = data.get("documents", [])
        mandatory_types = ["Trade License", "Balance Sheet", "Profit & Loss"]
        present_types = [doc["classType"] for doc in documents]

        # Missing mandatory documents → conservative high risk
        for required in mandatory_types:
            if required not in present_types:
                self.add_risk(30, f"Missing mandatory document: {required}")
                self.add_exception(
                    "High",
                    [required],
                    f"Obtain {required}"
                )

        # Expired documents
        for doc in documents:
            expiry = doc.get("expiryDate")
            if expiry:
                expiry_date = datetime.fromisoformat(expiry)
                if expiry_date < datetime.utcnow():
                    self.add_risk(25, f"Expired document: {doc['classType']}")
                    self.add_exception(
                        "High",
                        [doc["classType"]],
                        "Obtain renewed document"
                    )

        # Low confidence → conservative addition
        for doc in documents:
            if doc.get("confidence", 1) < 0.6:
                self.add_risk(10, f"Low classification confidence: {doc['classType']}")
                self.add_exception(
                    "Low",
                    [doc["classType"]],
                    "Manual verification required"
                )

    # ------------------------------
    # FINALIZE RISK SCORE
    # ------------------------------

    def finalize(self):
        # Cap score to 100
        self.score = min(self.score, 100)

        # Risk band
        if self.score <= 30:
            band = "Low"
        elif self.score <= 60:
            band = "Medium"
        else:
            band = "High"

        return {
            "financialRiskScore": self.score,
            "riskBand": band,
            "riskDrivers": self.risk_drivers,  # ✅ fully explainable
            "confidenceLevel": "High" if self.score < 40 else "Medium"
        }, self.exceptions

In [11]:
# pipeline = KYBExtractionPipeline()
# unified_output = pipeline.process_folder("synthetic_documents")

engine = RiskEngine()
engine.evaluate_financial_risk(unified_company)
engine.validate_documents(unified_company)

risk_result, exceptions = engine.finalize()

unified_company["riskAssessment"] = risk_result
unified_company["complianceIndicators"] = {
    "exceptions": exceptions
}



  if datetime.utcnow().year - year > 1:
  if expiry_date < datetime.utcnow():


In [12]:
import json
print(json.dumps(unified_company, indent=2))

{
  "companyProfile": {
    "legalName": {
      "value": "ACME FINTECH SOLUTIONS LLC",
      "sourceDocument": "moa_aoa.pdf",
      "confidence": 0.95,
      "extractionMethod": "regex_v1"
    },
    "registrationNumber": {
      "value": "TL-2026-987654",
      "sourceDocument": "trade_license.pdf",
      "confidence": 0.95,
      "extractionMethod": "regex_v1"
    },
    "legalForm": {
      "value": "LIMITED LIABILITY COMPANY",
      "sourceDocument": "moa_aoa.pdf",
      "confidence": 0.95,
      "extractionMethod": "regex_v1"
    },
    "licenseIssuingAuthority": {
      "value": "DUBAI DEPARTMENT OF ECONOMIC DEVELOPMENT",
      "sourceDocument": "trade_license.pdf",
      "confidence": 0.95,
      "extractionMethod": "regex_v1"
    },
    "jurisdiction": {
      "value": "UAE",
      "sourceDocument": "vat_certificate.pdf",
      "confidence": 0.95,
      "extractionMethod": "regex_v1"
    }
  },
  "licenseDetails": {},
  "addresses": {},
  "shareholders": [
    {
      "name": 