<a href="https://colab.research.google.com/github/Meghaa-N/Budgeteer/blob/main/Budgeteer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Budgeteer
Budgeteer is a Smart Personal Finance Assistant. This application will act as a chat-bot that gets everyday expense statements from the user in natural language, process the informations to store them in the DBs, and dynamically give budget insights, like your own finance advisor, or a friend (that's your choice!) :)

### MongoDB Utilities
The section contains all the utility functions that help in querying the database, inserting new expenses in the database and many other such functionalities.

In [None]:
!pip install langchain_openai
%pip install llama-index-llms-gemini llama-index
from langchain_openai import OpenAIEmbeddings
from llama_index.llms.gemini import Gemini
!pip install pymongo

In [2]:
from pymongo.mongo_client import MongoClient

uri = "mongodb+srv://meghaan:********************************************************"
client = MongoClient(uri, tls=True, tlsAllowInvalidCertificates=True)

try:
    client.admin.command('ping')
    print("Pinged your deployment. You successfully connected to MongoDB!")
except Exception as e:
    print(e)

Pinged your deployment. You successfully connected to MongoDB!


In [3]:
def get_collection(db_name, collection_name):
    db = client[db_name]
    collection = db[collection_name]
    return collection

In [4]:
def get_currently_available_tags(month_index):
    collection = get_collection("Budgeteer","Budget Plan")
    budget_plan = collection.find_one({"_id":str(month_index)})
    if budget_plan == None:
      budget_plan = collection.find_one({"_id":"default"})
    tags = budget_plan['values'].keys()
    return tags

In [5]:
get_currently_available_tags(1)

dict_keys(['groceries', 'food', 'transport', 'medical', 'miscellaneous', 'house'])

In [6]:
def get_budget_plan_for_the_month(month_index):
  collection = get_collection("Budgeteer","Budget Plan")
  budget = collection.find_one({"_id":str(month_index)})
  if budget == None:
    budget = collection.find_one({"_id":"default"})
  return budget["values"]

In [7]:
get_budget_plan_for_the_month(1)

{'groceries': '5',
 'food': '4',
 'transport': '10',
 'medical': '30',
 'miscellaneous': '30',
 'house': '10'}

In [8]:
def get_current_expense_for_the_month(month_index):
  collection = get_collection("Budgeteer","Monthly Expenses")
  monthly_expense = collection.find_one({"_id":str(month_index)})
  print(monthly_expense)
  amount_split = {}
  expense_split = {}
  for key in monthly_expense["values"].keys():
    expenses = monthly_expense["values"][key]
    print(expenses)
    for expense in expenses:
      amount_split[expense[2]] =  amount_split.get(expense[2],0) + expense[1]
      if expense[2] in expense_split:
        expense_split[expense[2]][expense[0]] = expense_split[expense[2]].get(expense[0],0) + expense[1]
      else:
        expense_split[expense[2]] = {}
        expense_split[expense[2]][expense[0]] = expense_split[expense[2]].get(expense[0],0) + expense[1]
  return amount_split, expense_split

In [9]:
get_current_expense_for_the_month(2)

{'_id': '2', 'values': {'01-02-2025': [['ice cream', 200, 'groceries'], ['dinner', 500, 'food']], '02-02-2025': [['flight', 5000, 'transport'], ['flight', 5000, 'transport'], ['flight', 5000, 'transport'], ['flight', 5000, 'transport'], ['flight', 5000, 'transport'], ['flight', 5000, 'transport'], ['flight', 5000, 'transport']], '14-02-2025': [['flight tickets', 2000, 'transport'], ['flight tickets', 2000.0, 'transport'], ['flight tickets', 2000.0, 'transport'], ['flight tickets', 2000.0, 'transport']]}}
[['ice cream', 200, 'groceries'], ['dinner', 500, 'food']]
[['flight', 5000, 'transport'], ['flight', 5000, 'transport'], ['flight', 5000, 'transport'], ['flight', 5000, 'transport'], ['flight', 5000, 'transport'], ['flight', 5000, 'transport'], ['flight', 5000, 'transport']]
[['flight tickets', 2000, 'transport'], ['flight tickets', 2000.0, 'transport'], ['flight tickets', 2000.0, 'transport'], ['flight tickets', 2000.0, 'transport']]


({'groceries': 200, 'food': 500, 'transport': 43000.0},
 {'groceries': {'ice cream': 200},
  'food': {'dinner': 500},
  'transport': {'flight': 35000, 'flight tickets': 8000.0}})

In [10]:

def get_income_statement_for_the_month(month_index):
  collection = get_collection("Budgeteer","Income Statements")
  income = collection.find_one({"_id":str(month_index)})
  return income["values"]

def get_total_income_of_current_month(month_index):
  income = get_income_statement_for_the_month(month_index)
  income_amount = 0
  for key in income.keys():
    income_amount += income[key]
  return income_amount

def get_calculated_budget_for_the_month(month_index):
  income = get_total_income_of_current_month(month_index)
  budget = get_budget_plan_for_the_month(month_index)
  budget_amount = {}
  for key in budget.keys():
    budget_amount[key] = (float(budget[key]) * income)//100
  return budget_amount

In [11]:
get_calculated_budget_for_the_month(1)

{'groceries': 7575.0,
 'food': 6060.0,
 'transport': 15150.0,
 'medical': 45450.0,
 'miscellaneous': 45450.0,
 'house': 15150.0}

In [12]:
def add_expenses(expenses):
    collection = get_collection("Budgeteer", "Monthly Expenses")
    if not expenses:
        return []

    if isinstance(expenses, str):
        try:
            match = re.search(r"\[\[(.*?)\]\]", expenses)  # Extract content inside [[ ]]
            if match:
                extracted_text = match.group(1)  # Get the inside part
                items = extracted_text.split("], [")  # Split by ], [

                cleaned_expenses = []
                for item in items:
                    values = item.replace("'", "").split(", ")  # Remove quotes & split
                    values[2] = float(values[2])  # Convert amount to number
                    cleaned_expenses.append(values)

                expenses = cleaned_expenses  # Assign back to expenses
            else:
                print("Error: No valid expense data found in string")
                return []
        except Exception as e:
            print("Error: Failed to process expenses.", e)
            return []

    if not isinstance(expenses, list) or not all(isinstance(item, list) for item in expenses):
        print(f"Error: Invalid expense format after conversion: {expenses}")
        return []

    for curr_expense in expenses:
        date, name, amount, category = curr_expense  # Unpacking the expense tuple
        month_index = int(date.split("-")[1])

        # Check if the month document exists
        expense_doc = collection.find_one({"_id": str(month_index)})

        if expense_doc is None:
            expense_doc = {"_id": str(month_index), "values": {}}

        if date not in expense_doc["values"]:
            expense_doc["values"][date] = []

        # Append new expense
        expense_doc["values"][date].append([name, amount, category])

        # Update or insert the document
        collection.update_one({"_id": str(month_index)}, {"$set": {"values": expense_doc["values"]}}, upsert=True)


In [13]:
import re

def is_exceeding_budget(expenses):
    """
    Function to determine if multiple expenses crossed the budget limit for the month.
    Returns a list of results for each expense.
    """

    if not expenses:
        return []

    if isinstance(expenses, str):
        try:
            match = re.search(r"\[\[(.*?)\]\]", expenses)  # Extract content inside [[ ]]
            if match:
                extracted_text = match.group(1)  # Get the inside part
                items = extracted_text.split("], [")  # Split by ], [

                cleaned_expenses = []
                for item in items:
                    values = item.replace("'", "").split(", ")  # Remove quotes & split
                    values[2] = float(values[2])  # Convert amount to number
                    cleaned_expenses.append(values)

                expenses = cleaned_expenses  # Assign back to expenses
            else:
                print("Error: No valid expense data found in string")
                return []
        except Exception as e:
            print("Error: Failed to process expenses.", e)
            return []

    if not isinstance(expenses, list) or not all(isinstance(item, list) for item in expenses):
        print(f"Error: Invalid expense format after conversion: {expenses}")
        return []

    results = []

    for expense in expenses:
        if not isinstance(expense, list) or len(expense) != 4:
            print(f"Error: Invalid expense format: {expense}")
            continue  # Skip this expense

        try:
            month_index = int(expense[0].split("-")[1])
            amount_split, expense_split = get_current_expense_for_the_month(month_index)
            budget_amount = get_calculated_budget_for_the_month(month_index)

            category = expense[3]
            amount = expense[2]

            # Update the current spending in that category
            amount_split[category] = amount_split.get(category, 0) + amount

            # Check if spending exceeds the budget
            exceeds = amount_split[category] > budget_amount.get(category, float("inf"))
            results.append((exceeds))

        except Exception as e:
            print(f"Error processing expense {expense}: {e}")

    return results


In [14]:
import datetime
from datetime import datetime
def todays_date():
  return datetime.now().strftime("%d-%m-%Y")
todays_date()

'15-02-2025'

In [15]:
new_expense = ["02-01-2025","flight",5000,"transport"]
add_expenses([new_expense])
is_exceeding_budget([new_expense])


{'_id': '1', 'values': {'01-01-2025': [['ice cream', 200, 'groceries'], ['dinner', 500, 'food']], '02-01-2025': [['flight', 5000, 'transport'], ['flight', 5000, 'transport'], ['flight', 5000, 'transport'], ['flight', 5000, 'transport'], ['flight', 5000, 'transport'], ['flight', 5000, 'transport'], ['flight', 5000, 'transport'], ['flight', 5000, 'transport'], ['flight', 5000, 'transport'], ['flight', 5000, 'transport'], ['flight', 5000, 'transport'], ['flight', 5000, 'transport'], ['flight', 5000, 'transport'], ['flight', 5000, 'transport'], ['flight', 5000, 'transport'], ['flight', 5000, 'transport'], ['flight', 5000, 'transport']]}}
[['ice cream', 200, 'groceries'], ['dinner', 500, 'food']]
[['flight', 5000, 'transport'], ['flight', 5000, 'transport'], ['flight', 5000, 'transport'], ['flight', 5000, 'transport'], ['flight', 5000, 'transport'], ['flight', 5000, 'transport'], ['flight', 5000, 'transport'], ['flight', 5000, 'transport'], ['flight', 5000, 'transport'], ['flight', 5000, 't

[True]

In [None]:
!pip install langchain
%pip install -qU pypdf langchain_community
!pip install llama-index

In [None]:
!pip install langchain_community

In [None]:
!pip uninstall llama-index -y
!pip install llama-index


In [None]:
!pip install langchain-google-genai


In [52]:
# Initialize Model

# Import LlamaIndex from llama_index instead of langchain.llms
from langchain_openai import OpenAIEmbeddings
from llama_index.llms.gemini import Gemini
from langchain_google_genai import ChatGoogleGenerativeAI
import os

GOOGLE_API_KEY = "**********************************************"
os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY

gemini_llm = Gemini(  # Assign the Gemini instance to a variable
    model="models/gemini-1.5-flash",
)
myLLM = ChatGoogleGenerativeAI(model="gemini-1.5-flash").bind()

In [21]:
# Experimenting Agents to achieve results

from pymongo import MongoClient
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain.tools import Tool
from langchain.agents import initialize_agent
from langchain.memory import ConversationBufferMemory

In [22]:
import json

def smart_data_extraction(input_str):
    prompt_template = PromptTemplate.from_template(
        """
        Extract structured transaction data from the following input:
        "{input_str}"
        Return a JSON **list** (only JSON, no extra text):
        [["14-02-2025", "flight tickets", 2000, "transport"]]
        """
    )

    chain = prompt_template | myLLM

    try:
        response = chain.invoke({"input_str": input_str})
        response_text = response.content.strip() if hasattr(response, "content") else str(response).strip()

        # Extract the first valid JSON list
        start_idx = response_text.find("[")
        end_idx = response_text.rfind("]") + 1
        if start_idx == -1 or end_idx == 0:
            raise ValueError("No valid JSON found in response")

        clean_json = response_text[start_idx:end_idx]

        extracted_data = json.loads(clean_json)

        # Ensure it's a list of lists
        if isinstance(extracted_data, list) and all(isinstance(item, list) for item in extracted_data):
            return extracted_data
        else:
            print("Unexpected format:", extracted_data)
            return []

    except json.JSONDecodeError:
        print("AI returned invalid JSON:", response_text)
        return []
    except Exception as e:
        print("Unexpected error:", e)
        return []



In [23]:
# Establishing all the needed functions as tools for the AI to use

tools = [
    Tool(name="Get Expense Tags", func=get_currently_available_tags, description="On passing the month index (1 is January and so on), we get the budget tags that we must categorize each expense in"),
    Tool(name="Get Expense Statements from user input", func=smart_data_extraction, description="This function returns an expense statement in the structured fromat from the user input which is in natural language"),
    Tool(name="Get Budget Split", func=get_budget_plan_for_the_month, description="This give's the user's current budget plan. Each tag that we use for our expenses have a percentage value in the budget. Groceries: 3 means in a month we are allowed to spend 3% of the total income of the month in groceries"),
    Tool(name="Check if expense is greater than the budget", func=is_exceeding_budget,
         description="This function returns True if this expense has exceeded the budget limit in that category or not, and also returns current amount spent in the category and the actual budget available in the category"),
    Tool(name="Add Expense", func=add_expenses, description="This accepts a list of lists as parameter. Based on the date of expense (usually the first value in the expense list), ot adds this expense in the database"),
    Tool(name="Get Income Statement", func=get_income_statement_for_the_month, description="This gives the user's income statements of the month"),
    Tool(name="Get Total Income", func=get_total_income_of_current_month, description="This gives the user's total income of the month"),
    Tool(name="Get Current Expenses", func=get_current_expense_for_the_month, description="This gives the user's current expenses of the month. We have two objects returned. The first object gives the cumulative amount spent in each category. The second object gives the amount spent in each category for a particular entry. For example the second list add more specifics like icecream:300"),
    Tool(name="Get Calculated Budget",func=get_calculated_budget_for_the_month, description="This gives the maximum amount we can spend on each category from the current total income"),
    Tool(name="Get Today's date", func=lambda x: todays_date(), description="No input needed. This gives the current date in the format YYYY-MM-DD. The agent can use this if the user says they did something today or yesterday. It gives current date for agent to arrive at user date")
    ]

In [24]:
memory = ConversationBufferMemory(memory_key="chat_history")

  memory = ConversationBufferMemory(memory_key="chat_history")


In [25]:
system_message = """
You are a personal budget management assistant and your name is Budgeteer.
Your job is to help the user track their expenses, analyze their spending, and provide insights on budgeting.

Remember, You **must** always check if an expense exceeds the budget before logging it into the database.
1. First, extract the structured expense data from the user input.
2. Then, call `Is Expense Exceeding the Budget` to determine if it surpasses the budget. Appreciate or make friendly wwarning to the user.
If the expense exceeds the budget (True), return: 'Expense exceeds budget. Do not add.'
If it does not exceed (False), return: 'Expense is within budget. Proceed to add.'
3. Finally, call `Add Expense` to record the transaction.
Always follow this order if it is a request to add expenses.

Use the available tools to fetch budget details, monthly expenses, and category-wise spending. Ensure your responses are clear, actionable, and aligned with the user's financial goals. Automatically categorize transactions based on common spending categories.
"""

In [26]:
agent = initialize_agent(
    tools=tools,
    llm=myLLM,
    agent="zero-shot-react-description",
    memory=memory,
    verbose=True,
    system_message=system_message
)


  agent = initialize_agent(


In [None]:
agent.run("I spent 2000 for booking flight tickets to go to Goa today")

### StreamLit


In [28]:
!pip install streamlit ngrok
!pip install pyngrok streamlit

Collecting streamlit
  Downloading streamlit-1.42.0-py2.py3-none-any.whl.metadata (8.9 kB)
Collecting ngrok
  Downloading ngrok-1.4.0-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (19 kB)
Collecting watchdog<7,>=2.1.5 (from streamlit)
  Downloading watchdog-6.0.0-py3-none-manylinux2014_x86_64.whl.metadata (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.3/44.3 kB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
Collecting pydeck<1,>=0.8.0b4 (from streamlit)
  Downloading pydeck-0.9.1-py2.py3-none-any.whl.metadata (4.1 kB)
Downloading streamlit-1.42.0-py2.py3-none-any.whl (9.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m9.6/9.6 MB[0m [31m72.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading ngrok-1.4.0-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.1/3.1 MB[0m [31m99.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pydeck-0.9.1-py2.py3-none

In [38]:
!ngrok authtoken 2t2auHomTGUwgsbafz2wvj4YhJK_33GSuf1VX2N4jVqG63bvs

Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml


In [None]:
!pip install fastapi uvicorn nest-asyncio
!killall ngrok


In [None]:
from fastapi import FastAPI
from pydantic import BaseModel
from pyngrok import ngrok
import time
import nest_asyncio  # Import nest_asyncio
import uvicorn

app = FastAPI()

class QueryModel(BaseModel):
    query: str

@app.post("/agent_response")
async def get_response(query: QueryModel):
    user_input = query.query  # Extract user input
    print(f"Received input: {user_input}")  # Debugging log

    response_text = agent.run(user_input)  # Ensure it waits for completion
    print(f"Agent Response: {response_text}")  # Debugging log

    return {"response": response_text}


# Start the server
ngrok_tunnel = ngrok.connect(8000)
print("Backend running at:", ngrok_tunnel.public_url)

# Apply nest_asyncio to allow nested event loops
nest_asyncio.apply()

# Run uvicorn with the FastAPI app
uvicorn.run(app, host="0.0.0.0", port=8000)

In [48]:
!nohup streamlit run streamlit_app.py --server.port 8501 --server.address 0.0.0.0 > streamlit.log 2>&1 &


In [49]:
!ls

sample_data  streamlit_app.py  streamlit.log


In [50]:
from pyngrok import ngrok
public_url = ngrok.connect(8501)
print("Streamlit is live at:", public_url)

Streamlit is live at: NgrokTunnel: "https://c0c6-34-23-88-68.ngrok-free.app" -> "http://localhost:8501"
