In [1]:
%reload_ext autoreload
%autoreload 2

In [2]:
import pandas as pd
import sqlite3
from langchain.chat_models import init_chat_model
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import JsonOutputParser
import uuid

from langchain_qdrant import QdrantVectorStore
from qdrant_client import QdrantClient
from qdrant_client.http.models import Distance, VectorParams
from langchain_google_vertexai import VertexAIEmbeddings

from typing import TypedDict
from dataclasses import dataclass
from langgraph.runtime import get_runtime
from langgraph.checkpoint.memory import InMemorySaver
from langchain_community.utilities import SQLDatabase
from langchain_core.tools import tool
from langchain.agents import create_agent



from dotenv import load_dotenv
load_dotenv()

  from google.cloud.aiplatform.utils import gcs_utils


True

In [3]:
import os 
import json
import asyncio
from langchain_community.tools import DuckDuckGoSearchRun

# Initialize search
search_tool = DuckDuckGoSearchRun()
MERCHANT_CACHE_FILE = "merchant_cache.json"

# Helper to load/save cache
def _load_cache():
    if os.path.exists(MERCHANT_CACHE_FILE):
        try:
            with open(MERCHANT_CACHE_FILE, 'r') as f:
                return json.load(f)
        except:
            return {}
    return {}

def _save_cache(cache):
    with open(MERCHANT_CACHE_FILE, 'w') as f:
        json.dump(cache, f)

async def get_merchant_category_async(description, cache, sem):
    """Async search with semaphore and shared cache"""
    if description in cache:
        return cache[description]
    
    async with sem:
        try:
            # Random small sleep to jitter requests slightly
            await asyncio.sleep(0.05) 
            print(f"   üîç Web searching for: {description}...")
            # Run the search asynchronously
            result = await search_tool.ainvoke(f"What type of business / store is '{description}'? Whats is location")
            return result
        except Exception as e:
            print(f"   ‚ö†Ô∏è Search failed for {description}: {e}")
            return "Unknown"

async def ingest_csv(file_path, llm, db_path="money_rag.db"):
    print(f"üìÇ Processing {file_path}...")
    df = pd.read_csv(file_path)
    headers = df.columns.tolist()
    sample_data = df.head(10).to_json() 

    # 1. LLM Mapping (Sync is fine here)
    prompt = ChatPromptTemplate.from_template("""
    Act as a financial data parser. Analyze this CSV data:
    Filename: {filename}
    Headers: {headers}
    Sample Data: {sample}

    TASK:
    1. Map the CSV columns to standard fields: date, description, amount, and category.
    2. Determine the 'sign_convention' for spending.
       
       RULES:
       - If the filename suggests 'Discover' credit card, spending are usually POSITIVE.
       - If the filename suggests 'Chase' credit card, spending are usually NEGATIVE.
                                              
       - Analyze the 'sign_convention' for spending (outflows):
          - Look at the sample data for known merchants or spending patterns.
          - If spending (like a restaurant or store) is NEGATIVE (e.g., -25.00), the convention is 'spending_is_negative'.
          - If spending is POSITIVE (e.g., 25.00), the convention is 'spending_is_positive'.

    OUTPUT FORMAT (JSON ONLY):
    {{
      "date_col": "column_name",
      "desc_col": "column_name",
      "amount_col": "column_name",
      "category_col": "column_name or null",
      "sign_convention": "spending_is_negative" | "spending_is_positive"
    }}
    """)
    
    chain = prompt | llm | JsonOutputParser()
    mapping = chain.invoke({"headers": headers, "sample": sample_data, "filename": file_path})

    # 2. Standardization
    standard_df = pd.DataFrame()
    standard_df['id'] = [str(uuid.uuid4()) for _ in range(len(df))]
    standard_df['transaction_date'] = pd.to_datetime(df[mapping['date_col']])
    standard_df['description'] = df[mapping['desc_col']]
    
    # Normalization Logic
    raw_amounts = pd.to_numeric(df[mapping['amount_col']])
    if mapping['sign_convention'] == "spending_is_negative":
        standard_df['amount'] = raw_amounts * -1
    else:
        standard_df['amount'] = raw_amounts
    
    standard_df['category'] = df[mapping['category_col']] if mapping.get('category_col') else 'Uncategorized'
    standard_df['source_file'] = file_path.split("/")[-1]

    # 3. --- Async Enrichment Step ---
    print("   ‚ú® Enriching descriptions (Async)...")
    unique_descriptions = standard_df['description'].unique()
    
    # Load cache once
    cache = _load_cache()
    
    # Create a semaphore to limit concurrent web searches (e.g. 5)
    sem = asyncio.Semaphore(5)
    
    # Create tasks for all descriptions
    # Note: We process ALL descriptions now effectively
    tasks = []
    for desc in unique_descriptions:
        tasks.append(get_merchant_category_async(desc, cache, sem))
        
    # Run in parallel
    results = await asyncio.gather(*tasks)
    
    # Update cache object with new results
    for desc, res in zip(unique_descriptions, results):
        cache[desc] = res
        
    # Save cache back to disk
    _save_cache(cache)
        
    # Map back
    desc_map = dict(zip(unique_descriptions, results))
    standard_df['enriched_info'] = standard_df['description'].map(desc_map).fillna("")

    # 4. Save to DB
    conn = sqlite3.connect(db_path)
    standard_df.to_sql("transactions", conn, if_exists="append", index=False)
    conn.close()
    
    print(f"‚úÖ Ingested {len(standard_df)} rows from {file_path.split('/')[-1]}. Logic: {mapping['sign_convention']}")

In [4]:
path1 = "/Users/sawale/Documents/learning/money_rag/demo_data/Discover-AllAvailable-20260110.csv"
path2 = "/Users/sawale/Documents/learning/money_rag/demo_data/Chase5282_Activity20240110_20260110_20260111.CSV"

# Initialize the Gemini model via Vertex AI
vertex_llm = init_chat_model(
    "gemini-2.5-flash", 
    model_provider="google_vertexai",
    project='gen-lang-client-0311515393',
    location='us-central1',
)

# Run async functions in Jupyter
# We run them sequentially here to avoid file lock issues with SQLite if both try to write at once
# (Though SQLite handles concurrency, keeping ingestion strictly ordered is safer for the demo)
await ingest_csv(path1, vertex_llm)
await ingest_csv(path2, vertex_llm)

üìÇ Processing /Users/sawale/Documents/learning/money_rag/demo_data/Discover-AllAvailable-20260110.csv...
   ‚ú® Enriching descriptions (Async)...
   üîç Web searching for: BACK MARKET BROOKLYN NY...
   üîç Web searching for: TEMU.COM 8884958368 DE...
   üîç Web searching for: WALMART STORE 00332 HUNTSVILLE AL...
   üîç Web searching for: $100 STATEMENT CREDIT W 1ST PU...
   üîç Web searching for: PY *KUNG-FU TEA AL HUNTSVILLE AL...
   üîç Web searching for: MADISON MONTGOMERY AL...
   üîç Web searching for: INTERNET PAYMENT - THANK YOU...
   üîç Web searching for: GRUBHUB - UNIVERSITY OF HUNTSVILLE AL...
   üîç Web searching for: MINT MOBILE 800-683-7392 CA...
   üîç Web searching for: POPEYES 2577 HUNTSVILLE AL...
   üîç Web searching for: 88 BUFFET HUNTSVILLE AL...
   üîç Web searching for: VIET HUONG VIETNAMESE RE HUNTSVILLE AL...
   üîç Web searching for: CASHBACK BONUS REDEMPTION PYMT/STMT CRDT...
   üîç Web searching for: SPO*THECURRYMODERNINDIAN HUNTSVILLE AL...


In [5]:

import sqlite3
import pandas as pd

# Connect to the database
conn = sqlite3.connect("money_rag.db")

# Query all transactions
df_view = pd.read_sql_query("SELECT * FROM transactions", conn)

# Close connection
conn.close()

# Display the data
df_view

Unnamed: 0,id,transaction_date,description,amount,category,source_file,enriched_info
0,cf2f48c5-34ab-4544-b1f4-542681fd5017,2024-10-17 00:00:00,BACK MARKET BROOKLYN NY,231.19,Merchandise,Discover-AllAvailable-20260110.csv,"Online Retailer in New York, NY . See BBB rati..."
1,16913dce-52bf-43f4-853d-7bb55e09aac5,2024-10-18 00:00:00,TEMU.COM 8884958368 DE,16.51,Merchandise,Discover-AllAvailable-20260110.csv,Temu ' s business model has allowed it to beco...
2,5917cb10-f0e6-49fa-9987-776cee28688c,2024-10-18 00:00:00,WALMART STORE 00332 HUNTSVILLE AL,146.73,Merchandise,Discover-AllAvailable-20260110.csv,Posts Walmart Huntsville - University Drive De...
3,bd66a49e-5064-43ca-bdb2-94947297b6d5,2024-10-18 00:00:00,$100 STATEMENT CREDIT W 1ST PU,-100.00,Awards and Rebate Credits,Discover-AllAvailable-20260110.csv,"That said, I woke up yesterday and saw a $ 100..."
4,8b9b16d4-dd81-4ebe-820f-62fdc827a0bb,2024-11-02 00:00:00,PY *KUNG-FU TEA AL HUNTSVILLE AL,8.09,Restaurants,Discover-AllAvailable-20260110.csv,"Jan 22, 2021 ¬∑ Best part to me--besides the ro..."
...,...,...,...,...,...,...,...
245,239f983e-2ed8-4454-90ad-0e8477bf7893,2025-06-18 00:00:00,PANDA EXPRESS #2005,52.87,Food & Drink,Chase5282_Activity20240110_20260110_20260111.CSV,"Aug 29, 2025 ¬∑ The group's headquarters is loc..."
246,0ecba974-5491-4ef2-ba52-bf3c37d6e854,2025-06-14 00:00:00,Payment Thank You-Mobile,-62.07,,Chase5282_Activity20240110_20260110_20260111.CSV,"Sep 25, 2025 ¬∑ Thank You Mobile is not a compa..."
247,fb966472-d77a-4723-a9e6-15e63bda4cd2,2025-06-12 00:00:00,STARS AND STRIKES - HUNTS,21.80,Entertainment,Chase5282_Activity20240110_20260110_20260111.CSV,"At our Huntsville , AL location , we pride our..."
248,b6bc2e1d-d68b-4acd-a201-46d8cf5175e5,2025-06-11 00:00:00,WAL-MART #332,4.47,Groceries,Chase5282_Activity20240110_20260110_20260111.CSV,3 days ago ¬∑ Walmart Inc. is an American multi...


In [6]:
# df_view["amount"].sum()

In [7]:
db = SQLDatabase.from_uri("sqlite:///money_rag.db")

embeddings = VertexAIEmbeddings(model_name="text-embedding-005")


# Initialize Qdrant with disk persistence
# "path" creates a local directory to store the vectors
qdrant_client = QdrantClient(path="qdrant_db") 
COLLECTION_NAME = "transactions"

def sync_to_qdrant(db_path: str):
    """Sync the transactions table from SQLite to Qdrant vector store."""
    # Load data from the database
    conn = sqlite3.connect(db_path)
    # Ensure we select the new column
    df = pd.read_sql_query("SELECT * FROM transactions", conn)
    conn.close()

    # Recreate collection to ensure clean state on re-ingestion
    if qdrant_client.collection_exists(COLLECTION_NAME):
        qdrant_client.delete_collection(COLLECTION_NAME)
        
    qdrant_client.create_collection(
        collection_name=COLLECTION_NAME,
        vectors_config=VectorParams(size=768, distance=Distance.COSINE),
    )
    
    vector_store = QdrantVectorStore(
        client=qdrant_client,
        collection_name=COLLECTION_NAME,
        embedding=embeddings,
    )

    # Use description + enrichment as the main text for embedding
    texts = []
    for _, row in df.iterrows():
        # Combine original description with the permanent enriched info
        # If enriched_info is present, it looks like: "MCDONALDS - Fast food chain..."
        enriched = row.get('enriched_info', '')
        if enriched:
            texts.append(f"{row['description']} - {enriched}")
        else:
            texts.append(str(row['description']))
    
    # Store other fields as metadata for correlation
    metadatas = df[['id', 'amount', 'category', 'transaction_date']].to_dict('records')
    # Convert timestamps to string for metadata compatibility
    for m in metadatas:
        m['transaction_date'] = str(m['transaction_date'])

    vector_store.add_texts(texts=texts, metadatas=metadatas)
    print(f"‚úÖ Synced {len(texts)} records to Qdrant at 'qdrant_db/'.")
    return vector_store

# Initialize store from DB
vector_store = sync_to_qdrant("money_rag.db")



‚úÖ Synced 250 records to Qdrant at 'qdrant_db/'.


In [8]:
@dataclass
class RuntimeContext:
    db: SQLDatabase
    vector_store: QdrantVectorStore

@tool
def execute_sql(query: str) -> str:
    """Execute a SQLite command and return results."""
    runtime = get_runtime(RuntimeContext)
    db = runtime.context.db
    try:
        return db.run(query)
    except Exception as e:
        return f"Error: {e}"


@tool
def semantic_search(query: str, topk: int = 5) -> str:
    """
    Search for transactions semantically when exact category or description matches are unknown.
    Use this to find specific merchants or types of spending (e.g., 'streaming services' or 'fast food').
    Returns a list of matching transactions with their IDs and metadata.
    """
    runtime = get_runtime(RuntimeContext)
    vs = runtime.context.vector_store
    results = vs.similarity_search(query, k=topk)
    
    output = []
    for doc in results:
        output.append(f"Result: {doc.page_content} | Metadata: {doc.metadata}")
    
    return "\n".join(output) if output else "No semantically similar transactions found."


SYSTEM = f"""You are a sophisticated financial analyst with access to both a SQLite database and a semantic search tool.

Workflow:
1. **Identify the Need**: If the user's request uses vague terms (e.g., "junk food", "travel stuff") or you don't know the exact category/description name, start with `semantic_search`.
2. **Semantic Discovery**:
    - Call `semantic_search(query, topk=...)`. 
    - **Pro Tip**: Use a higher `topk` (e.g., 10 or 20) if you suspect there are many relevant transactions to find.
    - **Relevance Check**: Not all results in the `topk` list may be relevant. You have permission to select only the few that match the user's intent and discard the rest.
    - **Reiteration Logic**: 
        - If the initial results seem too narrow, or if you suspect more relevant transactions exist but were cut off, reiterate by calling `semantic_search` again with a higher `topk` (e.g., 20, 50).
        - **Stop Condition**: If you found a few matching transactions and are confident that's all of them (i.e., the rest of the results are clearly irrelevant), do NOT iterate further.
3. **Filter & Extract**:
    - Review the results from semantic search. Manually filter out any that aren't relevant to the user's specific intent.
    - Extract the unique `id` values from the metadata of relevant results.
4. **Handling No Results**:
    - If `semantic_search` returns nothing useful, or if the results are clearly not what the user asked for (e.g., user asked for "coffee" but results are all "gas stations"), STOP.
    - Return a clear message: "I couldn't find any transactions related to [topic]."
    - **DO NOT GUESS**. It is better to say you found nothing than to sum up unrelated transactions.
5. **SQL Execution**:
    - Use `execute_sql` to perform the final calculation or retrieval.
    - Correlate results by using the discovered IDs in your query: `SELECT SUM(amount) FROM transactions WHERE id IN ('uuid1', 'uuid2', ...)`.
    - You can also use discovered merchant names if they share a common pattern.

Rules:
- **Thinking**: Explain your plan before calling any tools.
- **Read-only**: No modifications (INSERT/UPDATE/DELETE/etc.) to the database.
- **Spending Logic**: 
    - Spending = POSITIVE values (> 0). 
    - Payments/Refunds = NEGATIVE values (< 0). EXCLUDE negative values when calculating spending.
- **SQL Formatting**: Limit results to 5 rows for non-aggregation queries. Use `SUM()` for totals.
"""

agent = create_agent(
 model=vertex_llm,
 tools=[execute_sql, semantic_search],
 system_prompt=SYSTEM,
 context_schema=RuntimeContext,
 checkpointer=InMemorySaver(),
)

In [9]:
# # Initialize the store once
# # vector_store = sync_to_qdrant("money_rag.db")

# Test search
query = "Where did I spend money on groceries or food?"
results = vector_store.similarity_search(query, k=3)

for doc in results:
    print(f"Match: {doc.page_content} | Metadata: {doc.metadata}")

Match: TST*WOKS UP HUNTSVILLE AL00075396024313993332AA - Take a look at these nine new businesses that have just opened or announced an opening here in the Rocket City. Check Dress Up - Huntsville in Huntsville , AL, 920 Bob Wallace Ave SW #317 on Cylex and find ‚òé (256) 585-2..., contact info, ‚åö opening hours. We're posted at the WeUp location for just a couple more hours ‚Äîso if you're hungry, now's the time to pull up ! We've had some amazing folks swing through today, including the owner of Ms. Juju's Kitchen and one of your favorite local barbers Mike ‚Äîand YES, they're repeat customers because the food speaks for itself! Detailed info and reviews on 27 top companies and startups in Huntsville in 2026. Get the latest updates on their products, jobs, funding, investors, founders and more. Check Your Statement: Look for entries starting with "TST" on your debit card statement. Note the date, amount, and any additional details like a business name or code. Match with Receipts: C

In [10]:
question = "ow much did i spend in haircuts"
steps = []

for step in agent.stream(
    {"messages": [{"role": "user", "content": question}]},
    {"configurable": {"thread_id": "1"}},
    stream_mode="values",
    context=RuntimeContext(db=db, vector_store=vector_store)
):
    step["messages"][-1].pretty_print()
    steps.append(step)


ow much did i spend in haircuts
Tool Calls:
  semantic_search (f8fc4ebf-658f-4c58-a163-6bc4308ae46a)
 Call ID: f8fc4ebf-658f-4c58-a163-6bc4308ae46a
  Args:
    topk: 10.0
    query: haircuts
Name: semantic_search

Result: TST*WOKS UP HUNTSVILLE AL00075396024313993332AA - Take a look at these nine new businesses that have just opened or announced an opening here in the Rocket City. Check Dress Up - Huntsville in Huntsville , AL, 920 Bob Wallace Ave SW #317 on Cylex and find ‚òé (256) 585-2..., contact info, ‚åö opening hours. We're posted at the WeUp location for just a couple more hours ‚Äîso if you're hungry, now's the time to pull up ! We've had some amazing folks swing through today, including the owner of Ms. Juju's Kitchen and one of your favorite local barbers Mike ‚Äîand YES, they're repeat customers because the food speaks for itself! Detailed info and reviews on 27 top companies and startups in Huntsville in 2026. Get the latest updates on their products, jobs, funding, inves

In [None]:
# ...existing code...
# Interactive Chat Loop
print("üí¨ Chat with your financial data! (Type 'exit' to stop)")

while True:
    try:
        user_input = input("User: ")
        if user_input.lower() in ["exit", "quit", "q"]:
            print("Goodbye!")
            break
            
        print("\n" + "-"*50)
        
        # Stream the agent's response
        for step in agent.stream(
            {"messages": [{"role": "user", "content": user_input}]},
            {"configurable": {"thread_id": "1"}}, # Keeps memory of the conversation
            stream_mode="values",
            context=RuntimeContext(db=db, vector_store=vector_store)
        ):
            step["messages"][-1].pretty_print()
            
        print("-" * 50 + "\n")
        
    except KeyboardInterrupt:
        print("\nGoodbye!")
        break

üí¨ Chat with your financial data! (Type 'exit' to stop)

--------------------------------------------------

or db won't be in github..

It seems like your last message might be incomplete or a comment about something else. Could you please let me know what you'd like me to do or if you have a question about your spending? I'm ready to help analyze your transactions.
--------------------------------------------------

