# Processing Invoices Using Gemini

In [None]:
import json
import os
import sqlite3
from dotenv import load_dotenv
from google import genai
from PyPDF2 import PdfReader
from pydantic import BaseModel

load_dotenv(override=True)

In [None]:
MODEL_NAME = "gemini-2.5-flash"
LOCAL_INVOICE_FOLDER = "invoices"

GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")

## Processing invoices

With the invoices locally stored, you can loop and process them one by one to extract their information.

Let's start by defining a Pydantic model that will help you process JSON results from Gemini.

In [None]:
class Invoice(BaseModel):
    client_name: str
    invoice_amount: float
    product_name: str

Now, let's create a function that you'll use to extract the fields from a given invoice.

In [None]:
def extract_invoice_fields(file: str, invoice: str):
    """
    Extract data from the supplied invoice text.
    """
    print(f"Extracting data from invoice {file}...")
    client = genai.Client(api_key=GEMINI_API_KEY)

    prompt = (
        "Extract the following information from this invoice text: "
        "1. Client name "
        "2. Invoice amount "
        "3. Product name "
        "Return the result as a JSON object."
        "Use the the following keys: "
        "1. client_name (string) "
        "2. invoice_amount (float) "
        "3. product_name (string) "
        "If the information is not found, return 'null' for the corresponding key.\n"
        f"Invoice text:\n{invoice}"
    )

    try:
        response = client.models.generate_content( model=MODEL_NAME, 
                                                   contents=prompt,
                                                   config={
                                                     "response_mime_type": "application/json",
                                                     "response_schema": Invoice,
                                                   },
                                                 )
        result = json.loads(response.candidates[0].content.parts[0].text)
        result["file"] = file
        print(json.dumps(result, indent=4))
        return result
    except Exception as e:
        print(f"Failed to extract data using Gemini. Exception:{e}")

Let's set up the database where you'll store the information of every invoice.

In [None]:
connection = sqlite3.connect("invoices.db")
cursor = connection.cursor()
cursor.execute("""
    CREATE TABLE IF NOT EXISTS invoices (
        file TEXT PRIMARY KEY UNIQUE,
        client TEXT,
        amount REAL,
        product TEXT
    )
""")
connection.commit()

Let's now process all invoices in the local folder and extract the appropriate fields. After you process an invoice, you'll update the database with all of its data.

In [None]:
cursor = connection.cursor()

for file in os.listdir(LOCAL_INVOICE_FOLDER):
    if file.endswith(".pdf"):
        print(f"\nProcessing {file}...")
        try:
            reader = PdfReader(os.path.join(LOCAL_INVOICE_FOLDER, file))
            text = ""
            for page in reader.pages:
                text += page.extract_text() or ""

            data = extract_invoice_fields(file, text)

            print(f"Updating database with invoice {data['file']}...")
            cursor.execute(
                """
                INSERT INTO invoices (file, client, amount, product)
                VALUES (?, ?, ?, ?)
                ON CONFLICT(file) DO UPDATE SET
                    client=excluded.client,
                    amount=excluded.amount,
                    product=excluded.product
                """,
                (
                    data["file"],
                    data["client_name"],
                    data["invoice_amount"],
                    data["product_name"],
                ),
            )
        except Exception as e:
            print(f"Failed to extract text from {file}. Exception: {e}")

connection.commit()

## Generating final reports

Finally, you want to generate a couple of reports with the data that you stored in the database.

In [None]:
print("\nInvoice Report")

cursor = connection.cursor()
cursor.execute("SELECT COUNT(*), SUM(amount) FROM invoices")
total_invoices, total_amount = cursor.fetchone()

print(f"* Total invoices: {total_invoices}")
print(f"* Total amount: {total_amount}")

print("\nBreakdown by client:")
cursor.execute("SELECT client, COUNT(*), SUM(amount) FROM invoices GROUP BY client")
for row in cursor.fetchall():
    client, count, amount = row
    print(f"* {client}: {count} invoices (${amount})")

connection.close()