In [2]:
!pip install fastapi uvicorn apscheduler pydantic


Collecting apscheduler
  Downloading apscheduler-3.11.1-py3-none-any.whl.metadata (6.4 kB)
Downloading apscheduler-3.11.1-py3-none-any.whl (64 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m64.3/64.3 kB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: apscheduler
Successfully installed apscheduler-3.11.1


In [3]:
import sqlite3

DB = "invoiceflow.db"

def init_db():
    conn = sqlite3.connect(DB)
    c = conn.cursor()
    c.execute(
        "CREATE TABLE IF NOT EXISTS invoices("
        "id TEXT PRIMARY KEY, raw_text TEXT, vendor TEXT, invoice_number TEXT, "
        "date TEXT, subtotal TEXT, tax TEXT, total TEXT, currency TEXT, status TEXT)"
    )
    c.execute(
        "CREATE TABLE IF NOT EXISTS approvals("
        "id TEXT PRIMARY KEY, invoice_id TEXT, approver TEXT, status TEXT, updated_at REAL)"
    )
    conn.commit()
    conn.close()

init_db()
print("Database initialized:", DB)



Database initialized: invoiceflow.db


In [4]:
import asyncio

class OCRAgent:
    async def run(self, file_bytes: bytes) -> str:
        await asyncio.sleep(0.05)
        return "INVOICE\nVendor: Acme Supplies\nDate: 2025-11-01\nInvoice#: INV-12345\nTotal: 1,250.00\nCurrency: INR\n"

ocr_agent = OCRAgent()
print("OCRAgent ready")


OCRAgent ready


In [5]:
import asyncio

class ExtractionAgent:
    async def run(self, text: str):
        await asyncio.sleep(0.05)
        return {
            "vendor": "Acme Supplies",
            "invoice_number": "INV-12345",
            "date": "2025-11-01",
            "subtotal": "1000.00",
            "tax": "250.00",
            "total": "1250.00",
            "currency": "INR",
            "due_date": "2025-12-01"
        }

extraction_agent = ExtractionAgent()
print("ExtractionAgent ready")


ExtractionAgent ready


In [6]:
import time
import uuid
import sqlite3
from apscheduler.schedulers.background import BackgroundScheduler

class ApprovalAgent:
    def __init__(self, scheduler=None):
        self.scheduler = scheduler or BackgroundScheduler()
        if not self.scheduler.running:
            self.scheduler.start()
        self.jobs = {}

    def _create_approval_record(self, approval_id, invoice_id, approver):
        conn = sqlite3.connect("invoiceflow.db")
        c = conn.cursor()
        c.execute("INSERT INTO approvals(id, invoice_id, approver, status, updated_at) VALUES(?,?,?,?,?)",
                  (approval_id, invoice_id, approver, "pending", time.time()))
        conn.commit()
        conn.close()

    def _update_approval_record(self, approval_id, status):
        conn = sqlite3.connect("invoiceflow.db")
        c = conn.cursor()
        c.execute("UPDATE approvals SET status=?, updated_at=? WHERE id=?", (status, time.time(), approval_id))
        conn.commit()
        conn.close()

    def _update_invoice_status(self, invoice_id, status):
        conn = sqlite3.connect("invoiceflow.db")
        c = conn.cursor()
        c.execute("UPDATE invoices SET status=? WHERE id=?", (status, invoice_id))
        conn.commit()
        conn.close()

    def start(self, invoice_id: str, approver: str, interval: int = 20):
        approval_id = str(uuid.uuid4())
        self._create_approval_record(approval_id, invoice_id, approver)
        def reminder():
            print(f"Reminder: Invoice {invoice_id} pending approval by {approver}")
        job_id = str(uuid.uuid4())
        self.scheduler.add_job(reminder, "interval", seconds=interval, id=job_id)
        self.jobs[approval_id] = job_id
        return approval_id

    def complete(self, approval_id: str, status: str):
        job_id = self.jobs.get(approval_id)
        if job_id:
            try:
                self.scheduler.remove_job(job_id)
            except Exception:
                pass
            self.jobs.pop(approval_id, None)
        self._update_approval_record(approval_id, status)
        conn = sqlite3.connect("invoiceflow.db")
        c = conn.cursor()
        c.execute("SELECT invoice_id FROM approvals WHERE id=?", (approval_id,))
        row = c.fetchone()
        conn.close()
        if row:
            invoice_id = row[0]
            self._update_invoice_status(invoice_id, status)

scheduler = BackgroundScheduler()
scheduler.start()
approval_agent = ApprovalAgent(scheduler)
print("ApprovalAgent ready")


ApprovalAgent ready


In [7]:
import uuid

class ManagerAgent:
    def __init__(self, ocr_agent, extraction_agent, approval_agent):
        self.ocr_agent = ocr_agent
        self.extraction_agent = extraction_agent
        self.approval_agent = approval_agent

    async def process_invoice(self, file_bytes: bytes):
        raw_text = await self.ocr_agent.run(file_bytes)
        fields = await self.extraction_agent.run(raw_text)

        invoice_id = str(uuid.uuid4())

        conn = sqlite3.connect("invoiceflow.db")
        c = conn.cursor()
        c.execute("INSERT INTO invoices(id, raw_text, vendor, invoice_number, date, subtotal, tax, total, currency, status) VALUES(?,?,?,?,?,?,?,?,?,?)",
                  (invoice_id, raw_text, fields["vendor"], fields["invoice_number"], fields["date"], fields["subtotal"], fields["tax"], fields["total"], fields["currency"], "pending"))
        conn.commit()
        conn.close()

        approval_id = self.approval_agent.start(invoice_id, "approver@example.com")

        return invoice_id, approval_id, fields

manager = ManagerAgent(ocr_agent, extraction_agent, approval_agent)
print("ManagerAgent ready")


ManagerAgent ready


In [10]:
import sqlite3

def list_invoices():
    conn = sqlite3.connect("invoiceflow.db")
    c = conn.cursor()
    c.execute("SELECT id, vendor, invoice_number, date, total, currency, status FROM invoices ORDER BY rowid DESC")
    rows = c.fetchall()
    conn.close()
    return [
        {
            "id": r[0],
            "vendor": r[1],
            "invoice_number": r[2],
            "date": r[3],
            "total": r[4],
            "currency": r[5],
            "status": r[6]
        }
        for r in rows
    ]

print("list_invoices() ready")


list_invoices() ready


In [11]:
import asyncio
import json

async def test_invoiceflow():
    fake_file_bytes = b"fake invoice data"
    invoice_id, approval_id, fields = await manager.process_invoice(fake_file_bytes)

    print("Invoice ID:", invoice_id)
    print("Approval ID:", approval_id)
    print("Extracted Fields:\n", json.dumps(fields, indent=2))

    print("\nCurrent invoices in DB:")
    print(list_invoices())

await test_invoiceflow()


Invoice ID: ece27299-170d-4ad5-af5f-3a0a3cda3d3e
Approval ID: 4d5fdccd-4570-45c7-8307-da6f02aa7f13
Extracted Fields:
 {
  "vendor": "Acme Supplies",
  "invoice_number": "INV-12345",
  "date": "2025-11-01",
  "subtotal": "1000.00",
  "tax": "250.00",
  "total": "1250.00",
  "currency": "INR",
  "due_date": "2025-12-01"
}

Current invoices in DB:
[{'id': 'ece27299-170d-4ad5-af5f-3a0a3cda3d3e', 'vendor': 'Acme Supplies', 'invoice_number': 'INV-12345', 'date': '2025-11-01', 'total': '1250.00', 'currency': 'INR', 'status': 'pending'}, {'id': 'fae50abe-e8e0-4d7a-8e87-6f7e0a6b766e', 'vendor': 'Acme Supplies', 'invoice_number': 'INV-12345', 'date': '2025-11-01', 'total': '1250.00', 'currency': 'INR', 'status': 'pending'}]
Reminder: Invoice fae50abe-e8e0-4d7a-8e87-6f7e0a6b766e pending approval by approver@example.com
Reminder: Invoice ece27299-170d-4ad5-af5f-3a0a3cda3d3e pending approval by approver@example.com
Reminder: Invoice fae50abe-e8e0-4d7a-8e87-6f7e0a6b766e pending approval by approver@

In [12]:
import sqlite3
import time

def get_pending_approvals():
    conn = sqlite3.connect("invoiceflow.db")
    c = conn.cursor()
    c.execute("SELECT id, invoice_id FROM approvals WHERE status='pending' ORDER BY updated_at DESC")
    rows = c.fetchall()
    conn.close()
    return rows

def approve_latest():
    pending = get_pending_approvals()
    if not pending:
        return "No pending approvals found."
    
    approval_id, invoice_id = pending[0]
    
    conn = sqlite3.connect("invoiceflow.db")
    c = conn.cursor()
    c.execute("UPDATE approvals SET status=?, updated_at=? WHERE id=?", ("approved", time.time(), approval_id))
    c.execute("UPDATE invoices SET status=? WHERE id=?", ("approved", invoice_id))
    conn.commit()
    conn.close()
    
    return {
        "approval_id": approval_id,
        "invoice_id": invoice_id,
        "status": "approved"
    }

approve_latest()


{'approval_id': '4d5fdccd-4570-45c7-8307-da6f02aa7f13',
 'invoice_id': 'ece27299-170d-4ad5-af5f-3a0a3cda3d3e',
 'status': 'approved'}

Reminder: Invoice ece27299-170d-4ad5-af5f-3a0a3cda3d3e pending approval by approver@example.com
Reminder: Invoice fae50abe-e8e0-4d7a-8e87-6f7e0a6b766e pending approval by approver@example.com
Reminder: Invoice ece27299-170d-4ad5-af5f-3a0a3cda3d3e pending approval by approver@example.com
Reminder: Invoice fae50abe-e8e0-4d7a-8e87-6f7e0a6b766e pending approval by approver@example.com
Reminder: Invoice ece27299-170d-4ad5-af5f-3a0a3cda3d3e pending approval by approver@example.com


In [13]:

try:
    scheduler.remove_all_jobs()
    print("All reminders stopped.")
except:
    print("Scheduler not running or already cleared.")


All reminders stopped.


In [14]:
import sqlite3

def export_invoices_csv():
    conn = sqlite3.connect("invoiceflow.db")
    c = conn.cursor()
    c.execute("SELECT id, vendor, invoice_number, date, total, currency, status FROM invoices")
    rows = c.fetchall()
    conn.close()

    csv_lines = ["id,vendor,invoice_number,date,total,currency,status"]

    for r in rows:
        csv_lines.append(",".join([str(x) for x in r]))

    return "\n".join(csv_lines)

csv_output = export_invoices_csv()
print(csv_output)


id,vendor,invoice_number,date,total,currency,status
fae50abe-e8e0-4d7a-8e87-6f7e0a6b766e,Acme Supplies,INV-12345,2025-11-01,1250.00,INR,pending
ece27299-170d-4ad5-af5f-3a0a3cda3d3e,Acme Supplies,INV-12345,2025-11-01,1250.00,INR,approved
