# Document Extractor

This notebook contains the document extraction pipeline used by the `document_processor` agent. It includes code to:

- Load PDFs and images.
- Preprocess documents (OCR, image correction, cropping).
- Extract structured fields and tables.
- Save extracted results to the project data directory.

Changes made

- Ensure any helper functions or configuration imports use the `src` package imports (e.g., `from src.config import ...`) so the notebook runs when the repository root is in `PYTHONPATH` or run from the project root.

How to run

1. Activate the project's virtual environment:

```bash
source venv/bin/activate
```

2. From the repository root run the notebook (e.g., via Jupyter Lab) or execute cells in order. Make sure `src/` is on `PYTHONPATH` if running cells from a different working directory.

Notes

- If you added or modified helper modules, confirm they are imported at the top of the notebook. If imports fail, run `export PYTHONPATH=. ` (or use the Python module runner) from the project root.
- This cell was added automatically to document the code changes; update it with more details if you add new extraction steps or dependencies.


In [None]:
# --- Optional: install (Colab) ---
#%pip install pdfplumber pandas openpyxl

#from tkinter import Tk, filedialog

#Tk().withdraw()  # hides the root window
#pdf_path = filedialog.askopenfilename(
#    title="Select a PDF file",
#    filetypes=[("PDF files", "*.pdf")]
#)
 
#print("Selected:", pdf_path)

import json, re, os
import pdfplumber
import pandas as pd

# ========= SET YOUR PDF PATH HERE =========
pdf_path = r"D:\git\utility-billing-ai\data\raw\National Grid Usage Statement-With Overcharge.pdf"

#pdf_path = "National Grid Usage Statement-With Overcharge.pdf"
#pdf_path = "National Grid Usage Statement-Without Overcharge.pdf"
assert os.path.exists(pdf_path), f"PDF not found at: {pdf_path}"

# ========= YOUR CHAIN OBJECTS =========
# Assumes you already defined: SchemaMain, llm, and create_extraction_chain(SchemaMain, llm)
# If these aren't defined yet, define/import them before running this cell.

# ---------------- helpers ----------------
DATE_RE = re.compile(r"^\d{1,2}/\d{1,2}/\d{2,4}$")

def rebuild_text_from_words(page, *, crop_to_table=True):
    """
    Rebuild text by (1) extracting words w/ positions, (2) optionally cropping to the
    table body using a simple heuristic, (3) grouping by line (y) and sorting by x.
    Returns (page_raw_text, table_text).
    """
    # Pull words with geometry; keep blanks and flow left-to-right
    words = page.extract_words(
        x_tolerance=1.5, y_tolerance=3,
        keep_blank_chars=True, use_text_flow=True,
        extra_attrs=["size"]
    )

    # Raw page text (best-effort reconstruction) — *never* altered later
    raw_lines = {}
    for w in words:
        y = round(w["top"], 1)
        raw_lines.setdefault(y, []).append(w)
    raw_text = "\n".join(
        " ".join(w["text"] for w in sorted(raw_lines[y], key=lambda z: z["x0"]))
        for y in sorted(raw_lines)
    )

    # Heuristic crop to the table body to avoid header shards leaking into rows:
    # - find the first line that contains a date token -> that's the top of the table body
    # - find the first line that contains 'Page' -> that's the footer (bottom bound)
    y_with_date = []
    for w in words:
        if DATE_RE.match(w["text"]):
            y_with_date.append(w["top"])
    table_top = min(y_with_date) - 2 if y_with_date else 0

    y_page_lines = [w["top"] for w in words if w["text"].lower() == "page"]
    table_bottom = min(y_page_lines) - 2 if y_page_lines else page.height

    tbl_words = [
        w for w in words
        if (not crop_to_table) or (table_top <= w["top"] <= table_bottom)
    ]

    # Rebuild table-only text
    tbl_lines = {}
    for w in tbl_words:
        y = round(w["top"], 1)
        tbl_lines.setdefault(y, []).append(w)
    table_text = "\n".join(
        " ".join(w["text"] for w in sorted(tbl_lines[y], key=lambda z: z["x0"]))
        for y in sorted(tbl_lines)
    )

    return raw_text, table_text

# ---------------- 1) Run your chain over each page ----------------
final = []
laparams = {"char_margin": 2.0, "word_margin": 0.1, "line_margin": 0.3}  # tighter word joins
with pdfplumber.open(pdf_path, laparams=laparams) as pdf:
    for pi, page in enumerate(pdf.pages, start=1):
        print(f"on page {pi}")
        # Rebuild robust text from absolute word positions
        page_raw, table_text = rebuild_text_from_words(page, crop_to_table=True)

        # Keep both: truly raw (best effort) & table-only (cleaner for LLM)
        try:
            inp = table_text.strip() or page_raw  # prefer the table body
            chain = create_extraction_chain(SchemaMain, llm)
            output = chain.run(inp)
            # also store the raw page text for auditability
            final.append({"_page_index": pi-1, "_raw_text": page_raw, **(output if isinstance(output, dict) else {"_raw_response": output})})
        except Exception as e:
            print(f"Error processing page {pi}: {e}")
            final.append({"_error": str(e), "_page_index": pi-1, "_raw_text": page_raw})

# ---------------- 2) Normalize chain outputs ----------------
def extract_json(obj):
    if isinstance(obj, (dict, list)):
        return obj
    if isinstance(obj, str):
        s = obj.strip()
        try:
            return json.loads(s)
        except Exception:
            start_obj = s.find("{"); end_obj = s.rfind("}")
            start_arr = s.find("["); end_arr = s.rfind("]")
            cand = None
            if start_obj != -1 and end_obj > start_obj: cand = s[start_obj:end_obj+1]
            elif start_arr != -1 and end_arr > start_arr: cand = s[start_arr:end_arr+1]
            if cand:
                try: return json.loads(cand)
                except Exception: return {"_raw_response": s}
            return {"_raw_response": s}
    return {"_raw_response": str(obj)}

objs = [extract_json(x) for x in final]

# Save raw NDJSON (includes _raw_text) so nothing is ever lost/overwritten
with open("llm_page_outputs.ndjson", "w", encoding="utf-8") as f:
    for o in objs:
        f.write(json.dumps(o, ensure_ascii=False) + "\n")

# ---------------- 3) Build the Summary sheet (top-level only) ----------------
summary_df = pd.json_normalize(objs, max_level=1)

# Drop nested lists/dicts from the summary
nested_cols = [c for c in summary_df.columns
               if summary_df[c].apply(lambda v: isinstance(v, (list, dict))).any()]
summary_df = summary_df.drop(columns=nested_cols, errors="ignore")

# Light parsing for dates/numbers (keeps original values in NDJSON)
def to_num(x):
    if pd.isna(x): return pd.NA
    s = str(x).replace("$","").replace(",","").strip()
    return pd.to_numeric(s, errors="coerce")

date_like = [c for c in summary_df.columns if re.search(r"(date|period|start|end|due)", c, re.I)]
for c in date_like:
    try: summary_df[c] = pd.to_datetime(summary_df[c], errors="coerce")
    except Exception: pass

num_like = [c for c in summary_df.columns if re.search(r"(kwh|amount|charge|tax|rate|demand|rkva|usage|total|balance|price|qty)", c, re.I)]
for c in num_like:
    summary_df[c] = summary_df[c].apply(to_num)

# ---------------- 4) Split repeated list keys to extra sheets ----------------
list_keys = set()
for o in objs:
    if isinstance(o, dict):
        for k, v in o.items():
            if isinstance(v, list):
                list_keys.add(k)

sheets = {"Summary": summary_df}
for key in list_keys:
    frames = []
    for idx, o in enumerate(objs):
        if not isinstance(o, dict) or key not in o or not isinstance(o[key], list):
            continue
        meta = {mk: mv for mk, mv in o.items() if not isinstance(mv, (list, dict))}
        df_li = pd.json_normalize(o, record_path=[key])
        df_li["SourcePageIndex"] = idx
        for mk, mv in meta.items():
            df_li[mk] = mv
        frames.append(df_li)
    if frames:
        li_df = pd.concat(frames, ignore_index=True)
        for c in li_df.columns:
            if re.search(r"(kwh|qty|amount|charge|tax|rate|demand|rkva|usage|total|price|unit)", str(c), re.I):
                li_df[c] = li_df[c].apply(to_num)
        sheets[key[:31] or "Items"] = li_df

# ---------------- 5) Save files ----------------
summary_df.to_csv("bill_extraction_summary.csv", index=False)

with pd.ExcelWriter("bill_extraction.xlsx", engine="openpyxl") as writer:
    for name, df in sheets.items():
        safe = re.sub(r"[:\\/?*\[\]]", "_", name)[:31] or "Sheet"
        df.to_excel(writer, sheet_name=safe, index=False)

print("Wrote:")
print(" - bill_extraction_summary.csv")
print(" - bill_extraction.xlsx (Summary + one sheet per nested list)")
print(" - llm_page_outputs.ndjson (raw per-page with _raw_text)")


on page 1
Error processing page 1: name 'create_extraction_chain' is not defined
on page 2
Error processing page 2: name 'create_extraction_chain' is not defined
on page 3
Error processing page 3: name 'create_extraction_chain' is not defined
on page 4
Error processing page 4: name 'create_extraction_chain' is not defined
Wrote:
 - bill_extraction_summary.csv
 - bill_extraction.xlsx (Summary + one sheet per nested list, e.g., charges/line_items)
 - llm_page_outputs.ndjson (raw per-page LLM outputs for debugging)


In [None]:
# ==== Utility bill "Monthly Electric History" -> CSV ====
# Strong cleanup + diagnostics so we can see why rows were zero.

import pandas as pd, re, unicodedata
from datetime import datetime
from pathlib import Path
from difflib import SequenceMatcher

INPUT_TEXT_CSV = Path("bill_extraction_summary.csv")              # <- ensure this is the CSV with the raw text
OUTPUT_CSV     = Path("bill_extraction_summary_formatted.csv")

DEST_COLS = [
    "Bill Account","Customer","Bill Date","Read Date","Days Used","Billed Kwh",
    "Billed Demand","Load Factor","Billed Rkva","Bill Amount","Sales Tax Amt",
    "Bill Amount w/Sales Tax","Retracted Amt","Sales Tax Factor",
]

# ---------------- helpers ----------------
def normspace(s: str) -> str:
    if s is None: return ""
    s = unicodedata.normalize("NFKC", str(s)).replace("\u00A0"," ")
    return re.sub(r"\s+", " ", s).strip()

DATE      = r"\d{1,2}/\d{1,2}/\d{2,4}"
ACCOUNT   = r"\b\d{7,12}\b"
DATE_RE   = re.compile(rf"^{DATE}$")

def as_date(tok: str):
    tok = tok.strip()
    if not DATE_RE.match(tok): return None
    for fmt in ("%m/%d/%Y","%m/%d/%y","%Y-%m-%d","%Y/%m/%d"):
        try:
            dt = datetime.strptime(tok, fmt)
            return f"{dt.month}/{dt.day}/{dt.year}"
        except: pass
    return tok

def is_intlike(tok: str):  return re.fullmatch(r"^-?[\d,]+$", tok) is not None
def parse_intlike(tok: str):
    m = re.sub(r"[^\d\-]", "", tok)
    if m in ("", "-",): return ""
    return f"{int(m):,}"

def is_num(tok: str):      return re.fullmatch(r"^-?[\d,]*\.?\d+$", tok.replace(",", "")) is not None
def parse_float(tok: str, nd=2):
    s = tok.replace(",", "")
    try:
        v = float(s); out = f"{v:.{nd}f}".rstrip("0").rstrip("."); return out
    except: return tok

def is_money(tok: str):    return re.fullmatch(r"^\$?-?[\d,]*\.?\d+$", tok) is not None
def parse_money(tok: str):
    s = tok.replace("$","").replace(",","")
    if s in ("", "-", ".", "-."): return ""
    try: return f"${float(s):,.2f}"
    except: return tok

def pull(text, pats, group=1, default=""):
    for pat in pats:
        m = re.search(pat, text, flags=re.IGNORECASE)
        if m: return normspace(m.group(group))
    return default

def is_page_footer(tokens, k):
    return (k + 3 < len(tokens) and tokens[k].lower() == "page"
            and re.fullmatch(r"[\d,]+", tokens[k+1] or "")
            and tokens[k+2].lower() == "of"
            and re.fullmatch(r"[\d,]+", tokens[k+3] or ""))

# ---------------- load text ----------------
df_raw = pd.read_csv(INPUT_TEXT_CSV, encoding="utf-8-sig", engine="python", dtype=str, keep_default_na=False)
full_text = "\n".join(df_raw.apply(lambda r: " ".join(r.astype(str)), axis=1))
full_text = normspace(full_text)

# ---- Strong normalization ----
# 0) fix weird header shards that invade dates (Buffalo-style)
full_text = re.sub(r"A/1R", "/1", full_text)   # 4A/1R6/2021 -> 4/16/2021
full_text = re.sub(r"A0R/", "0/", full_text)   # 1A0R/14/2021 -> 10/14/2021
# generic variants
full_text = re.sub(r"(?<=\d)\s*[A-Z]{1,3}\s*/\s*(?=\d{1,2}/\d{2,4})", "/", full_text)
full_text = re.sub(r"(?<=\d)\s*A/?(\d)R(?=\d)", r"\1", full_text)

# 1) ensure space between ANY account and the start of an ALL-CAPS name
full_text = re.sub(rf"({ACCOUNT})(?=[A-Z])", r"\1 ", full_text)

# 2) ensure space between ANY letters and a date that follows immediately (e.g., 'DEP4/16/2021' -> 'DEP 4/16/2021')
full_text = re.sub(rf"([A-Z])(?={DATE})", r"\1 ", full_text)

# 3) split glued dates (…/YY)(MM/…) -> add a space
full_text = re.sub(rf"({DATE})(?={DATE})", r"\1 ", full_text)

# 4) start new potential row at each account number
full_text = re.sub(rf"\s*(?={ACCOUNT})", "\n", full_text)

# ---------------- robust customer inference ----------------
def _norm_caps(s: str) -> str:
    s = re.sub(r"\s+", " ", s or "").strip()
    return re.sub(r"[^A-Z0-9 &'.,\-\/]", "", s.upper())

def _sim(a, b): return SequenceMatcher(None, a, b).ratio()

def _extract_header_customer(text: str) -> str:
    m = re.search(r"Customer[:\s]+(.+?)(?=\s+(?:Post Office:|Service Address:|Bill Account:|Monthly|Page\s+\d+|$))",
                  text, flags=re.IGNORECASE|re.DOTALL)
    return _norm_caps(m.group(1)) if m else ""

def _extract_row_customers(text: str):
    cands = []
    for m in re.finditer(rf"{ACCOUNT}\s*([A-Z0-9 &'.,\-\/]{{3,}}?)\s+(?={DATE})", text):
        cands.append(_norm_caps(m.group(1)))
    return cands

def _merge_customer(header_cand: str, row_cands):
    if not row_cands:  # fallback
        return header_cand
    freq = {}
    for c in row_cands:
        if c: freq[c] = freq.get(c, 0) + 1
    best = sorted(freq.items(), key=lambda kv: (kv[1], len(kv[0])), reverse=True)[0][0]
    if header_cand and best.startswith(header_cand) and len(best) > len(header_cand):
        return best
    if header_cand:
        scored = sorted(freq.keys(), key=lambda c: (_sim(header_cand, c), len(c)), reverse=True)
        if scored and _sim(header_cand, scored[0]) >= 0.80 and len(scored[0]) >= len(header_cand):
            return scored[0]
    return best

def infer_customer_name(text: str) -> str:
    header = _extract_header_customer(text)
    rows   = _extract_row_customers(text)
    return _merge_customer(header, rows)

bill_account = pull(full_text, [rf"Bill Account[:\s]+({ACCOUNT})"])
customer     = infer_customer_name(full_text)

# ---------------- diagnostics (so we see why rows might be zero) ----------------
acc_hits = len(re.findall(ACCOUNT, full_text))
date_pairs = list(re.finditer(rf"{DATE}\s+{DATE}", full_text))
print(f"accounts found: {acc_hits}")
print(f"date-pairs found: {len(date_pairs)}")
print("first 300 chars after cleanup:\n", full_text[:300], "\n")

for m in date_pairs[:5]:
    a, b = m.span()
    s = max(0, a-50); e = b+50
    print("…", full_text[s:e], "…")

# ---------------- tokenization + parsing ----------------
tokens = full_text.split()
rows = []
i, N = 0, len(tokens)

while i + 1 < N:
    d1 = as_date(tokens[i])
    d2 = as_date(tokens[i+1]) if d1 else None
    if not (d1 and d2):
        i += 1
        continue

    j = i + 2
    if j >= N or not is_intlike(tokens[j]): i += 1; continue
    days = parse_intlike(tokens[j]); j += 1

    if j >= N or not is_intlike(tokens[j].lstrip("-")): i += 1; continue
    kwh = parse_intlike(tokens[j]); j += 1

    if j >= N or not is_num(tokens[j]): i += 1; continue
    demand = parse_float(tokens[j], nd=1); j += 1

    if j >= N or not is_num(tokens[j]): i += 1; continue
    load_factor = parse_float(tokens[j], nd=2); j += 1

    if j >= N or not is_intlike(tokens[j]): i += 1; continue
    rkva = parse_intlike(tokens[j]); j += 1

    if j >= N or not is_money(tokens[j]): i += 1; continue
    bill_amt = parse_money(tokens[j]); j += 1

    if j >= N or not is_money(tokens[j]): i += 1; continue
    sales_tax_amt = parse_money(tokens[j]); j += 1

    if j >= N or not (is_money(tokens[j]) or is_num(tokens[j])): i += 1; continue
    bill_with_tax = parse_money(tokens[j]) if is_money(tokens[j]) else parse_money(tokens[j]); j += 1

    # ghost money BEFORE retracted
    while (j + 2 < N and (is_money(tokens[j]) or is_num(tokens[j]))
           and (is_money(tokens[j+1]) or is_num(tokens[j+1]))
           and (is_num(tokens[j+2]) or as_date(tokens[j+2]) or tokens[j+2].lower()=="page")):
        j += 1

    if j >= N or not (is_money(tokens[j]) or is_num(tokens[j])): i += 1; continue
    retracted_amt = parse_money(tokens[j]); j += 1

    # ghost money AFTER retracted
    while j < N and is_money(tokens[j]):
        nxt = tokens[j+1] if j+1 < N else ""
        if is_num(nxt) or as_date(nxt) or nxt.lower() == "page":
            break
        j += 1

    # find factor by scanning to boundary
    k = j
    boundary = N
    while k < N:
        if as_date(tokens[k]) or is_page_footer(tokens, k) \
           or re.match(ACCOUNT, tokens[k] or "") or tokens[k] in ("Bill","Customer","Monthly"):
            boundary = k
            break
        k += 1

    factor = ""
    for t in range(j, min(boundary, j+12)):
        if is_num(tokens[t]) and not is_money(tokens[t]):
            try: val = float(tokens[t].replace(",",""))
            except: val = 9999
            if val <= 100:
                factor = parse_float(tokens[t], nd=2)
                j = t + 1
                break
    if factor == "":
        for t in range(boundary-1, max(j-1, boundary-12), -1):
            if is_num(tokens[t]) and not is_money(tokens[t]):
                try: val = float(tokens[t].replace(",",""))
                except: val = 9999
                if val <= 100:
                    factor = parse_float(tokens[t], nd=2)
                    j = t + 1
                    break

    while j < N:
        if as_date(tokens[j]): j += 1; continue
        if is_page_footer(tokens, j): j += 4; continue
        break

    rows.append({
        "Bill Account": bill_account or "",
        "Customer":     customer or "",
        "Bill Date":    d1,
        "Read Date":    d2,
        "Days Used":    days,
        "Billed Kwh":   kwh,
        "Billed Demand":demand,
        "Load Factor":  load_factor,
        "Billed Rkva":  rkva,
        "Bill Amount":  bill_amt,
        "Sales Tax Amt":sales_tax_amt,
        "Bill Amount w/Sales Tax": bill_with_tax,
        "Retracted Amt":retracted_amt,
        "Sales Tax Factor": factor,
    })
    i = j

# ---------------- output ----------------
df_out = pd.DataFrame(rows)
for c in DEST_COLS:
    if c not in df_out.columns: df_out[c] = ""
df_out = df_out[DEST_COLS]
df_out.to_csv(OUTPUT_CSV, index=False, encoding="utf-8")
print(f"\n✅ Parsed {len(df_out)} rows -> {OUTPUT_CSV.resolve()}")
print(df_out.head(30))


Parsed 73 rows -> D:\git\utility-billing-ai\src\agents\document_processor\bill_extraction_summary_formatted.csv


Unnamed: 0,Bill Account,Customer,Bill Date,Read Date,Days Used,Billed Kwh,Billed Demand,Load Factor,Billed Rkva,Bill Amount,Sales Tax Amt,Bill Amount w/Sales Tax,Retracted Amt,Sales Tax Factor
0,3288390002,TOWN OF HALFMOON,7/9/2019,7/2/2019,28,15680,44.0,0.53,0,$702.11,$0.00,$702.11,$0.00,
1,3288390002,TOWN OF HALFMOON,8/8/2019,8/2/2019,31,19120,48.8,0.53,0,$806.48,$0.00,$806.48,$0.00,
2,3288390002,TOWN OF HALFMOON,9/10/2019,9/4/2019,33,18720,44.0,0.54,0,$694.54,$0.00,$694.54,$0.00,
3,3288390002,TOWN OF HALFMOON,10/8/2019,10/2/2019,28,15520,50.4,0.46,0,$740.16,$0.00,$740.16,$0.00,
4,3288390002,TOWN OF HALFMOON,11/6/2019,10/31/2019,29,16960,55.2,0.44,0,$831.72,$0.00,$831.72,$0.00,
5,3288390002,TOWN OF HALFMOON,12/9/2019,12/3/2019,33,29600,80.8,0.46,0,"$1,245.98",$0.00,"$1,245.98",$0.00,
6,3288390002,TOWN OF HALFMOON,1/9/2020,1/3/2020,31,34000,88.8,0.51,0,"$1,354.17",$0.00,"$1,354.17",$0.00,
7,3288390002,TOWN OF HALFMOON,2/7/2020,2/3/2020,31,33440,90.4,0.5,0,"$1,338.99",$0.00,"$1,338.99",$0.00,
8,3288390002,TOWN OF HALFMOON,3/9/2020,3/3/2020,29,33040,85.6,0.55,0,"$1,325.69",$0.00,"$1,325.69",$0.00,
9,3288390002,TOWN OF HALFMOON,4/8/2020,4/2/2020,30,25360,81.6,0.43,0,"$1,209.09",$0.00,"$1,209.09",$0.00,
