In [1]:
!pip install -q google-adk google-cloud-firestore google-cloud-storage google-auth \
               google-genai gradio python-dotenv


In [2]:
import os
import google.genai as genai

os.environ["GEMINI_API_KEY"] = GEMINI_API_KEY
client = genai.Client(api_key=GEMINI_API_KEY)

# Quick sanity check
resp = client.models.generate_content(
    model="gemini-2.0-flash-001",
    contents="Say hello in one short sentence."
)
print(resp.text)




Hello there!



In [3]:
import sqlite3
import json
import datetime

DB_PATH = "expenses.db"
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
cur = conn.cursor()

cur.execute("""
CREATE TABLE IF NOT EXISTS expenses (
    id        INTEGER PRIMARY KEY AUTOINCREMENT,
    user_id   TEXT,
    date      TEXT,
    merchant  TEXT,
    category  TEXT,
    total     REAL,
    currency  TEXT,
    notes     TEXT,
    raw_json  TEXT,
    created_at TEXT
)
""")
conn.commit()

DEMO_USER_ID = "demo-user-001"

def insert_expense(user_id: str, expense: dict) -> int:
    cur.execute(
        """
        INSERT INTO expenses (user_id, date, merchant, category, total, currency, notes, raw_json, created_at)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
        """,
        (
            user_id,
            expense.get("date", ""),
            expense.get("merchant", ""),
            expense.get("category", ""),
            float(expense.get("total", 0) or 0),
            expense.get("currency", ""),
            expense.get("notes", ""),
            json.dumps(expense, ensure_ascii=False),
            datetime.datetime.utcnow().isoformat(),
        ),
    )
    conn.commit()
    return cur.lastrowid

def get_recent_expenses(user_id: str, limit: int = 50) -> list[dict]:
    cur.execute(
        """
        SELECT id, date, merchant, category, total, currency, notes, raw_json
        FROM expenses
        WHERE user_id = ?
        ORDER BY created_at DESC
        LIMIT ?
        """,
        (user_id, limit),
    )
    rows = cur.fetchall()
    results = []
    for r in rows:
        rid, date, merchant, category, total, currency, notes, raw_json = r
        try:
            raw = json.loads(raw_json)
        except Exception:
            raw = {}
        results.append({
            "id": rid,
            "date": date,
            "merchant": merchant,
            "category": category,
            "total": total,
            "currency": currency,
            "notes": notes,
            "raw": raw,
        })
    return results


In [4]:
from google import genai
from google.genai import types
import os, json
from io import BytesIO
from PIL import Image

client = genai.Client(api_key=GEMINI_API_KEY)

# Text sanity check
resp = client.models.generate_content(
    model="gemini-2.0-flash-001",  # or gemini-2.5-flash if allowed
    contents="Say hello in one short sentence.",
)
print("TEXT OK:", resp.text[:80], "...")


TEXT OK: Hello there!
 ...


In [5]:
from io import BytesIO
from PIL import Image

import re
from io import BytesIO
from PIL import Image
from google.genai import types

def extract_expense_from_receipt(image: Image.Image) -> dict:
    """
    Calls Gemini with the receipt image and asks for structured JSON.
    Uses robust JSON extraction so we don't always fall back.
    """
    # Convert PIL image to PNG bytes
    buf = BytesIO()
    image.save(buf, format="PNG")
    img_bytes = buf.getvalue()

    prompt = """
You are an assistant that reads receipt images and extracts structured expense data.

You MUST respond with ONLY one JSON object and nothing else:
no markdown, no code fences, no explanations.

The JSON object should have exactly these keys:
- "date": ISO date string (yyyy-mm-dd)
- "merchant": store / merchant name
- "category": one of ["groceries", "restaurant", "coffee", "transport", "shopping", "other"]
- "total": number (no currency symbol)
- "currency": 3-letter code (e.g. "USD", "EUR", "INR")
- "notes": short free-text summary
"""

    parts = [
        types.Part.from_text(text=prompt),
        types.Part.from_bytes(data=img_bytes, mime_type="image/png"),
    ]

    response = client.models.generate_content(
        model="gemini-2.5-flash",   # vision-capable model
        contents=parts,
    )

    raw_text = (response.text or "").strip()
    # print("RAW MODEL TEXT:\n", raw_text[:500])  # uncomment for debugging

    # Try to extract a JSON object from the text, even if the model added extra text
    json_str = None
    m = re.search(r"\{.*\}", raw_text, flags=re.DOTALL)
    if m:
        json_str = m.group(0)

    if json_str:
        try:
            data = json.loads(json_str)
            # Ensure required keys exist with sensible defaults
            data.setdefault("date", "")
            data.setdefault("merchant", "")
            data.setdefault("category", "other")
            data.setdefault("total", 0)
            data.setdefault("currency", "")
            data.setdefault("notes", "")
            return data
        except Exception:
            pass  # fall back below

    # Fallback: keep the app running and at least store what the model said
    return {
        "date": "",
        "merchant": "",
        "category": "other",
        "total": 0,
        "currency": "",
        "notes": raw_text[:500],
    }


In [6]:
# Dummy image just to make sure the call doesn't crash
test_img = Image.new("RGB", (200, 100), color="white")

print("Calling extract_expense_from_receipt on dummy image...")
test_result = extract_expense_from_receipt(test_img)
print("Got result:", test_result)


Calling extract_expense_from_receipt on dummy image...
Got result: {'date': '', 'merchant': '', 'category': 'other', 'total': 0.0, 'currency': 'XXX', 'notes': 'Blank receipt image, no expense data found.'}


In [7]:
def answer_question_over_expenses(user_id: str, question: str, max_rows: int = 50) -> str:
    """
    Retrieves recent expenses from SQLite and lets Gemini answer the question
    based ONLY on that data. This is our simple RAG layer.
    """
    expenses = get_recent_expenses(user_id, limit=max_rows)

    if not expenses:
        base_prompt = (
            "The user has no stored expenses yet. "
            "Kindly explain that they need to upload some receipts first."
        )
        resp = client.models.generate_content(
            model="gemini-2.0-flash-001",
            contents=base_prompt,
        )
        return resp.text

    context = json.dumps(expenses, ensure_ascii=False, indent=2)

    prompt = f"""
You are a personal expense assistant.

Here is the user's expense history as JSON:
{context}

Answer the following question using ONLY this data:
\"\"\"{question}\"\"\".

- Show totals in the same currency as the data.
- If you are unsure because the data doesn't cover it, say so explicitly.
- You may show a brief breakdown by merchant/category if helpful.
"""

    resp = client.models.generate_content(
        model="gemini-2.0-flash-001",
        contents=prompt,
    )
    return resp.text


In [8]:
import gradio as gr

def chat_fn(message, history, image):
    if history is None:
        history = []

    # ----- Receipt upload flow -----
    if image is not None:
        user_msg = message or "Store this receipt"

        try:
            expense = extract_expense_from_receipt(image)
            expense_id = insert_expense(DEMO_USER_ID, expense)
            reply = (
                f"I parsed and stored this receipt as expense #{expense_id}:\n"
                f"- Date: {expense.get('date','')}\n"
                f"- Merchant: {expense.get('merchant','')}\n"
                f"- Category: {expense.get('category','')}\n"
                f"- Total: {expense.get('total','')} {expense.get('currency','')}\n"
            )
        except Exception as e:
            reply = f"Sorry, I ran into an error while parsing the receipt: {e}"

        history.append(("ðŸ§¾ " + user_msg, reply))
        # clear message + image after handling
        return "", history, None

    # ----- Simple chat / RAG question -----
    if message:
        answer = answer_question_over_expenses(DEMO_USER_ID, message)
        history.append((message, answer))

    return "", history, None


with gr.Blocks() as demo:
    gr.Markdown("# ðŸ’³ Personal Expense Assistant (Gemini + SQLite)")
    with gr.Row():
        with gr.Column(scale=3):
            chatbot = gr.Chatbot(height=400)   # ðŸ‘ˆ default tuple format
            msg = gr.Textbox(label="Message")
            image = gr.Image(label="Upload receipt (optional)", type="pil")
            send_btn = gr.Button("Send")
        with gr.Column(scale=1):
            gr.Markdown(
                "### How to use\n"
                "- Upload a receipt image and click **Send** to store it.\n"
                "- Then ask questions like:\n"
                "  - *How much have I spent on groceries?*\n"
                "  - *What is my total spending this month?*"
            )

    send_btn.click(
        chat_fn,
        inputs=[msg, chatbot, image],
        outputs=[msg, chatbot, image],
    )

demo.launch(share=True)


  chatbot = gr.Chatbot(height=400)   # ðŸ‘ˆ default tuple format


Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://35f947d19571c1600c.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


