In [0]:
%pip install databricks-langchain langchain databricks-sdk mlflow
dbutils.library.restartPython()


# Tools

We are going to create couple of tools for the purpose of conversation. 

1. SQL Tool : Extract customer information (based on customer credentials)
2. SQL Tool :  Extract customer transaction based on customer ID
3. SQL Tool : Extract customer portfolio based on customer ID
4. SQL Tool : Extract fraud alert based on transaction ID
5. Vector Search Retriver tool : VS search for advisory notes , fraud guidelines , policy investment guidelines
6. Process Fraud Suspension 
7. Appointment Scheduler 
8. Genie Agent Tool (Optional with structured data)

In [0]:
#Define the catalog name for each user 
user_email = dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get() 
email = str(user_email).split('@')
catalog = email[0].replace('.','')

schema = "agent_workshop"


func_name_cust_details  = f"{catalog}.{schema}.extract_customer_details"
func_name_portfolio_details  = f"{catalog}.{schema}.extract_portfolio_details"
func_name_customer_transactions = f"{catalog}.{schema}.extract_customer_transactions"
func_name_fraud_analysis = f"{catalog}.{schema}.fraud_analysis"
func_name_customer_fraud = f"{catalog}.{schema}.extract_customer_fraud"
func_name_transaction_details = f"{catalog}.{schema}.extract_transaction_details"
func_name_rebalancing = f"{catalog}.{schema}.simulate_rebalance_and_compare"
func_name_unstructured_vs_tool = f"{catalog}.{schema}.internal_knowledge_search"

transaction_table = f"{catalog}.{schema}.transactions"
customer_table = f"{catalog}.{schema}.customers"
portfolio_table = f"{catalog}.{schema}.portfolio"
fraud_table = f"{catalog}.{schema}.fraud"
vs_index_name = f"{catalog}.{schema}.guidance_gold_index"


# Extract Customer Details

In [0]:
spark.sql(f"DROP FUNCTION IF EXISTS {func_name_cust_details}")

query = f"""
CREATE OR REPLACE FUNCTION {func_name_cust_details} (
  customer_first_name STRING ,
  customer_last_name STRING ,
  post_code STRING
)
returns table (customer_id STRING,
               Email STRING
               )
 COMMENT "This functions extracts customer ID and Email based on customer first , last name and PostCode" 
 RETURN
  SELECT customer_id, Email FROM {customer_table}
  where FirstName = customer_first_name 
  and LastName = customer_last_name
  and PostalCode = post_code
  """
spark.sql(query)

In [0]:
display(spark.sql(f"select * from {func_name_cust_details}('Danielle','Johnson','2461')"))


# Extract Portfolio Details

In [0]:
spark.sql(f"DROP FUNCTION IF EXISTS {func_name_portfolio_details}")

query = f"""
CREATE OR REPLACE FUNCTION {func_name_portfolio_details} (
  cust_id STRING 
)
returns table (
          customer_id STRING,
          stock_name STRING,
          quantity INTEGER,
          current_value DECIMAL, 
          gain_loss_percentage DECIMAL
               )
 COMMENT "This functions extracts portfolio details based on customer ID" 
 RETURN
  SELECT * FROM {portfolio_table}
  where customer_id = cust_id
  """
spark.sql(query)


In [0]:
display(spark.sql(f"select * from {func_name_portfolio_details}('CUST001')"))

In [0]:
# spark.sql("ALTER TABLE ananyaroy.finOps.customers ADD COLUMNS (`status` STRING)")
# spark.sql("UPDATE ananyaroy.finOps.customers SET `status` = 'active' WHERE `status` IS NULL")


# Extract transaction details

In [0]:
spark.sql(f"DROP FUNCTION IF EXISTS {func_name_transaction_details}")

query = f"""
CREATE OR REPLACE FUNCTION {func_name_transaction_details} (
  cust_id STRING   
)
RETURNS TABLE (
          transaction_id STRING,
          customer_id STRING,
          amount DECIMAL,
          date DATE, 
          location STRING
               )
 COMMENT "This functions extracts customer transaction details based on customer ID" 
 RETURN
  SELECT * FROM {transaction_table}
  where customer_id = cust_id
  """
spark.sql(query)

In [0]:
display(spark.sql(f"select * from {func_name_transaction_details}('CUST001')"))


# Portoflio Rebalancing Simulation

In [0]:
spark.sql(f"DROP FUNCTION IF EXISTS {func_name_rebalancing}")

query = f"""
          CREATE OR REPLACE FUNCTION {func_name_rebalancing}(
          portfolio_list STRING COMMENT 'Customer Portfolio List',
          customer_id STRING COMMENT 'Customer ID',
          from_stock STRING COMMENT 'Stock symbol to reduce allocation from',
          to_stock STRING COMMENT 'Stock symbol to increase allocation to',
          percentage DOUBLE COMMENT 'Percentage of the from_stock value to reallocate to to_stock'
        )
        RETURNS DOUBLE
        LANGUAGE PYTHON
        DETERMINISTIC
        COMMENT "Simulates rebalancing of a customers stock portfolio and compares projected values before and after.
        Args:
            portfolio_list (str): a string list of dictionaries containing the customer\'s portfolio.
            customer_id (str): Unique identifier for the customer whose portfolio will be rebalanced.
            from_stock (str): The stock symbol to reduce allocation from.
            to_stock (str): The stock symbol to increase allocation to.
            percentage (float): Percentage of the `from_stock` value to reallocate to `to_stock`.

        Returns:
            difference (float): The difference in forecasted portfolio value after rebalancing. Positive indicates gain.

        Steps:
            1. Fetch the customer\'s portfolio.
            2. Forecast portfolio value based on current value and gain/loss %.
            3. Identify `from_stock` and `to_stock`.
            4. Reallocate the specified percentage.
            5. Forecast updated portfolio.
            6. Return difference in projected values."
        AS 
        $$
          import pandas as pd 
          import ast

          customer_portfolio = ast.literal_eval(portfolio_list)
          columns = ['customer_id', 'stock_name', 'quantity', 'current_value', 'gain_loss_percentage']

          customer_portfolio = pd.DataFrame(customer_portfolio, columns=columns)

          original_df = customer_portfolio.copy()
          original_df['forecasted_value'] = original_df['current_value'] * (1 + original_df['gain_loss_percentage'] / 100)
          orig_total = original_df['forecasted_value'].sum()

          source_portfolio = customer_portfolio[customer_portfolio['stock_name'] == from_stock]
          target_portfolio = customer_portfolio[customer_portfolio['stock_name'] == to_stock]

          if source_portfolio.empty or target_portfolio.empty:
            raise ValueError("Stock names provided are not valid for this customer.")

          from_value = source_portfolio['current_value'].values[0]
          to_value = target_portfolio['current_value'].values[0]
          reallocation_value = from_value * (percentage / 100)

          customer_portfolio.loc[customer_portfolio['stock_name'] == from_stock, 'current_value'] -= reallocation_value
          customer_portfolio.loc[customer_portfolio['stock_name'] == to_stock, 'current_value'] += reallocation_value

          rebalanced_df = customer_portfolio.copy()
          rebalanced_df['forecasted_value'] = rebalanced_df['current_value'] * (1 + rebalanced_df['gain_loss_percentage'] / 100)
          reb_total = rebalanced_df['forecasted_value'].sum()
          difference = reb_total - orig_total

          return difference
        $$
"""
spark.sql(query)

In [0]:
from unitycatalog.ai.core.databricks import DatabricksFunctionClient

client = DatabricksFunctionClient()
customer_id = 'CUST001'
query = f"select * from {portfolio_table}  where customer_id = '{customer_id}'"
customer_portfolio = spark.sql(query)
customer_portfolio = customer_portfolio.collect()
customer_portfolio_str = str([row.asDict() for row in customer_portfolio])
customer_portfolio_str

In [0]:
result = client.execute_function(
    function_name=func_name_rebalancing,
    parameters={
        "percentage": 5.0,
        "customer_id": "CUST001",
        "portfolio_list": customer_portfolio_str,
        "to_stock": "Netflix",
        "from_stock": "Amazon"
      }
  )
result.value


# Fraud Analysis

In [0]:
spark.sql(f"DROP FUNCTION IF EXISTS {func_name_fraud_analysis}")

query = f"""
CREATE OR REPLACE FUNCTION {func_name_fraud_analysis} (
  cust_id STRING 
)
returns table (
        transaction_id STRING,
        customer_id STRING,
        alert_description STRING,
        severity_level STRING,
        amount DECIMAL,
        date DATE, 
        location STRING,
        analyst_notes STRING
               )
 COMMENT "This functions extracts transaction and fraud details from respective tables to assess for fraud in customers account. Use this function when customers asks about suspiscious activity in their email and want to understand if any fraud has happened" 
 RETURN
  SELECT 
  frd.transaction_id,
  trxn.customer_id ,
  frd.alert_description,
  frd.severity_level, 
  trxn.amount,
  trxn.date, 
  trxn.location ,
  frd.analyst_notes
  FROM 
    {transaction_table} trxn 
  inner join 
      {fraud_table} frd 
  on trxn.transaction_id = frd.transaction_id
  and trxn.customer_id = cust_id
  """
spark.sql(query)


In [0]:
display(spark.sql(f"select * from {func_name_fraud_analysis}('CUST001')"))


# Vectorsearch Retriever Tool

In [0]:
query = f"""
CREATE OR REPLACE FUNCTION {func_name_unstructured_vs_tool} (
  -- The agent uses this comment to determine how to generate the query string parameter.
  query STRING
  COMMENT 'The query string for searching any internal knowledge on guidance on different matter.'
) RETURNS TABLE
-- The agent uses this comment to determine when to call this tool. It describes the types of documents and information contained within the index.
COMMENT 'Executes a search on internal knowledge for guidance on portfolio and fraud.Guidance can be on matters like advisory note ,rebalancing,investment portfolio , fraud identification and policies , investment suggestions etc.'
 RETURN
SELECT
  file_content as page_content,
  map('doc_uri', file_name) as metadata
FROM
  vector_search(
    index => '{vs_index_name}', -- Define the vector search index name
    query => query,
    num_results => 3
  )"""
spark.sql(query)

In [0]:
from databricks_langchain import ChatDatabricks
from unitycatalog.ai.langchain.toolkit import UCFunctionToolkit
from unitycatalog.ai.core.databricks import DatabricksFunctionClient
from langchain_core.prompts import ChatPromptTemplate
from langchain.agents import AgentExecutor, create_tool_calling_agent

# Create a toolkit with the UC function
client = DatabricksFunctionClient()

sample_quesetion = "What is the company guideline for any suspiscious activity on transactions?"

# manually defining the func name as I want to test only this functionality
toolkit = UCFunctionToolkit(
    function_names=[func_name_unstructured_vs_tool],
        client=client 
)
tools = toolkit.tools

LLM_ENDPOINT_NAME = "databricks-meta-llama-3-3-70b-instruct"
model = ChatDatabricks(endpoint=LLM_ENDPOINT_NAME)

# Define the prompt
prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful assistant. Use tools for computations if applicable."),
    ("placeholder", "{chat_history}"),
    ("human", "{input}"),
    ("placeholder", "{agent_scratchpad}")
])

# Create a simple LangChain agent agent with our newly defined vs tool 
agent = create_tool_calling_agent(model, toolkit.tools, prompt)

# Create an agent executor via LangChain, adding our defined tools from the UCFunctionToolkit instance
agent_executor = AgentExecutor(agent=agent, tools=toolkit.tools, verbose=True)

# Run the agent with an input
agent_executor.invoke({"input": sample_quesetion})