<a href="https://colab.research.google.com/github/incognito-hb-24/Arthena/blob/main/ArthenaAI.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install -q -U google-generativeai pandas matplotlib numpy
from IPython.display import clear_output
clear_output()
print("Libraries ready.")

Libraries ready.


In [2]:
import google.generativeai as genai
from google.colab import userdata

API_KEY = userdata.get("ARTHENA")
if not API_KEY:
    raise ValueError("No API key found! Go to Tools ‚Üí Secrets ‚Üí Key = 'ARTHENA'.")

genai.configure(api_key=API_KEY)
print("Gemini API configured successfully!")

Gemini API configured successfully!


In [3]:
import os
from datetime import date, datetime

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import ipywidgets as widgets
from IPython.display import display, HTML

# --- Public contract mirrored from cfo.storage / cfo.logic ---
COLS = ["Date","Module","Type","Category","Entity","Amount (‚Çπ)","Notes"]

def get_history_path() -> str:
    # Colab-friendly path (simple local file like Streamlit version would)
    return "/content/arthena_history.csv"

def load_history() -> pd.DataFrame:
    p = get_history_path()
    if os.path.exists(p):
        try:
            df = pd.read_csv(p)
            # ensure columns present
            missing = [c for c in COLS if c not in df.columns]
            if missing:
                for c in missing:
                    df[c] = "" if c != "Amount (‚Çπ)" else 0.0
                df = df[COLS]
            return df
        except Exception:
            return pd.DataFrame(columns=COLS)
    return pd.DataFrame(columns=COLS)

def save_history(df: pd.DataFrame):
    df.to_csv(get_history_path(), index=False)

def summarize_context(df: pd.DataFrame) -> str:
    if df.empty:
        return "No transactions yet."
    inc = df[df["Type"]=="Income"]["Amount (‚Çπ)"].sum()
    exp = abs(df[df["Type"]=="Expense"]["Amount (‚Çπ)"]).sum()
    inv = abs(df[df["Type"]=="Investment"]["Amount (‚Çπ)"]).sum()
    net = inc - exp - inv
    return f"Income ‚Çπ{inc:.0f}, Expenses ‚Çπ{exp:.0f}, Investments ‚Çπ{inv:.0f}, Net Savings ‚Çπ{net:.0f}"

def compute_summary(df: pd.DataFrame):
    inc = df[df["Type"]=="Income"]["Amount (‚Çπ)"].sum()
    exp = abs(df[df["Type"]=="Expense"]["Amount (‚Çπ)"]).sum()
    inv = abs(df[df["Type"]=="Investment"]["Amount (‚Çπ)"]).sum()
    return {"Income": inc, "Expenses": exp, "Investments": inv, "Net": inc - exp - inv}

def plot_type_distribution(df: pd.DataFrame):
    vc = df["Type"].value_counts()
    fig, ax = plt.subplots(figsize=(6, 3))
    ax.bar(vc.index, vc.values)
    ax.set_title("Transaction type distribution")
    ax.grid(axis="y", alpha=0.3)
    return fig

# session-like state
history_df = load_history()
chat_memory = []  # list of (user, assistant)
print("Storage ready at:", get_history_path())


Storage ready at: /content/arthena_history.csv


In [4]:
from google.colab import output
output.enable_custom_widget_manager()

In [5]:
# --- Arthena Form: Add a Transaction (original layout) ---
import ipywidgets as widgets
from IPython.display import display, HTML
from datetime import datetime, date

# Load current session dataframe
history_df = load_history()

modules = ["Wealth", "Cashflow"]
wealth_types = ["Asset", "Liability", "Goal", "Insurance"]
cashflow_types = ["Income", "Expense", "Investment", "Payment"]

# --- Form widgets ---
mod_dd   = widgets.Dropdown(options=modules, value="Cashflow", description="Module:")
type_dd  = widgets.Dropdown(options=cashflow_types, description="Type:")
cat_in   = widgets.Text(description="Category:")
entity_in= widgets.Text(description="Entity:")
amt_in   = widgets.FloatText(description="Amount (‚Çπ):", value=0.0, step=100.0)
date_in  = widgets.DatePicker(description="Date:", value=date.today())
notes_in = widgets.Text(description="Notes:")
add_btn  = widgets.Button(description="‚ûï Add Entry", button_style="success",
                          layout=widgets.Layout(width='200px'))
out_add  = widgets.Output()

# --- Dynamic type update when module changes ---
def _update_type(_):
    type_dd.options = wealth_types if mod_dd.value == "Wealth" else cashflow_types
mod_dd.observe(_update_type, names="value")

# --- Add entry callback ---
def _add_entry(_):
    global history_df
    with out_add:
        out_add.clear_output()
        if not entity_in.value.strip() or amt_in.value == 0:
            print("‚ö†Ô∏è Please fill Entity and a non-zero Amount (‚Çπ).")
            return
        row = {
            "Date": (date_in.value or date.today()).strftime("%Y-%m-%d"),
            "Module": mod_dd.value,
            "Type": type_dd.value,
            "Category": cat_in.value,
            "Entity": entity_in.value,
            "Amount (‚Çπ)": float(amt_in.value),
            "Notes": notes_in.value,
        }
        history_df = pd.concat([history_df, pd.DataFrame([row])], ignore_index=True)
        save_history(history_df)
        print("‚úÖ Entry added.")
        display(history_df.tail(5))

add_btn.on_click(_add_entry)

# --- Display form ---
display(HTML("<h2>Arthena ‚Äî Personal CFO</h2>"))
display(HTML("<h3>üßæ Add a transaction</h3>"))
display(widgets.VBox([
    mod_dd, type_dd, cat_in, entity_in, amt_in, date_in, notes_in, add_btn, out_add
]))

VBox(children=(Dropdown(description='Module:', index=1, options=('Wealth', 'Cashflow'), value='Cashflow'), Dro‚Ä¶

In [6]:
save_btn   = widgets.Button(description="üíæ Save now", button_style="", layout=widgets.Layout(width='160px'))
clear_btn  = widgets.Button(description="üßπ Clear (session only)", button_style="warning", layout=widgets.Layout(width='200px'))
sample_btn = widgets.Button(description="‚Ü≥ Load sample rows", button_style="info", layout=widgets.Layout(width='200px'))
out_store  = widgets.Output()

def _save(_):
    with out_store:
        out_store.clear_output()
        save_history(history_df)
        print("Saved:", get_history_path())

def _clear(_):
    global history_df, chat_memory
    with out_store:
        out_store.clear_output()
        history_df = pd.DataFrame(columns=COLS)
        chat_memory = []
        print("Session cleared.")

def _load_sample(_):
    global history_df
    with out_store:
        out_store.clear_output()
        sample_path = "/content/data/sample_transactions.csv"  # same relative as app.py expects
        if os.path.exists(sample_path):
            sample = pd.read_csv(sample_path)
            history_df = pd.concat([history_df, sample], ignore_index=True)
            save_history(history_df)
            print("Sample data added.")
        else:
            print("No sample file found at:", sample_path)

display(HTML("<h3>üìÅ Storage</h3>"))
display(widgets.HBox([save_btn, clear_btn, sample_btn]))
display(out_store)

save_btn.on_click(_save)
clear_btn.on_click(_clear)
sample_btn.on_click(_load_sample)

HBox(children=(Button(description='üíæ Save now', layout=Layout(width='160px'), style=ButtonStyle()), Button(but‚Ä¶

Output()

In [7]:
out_insight = widgets.Output()
refresh_btn = widgets.Button(description="üîÑ Refresh Insights", button_style="info", layout=widgets.Layout(width='200px'))

def _render_insights(_=None):
    with out_insight:
        out_insight.clear_output()
        df = history_df.copy()
        if df.empty:
            print("No data yet. Add your first entry above.")
            return

        # Headline metrics
        summary = compute_summary(df)
        print(f"Income (‚Çπ): {summary['Income']:,.0f}")
        print(f"Expenses (‚Çπ): {summary['Expenses']:,.0f}")
        print(f"Investments (‚Çπ): {summary['Investments']:,.0f}")
        print(f"Net Savings (‚Çπ): {summary['Net']:,.0f}")

        # Chart 1: Type distribution
        display(HTML("<b>Transaction type distribution</b>"))
        fig = plot_type_distribution(df.copy())
        plt.show(fig)

        # Chart 2: Monthly cashflow (Inflows vs Outflows)
        display(HTML("<b>Monthly cashflow</b>"))
        plot_df = df.copy()
        plot_df["Date"] = pd.to_datetime(plot_df["Date"], errors="coerce")
        plot_df["YYYY-MM"] = plot_df["Date"].dt.to_period("M").astype(str)

        inflow = plot_df[plot_df["Type"].isin(["Income","Investment"])].groupby("YYYY-MM")["Amount (‚Çπ)"].sum()
        outflow = plot_df[plot_df["Type"].isin(["Expense","Payment"])]["Amount (‚Çπ)"].abs().groupby(plot_df["YYYY-MM"]).sum()

        idx = sorted(set(inflow.index).union(outflow.index))
        y_in = [inflow.get(i, 0) for i in idx]
        y_out= [outflow.get(i, 0) for i in idx]
        x = np.arange(len(idx))
        width = 0.4

        fig2, ax2 = plt.subplots(figsize=(7,4))
        ax2.bar(x - width/2, y_in, width, label="Inflows")
        ax2.bar(x + width/2, y_out, width, label="Outflows")
        ax2.set_xticks(x); ax2.set_xticklabels(idx, rotation=45, ha="right")
        ax2.set_ylabel("‚Çπ"); ax2.set_title("Inflows vs Outflows by Month")
        ax2.legend(); ax2.grid(axis="y", alpha=0.3)
        plt.show()

        # Recent entries
        display(HTML("<b>Recent entries</b>"))
        disp = df.sort_values("Date", ascending=False).head(10)
        display(disp)

display(HTML("<h3>üìä Insights dashboard</h3>"))
display(widgets.VBox([refresh_btn, out_insight]))
refresh_btn.on_click(_render_insights)
_render_insights()

VBox(children=(Button(button_style='info', description='üîÑ Refresh Insights', layout=Layout(width='200px'), sty‚Ä¶

In [8]:
# List usable text models (same filter as app.py)
def list_text_models():
    try:
        all_models = list(genai.list_models())
    except Exception as e:
        print("Could not list Gemini models:", e)
        return []
    usable = []
    for m in all_models:
        methods = set(getattr(m, "supported_generation_methods", []) or [])
        if "generateContent" in methods:
            name = getattr(m, "name", "") or getattr(m, "model", "")
            if name:
                usable.append(name)
    def score(n: str):
        nl = n.lower()
        return (
            0 if "flash" in nl else (1 if "pro" in nl else 2),
            0 if "latest" in nl else (1 if "-001" in nl else 2),
            nl
        )
    return sorted(usable, key=score)

available_models = list_text_models()
if not available_models:
    available_models = ["models/gemini-1.5-flash-latest"]

model_dd = widgets.Dropdown(options=available_models, value=available_models[0], description="Model:")
ask_in   = widgets.Textarea(placeholder="Ask a question (e.g., ‚ÄúHow can I reduce expenses this month?‚Äù)",
                            layout=widgets.Layout(width='750px', height='80px'))
ask_btn  = widgets.Button(description="Ask Arthena", button_style="primary", layout=widgets.Layout(width='160px'))
chat_out = widgets.Output()

def try_models_in_order(prompt: str, chosen: str, all_models: list[str]):
    errs = []
    names_to_try = [chosen] + [m for m in all_models if m != chosen]
    for name in names_to_try:
        try:
            m = genai.GenerativeModel(name)
            r = m.generate_content(prompt)
            return name, getattr(r, "text", str(r))
        except Exception as e:
            msg = str(e)
            errs.append((name, msg))
            if ("not found" in msg.lower()) or ("404" in msg) or ("unsupported" in msg.lower()):
                continue
            break
    joined = "\n".join([f"- {n}: {m}" for n, m in errs[:5]])
    return None, f"All model attempts failed.\n{joined}"

def _ask(_):
    global chat_memory
    with chat_out:
        chat_out.clear_output()
        if history_df is None or (history_df.empty and not ask_in.value.strip()):
            print("Add some data and type a question.")
            return
        context = summarize_context(history_df)
        history_str = "\n".join([f"User: {u}\nArthena: {a}" for u, a in chat_memory[-3:]])
        prompt = (
            "You are Arthena ‚Äî a personal finance coach. "
            "Use the user's data context to give simple, practical, India-friendly advice. "
            "Be concise (2‚Äì5 sentences) and avoid generic tips.\n\n"
            f"Recent chat:\n{history_str}\n\n"
            f"Data context: {context}\n\n"
            f"User question: {ask_in.value.strip() or '(empty question)'}"
        )
        used, ans = try_models_in_order(prompt, model_dd.value, available_models)
        if used:
            print(f"(Answered with model: {used})\n")
        print(ans)
        chat_memory.append((ask_in.value.strip() or "(empty question)", ans))

display(HTML("<h3>ü§ñ Arthena AI ‚Äî Ask for tips</h3>"))
display(widgets.VBox([model_dd, ask_in, ask_btn, chat_out]))
ask_btn.on_click(_ask)

VBox(children=(Dropdown(description='Model:', options=('models/gemini-flash-latest', 'models/gemini-flash-lite‚Ä¶

In [9]:
print("üéØ Colab backend mirror of app.py is ready.")
print("‚Ä¢ Add transactions, hit 'Refresh Insights', and try the AI section.")
print("‚Ä¢ CSV path:", get_history_path())

üéØ Colab backend mirror of app.py is ready.
‚Ä¢ Add transactions, hit 'Refresh Insights', and try the AI section.
‚Ä¢ CSV path: /content/arthena_history.csv
