In [1]:
import fitz  # PyMuPDF
import pytesseract
from pdf2image import convert_from_path
import os
import ast
import json
import uuid
import re
from azure.ai.documentintelligence import DocumentIntelligenceClient
from azure.core.credentials import AzureKeyCredential
import openpyxl
from openpyxl.utils import get_column_letter

In [2]:
# Đường dẫn file
pdf_path = r'D:\works\silversea-analytics\demo\demo_tax_api\frontend\public\OCR\files\testing_bookmark.pdf'

In [None]:
# Mở PDF
doc = fitz.open(pdf_path)

# Duyệt qua tất cả bookmark
toc = doc.get_toc()  # Table of contents = list of bookmarks
for item in toc:
    level, title, page_num = item
    print(f"Bookmark: {title} - Page: {page_num}")

    # Render trang thành ảnh
    page = doc.load_page(page_num - 1)  # page_num trong toc bắt đầu từ 1
    pix = page.get_pixmap(dpi=300)
    img_path = f'temp_page_{page_num}.png'
    pix.save(img_path)

    # OCR ảnh lấy text
    text = pytesseract.image_to_string(img_path)
    print(text)

    # Phân tích nội dung text để xác định form
    if 'W-2' in text:
        print(f"==> Detected W-2 form at bookmark {title}")
        
        # Apply xử lý W-2
    elif '1099-DIV' in text:
        print(f"==> Detected 1099-DIV form at bookmark {title}")
        
        # Apply xử lý 1099
    else:
        print(f"==> Unknown form at bookmark {title}")

doc.close()


Bookmark: 1099-div - Page: 1
==> Detected 1099-DIV form at bookmark 1099-div
Bookmark: w2 - Page: 2
==> Detected W-2 form at bookmark w2


In [21]:
#   Config
endpoint_pro = "https://henryphamocr.cognitiveservices.azure.com/"
key_pro = "BgokkCOFCdGxs3Nn0u03fOLproraZm4rQzAHXpxunThC8PSnzuR7JQQJ99BDACYeBjFXJ3w3AAALACOG4v7Z"

#   result
result_txt = r'D:\works\silversea-analytics\demo\demo_tax_api\frontend\public\OCR\files\result.txt'
json = []

In [17]:
client = DocumentIntelligenceClient(endpoint_pro, AzureKeyCredential(key_pro))

In [46]:
def OCR_form_w2(filepath):
    print(filepath)

    with open(filepath, "rb") as f:
        poller = client.begin_analyze_document("prebuilt-tax.us", body=f)
        result = poller.result()

    with open(result_txt, "w", encoding="utf-8") as out_file:
        for i, doc in enumerate(result.documents):
            out_file.write(f"--- Document {i+1} ---\n")
            out_file.write(f"Type: {doc.doc_type}\n")
            out_file.write("All Fields:\n")
            out_file.write(f"{doc.fields}\n\n")

    print("✅ step1")

    txt_path = result_txt  # <- Đặt tên file .txt bạn đã lưu ở đây

    documents = []
    with open(txt_path, "r", encoding="utf-8") as file:
        lines = file.readlines()

    current_doc = {}
    inside_fields = False
    field_lines = []

    for line in lines:
        line = line.strip()

        # Bắt đầu document mới
        if line.startswith("--- Document"):
            if field_lines:
                try:
                    all_fields_str = "\n".join(field_lines)
                    parsed_dict = ast.literal_eval(all_fields_str)
                    current_doc["fields"] = parsed_dict
                except Exception as e:
                    current_doc["fields"] = {"error": str(e)}
                documents.append(current_doc)
                field_lines = []

            current_doc = {"document_number": int(re.findall(r'\d+', line)[0])}

        elif line.startswith("Type:"):
            current_doc["type"] = line.replace("Type:", "").strip()

        elif line.startswith("All Fields:"):
            inside_fields = True
            field_lines = []

        elif inside_fields:
            if line.startswith("--- Document") or line.startswith("Type:"):
                inside_fields = False
            else:
                field_lines.append(line)

    # Thêm document cuối cùng nếu còn
    if field_lines:
        try:
            all_fields_str = "\n".join(field_lines)
            current_doc["fields"] = ast.literal_eval(all_fields_str)
        except Exception as e:
            current_doc["fields"] = {"error": str(e)}
        documents.append(current_doc)


    print(f"✅ step2")

    def extract_fields(doc):
        fields = doc.get("fields", {})
        employer = fields.get("Employer", {}).get("valueObject", {})
        employer_address = employer.get("Address", {}).get("valueAddress", {})

        employee = fields.get("Employee", {}).get("valueObject", {})
        employee_SocialSecurityNumber = employee.get("SocialSecurityNumber", {}).get("valueString", '')

        state_infos = fields.get("StateTaxInfos", {}).get("valueArray", [])
        local_infos = fields.get("LocalTaxInfos", {}).get("valueArray", [])

        def safe_get(d, key, default=""):
            return d.get(key, {}).get("valueString") or d.get(key, {}).get("valueNumber") or ""

        result = {
            "type": "w2",
            "document_number": doc.get("document_number", ""),
            "SocialSecurityNumber": employee_SocialSecurityNumber,
            "TS": "T",
            "Employer_IdNumber": safe_get(employer, "IdNumber"),
            "Employer_Name": safe_get(employer, "Name"),
            # Tách từng phần địa chỉ
            # "Employer_Address_HouseNumber": employer_address.get("houseNumber", ""),
            # "Employer_Address_Road": employer_address.get("road", ""),
            "Employer_Address_StreetAddress": employer_address.get("streetAddress", ""),
            "Employer_Address_City": employer_address.get("city", ""),
            "Employer_Address_State": employer_address.get("state", ""),
            "Employer_Address_PostalCode": employer_address.get("postalCode", ""),
            # "Employer_Address_Unit": employer_address.get("unit", "")
        }

        # Các trường số/chuỗi khác
        keys = [
            "ControlNumber",
            "WagesTipsAndOtherCompensation",
            "FederalIncomeTaxWithheld",
            "SocialSecurityWages",
            "SocialSecurityTaxWithheld",
            "MedicareWagesAndTips",
            "MedicareTaxWithheld",
            "SocialSecurityTips",
            "AllocatedTips",
            "DependentCareBenefits",
            "NonQualifiedPlans",
            "IsStatutoryEmployee",
            "IsRetirementPlan",
            "IsThirdPartySickPay"
        ]
        for key in keys:
            result[key] = safe_get(fields, key)

        # Lấy StateTaxInfos (nếu có)
        result["StateTaxInfos"] = []
        for s in state_infos:
            val = s.get("valueObject", {})
            result["StateTaxInfos"].append({
                "State": safe_get(val, "State"),
                "StateWagesTipsEtc": safe_get(val, "StateWagesTipsEtc"),
                "StateIncomeTax": safe_get(val, "StateIncomeTax")
            })

        # Lấy LocalTaxInfos (nếu có)
        result["LocalTaxInfos"] = []
        for l in local_infos:
            val = l.get("valueObject", {})
            result["LocalTaxInfos"].append({
                "LocalWagesTipsEtc": safe_get(val, "LocalWagesTipsEtc"),
                "LocalIncomeTax": safe_get(val, "LocalIncomeTax")
            })

        return result

    # Áp dụng trích xuất cho tất cả document
    filtered_result = [extract_fields(doc) for doc in documents]


    print(f"✅ step3")
    return filtered_result


In [47]:
def OCR_form_1099_div(filepath):

    with open(filepath, "rb") as f:
        poller = client.begin_analyze_document(
            "prebuilt-tax.us.1099DIV", body=f
        )
    tax1040 = poller.result()

    with open(result_txt, "w", encoding="utf-8") as out_file:
        for i, doc in enumerate(tax1040.documents):
            out_file.write(f"--- Document {i+1} ---\n")
            out_file.write(f"Type: {doc.doc_type}\n")
            out_file.write("All Fields:\n")
            out_file.write(f"{doc.fields}\n\n")

    print(f"✅ step1: {result_txt}")

    documents = []
    with open(result_txt, "r", encoding="utf-8") as file:
        lines = file.readlines()

    current_doc = {}
    inside_fields = False
    field_lines = []

    for line in lines:
        line = line.strip()

        # Bắt đầu document mới
        if line.startswith("--- Document"):
            if field_lines:
                try:
                    all_fields_str = "\n".join(field_lines)
                    parsed_dict = ast.literal_eval(all_fields_str)
                    current_doc["fields"] = parsed_dict
                except Exception as e:
                    current_doc["fields"] = {"error": str(e)}
                documents.append(current_doc)
                field_lines = []

            current_doc = {"document_number": int(re.findall(r'\d+', line)[0])}

        elif line.startswith("Type:"):
            current_doc["type"] = line.replace("Type:", "").strip()

        elif line.startswith("All Fields:"):
            inside_fields = True
            field_lines = []

        elif inside_fields:
            if line.startswith("--- Document") or line.startswith("Type:"):
                inside_fields = False
            else:
                field_lines.append(line)

    # Thêm document cuối cùng nếu còn
    if field_lines:
        try:
            all_fields_str = "\n".join(field_lines)
            current_doc["fields"] = ast.literal_eval(all_fields_str)
        except Exception as e:
            current_doc["fields"] = {"error": str(e)}
        documents.append(current_doc)

    print(f"✅ step2: txt to json")

    # Hàm helper an toàn
    def safe_get(d, key, default=""):
        return d.get(key, {}).get("valueString") or d.get(key, {}).get("valueNumber") or d.get(key, {}).get("valueBoolean") or default

    def extract_fields(doc):
        fields = doc.get("fields", {})
        document_number = doc.get("document_number", "")

        payer = fields.get("Payer", {}).get("valueObject", {})
        recipient = fields.get("Recipient", {}).get("valueObject", {})
        payer_address = payer.get("Address", {}).get("valueAddress", {})
        transactions = fields.get("Transactions", {}).get("valueArray", [])
        first_txn = transactions[0].get("valueObject", {}) if transactions else {}

        state_taxes = first_txn.get("StateTaxesWithheld", {}).get("valueArray", [])
        first_state_tax = state_taxes[0].get("valueObject", {}) if state_taxes else {}

        return {
            "type": "1099-div",
            "document_number": document_number,
            "Payer_Name": safe_get(payer, "Name"),
            "Payer_TIN": safe_get(payer, "TIN"),
            "Account_Number": safe_get(recipient, "AccountNumber"),

            "Payer_PostalCode": payer_address.get("postalCode", ""),
            "Payer_City": payer_address.get("city", ""),
            "Payer_State": payer_address.get("state", ""),
            
            "Box1a": safe_get(first_txn, "Box1a"),
            "Box1b": safe_get(first_txn, "Box1b"),
            "Box2a": safe_get(first_txn, "Box2a"),
            "Box2b": safe_get(first_txn, "Box2b"),
            "Box2c": safe_get(first_txn, "Box2c"),
            "Box2d": safe_get(first_txn, "Box2d"),

            "Box7": safe_get(first_txn, "Box7"),
            
            "Box9": safe_get(first_txn, "Box9"),
            "Box10": safe_get(first_txn, "Box10"),
            "Box12": safe_get(first_txn, "Box12"),
            "Box13": safe_get(first_txn, "Box13")
        }

    # Thực hiện lọc
    filtered_result = [extract_fields(doc) for doc in documents]

    print(f"✅ Step3: formating JSON")
    return filtered_result
 

In [None]:
FORM_HANDLER_MAP = {
    "W-2": OCR_form_w2,
    "1099-DIV": OCR_form_1099_div,
    # "1098": handle_1098,
    # Add more forms here
}
def detect_form_type(text):
    if "W-2 Wage and Tax Statement" in text or "W-2" in text:
        return "W-2"
    elif "1099-DIV" in text:
        return "1099-DIV"
    else:
        return "UNKNOWN"

In [48]:
result_json = []
def process_pdf(filepath):
    doc = fitz.open(filepath)
    toc = doc.get_toc()

    for level, title, page_num in toc:
        page = doc.load_page(page_num - 1)

        # OCR page
        pix = page.get_pixmap(dpi=300)
        img_path = f"temp_page_{page_num}.png"
        pix.save(img_path)
        text = pytesseract.image_to_string(img_path)

        form_type = detect_form_type(text)
        print(f"Detected form: {form_type} at page {page_num}")

        if form_type in FORM_HANDLER_MAP:
            print(f"Processing with handler for {form_type}...")
            # Optional: save this single page to a temp PDF
            single_page_pdf_path = f"temp_{form_type}_{page_num}.pdf"
            single_doc = fitz.open()
            single_doc.insert_pdf(doc, from_page=page_num-1, to_page=page_num-1)
            single_doc.save(single_page_pdf_path)
            single_doc.close()

            # Call handler
            output = FORM_HANDLER_MAP[form_type](single_page_pdf_path)

            
            # 👉 Gộp vào từng bookmark riêng
            result_json.append({
                "title": title,
                "page_number": page_num,
                "form_type": form_type,
                "data": output   # output là list các fields extract ra
            })

            print(f"✅ Processed {form_type}, Output:", output)
        else:
            print(f"⚠ No handler defined for {form_type}")

    doc.close()

In [49]:
process_pdf(pdf_path)

Detected form: 1099-DIV at page 1
Processing with handler for 1099-DIV...
✅ step1: D:\works\silversea-analytics\demo\demo_tax_api\frontend\public\OCR\files\result.txt
✅ step2: txt to json
✅ Step3: formating JSON
✅ Processed 1099-DIV, Output: [{'document_number': 1, 'Payer_Name': 'MULTI-STRATEGY FUND LLC C/O STATE', 'Payer_TIN': '12-3456789', 'Account_Number': '1234', 'Payer_PostalCode': '02206', 'Payer_City': 'BOSTON', 'Payer_State': 'MA', 'Box1a': 5455.92, 'Box1b': 2234.56, 'Box2a': 1179.56, 'Box2b': 123456, 'Box2c': 1435.56, 'Box2d': 1234.56, 'Box7': 1565.12, 'Box9': 2565.12, 'Box10': 1129.12, 'Box12': 3505.45, 'Box13': 2512.12}]
Detected form: W-2 at page 2
Processing with handler for W-2...
temp_W-2_2.pdf
✅ step1
✅ step2
✅ step3
✅ Processed W-2, Output: [{'document_number': 1, 'SocialSecurityNumber': '987-65-4321', 'TS': 'T', 'Employer_IdNumber': '12-3456789', 'Employer_Name': 'APPLE INC.', 'Employer_Address_StreetAddress': '1234 EMPIRE AVENUE', 'Employer_Address_City': 'BURBANK', 

In [50]:
print(result_json)

[{'title': '1099-div', 'page_number': 1, 'form_type': '1099-DIV', 'data': [{'document_number': 1, 'Payer_Name': 'MULTI-STRATEGY FUND LLC C/O STATE', 'Payer_TIN': '12-3456789', 'Account_Number': '1234', 'Payer_PostalCode': '02206', 'Payer_City': 'BOSTON', 'Payer_State': 'MA', 'Box1a': 5455.92, 'Box1b': 2234.56, 'Box2a': 1179.56, 'Box2b': 123456, 'Box2c': 1435.56, 'Box2d': 1234.56, 'Box7': 1565.12, 'Box9': 2565.12, 'Box10': 1129.12, 'Box12': 3505.45, 'Box13': 2512.12}]}, {'title': 'w2', 'page_number': 2, 'form_type': 'W-2', 'data': [{'document_number': 1, 'SocialSecurityNumber': '987-65-4321', 'TS': 'T', 'Employer_IdNumber': '12-3456789', 'Employer_Name': 'APPLE INC.', 'Employer_Address_StreetAddress': '1234 EMPIRE AVENUE', 'Employer_Address_City': 'BURBANK', 'Employer_Address_State': 'CA', 'Employer_Address_PostalCode': '91504', 'ControlNumber': '', 'WagesTipsAndOtherCompensation': 39928.93, 'FederalIncomeTaxWithheld': 9882.68, 'SocialSecurityWages': 39928.93, 'SocialSecurityTaxWithheld

In [51]:
import json

with open("result.json", "w", encoding="utf-8") as f:
    json.dump(result_json, f, ensure_ascii=False, indent=4)
