In [1]:
# %%
# INSTALL DEPENDENCIES
%pip install --upgrade google-cloud-aiplatform vertexai google-auth python-dotenv

Collecting google-cloud-aiplatform
  Using cached google_cloud_aiplatform-1.113.0-py2.py3-none-any.whl.metadata (40 kB)


In [2]:
# %%
import vertexai
from vertexai.preview.generative_models import GenerativeModel
from pyspark.sql import SparkSession
import math

# %%
# -----------------------------
# SPARK SESSION
# -----------------------------
spark = SparkSession.builder \
    .appName("TransactionTipsBatch") \
    .getOrCreate()

In [3]:
# -----------------------------
# CONFIG
# -----------------------------
PROJECT_ID = "mymoneytracker-471819"
LOCATION = "us-central1"
MODEL_NAME = "gemini-2.5-flash"
CSV_INPUT = "transactions.csv"
OUTPUT_DIR = "transactions_with_tips_batched"

# Init Vertex AI
vertexai.init(project=PROJECT_ID, location=LOCATION)

# Load Gemini model once
model = GenerativeModel(model_name=MODEL_NAME)

# %%
# -----------------------------
# Load transactions
# -----------------------------
df = spark.read.csv(CSV_INPUT, header=True, inferSchema=True)
transactions = df.collect()

print(f"Loaded {len(transactions)} transactions.")

# %%
# -----------------------------
# Batch function
# -----------------------------
def generate_tips_batch(transaction_batch):
    """
    Takes a list of Spark Row objects and returns a list of tips.
    """
    # Build prompt with multiple transactions
    prompt = "You are a financial advisor. For each transaction, provide one short, practical financial tip.\n\n"
    for idx, txn in enumerate(transaction_batch, start=1):
        prompt += f"Transaction {idx}: Description={txn['Description']}, Category={txn['Category']}, Amount={txn['Amount']}\n"

    prompt += "\nReturn the tips as a numbered list matching the transaction numbers."

    response = model.generate_content(prompt)
    text = response.text.strip()

    # Parse numbered list output into tips
    tips = []
    for line in text.split("\n"):
        if line.strip() and (line[0].isdigit() or line.startswith("-")):
            # Example: "1. Coffee spending is high..." → extract tip
            tip = line.split(".", 1)[-1].strip()
            tips.append(tip)

    # Safety check: align lengths
    if len(tips) != len(transaction_batch):
        print("⚠️ Warning: Mismatch in generated tips count, filling missing with blank")
        while len(tips) < len(transaction_batch):
            tips.append("No tip generated")

    return tips

Loaded 326 transactions.


In [4]:
# %%
# -----------------------------
# Process in batches
# -----------------------------
BATCH_SIZE = 10
num_batches = math.ceil(len(transactions) / BATCH_SIZE)

results = []
for b in range(num_batches):
    batch_txns = transactions[b*BATCH_SIZE : (b+1)*BATCH_SIZE]
    tips = generate_tips_batch(batch_txns)
    for txn, tip in zip(batch_txns, tips):
        row_with_tip = txn.asDict()
        row_with_tip["Financial_Tip"] = tip
        results.append(row_with_tip)
    print(f"✅ Processed batch {b+1}/{num_batches}")

✅ Processed batch 1/33
✅ Processed batch 2/33
✅ Processed batch 3/33
✅ Processed batch 4/33
✅ Processed batch 5/33
✅ Processed batch 6/33
✅ Processed batch 7/33
✅ Processed batch 8/33
✅ Processed batch 9/33
✅ Processed batch 10/33
✅ Processed batch 11/33
✅ Processed batch 12/33
✅ Processed batch 13/33
✅ Processed batch 14/33
✅ Processed batch 15/33
✅ Processed batch 16/33
✅ Processed batch 17/33
✅ Processed batch 18/33
✅ Processed batch 19/33
✅ Processed batch 20/33
✅ Processed batch 21/33
✅ Processed batch 22/33
✅ Processed batch 23/33
✅ Processed batch 24/33
✅ Processed batch 25/33
✅ Processed batch 26/33
✅ Processed batch 27/33
✅ Processed batch 28/33
✅ Processed batch 29/33
✅ Processed batch 30/33
✅ Processed batch 31/33
✅ Processed batch 32/33
✅ Processed batch 33/33


In [9]:
# %%
# -----------------------------
# Save results back to Spark
# -----------------------------
results_df = spark.createDataFrame(results)
results_df.show(5, truncate=False)

+-------+------+-----------+----------+-------------------+------------------------------------------------------------------------------------------------+--------------+----+
|Account|Amount|Category   |Date      |Description        |Financial_Tip                                                                                   |Location      |_c6 |
+-------+------+-----------+----------+-------------------+------------------------------------------------------------------------------------------------+--------------+----+
|A123   |50.0  | Groceries |01/01/2023| Grocery Shopping  |**Grocery Shopping:** Plan your meals and make a grocery list to avoid impulse purchases.       | New York     |NULL|
|B456   |30.0  | Fuel      |02/01/2023| Gasoline Refill   |**Gasoline Refill:** Combine errands and drive efficiently to conserve fuel.                    | Los Angeles  |NULL|
|C789   |100.0 | Dining Out|03/01/2023| Restaurant Dinner |**Restaurant Dinner:** Set a monthly dining-out budget a

In [10]:
# %%
# -----------------------------
# TRAIN / TEST SPLIT
# -----------------------------
train_df, test_df = results_df.randomSplit([0.8, 0.2], seed=42)

print(f"Training set size: {train_df.count()} rows")
print(f"Test set size: {test_df.count()} rows")

Training set size: 255 rows
Test set size: 71 rows


In [11]:
# -----------------------------
# SAVE AS SINGLE CSV FILES
# -----------------------------
train_output_dir = OUTPUT_DIR + "_train"
test_output_dir = OUTPUT_DIR + "_test"

# Save training set to a single CSV
train_df.coalesce(1).write.mode("overwrite").option("header", True).csv(train_output_dir)

# Save test set to a single CSV
test_df.coalesce(1).write.mode("overwrite").option("header", True).csv(test_output_dir)

print(f"✅ Training dataset saved as single CSV under {train_output_dir}")
print(f"✅ Test dataset saved as single CSV under {test_output_dir}")

✅ Training dataset saved as single CSV under transactions_with_tips_batched_train
✅ Test dataset saved as single CSV under transactions_with_tips_batched_test
