In [None]:
# ✅ Install Required Libraries
!pip install transformers PyPDF2 python-docx

# ✅ Import Libraries
import base64
import email
import os
import re
import io
import json
from typing import Dict, List, Tuple
from transformers import pipeline
import PyPDF2
import docx

# ✅ Load Zero-Shot Classifier
classifier = pipeline("zero-shot-classification", model="facebook/bart-large-mnli")

# ✅ Sample PDF and DOCX content encoded in base64
def encode_pdf(text):
    from fpdf import FPDF
    pdf = FPDF()
    pdf.add_page()
    pdf.set_font("Arial", size=12)
    pdf.multi_cell(0, 10, text)
    pdf.output("temp.pdf")
    with open("temp.pdf", "rb") as f:
        return base64.b64encode(f.read()).decode('utf-8')

def encode_docx(text):
    from docx import Document
    doc = Document()
    doc.add_paragraph(text)
    doc.save("temp.docx")
    with open("temp.docx", "rb") as f:
        return base64.b64encode(f.read()).decode('utf-8')

# ✅ Write sample EML file with given content
def write_sample_eml(file_path: str, body_text: str, pdf_text: str, docx_text: str) -> None:
    pdf_base64 = encode_pdf(pdf_text)
    docx_base64 = encode_docx(docx_text)
    eml_content = f"""From: testuser@example.com
To: loans@examplebank.com
Subject: Loan Servicing Inquiry
MIME-Version: 1.0
Content-Type: multipart/mixed; boundary="BOUNDARY"

--BOUNDARY
Content-Type: text/plain

{body_text}

--BOUNDARY
Content-Type: application/pdf
Content-Transfer-Encoding: base64
Content-Disposition: attachment; filename="sample.pdf"

{pdf_base64}

--BOUNDARY
Content-Type: application/vnd.openxmlformats-officedocument.wordprocessingml.document
Content-Transfer-Encoding: base64
Content-Disposition: attachment; filename="sample.docx"

{docx_base64}

--BOUNDARY--
"""
    with open(file_path, "w", encoding="utf-8") as f:
        f.write(eml_content)

# ✅ Parse EML and extract components
def parse_eml(file_path: str) -> dict:
    with open(file_path, "rb") as f:
        raw_data = f.read()
    msg = email.message_from_bytes(raw_data)
    email_data = {
        "subject": msg.get("subject", ""),
        "from": msg.get("from", ""),
        "to": msg.get("to", ""),
        "cc": msg.get("cc", ""),
        "date": msg.get("date", ""),
        "body": "",
        "attachments": []
    }
    for part in msg.walk():
        content_type = part.get_content_type()
        filename = part.get_filename()
        if filename:
            attach_data = part.get_payload(decode=True)
            email_data["attachments"].append({"filename": filename, "data": attach_data})
        elif content_type in ["text/plain", "text/html"]:
            try:
                body_content = part.get_payload(decode=True)
                if body_content:
                    email_data["body"] += body_content.decode(errors="ignore")
            except Exception:
                pass
    return email_data

# ✅ Extract Text from Attachments
def extract_text_from_attachments(attachments: List[dict]) -> str:
    full_attachment_text = ""
    for attach in attachments:
        filename = attach["filename"].lower()
        data = attach["data"]
        if filename.endswith(".pdf"):
            try:
                reader = PyPDF2.PdfReader(io.BytesIO(data))
                for page in reader.pages:
                    full_attachment_text += (page.extract_text() or "") + "\n"
            except Exception as e:
                full_attachment_text += f"\n[Error reading PDF: {e}]\n"
        elif filename.endswith(".docx"):
            try:
                file_stream = io.BytesIO(data)
                doc = docx.Document(file_stream)
                for para in doc.paragraphs:
                    full_attachment_text += para.text + "\n"
            except Exception as e:
                full_attachment_text += f"\n[Error reading DOCX: {e}]\n"
    return full_attachment_text

# ✅ Zero-Shot Classification
def classify_request_types(text: str, candidate_labels: List[str]) -> Dict[str, float]:
    if not text.strip():
        return {label: 0.0 for label in candidate_labels}
    result = classifier(text, candidate_labels, multi_label=True)
    return {label: score for label, score in zip(result["labels"], result["scores"])}

# ✅ Apply Domain Rules
def apply_domain_rules(scores_dict: Dict[str, float]) -> Tuple[str, List[str]]:
    priority_order = [
        "Money Movement-Inbound", "Money Movement-Outbound", "Commitment Change",
        "Fee Payment", "Closing Notice", "AU Transfer", "Adjustments",
    ]
    sorted_labels = sorted(scores_dict.items(), key=lambda x: x[1], reverse=True)
    primary = None
    sub_list = []
    for label, _ in sorted_labels:
        if primary is None and label in priority_order:
            primary = label
        else:
            sub_list.append(label)
    return primary, sub_list

# ✅ Field Extraction
def extract_key_fields(text: str) -> dict:
    amount_pattern = re.compile(r'(\$?\d{1,3}(?:,\d{3})*(?:\.\d+)?)(?:\s?(USD|dollars)?)')
    matches = amount_pattern.findall(text)
    amount_extracted = matches[0][0] if matches else None
    return {
        "deal_name": None,
        "amount": amount_extracted,
        "expiration_date": None
    }

# ✅ Check Duplicate (Placeholder)
def check_duplicate(email_data: dict, text: str) -> Tuple[bool, str]:
    return False, ""

# ✅ Process a Single EML File
def process_email(file_path: str, request_types: List[str]) -> dict:
    eml_data = parse_eml(file_path)
    body_text = eml_data["body"] or ""
    attachments_text = extract_text_from_attachments(eml_data["attachments"])
    combined_text = body_text + "\n" + attachments_text
    scores = classify_request_types(combined_text, request_types)
    primary_request, sub_requests = apply_domain_rules(scores)
    extracted_fields = extract_key_fields(combined_text)
    duplicate_flag, duplicate_reason = check_duplicate(eml_data, combined_text)
    if primary_request is None:
        primary_request = "Unknown"
    return {
        "file": file_path,
        "primary_request_type": {
            "label": primary_request,
            "confidence": scores.get(primary_request, 0.0),
            "reasoning": f"Detected {primary_request} with highest confidence; domain rules applied."
        },
        "sub_request_types": [
            {"label": sr, "confidence": scores.get(sr, 0.0)} for sr in sub_requests
        ],
        "extracted_fields": extracted_fields,
        "duplicate_flag": duplicate_flag,
        "duplicate_reason": duplicate_reason
    }

# ✅ Create Test Samples and Run on Directory
os.makedirs("test_emails", exist_ok=True)

samples = [
    ("Money Movement-Inbound", "Please initiate an inbound transfer for $500,000.", "Funding details enclosed."),
    ("Fee Payment", "We need to process the ongoing fee payment.", "Fee details are in the document."),
    ("Commitment Change", "Requesting a decrease in commitment.", "Attached is the approval letter.")
]

for idx, (label, body, attach) in enumerate(samples, 1):
    write_sample_eml(f"test_emails/mail_{idx}.eml", body, attach, f"{label} supporting content.")

# ✅ Define request types
SAMPLE_REQUEST_TYPES = [
    "Adjustments", "AU Transfer", "Closing Notice",
    "Commitment Change", "Fee Payment",
    "Money Movement-Inbound", "Money Movement-Outbound"
]

# ✅ Process All EML Files in Directory
results = []
for fname in os.listdir("test_emails"):
    if fname.endswith(".eml"):
        result = process_email(os.path.join("test_emails", fname), SAMPLE_REQUEST_TYPES)
        results.append(result)

# ✅ Display Results
print(json.dumps(results, indent=2))
