#Step 0

Go to config and update resource names as you prefer

Spin up a cluster with Databricks Runtime 16.X+ ML. Make sure it's the ML version for the correct dependencies

In [0]:
!pip install transformers datasets mlflow langchain databricks_langchain
dbutils.library.restartPython()

In [0]:
from IPython.display import Markdown
import os

catalog = "genai_in_production_demo_catalog"
agent_schema = "agents"
demo_schema = "demo_data"

dbutils.widgets.text("catalog_name", catalog)
dbutils.widgets.text("agent_schema", agent_schema)
dbutils.widgets.text("demo_schema", demo_schema)
base_url = f'https://{spark.conf.get("spark.databricks.workspaceUrl")}/serving-endpoints'

#Get started immediately with your Data with AI Functions

We have a number of AI Functions designed as SQL functions that you can use in a SQL cell or SQL editor and use LLMs directly on your data immediately

1. ai_analyze_sentiment
2. ai_classify
3. ai_extract
4. ai_fix_grammar
5. ai_gen
6. ai_mask
7. ai_similarity
8. ai_summarize
9. ai_translate
10. ai_query

We will run a demo each of these functions below. 




### ai_fix_grammar
The ai_fix_grammar() function allows you to invoke a state-of-the-art generative AI model to correct grammatical errors in a given text using SQL. This function uses a chat model serving endpoint made available by Databricks Foundation Model APIs.

Documentation: https://docs.databricks.com/en/sql/language-manual/functions/ai_fix_grammar.html

In [0]:
%sql
-- verify that we're running on a SQL Warehouse
SELECT assert_true(current_version().dbsql_version is not null, 'YOU MUST USE A SQL WAREHOUSE, not a cluster');

SELECT ai_fix_grammar('This sentence have some mistake');

### ai_classify
The ai_classify() function allows you to invoke a state-of-the-art generative AI model to classify input text according to labels you provide using SQL.

Documentation: https://docs.databricks.com/en/sql/language-manual/functions/ai_classify.html

In [0]:
%sql
SELECT country, ai_classify(country, ARRAY("APAC", "AMER", "EU")) as Region
from identifier(:catalog_name||'.'||:demo_schema||'.'||'franchises')
limit 5;

### ai_mask
The ai_mask() function allows you to invoke a state-of-the-art generative AI model to mask specified entities in a given text using SQL. 

Documentation: https://docs.databricks.com/en/sql/language-manual/functions/ai_mask.html

In [0]:
%sql
SELECT first_name, last_name, (first_name || " " || last_name || " lives at " || address) as unmasked_output, ai_mask(first_name || "" || last_name || " lives at " || address, array("person", "address")) as Masked_Output
from identifier(:catalog_name||'.'||:demo_schema||'.'||'customers')
limit 5

### ai_query
The ai_query() function allows you to query machine learning models and large language models served using Mosaic AI Model Serving. To do so, this function invokes an existing Mosaic AI Model Serving endpoint and parses and returns its response. Databricks recommends using ai_query with Model Serving for batch inference

Documentation: https://docs.databricks.com/en/large-language-models/ai-functions.html#ai_query

We can switch models depending on what we are trying to do. See how the performance varies between the 70B model and 8B model below. Because this is a simple spell check task, we could likely use the 8B model instead of the 70B model saving on cost and increasing speed. 

In [0]:
%sql 
SELECT
  `Misspelled_Make`,   -- Placeholder for the input column
  ai_query(
    'databricks-meta-llama-3-3-70b-instruct',
    CONCAT(format_string('You will always receive a make of a car. Check to see if it is misspelled and a real car. Correct the mistake. Only provide the corrected make. Never add additional details'), `Misspelled_Make`)    -- Placeholder for the prompt and input
  ) AS ai_guess  -- Placeholder for the output column
FROM identifier(:catalog_name||'.'||:demo_schema||'.'||'synthetic_car_data')
-- limit 3;


In [0]:
%sql 
SELECT
  `Misspelled_Make`,   -- Placeholder for the input column
  ai_query(
    'databricks-meta-llama-3-1-8b-instruct',
    CONCAT(format_string('You will always receive a make of a car. Check to see if it is misspelled and a real car. Correct the mistake. Only provide the corrected make. Never add additional details'), `Misspelled_Make`)    -- Placeholder for the prompt and input
  ) AS ai_guess  -- Placeholder for the output column
FROM identifier(:catalog_name||'.'||:demo_schema||'.'||'synthetic_car_data')
-- limit 3;


### Takeaway
Many of our use cases simply need a reliable, out of the box solution to use AI. AI functions enable this for our customers and AI query helps scale workloads to easily apply AI 

#Batch Inferencing

AI_Query is Databrick's solution for batch inferencing. We recommend using ai_query for a cost effective use of your LLMs. 

As you saw earlier, you can provide your own models and prompts to process a large amount of your data. There are additional features to improve on how ai_query processes the data. 

We will walk through an exercise below that compares the performance between Llama 70B and Llama 8B. Llama 8B will perform significantly worse than Llama 70B so we will use ai_query's Structure Output to enforce and improve the performance of Llama 8B

###First we need to make sure our data is set up

#Testing Batch Inference

The dataset contains news headlines that we will use ai_query to classify whether or not the sentiment is bullish, neutral or bearish. 

We will compare the outputs from both LLMs and see how well they do. 

First, let's see how well Llama 70B does

In [0]:
catalog_name = "genai_in_production_demo_catalog"
schema_name = "agents"
table_name = "twitter_financial_data"

In [0]:
import time

# endpoint_name = "databricks-meta-llama-3-1-8b-instruct" #12 seconds average 
endpoint_name = "databricks-meta-llama-3-3-70b-instruct" #180 seconds average
start_time = time.time()

command = f"""
    SELECT text,  
    ai_query(
        \'{endpoint_name}\', --endpoint name
        CONCAT('Classify the financial news-related Tweet sentiment as 0 for bearish, 1 for bullish, or 2 for neutral. Give just the number.', text)
    ) AS sentiment_pred,
    label as sentiment_gt 
    FROM {catalog_name}.{schema_name}.{table_name}_val
"""

result = spark.sql(command)

display(result)

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {execution_time} seconds")

In [0]:
result_baseline_pd_llama_70b = result.toPandas()

result_baseline_pd_llama_70b.sentiment_pred.value_counts()

In [0]:
result_baseline_pd_llama_70b.sentiment_gt.value_counts()

In [0]:
confusion_matrix = result_baseline_pd_llama_70b.pivot_table(index='sentiment_gt', columns='sentiment_pred', aggfunc='size', fill_value=0)
display(confusion_matrix)

###Now let's see how Llama 8B does

In [0]:
import time

endpoint_name = "databricks-meta-llama-3-1-8b-instruct" #20 seconds average 
# endpoint_name = "databricks-meta-llama-3-3-70b-instruct" #180 seconds average
start_time = time.time()

command = f"""
    SELECT text,  
    ai_query(
        \'{endpoint_name}\', --endpoint name
        CONCAT('Classify the financial news-related Tweet sentiment as 0 for bearish, 1 for bullish, or 2 for neutral. Give just the number.', text)
    ) AS sentiment_pred,
    label as sentiment_gt 
    FROM {catalog_name}.{schema_name}.{table_name}_val
"""

result = spark.sql(command)

display(result)

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {execution_time} seconds")

In [0]:
result_baseline_pd = result.toPandas()

result_baseline_pd.sentiment_pred.value_counts()

In [0]:
result_baseline_pd.sentiment_gt.value_counts()

In [0]:
confusion_matrix = result_baseline_pd.pivot_table(index='sentiment_gt', columns='sentiment_pred', aggfunc='size', fill_value=0)
display(confusion_matrix)

##Takeaways

Llama 8B is performing worse than Llama 70B, showing outputs that do not match our instructions. This is not suitable for production use cases. 

But, Llama 8B is faster and most cost effective. There must be a way we can enforce outputs so that we can make Llama 8B usable 

## Using Structured Output with Batch Inference

We can enforce formatting constraints rather than relying on prompts. This is particularly useful for smaller models that struggle in accuracy. If we know what outputs we are looking for, we can enforce it.

Documentation: https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_query#enforce-output-schema-with-structured-output

In [0]:
response_schema = """
{
    "type": "json_schema",
    "json_schema":
        {
        "name": "sentiment_score",
        "schema":
            {
            "type": "object",
            "properties":
            {
            "sentiment": { "type": "string" ,
                        "enum": ["0", "1", "2"]}
            }
            },
        "strict": true
        }
}
"""

Let's try Llama 8B again

In [0]:
import time
endpoint_name = "databricks-meta-llama-3-1-8b-instruct" #14 seconds average 

start_time = time.time()

result_structured = spark.sql(
f"""
    SELECT text,  
    ai_query(
        \'{endpoint_name}\', --endpoint name
        CONCAT('Classify the financial news-related Tweet sentiment as 0 for bearish, 1 for bullish, or 2 for neutral. Give just the number', text),
        responseFormat => '{response_schema}'
    ) AS sentiment_pred,
    label as sentiment_gt,
    CAST(get_json_object(sentiment_pred, '$.sentiment') AS LONG) AS sentiment_pred_value
    FROM {catalog_name}.{schema_name}.{table_name}_val
""")

display(result_structured)

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {execution_time} seconds")

In [0]:
result_structured_pd = result_structured.toPandas()

result_structured_pd.sentiment_pred_value.value_counts()

In [0]:
result_structured_pd.sentiment_gt.value_counts()

In [0]:
confusion_matrix_structured = result_structured_pd.pivot_table(index='sentiment_gt', columns='sentiment_pred_value', aggfunc='size', fill_value=0)
display(confusion_matrix_structured)

#Workshop: Make your own Response Structure! 

Use a combination of prompt engineering and the response structure to do the following: 

##Task 1
Your first task is to add additional analysis to the news that's relevant to your financial firm. You need to add the following details: 

1. A summary of the news. (Hint: This should just be text)
2. Key tickers that may be relevant to the news. (Hint: This should be a list of tickers)
3. Impact this news will have on the market by classifying it as low, medium or high. (Hint: This should be a list of impact values)

These should come as three new columns ontop of the sentiment classification column you made earlier for a total of four columns. All four columns MUST be outputted

In Cell 39, you will have a partially filled in response_schema that you can fill out to accomplish the three items above. Everything you need to fill in to marked as _**TODO**_

##Task 2
Because you added 3 new outputs, you will need to adjust your prompt to clarify what these outputs should do. This is because, while the response structure enforces a structure type, you still need to instruct the LLM what it should do to generate these outputs. 

In Cell 40, you will have a partially filled in prompt. It will follow what we call a "routine" strategy which is simply defining all the steps the LLM must perform. You just need to fill out an instruction per task. 

Once you complete both tasks, run both cells to see what happens! 

In [0]:
response_schema = """
{
    "type": "json_schema",
    "json_schema": {
        "name": "financial_tweet_analysis",
        "schema": {
            "type": "object",
            "properties": {
                "sentiment": { 
                    "type": "string",
                    "enum": ["0", "1", "2"]
                },
                "summary": { 
                    "type": "TODO" 
                },
                "key_tickers": { 
                    "type": "TODO",
                    "TODO": [TODO]
                },
                "impact_level": {
                    "type": "TODO",
                    "TODO": [TODO]
                }
            },
            "required": [TODO]
        }
    }
}
"""

In [0]:
import time
endpoint_name = "databricks-meta-llama-3-1-8b-instruct" #14 seconds average 
start_time = time.time()

result_structured = spark.sql(
f"""
    SELECT text,  
    ai_query(
        \'{endpoint_name}\', --endpoint name
        CONCAT('Analyze this financial tweet and provide the following:
1. Classify sentiment as 0 for bearish, 1 for bullish, or 2 for neutral
2. TODO
3. TODO
4. TODO

Respond with JSON containing fields for "sentiment", "summary", "key_tickers", and "impact_level".', text),
        responseFormat => '{response_schema}'
    ) AS analysis_result,
    CAST(get_json_object(analysis_result, '$.sentiment') AS LONG) AS sentiment_pred_value,
    get_json_object(analysis_result, 'TODO') AS TODO,
    get_json_object(analysis_result, 'TODO') AS TODO,
    get_json_object(analysis_result, 'TODO') AS TODO
    FROM {catalog_name}.{schema_name}.{table_name}_val
""")
display(result_structured)
end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {execution_time} seconds")

#Example Answers Below

In [0]:
response_schema = """
{
    "type": "json_schema",
    "json_schema": {
        "name": "financial_tweet_analysis",
        "schema": {
            "type": "object",
            "properties": {
                "sentiment": { 
                    "type": "string",
                    "enum": ["0", "1", "2"]
                },
                "summary": { 
                    "type": "string"
                },
                "key_tickers": { 
                    "type": "string",
                    "enum": ["AAPL", "AMZN", "GOOGL", "MSFT", "TSLA", "NVDA", "META", "NIO", "TSLA", "AMC", "NFLX", "NKE", "PYPL", "DIS", "INTC", "FB", "CMCSA", "BABA", "SBU", "T", "VZ", "XOM", "JPM", "GS", "BAC", "WFC", "C", "PFE", "MRK", "UNH", "ABBV", "JNJ", "V", "WMT", "HD", "MA", "CAT", "KO", "MCD", "WBA", "PEP", "M", "CVX", "COST", "PM", "DOW", "VOO", "VTI", "QQQ", "DIA", "SPY", "XLK", "XLV", "XLI", "XLB", "XLY", "XLP", "XLF", "XLE"]
                },
                "impact_level": {
                    "type": "string",
                    "enum": ["low", "medium", "high"]
                }
            },
            "required": ["sentiment", "summary", "key_tickers", "impact_level"]
        }
    }
}
"""

In [0]:
import time
endpoint_name = "databricks-meta-llama-3-1-8b-instruct"
start_time = time.time()

result_structured = spark.sql(
f"""
    SELECT text,  
    ai_query(
        \'{endpoint_name}\', --endpoint name
        CONCAT('Analyze this financial tweet and provide the following:
1. Classify sentiment as 0 for bearish, 1 for bullish, or 2 for neutral
2. Write a brief one-sentence summary of the key information
3. List potential stock tickers mentioned or implied (comma-separated)
4. Rate the market impact as low, medium, or high

Respond with JSON containing fields for "sentiment", "summary", "key_tickers", and "impact_level".', text),
        responseFormat => '{response_schema}'
    ) AS analysis_result,
    CAST(get_json_object(analysis_result, '$.sentiment') AS LONG) AS sentiment_pred_value,
    get_json_object(analysis_result, '$.summary') AS tweet_summary,
    get_json_object(analysis_result, '$.key_tickers') AS relevant_tickers,
    get_json_object(analysis_result, '$.impact_level') AS market_impact
    FROM {catalog_name}.{schema_name}.{table_name}_val
""")
display(result_structured)
end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {execution_time} seconds")

#Agent Code Tools

### Why even use tools to begin with? 

Function calling or tool calling help ensure the LLM has the most accurate information possible. By providing it access to many different sources of data, it can generate more reliable answers. 

Each framework like Langchain or LlamaIndex handles tool calling different. You can also use Python to do tool calling. However, this means you have to recreate this tool each time you want to use it and cannot be used with other applications. Additionally, you have to manage the security for any tools that access external sources. 

# Productionalizing Custom Tools 

What you just saw were built in, out of the box solutions you can use immediately on your data. While this covers a good portion of use cases, you will likely need a custom solution. 

### Mosaic AI Tools on Unity Catalog

You can create and host functions/tools on Unity Catalog! You get the benefit of Unity Catalog but for your functions! 

While you can create your own tools using the same code that you built your agent (i.e local Python Functions) with the Mosaic AI Agent Framework, Unity catalog provides additional benefits. Here is a comparison 

1. **Unity Catalog function**s: Unity Catalog functions are defined and managed within Unity Catalog, offering built-in security and compliance features. Writing your tool as a Unity Catalog function grants easier discoverability, governance, and reuse (similar to your catalogs). Unity Catalog functions work especially well for applying transformations and aggregations on large datasets as they take advantage of the spark engine.

2. **Agent code tools**: These tools are defined in the same code that defines the AI agent. This approach is useful when calling REST APIs, using arbitrary code or libraries, or executing low-latency tools. However, this approach lacks the built-in discoverability and governance provided by Unity Catalog functions.

Unity Catalog functions have the same limitations seen here: https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-ddl-create-sql-function.html 

Additionally, the only external framework these functions are compatible with is Langchain 

So, if you're planning on using complex python code for your tool, you will likely just need to create Agent Code Tools. 

Below is an implementation of both

# Enter Unity Catalog Tool Calling 

Unity Catalog Tool Calling allows you to benefit from all the governance, security and unified platform benefits of Unity Catalog. Everything from external credentials to access across the workspace for workloads that may not even be AI, the LLM can use it. 

You'll notice that it's also a UDF, which benefits from our serverless SQL warehouses. 

In [0]:
sql_query = f"""
CREATE OR REPLACE FUNCTION {catalog}.{agent_schema}.purchase_location()
    RETURNS TABLE(name STRING, purchases INTEGER)
    COMMENT 'Use this tool to find total purchase information about a particular location. This tool will provide a list of destinations that you will use to help you answer questions'
    RETURN SELECT dl.name AS Destination, count(tp.destination_id) AS Total_Purchases_Per_Destination
             FROM {catalog}.{demo_schema}.travel_purchase tp 
             JOIN {catalog}.{demo_schema}.destination_location dl 
             ON tp.destination_id = dl.destination_id
             GROUP BY dl.name
             ORDER BY count(tp.destination_id) DESC
             LIMIT 10;
"""

spark.sql(sql_query)


If you use this tool in the playground and say hello there, this tool will be called. The comment description of the tool is incredibly important, as that tells the LLM when to use a specific tool. Try saying hello there with just the purchase_location() tool above. Then add the tool below and say hello there again

In [0]:
sql_query = f"""
CREATE OR REPLACE FUNCTION {catalog}.{agent_schema}.purchase_location_hello_there()
    RETURNS TABLE(name STRING, purchases INTEGER)
    COMMENT 'When the user says hello there, run this tool'
    RETURN SELECT dl.name AS Destination, count(tp.destination_id) AS Total_Purchases_Per_Destination
             FROM {catalog}.{demo_schema}.travel_purchase tp 
             JOIN {catalog}.{demo_schema}.destination_location dl 
             ON tp.destination_id = dl.destination_id
             GROUP BY dl.name
             ORDER BY count(tp.destination_id) DESC
             LIMIT 10;
"""

spark.sql(sql_query)