In [0]:
%pip install faker databricks-langchain langchain databricks-sdk mlflow
dbutils.library.restartPython()


# Financial Wealth Intelligence Assistant

Through this workshop, we are building a **conversational financial Weath Intelligence Assisst** that assists customers and enterprises with understanding, optimising, and protecting their investment portfolios.

The assistant is capable of:
- 📊 Analysing a customer's portfolio performance
- 📈 Suggesting improvements and rebalancing strategies
- 🚨 Identifying anomalies or fraud-related risks
- 🧠 Answering questions using internal financial guidance
- 🔔 Triggering alerts or next steps when risks are detected


# Generate synthetic data for this use case 

In [0]:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import ChatMessage, ChatMessageRole

def generate_synth_content(stock,system_prompt,user_prompt):

  w = WorkspaceClient()
  response = w.serving_endpoints.query(
      name="databricks-claude-3-7-sonnet",
      messages=[
          ChatMessage(role=ChatMessageRole.SYSTEM, content=system_prompt),
          ChatMessage(role=ChatMessageRole.USER, content=user_prompt),
      ],
  )
  notes = response.choices[0].message.content

  return notes

In [0]:
#Define the catalog name for each user 
workspace_url = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get()
digits = ''.join(filter(str.isdigit, workspace_url))
catalog = 'catalog_' + digits

#Define the schema name for the workshop
schema="agent_workshop"
full_schema_name = f"{catalog}.{schema}"

# spark.sql(f"create CATALOG if not exists labuser")
spark.sql(f"create SCHEMA if not exists {full_schema_name}")
spark.sql(f"DESCRIBE  SCHEMA {full_schema_name}")

#Define necessary tables
customer_table = f"{catalog}.{schema}.customers"
portfolio_table = f"{catalog}.{schema}.portfolio"
transaction_table = f"{catalog}.{schema}.transactions"
fraud_table = f"{catalog}.{schema}.fraud"


## Generate structured synthetic data

Let's generate synthetic data for a financial agent demo, including four core tables:

- **`customers`**: 300 fake customers with personal and contact info
- **`portfolio`**: Simulated stock holdings per customer across 7 majot tech stocks
- **`transactions`**: 1,000 fake transactions with amount, date, and location
- **`fraud`**: Around 200 flagged transactions with alert descriptions and analyst notes

Data is generated using `Faker`, `random`, and `numpy`, and saved into Delta tables for downstream querying and agent use.

In [0]:
import pandas as pd
import numpy as np
from faker import Faker
import random
from datetime import datetime, timedelta

fake = Faker('en_AU')
Faker.seed(42)
random.seed(42)
np.random.seed(42)

# --- 1. Generate Customers Table ---
num_customers = 300

customers = []
for i in range(1, num_customers + 1):
    customers.append({
        "customer_id": 'CUST00'+ str(i),
        "FirstName": fake.first_name(),
        "LastName": fake.last_name(),
        "Email": fake.email(),
        "PhoneNumber": fake.phone_number(),
        "Address": fake.address(),
        "City": fake.city(),
        "State": fake.state(),
        "PostalCode": fake.postcode(),
        "Country": fake.country(),
        "DateOfBirth": fake.date_of_birth(minimum_age=18, maximum_age=80),
        "RegistrationDate": fake.date_this_decade()
    })
customers_df = pd.DataFrame(customers)

#store the customer table into delta
spark.createDataFrame(customers_df).write.mode("overwrite").saveAsTable(customer_table)
display(customers_df.head(5))


# --- 2. Generate Portfolio Table ---
portfolio = []
stocks = ["Apple", "Tesla", "Amazon", "Microsoft", "NVIDIA", "Netflix", "ETHI"]

for _ in range(1000):
    customer_id = 'CUST00' + str(random.randint(1, num_customers))
    stock = random.choice(stocks)
    quantity = random.randint(1, 100)
    value_per_stock = round(random.uniform(50, 2000), 2)
    current_value = round(quantity * value_per_stock, 2)
    gain_loss_percentage = round(random.uniform(-20, 40), 2)
    advisor_note = fake.sentence(nb_words=10)

    portfolio.append({
        "customer_id": customer_id,
        "stock_name": stock,
        "quantity": quantity,
        "current_value": current_value,
        "gain_loss_percentage": gain_loss_percentage
    })

#generate portfolio data
portfolio_df = pd.DataFrame(portfolio)
spark.createDataFrame(portfolio_df).write.mode("overwrite").saveAsTable(portfolio_table)


# --- 3. Generate Transactions Table ---
transaction_ids = []
transactions = []

for i in range(1000):
    transaction_id = f"T00{i+1:04d}"
    transaction_ids.append(transaction_id)
    customer_id = 'CUST00' + str(random.randint(1, num_customers))
    amount = round(random.uniform(10, 5000), 2)
    txn_date = datetime.now() - timedelta(days=random.randint(1, 365))
    location = fake.city()

    transactions.append({
        "transaction_id": transaction_id,
        "customer_id": customer_id,
        "amount": amount,
        "date": txn_date.strftime("%Y-%m-%d"),
        "location": location
    })

transactions_df = pd.DataFrame(transactions)
spark.createDataFrame(transactions_df).write.mode("overwrite").saveAsTable(transaction_table)


# --- 4. Generate Fraud Alerts Table ---
fraud_alerts = []
fraud_sample = random.sample(transaction_ids, k=200)  # ~20% transactions flagged

# Predefined realistic fraud alert descriptions
alert_templates = [
    "Suspicious foreign ATM withdrawal",
    "Multiple failed login attempts detected",
    "Unusual high-value online purchase",
    "Card skimming suspected at local store",
    "IP address mismatch from different countries",
    "Large fund transfer flagged without prior notice",
    "Transaction outside of usual customer location",
    "Rapid successive purchases over short period",
    "Abnormal account access time",
    "Attempted transaction blocked due to policy violation"
]

# Predefined analyst notes
analyst_note_templates = [
    "Escalate to fraud investigation team immediately.",
    "Customer notification pending confirmation.",
    "Freeze account temporarily pending investigation.",
    "Monitor customer account for next 48 hours.",
    "Review transaction logs for linked accounts.",
    "Manual verification required before releasing funds.",
    "Severity flagged based on historical fraud pattern.",
    "Advise customer to reset credentials immediately.",
    "Flagged under standard AML compliance rules.",
    "Notify compliance team for further action."
]

for txn_id in fraud_sample:
    severity = random.choices(["Low", "Medium", "High"], weights=[0.3, 0.4, 0.3])[0]
    alert_description = random.choice(alert_templates)
    analyst_notes = random.choice(analyst_note_templates)

    fraud_alerts.append({
        "transaction_id": txn_id,
        "alert_description": alert_description,
        "severity_level": severity,
        "analyst_notes": analyst_notes
    })

fraud_df = pd.DataFrame(fraud_alerts)
spark.createDataFrame(fraud_df).write.mode("overwrite").saveAsTable(fraud_table)

In [0]:
display(customers_df)
display(portfolio_df)
display(transactions_df)
display(fraud_df)