In [25]:
import os, json, time, sqlite3, warnings
from typing import Dict, Any, List
import re
import easyocr
from euriai.langgraph import EuriaiLangGraph
from dotenv import load_dotenv

In [26]:

EURI_API_KEY    = os.getenv("EURI_API_KEY")
MODEL           = "gpt-4.1-nano"

In [27]:
INPUT_DIR       = "invoices" # drectory where invoices stored.
DB_PATH         = "invoice.sqlite" # Database path
PROCESSED_LOG   = "processed.json" # Keeps the logs of all the processed file.
POLL_SEC        = 5 # it keep on checking invoice folder in every 5 seconds.

In [28]:
os.makedirs(INPUT_DIR, exist_ok=True)

In [30]:
"""
This function return the all the data from PROCESSED_LOG
if PROCESSED_LOG is not exist, then it return empty set
if PROCESSED_LOG exist, it return all the data
"""
def load_seen() -> set:
    if not os.path.exists(PROCESSED_LOG):
        return set()
    try:
        with open(PROCESSED_LOG, 'r', encoding="utf-8") as f:
            return set(json.load(f))
    except Exception:
        return set()

In [31]:
""""
this function write the seen data in PROCESSED_LOG
"""
def save_seen(seen: set) -> None:
    with open(PROCESSED_LOG, "w", encoding="utf-8") as f:
        json.dump(sorted(list(seen)), f, ensure_ascii=False, indent=2)

seen = load_seen()

In [32]:
''''
We have to create a function which will create a table inside database. So that file can stored in structured format.
'''

def ensure_schema():
    con = sqlite3.connect(DB_PATH) # Make connection with Database.
    cur = con.cursor() # Create cursor. Cursor allows to exceute SQL commands
    # Create if missing
    cur.execute(
        """
CREATE TABLE IF NOT EXISTS invoices(
id INTEGER PRIMARY KEY AUTOINCREMENT,
file_name TEXT,
vendor TEXT,
number TEXT,
date TEXT,
total REAL,
currency TEXT,
raw_json TEXT
)
"""
    )
    cur.execute("PRAGMA table_info(invoices);") # retrieves detailed information about the columns of the table "invoices"
    cols = {row[1] for row in cur.fetchall()} # This fetch all columns name -- id, file_name, vendor, number, date, total, currency, raw_json
    if "file_name" not in cols:
        cur.execute("ALTER TABLE invoices ADD COLUMN file_name TEXT;")
    if "raw_json" not in cols:
        cur.execute("ALTER TABLE invoices ADD COLUMN raw_json TEXT;")
    con.commit()
    con.close()
ensure_schema()

In [33]:
# Define a variable for easyocr
ocr = easyocr.Reader(["en"], gpu=False)

Using CPU. Note: This module is much faster with a GPU.


In [34]:
# Accessing the LLM

clean_graph = EuriaiLangGraph(api_key=EURI_API_KEY, default_model=MODEL)

In [35]:
# Creating Node name CLEAN
clean_graph.add_ai_node(
    "CLEAN",
    """
You clean noisy OCR to plain text.
- Keep facts.
- No guessing.
- Keep table rows readable.
OCR:
{ocr_text}"""
)
clean_graph.set_entry_point("CLEAN")
clean_graph.set_finish_point("CLEAN")

Added AI node: CLEAN (model: gpt-4.1-nano)
Set entry point: CLEAN
Set finish point: CLEAN


In [36]:
extract_graph = EuriaiLangGraph(api_key=EURI_API_KEY, default_model=MODEL)
extract_graph.add_ai_node(
    "EXTRACT",
    """
FROM CLEAN_TEXT, return STRICT JSON with keys exactly:
vendor, number, date, total, currency,
line_items (list of {{description, quantity, unit_price, amount}}).

Unknown -> null. Numbers numeric. Dates YYYY-MM-DD if possible.
JSON ONLY, no extra text.

CLEAN_TEXT:
{clean_text}"""
)
extract_graph.set_entry_point("EXTRACT")
extract_graph.set_finish_point("EXTRACT")

Added AI node: EXTRACT (model: gpt-4.1-nano)
Set entry point: EXTRACT
Set finish point: EXTRACT


In [37]:
def pick_text(x,*,prefer_key=None):
    """Return a plain string from various possible structure.
    If x is a dict, try prefer_key or common key; else stringify.
    """
    if isinstance(x, str):
        return x
    if isinstance(x, dict):
        if prefer_key and prefer_key in x and isinstance(x[prefer_key], str):
            return x[prefer_key]
        for k in ("output", "text", "CLEAN_output", "EXTRACT_output"):
            if k in x and isinstance(x[k], str):
                return x[k]
        return json.dumps(x, ensure_ascii=False)
    return str(x)

In [38]:
# JSON Parser:
'''This function try to parse the JSON object from string'''

def parser_json_safe(raw):
    """Parse JSON robustly. Accepts dict or str; falls back to substring."""
    if isinstance(raw, dict):
        return raw
    if not isinstance(raw, str):
        return {"__raw__": raw}
    try:
        return json.loads(raw)
    except Exception:
        pass
    try:
        s, e = raw.find("{"), raw.rfind("}") # s: index of "{" and e: index of "}"
        if s != -1 and e != -1 and e > s:
            return json.loads(raw[s:e+1])
    except Exception:
        pass
    return {"__raw__": raw}

In [39]:
def _heuristic_extract(clean_text: str) -> dict:
    """Very simple regex-based extractor to keep DB flowing when AI is down."""
    def find(pat, s):
        m = re.search(pat, s, re.IGNORECASE)
        return m.group(1).strip() if m else None

    vendor = find(r"Vendor:\s*(.+)", clean_text)
    number = find(r"(?:Invoice Number|Invoice No\.?):\s*([A-Za-z0-9\-]+)", clean_text)
    date   = find(r"(?:Invoice Date|Date):\s*([0-9]{4}-[0-9]{2}-[0-9]{2})", clean_text)
    total  = find(r"Total:\s*([0-9]+(?:\.[0-9]+)?)", clean_text)
    curr   = find(r"Total:\s*[0-9]+(?:\.[0-9]+)?\s*([A-Za-z]{3})", clean_text) or find(r"Currency:\s*([A-Za-z]{3})", clean_text)

    try: total = float(total) if total is not None else None
    except: total = None

    return {
        "vendor": vendor,
        "number": number,
        "date":   date,
        "total":  total,
        "currency": curr,
        "line_items": []
    }


In [40]:
# Create OCR Node

def NODE_OCR(file_path: str) -> Dict[str, Any]:
    """Read an Image file and return OCR text"""
    text = "\n".join(ocr.readtext(file_path, detail=0))
    return {"ocr_text": text}

In [41]:
'''Once we get OCR text then we call clean node with falback'''

def NODE_CLEAN(ocr_text: str) -> Dict[str, Any]:
    '''Normalize noisy OCR text using the CLEAN AI Node; fallback to pass-through on error'''
    try:
        clean_raw = clean_graph.run({"ocr_text": ocr_text})
        clean_text = pick_text(clean_raw, prefer_key="CLEAN_output")
        if not isinstance(clean_text, str) or not clean_text.strip():
            raise RuntimeError("Empty CLEAN output")
        return {"clean_text": clean_text, "CLEAN_raw": clean_raw}
    except Exception as e:
        print(f"[CLEAN:FALLBACK] {e}")
        return {"clean_text": ocr_text, "CLEAN_raw": {"fallback": True}}

In [42]:
def NODE_EXTRACT(clean_text_any) -> dict:
    """Extract structured JSON using the EXTRACT AI node; fallback to heuristic on error."""
    clean_text = pick_text(clean_text_any, prefer_key="CLEAN_output")
    try:
        result = extract_graph.run({"clean_text": clean_text})
        raw_json = pick_text(result, prefer_key="EXTRACT_output")
        return {"raw_json": raw_json, "EXTRACT_raw": result}
    except Exception as e:
        print(f"[EXTRACT:FALLBACK] {e}")
        heuristic = _heuristic_extract(clean_text)
        return {"raw_json": json.dumps(heuristic, ensure_ascii=False), "EXTRACT_raw": {"fallback": True}}


In [43]:
'''A validation node ensure that the data has the expected structure and values focusing on a few specific keys and their properties'''

def NODE_VALIDATION(data: Dict[str, Any]) -> Dict[str, Any]:
    '''Minimal schema checks and numeric sanity for demo purpose'''
    issues: List[str] = [] # A list of error messages describing any problems
    for k in ["vendor", "number", "date", "currency"]:
        if k not in data or data.get(k) in (None, ""):
            issues.append(f"missing key: {k}")
    try:
        if data.get("total") is None:
            issues.append("total is null")
        else:
            float(data.get("total"))
    except Exception:
        issues.append(f"total not numeric: {data.get('total')}")
    if not isinstance(data.get("line_items", []), list):
        issues.append("line_items not a list")
    return {"valid": len(issues) == 0, "issues": issues}

In [44]:
# Create Persist Node: to store the data in database

def NODE_PERSIST(file_name:str, data: dict, raw_json_any):
    raw_json_str = json.dumps(raw_json_any, ensure_ascii=False) if not isinstance(raw_json_any, str) else raw_json_any
    con = sqlite3.connect(DB_PATH)
    cur = con.cursor()
    cur.execute(
        """
INSERT INTO invoices(file_name, vendor, number, date, total, currency, raw_json)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(
    file_name,
    data.get("vendor"),
    data.get("number"),
    data.get("date"),
    data.get("total"),
    data.get("currency"),
    raw_json_str,
),
    )
    rowid = cur.lastrowid
    con.commit()
    con.close()
    print(f"[DB] Inserted row id={rowid} at {os.path.abspath(DB_PATH)}")
    return {"db": "sqlite", "rowid": rowid}


In [45]:
# Create Notify Node: it send me the notifications.

def NODE_NOTIFY(file_name:str, data:Dict[str, Any], valid:bool, issues:List[str]) -> None:
    status = "OK" if valid else f"WARN: {issues}"
    print(
        f"[{status}] file='{file_name}' vendor = {data.get("vendor")} number = {data.get("number")} total={data.get("total")} {data.get("currency")}"
    )

In [None]:
# Creating Nodes
NODES = ["WATCH", "OCR", "CLEAN", "EXTRACT", "VALIDATE", "PERSIST", "NOTIFY"]

# Connecting nodes
EDGES = [
    ("WATCH", "OCR"),
    ("OCR", "CLEAN"),
    ("CLEAN", "EXTRACT"),
    ("EXTRACT", "VALIDATE"),
    ("VALIDATE", "PERSIST"),
    ("PERSIST", "NOTIFY"),

]


In [46]:
def run_pipeline_for_file(file_path: str) -> None:
    file_name = os.path.basename(file_path)

    # OCR → CLEAN → EXTRACT
    ocr_out     = NODE_OCR(file_path)
    clean_out   = NODE_CLEAN(pick_text(ocr_out.get("ocr_text")))
    extract_out = NODE_EXTRACT(clean_out.get("clean_text"))

    # Parse JSON → VALIDATE
    raw_json = extract_out["raw_json"]
    data     = parser_json_safe(raw_json)
    val_out  = NODE_VALIDATION(data)

    # Persist + Notify
    NODE_PERSIST(file_name, data, raw_json)
    NODE_NOTIFY(file_name, data, val_out["valid"], val_out["issues"])


In [47]:
print(f"\nWatching '{INPUT_DIR}' every {POLL_SEC}s. Only NEW .png/.jpg will be processed.")
try:
    while True:
        for fname in sorted(os.listdir(INPUT_DIR)):
            if not fname.lower().endswith((".png", ".jpg", ".jpeg")):
                continue
            if fname in seen:
                continue  # already processed (persists across restarts)
            fpath = os.path.join(INPUT_DIR, fname)
            try:
                run_pipeline_for_file(fpath)          # trigger full pipeline
                seen.add(fname); save_seen(seen)      # mark as processed
            except Exception as e:
                print(f"[ERROR] {fname}: {e}")
        time.sleep(POLL_SEC)
except KeyboardInterrupt:
    print("Stopped watching.")


Watching 'invoices' every 5s. Only NEW .png/.jpg will be processed.
Stopped watching.
