In [2]:
import os
import re
import json
import base64
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional

import pandas as pd
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError

In [3]:
SCOPES = ["https://www.googleapis.com/auth/gmail.readonly"]  # read-only for safety
CLIENT_SECRET_PATH = "client_secret.json"   # download from GCP > APIs & Services > Credentials (Desktop App)
TOKEN_PATH = "token.json"                   # saved after first consent (delete to force re-consent)
ATTACH_SAVE_DIR = "gmail_attachments_poc"
os.makedirs(ATTACH_SAVE_DIR, exist_ok=True)

In [9]:
def build_gmail_service() -> Any:
    """On-screen OAuth: opens a browser for consent via run_local_server()."""
    creds = None
    if os.path.exists(TOKEN_PATH):
        creds = Credentials.from_authorized_user_file(TOKEN_PATH, SCOPES)

    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            if not os.path.exists(CLIENT_SECRET_PATH):
                raise FileNotFoundError(
                    f"Missing {CLIENT_SECRET_PATH}. Create OAuth Client ID (Desktop App) and place the JSON here."
                )
            flow = InstalledAppFlow.from_client_secrets_file(CLIENT_SECRET_PATH, SCOPES)
            # This opens your browser for on-screen consent:
            creds = flow.run_local_server(port=0, prompt='consent')  # prompt='consent' ensures the screen appears
        with open(TOKEN_PATH, "w") as f:
            f.write(creds.to_json())
    return build("gmail", "v1", credentials=creds)
service = build_gmail_service()

Please visit this URL to authorize this application: https://accounts.google.com/o/oauth2/auth?response_type=code&client_id=627222616815-5i21ahrgjjbcesig2qcid2v15ob0cruf.apps.googleusercontent.com&redirect_uri=http%3A%2F%2Flocalhost%3A49597%2F&scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fgmail.readonly&state=xz27W3Pb1e02OypzR8rslfeLqVpKh1&prompt=consent&access_type=offline


/etc/bash.bashrc: line 4: /usr/sbin/start-systemd-namespace: No such file or directory




In [25]:
# --- Core helpers ---
def list_messages(service, query, max_results=100):
    resp = service.users().messages().list(userId="me", q=query, maxResults=max_results).execute()
    return resp.get("messages", [])

def get_message(service, msg_id):
    return service.users().messages().get(userId="me", id=msg_id, format="full").execute()

def iter_pdf_attachments(msg):
    """Yield (filename, attachmentId) for PDF attachments found in message parts."""
    payload = msg.get("payload", {}) or {}
    stack = [payload]
    while stack:
        part = stack.pop()
        # push nested parts
        for sub in part.get("parts", []) or []:
            stack.append(sub)
        # check current part for attachment
        filename = part.get("filename") or ""
        body = part.get("body") or {}
        if filename and "attachmentId" in body and filename.lower().endswith(".pdf"):
            yield filename, body["attachmentId"]

def download_attachment(service, msg_id, attachment_id, filename):
    att = service.users().messages().attachments().get(
        userId="me", messageId=msg_id, id=attachment_id
    ).execute()
    data = att.get("data")
    if not data:
        return None
    file_bytes = base64.urlsafe_b64decode(data.encode("utf-8"))
    safe = re.sub(r'[\\/:*?"<>|]+', "_", filename) or f"{msg_id}.pdf"
    path = os.path.join(ATTACH_SAVE_DIR, safe)
    with open(path, "wb") as f:
        f.write(file_bytes)
    return path

In [50]:
from datetime import datetime, timezone
import os, re, json, base64, calendar


now = datetime.now(timezone.utc)
year = now.year
month = now.month
# compute previous month
prev_year = year if month > 1 else year - 1
prev_month = month - 1 if month > 1 else 12
start_day = 1
end_day = calendar.monthrange(prev_year, prev_month)[1]
# Gmail expects YYYY/MM/DD
after_str  = f"{prev_year}/{prev_month:02d}/{start_day:02d}"
before_str = f"{prev_year}/{prev_month:02d}/{end_day:02d}"

In [51]:
query = (
    f"subject:\"HSBC GOLD VISA e-Statement\" after:{after_str} before:{before_str}"
)

In [None]:
# --- Run: fetch PDFs for last month only ---
service = build_gmail_service()
msgs = list_messages(service, query, max_results=100)

saved = []
for m in msgs:
    full = get_message(service, m["id"])
    for fname, att_id in iter_pdf_attachments(full):
        out = download_attachment(service, m["id"], att_id, fname)
        if out:
            saved.append(out)

print(f"Search query:\n  {query}")
print(f"Matched emails: {len(msgs)}")
print("Saved files:")
for p in saved:
    print(" -", os.path.abspath(p))
print(f"\nDownload folder: {os.path.abspath(ATTACH_SAVE_DIR)}")

Search query:
  subject:"HSBC GOLD VISA e-Statement" after:2025/09/01 before:2025/09/30
Matched emails: 1
Saved files:
 - /root/dev/ledgerx/ledgerx-api/notebooks/gmail_attachments_poc/20250914.pdf

Download folder: /root/dev/ledgerx/ledgerx-api/notebooks/gmail_attachments_poc


In [56]:
from getpass import getpass
from pathlib import Path
import shutil
import sys

import pikepdf
import fitz  # PyMuPDF
import ocrmypdf

def has_selectable_text(pdf_path: str, password: str | None = None, sample_pages: int = 3) -> bool:
    """Return True if at least one of the first N pages has selectable text."""
    doc = fitz.open(pdf_path)  # just open
    if password:
        ok = doc.authenticate(password)
        if not ok:
            raise ValueError("Password incorrect")

    n = min(sample_pages, len(doc))
    for i in range(n):
        txt = doc.load_page(i).get_text().strip()
        if txt:
            return True
    return False

def decrypt_pdf(enc_pdf: str, out_pdf: str, password: str) -> None:
    """Fully decrypt (remove encryption) to a new PDF."""
    with pikepdf.open(enc_pdf, password=password) as pdf:
        pdf.save(out_pdf)

def ocr_to_searchable_pdf(input_pdf: str, output_pdf: str, lang: str = "eng") -> None:
    """
    Run OCR with ocrmypdf; keeps images, adds invisible text layer.
    """
    ocrmypdf.ocr(
        input_file=input_pdf,
        output_file=output_pdf,
        language=lang,
        deskew=True,
        rotate_pages=True,
        progress_bar=False,   # set True in notebooks if you want
        pdfa_image_compression="lossless",
    )

def process_encrypted_pdf(
    encrypted_pdf: str,
    output_pdf: str,
    lang: str = "eng",
    keep_plain_copy: bool = False,
) -> str:
    """
    1) Ask for password
    2) If PDF has text, just decrypt and save (or copy as-is)
    3) If no text, decrypt then OCR to searchable pdf
    Returns the path of the produced file.
    """
    enc_path = Path(encrypted_pdf)
    if not enc_path.exists():
        raise FileNotFoundError(f"Input not found: {enc_path}")

    password = getpass("Enter PDF password: ")

    # Quick text check without saving a decrypted copy yet
    text_exists = has_selectable_text(str(enc_path), password=password)

    tmp_decrypted = enc_path.with_suffix(".decrypted.tmp.pdf")
    decrypt_pdf(str(enc_path), str(tmp_decrypted), password=password)

    out_path = Path(output_pdf)

    if text_exists:
        # Already has text -> you might just want a decrypted copy
        if keep_plain_copy:
            shutil.move(str(tmp_decrypted), str(out_path))
            print(f"[OK] Decrypted (no OCR needed): {out_path}")
            return str(out_path)
        else:
            # If you still want to normalize to PDF/A (optional), you can feed it through ocrmypdf with --skip-text
            ocrmypdf.ocr(
                input_file=str(tmp_decrypted),
                output_file=str(out_path),
                language=lang,
                skip_text=True,        # do not OCR existing text
                progress_bar=False
            )
            tmp_decrypted.unlink(missing_ok=True)
            print(f"[OK] Decrypted + normalized (skip_text): {out_path}")
            return str(out_path)
    else:
        # No selectable text -> OCR it
        ocr_to_searchable_pdf(str(tmp_decrypted), str(out_path), lang=lang)
        tmp_decrypted.unlink(missing_ok=True)
        print(f"[OK] OCR complete: {out_path}")
        return str(out_path)

# ===== Run it =====
# Example usage:
# - encrypted input: "input_encrypted.pdf"
# - final output:    "output_searchable.pdf"
# - language packs:  "eng" or "eng+deu" etc.
try:
    produced = process_encrypted_pdf(
        encrypted_pdf="./gmail_attachments_poc/20250914.pdf",
        output_pdf="output_searchable.pdf",
        lang="eng",           # change to "eng+fil" etc. if you installed those packs
        keep_plain_copy=False # True = just decrypt if text already exists
    )
except Exception as e:
    print("Error:", e, file=sys.stderr)
    raise


GPL Ghostscript 10.02.1 (2023-11-01)
Copyright (C) 2023 Artifex Software, Inc.  All rights reserved.
This software is supplied under the GNU AGPLv3 and comes with NO WARRANTY:
see the file COPYING for details.
Processing pages 1 through 4.
Page 1
Loading font Helvetica (or substitute) from /usr/share/ghostscript/10.02.1/Resource/Font/NimbusSans-Regular
Page 2
GPL Ghostscript 10.02.1: Annotation set to non-printing,
 not permitted in PDF/A, annotation will not be present in output file
Page 3
Page 4

The following errors were encountered at least once while processing this file:
	stream inherited a resource

   
 This file had errors that were repaired or ignored.
   
 Please notify the author of the software that produced this
   
 file that it does not conform to Adobe's published PDF
   
 specification.


  warn(msg)


[OK] Decrypted + normalized (skip_text): output_searchable.pdf


In [59]:
from __future__ import annotations
import re, os, tempfile
from pathlib import Path
from typing import Optional, List, Tuple, Dict
from dateutil import parser as dtparser

import pikepdf
import fitz  # PyMuPDF
from pdf2image import convert_from_path
import pytesseract

# -------------------------------
# 1) Helpers: decrypt + text extraction
# -------------------------------
def decrypt_to_temp(encrypted_pdf: str, password: str) -> str:
    """Decrypts to a temporary PDF path and returns its filename."""
    tmp = Path(tempfile.gettempdir()) / (Path(encrypted_pdf).stem + ".decrypted.tmp.pdf")
    with pikepdf.open(encrypted_pdf, password=password) as pdf:
        pdf.save(str(tmp))
    return str(tmp)

def pdf_text_lines(pdf_path: str, max_pages: Optional[int] = None) -> List[str]:
    """Extract text lines with PyMuPDF (selectable text)."""
    lines: List[str] = []
    with fitz.open(pdf_path) as doc:
        n = len(doc) if max_pages is None else min(max_pages, len(doc))
        for i in range(n):
            page = doc.load_page(i)
            # Use 'text' (layout-agnostic) or 'blocks' if you need coordinates
            txt = page.get_text("text")
            if txt:
                lines.extend(s for s in txt.splitlines() if s.strip())
    return lines

def ocr_text_lines(pdf_path: str, dpi: int = 300, lang: str = "eng", max_pages: Optional[int] = None) -> List[str]:
    """OCR each page (no Ghostscript needed). Requires poppler + Tesseract installed."""
    images = convert_from_path(pdf_path, dpi=dpi)
    if max_pages is not None:
        images = images[:max_pages]
    lines: List[str] = []
    for idx, img in enumerate(images, 1):
        txt = pytesseract.image_to_string(img, lang=lang)
        lines.extend(s for s in txt.splitlines() if s.strip())
    return lines

def get_text_lines_smart(encrypted_pdf: str, password: str, lang: str = "eng") -> Tuple[List[str], str]:
    """Decrypt, try PyMuPDF; if no text found, fall back to OCR."""
    dec_path = decrypt_to_temp(encrypted_pdf, password)
    lines = pdf_text_lines(dec_path)
    if not any(lines):
        # Fallback to OCR (can be slow on big PDFs; adjust dpi/lang as needed)
        lines = ocr_text_lines(dec_path, dpi=300, lang=lang)
    return lines, dec_path

# -------------------------------
# 2) Parsing logic (Due Date & Amount)
# -------------------------------
DATE_KEYWORDS = [
    r"due\s*date", r"payment\s*due", r"pay\s*by", r"statement\s*due", r"bill\s*due",
    r"payment\s*deadline", r"date\s*due"
]
AMOUNT_KEYWORDS_PRIMARY = [
    r"total\s+amount\s+due", r"amount\s+due", r"total\s+due", r"statement\s+balance",
    r"outstanding\s+balance", r"current\s+balance"
]
AMOUNT_KEYWORDS_AVOID = [
    r"minimum\s+amount\s+due", r"minimum\s+due"
]

CURRENCY_SYMS = r"(?:₱|\bPHP\b|(?<!\S)Php)"
AMOUNT_NUM = r"(?:\d{1,3}(?:,\d{3})+|\d+)(?:\.\d{2})?"
AMOUNT_RX = re.compile(rf"{CURRENCY_SYMS}?\s*({AMOUNT_NUM})", re.IGNORECASE)

def normalize_amount(s: str) -> Optional[float]:
    m = AMOUNT_RX.search(s)
    if not m:
        return None
    num = m.group(1).replace(",", "")
    try:
        return float(num)
    except ValueError:
        return None

def parse_date_any(s: str) -> Optional[str]:
    """Return ISO date (YYYY-MM-DD) if parsable."""
    s = s.strip()
    # Try several liberal parses (handles 'Oct 15, 2025', '15 Oct 2025', '10/15/2025')
    for dayfirst in (False, True):
        try:
            dt = dtparser.parse(s, dayfirst=dayfirst, fuzzy=True, yearfirst=False)
            return dt.date().isoformat()
        except Exception:
            continue
    return None

def find_nearby(lines: List[str], idx: int, window: int = 2) -> List[str]:
    L = max(0, idx - window)
    R = min(len(lines), idx + window + 1)
    return lines[L:R]

def match_any(line: str, patterns: List[str]) -> Optional[re.Match]:
    for pat in patterns:
        m = re.search(pat, line, flags=re.IGNORECASE)
        if m:
            return m
    return None

def extract_due_and_amount(lines: List[str]) -> Dict[str, Optional[str]]:
    """
    Strategy:
      - For due date: look for date keywords; parse date on the same line or the next 2 lines.
      - For amount: prioritize PRIMARY keywords; avoid 'Minimum Amount Due' if a better one exists.
      - If conflicting amounts exist, prefer the one nearest to a primary keyword.
    """
    due_date_iso: Optional[str] = None
    amount_value: Optional[float] = None
    amount_source: Optional[str] = None

    # Pass 1: Due Date
    for i, line in enumerate(lines):
        if match_any(line, DATE_KEYWORDS):
            # Try same line first
            dd = parse_date_any(line)
            if dd:
                due_date_iso = dd
                break
            # Try next lines (within small window)
            for ctx in find_nearby(lines, i, window=2):
                if ctx == line:
                    continue
                dd = parse_date_any(ctx)
                if dd:
                    due_date_iso = dd
                    break
            if due_date_iso:
                break

    # Pass 2: Amounts - collect candidates with simple scoring
    candidates: List[Tuple[float, int, str]] = []  # (amount, score, context)
    for i, line in enumerate(lines):
        # Skip minimum due if possible
        if match_any(line, AMOUNT_KEYWORDS_AVOID):
            am = normalize_amount(line)
            if am is not None:
                # Lower score for minimum due
                candidates.append((am, 1, line))
            continue

        pri_hit = match_any(line, AMOUNT_KEYWORDS_PRIMARY)
        am = normalize_amount(line)
        if am is not None:
            score = 3 if pri_hit else 2
            # small bonus if currency symbol present
            if re.search(CURRENCY_SYMS, line, flags=re.IGNORECASE):
                score += 1
            candidates.append((am, score, line))

        # Look-ahead: amount on next line after a primary keyword
        if pri_hit and i + 1 < len(lines):
            am2 = normalize_amount(lines[i + 1])
            if am2 is not None:
                candidates.append((am2, 4, lines[i] + " | " + lines[i + 1]))

    if candidates:
        # Pick highest score; if tie, pick the largest amount (credit card/utility “Total Due” is usually max)
        candidates.sort(key=lambda t: (t[1], t[0]), reverse=True)
        amount_value, _, amount_source = candidates[0]

    return {
        "due_date": due_date_iso,                          # e.g., "2025-10-15"
        "amount": f"{amount_value:.2f}" if amount_value is not None else None,  # stringified amount
        "amount_context": amount_source                    # the line where we found it (useful for debugging)
    }

# -------------------------------
# 3) One-call entry point
# -------------------------------
def extract_bill_fields(encrypted_pdf: str, password: str, lang: str = "eng") -> Dict[str, Optional[str]]:
    lines, dec_path = get_text_lines_smart(encrypted_pdf, password, lang=lang)
    out = extract_due_and_amount(lines)
    # Clean up decrypted temp file
    try:
        os.remove(dec_path)
    except Exception:
        pass
    return out

# -------------------------------
# Example usage
# -------------------------------
result = extract_bill_fields("./gmail_attachments_poc/20250914.pdf", password="20Oct1997814614", lang="eng")
print(result)
# -> {'due_date': '2025-10-17', 'amount': '12345.67', 'amount_context': 'Total Amount Due  ₱12,345.67'}


{'due_date': '2025-10-06', 'amount': '31833.61', 'amount_context': 'Total Due | 31,833.61'}


In [57]:
produced

'output_searchable.pdf'