### Create a UC Function-Tool
This notebook defines Unity Catalog SQL functions (metadata) and a Python UDF for real statistics.

## Unity Catalog Functions & UDF (Canonical Order)
1. `check_catalog_exist(catalog_name_to_find)` – verify catalog existence
2. `check_table_exist(catalog_name_to_find, schema_name_to_find, table_name_to_find)` – verify table existence
3. `check_schema_exist(catalog_name_to_find, schema_name_to_find)` – verify schema existence
4. `get_table_columns(catalog_name, schema_name, table_name)` – JSON array of column metadata
5. `get_column_statistics(catalog_name, schema_name, table_name, column_name)` – Python UDF returning real stats (unique_count, null_count, min, max, mean)
6. `get_table_summary(catalog_name, schema_name, table_name)` – lightweight overview (no heavy scans)

Notes:
- Function 5 is a Python UDF (registered as a SQL callable) providing actual statistics.
- Function 4 and 6 return JSON assembled from information_schema; 6 intentionally skips heavy stats.
- Use Function 5 per-column when deeper statistics are required.


In [0]:
# Install Unity Catalog AI integration packages with the Databricks extra
%pip install unitycatalog-ai[databricks]
%pip install unitycatalog-langchain[databricks]
%pip install -U -qqqq mlflow databricks-openai databricks-agents

# Install the Databricks LangChain integration package
%pip install databricks-langchain


dbutils.library.restartPython()

In [0]:
import sys
path = '/Workspace' + '/'.join(dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get().split("/")[0:-2])
sys.path.append(path)

In [0]:
from configs import variables

In [0]:
from unitycatalog.ai.core.databricks import DatabricksFunctionClient

client = DatabricksFunctionClient()

In [0]:
%sql
-- Function 1: Check if a catalog exists in Unity Catalog
-- This function queries the system.information_schema.catalogs table to verify
-- whether a specific catalog exists in the Databricks workspace

CREATE OR REPLACE FUNCTION agentic_ai.synthia_data_agent.check_catalog_exist(catalog_name_to_find STRING COMMENT 'Catalog name to search')
RETURNS STRING
COMMENT "This function checks if a catalog exists in the Databricks workspace. Returns 'TRUE' if the catalog exists, 'FALSE' otherwise."
RETURN
SELECT CASE WHEN EXISTS (
    SELECT *
    FROM system.information_schema.catalogs c
    WHERE c.catalog_name = catalog_name_to_find
)
THEN 'TRUE'
ELSE 'FALSE'
END

In [0]:
%sql
-- Function 2: Check if a table exists in Unity Catalog
-- This function queries the system.information_schema.tables to verify whether
-- a specific table exists within the given catalog and schema combination
CREATE OR REPLACE FUNCTION agentic_ai.synthia_data_agent.check_table_exist(
  catalog_name_to_find STRING COMMENT 'Catalog name to search',
  schema_name_to_find STRING COMMENT 'Schema name to search',
  table_name_to_find STRING COMMENT 'Table name to search'
)
RETURNS STRING
COMMENT "This function checks if a table exists in the Databricks workspace. Returns 'TRUE' if the table exists"
RETURN
SELECT CASE WHEN EXISTS (
    SELECT *
    FROM system.information_schema.tables c
    WHERE c.table_catalog = catalog_name_to_find AND c.table_schema = schema_name_to_find AND c.table_name = table_name_to_find
)
THEN 'TRUE'
ELSE 'FALSE'
END

In [0]:
%sql
-- Function 3: Check if a schema exists in Unity Catalog
CREATE OR REPLACE FUNCTION agentic_ai.synthia_data_agent.check_schema_exist(
  catalog_name_to_find STRING COMMENT 'Catalog name to search',
  schema_name_to_find STRING COMMENT 'Schema name to search'
)
RETURNS STRING
COMMENT "This function checks if a schema exists in the Databricks workspace. Returns 'TRUE' if the schema exists"
RETURN
SELECT CASE WHEN EXISTS (
    SELECT *
    FROM system.information_schema.tables c
    WHERE c.table_catalog = catalog_name_to_find AND c.table_schema = schema_name_to_find
)
THEN 'TRUE'
ELSE 'FALSE'
END

In [0]:
%sql
-- Function 4: Get table columns metadata as JSON array
-- Returns JSON array with objects: column_name, data_type, is_nullable, comment, ordinal_position

CREATE OR REPLACE FUNCTION agentic_ai.synthia_data_agent.get_table_columns(
  catalog_name STRING COMMENT 'Catalog name where the table is located',
  schema_name STRING COMMENT 'Schema name where the table is located',
  table_name STRING COMMENT 'Table name to inspect'
)
RETURNS STRING
COMMENT "Returns JSON array with columns (column_name,data_type,is_nullable,comment,ordinal_position) for the specified table or an error JSON if table not found."
RETURN
SELECT CASE WHEN EXISTS (
    SELECT * FROM system.information_schema.tables t
    WHERE t.table_catalog = catalog_name
      AND t.table_schema = schema_name
      AND t.table_name = table_name
)
THEN (
    SELECT CONCAT('[', concat_ws(',', collect_list(
        CONCAT('{"column_name":"', c.column_name,
               '","data_type":"', c.data_type,
               '","is_nullable":"', c.is_nullable,
               '","comment":"', COALESCE(c.comment,''),
               '","ordinal_position":', CAST(c.ordinal_position AS STRING), '}')
    )), ']')
    FROM system.information_schema.columns c
    WHERE c.table_catalog = catalog_name
      AND c.table_schema = schema_name
      AND c.table_name = table_name
)
ELSE '{"error":"Table not found"}'
END

In [0]:
def get_column_statistics(catalog_name, schema_name, table_name, column_name):
    from pyspark.sql.functions import col
    import json
    
    full_table = f"{catalog_name}.{schema_name}.{table_name}"
    df = spark.read.table(full_table)
    if column_name not in df.columns:
        return json.dumps({"error": f"Column {column_name} not found"})
    distinct_count = df.select(column_name).distinct().count()
    null_count = df.filter(col(column_name).isNull()).count()
    try:
        numeric = df.select(column_name).summary("min", "max").collect()
        min_val = numeric[0][1]
        max_val = numeric[1][1]
    except:
        min_val, max_val = None, None
    return json.dumps({
        "column": column_name,
        "distinct_values": distinct_count,
        "null_count": null_count,
        "min": min_val,
        "max": max_val
    })

In [0]:
%sql
    
-- Function 6: Get comprehensive table summary from Unity Catalog
-- Returns a JSON object summarizing columns (name, type, nullable, comment, placeholder summary)
-- NOTE: Heavy statistics are intentionally excluded here to avoid scanning large tables in a scalar function.
CREATE OR REPLACE FUNCTION agentic_ai.synthia_data_agent.get_table_summary(
  catalog_name STRING COMMENT 'Catalog name where the table is located',
  schema_name STRING COMMENT 'Schema name where the table is located',
  table_name STRING COMMENT 'Table name to summarize'
)
RETURNS STRING
COMMENT "Returns JSON summary of all columns (name, data_type, is_nullable, comment, placeholder summary). Heavy stats should be computed via dedicated Python tools/UDFs." 
RETURN
SELECT CASE WHEN EXISTS (
    SELECT * FROM system.information_schema.tables t
    WHERE t.table_catalog = catalog_name AND t.table_schema = schema_name AND t.table_name = table_name
)
THEN (
    SELECT CONCAT('{"table_name":"', table_name,
                  '","total_columns":', CAST(COUNT(*) AS STRING),
                  ',"columns":[',
                  CONCAT_WS(',', collect_list(
                      CONCAT('{"column_name":"', c.column_name,
                             '","data_type":"', c.data_type,
                             '","is_nullable":"', c.is_nullable,
                             '","comment":"', COALESCE(c.comment,''),
                             '","summary":"Detailed statistics require sampling (use get_column_statistics for specifics)"}')
                  )),
                  ']}')
    FROM system.information_schema.columns c
    WHERE c.table_catalog = catalog_name
      AND c.table_schema = schema_name
      AND c.table_name = table_name
)
ELSE '{"error":"Table not found"}'
END

### Test Function

In [0]:
# Test the get_table_summary function
# This test retrieves a comprehensive summary of the NYC taxi trips table
# Expected result: JSON with complete table summary including all columns and their characteristics
result_summary = client.execute_function(
    function_name=f"{variables.CATALOG_NAME}.{variables.SCHEMA_NAME}.get_table_summary",
    parameters={
        "catalog_name": "samples",
        "schema_name": "nyctaxi", 
        "table_name": "trips"
    }
)

print("Table summary result:")
print(result_summary.value)

###  Agent Tool
Wrap the function using the UCFunctionToolkit to make it accessible to agent authoring libraries. The toolkit ensures consistency across different gen AI libraries and adds helpful features like auto-tracing for retrievers.

In [0]:
# Create LangChain Agent with Unity Catalog Function Tools
# This section demonstrates how to integrate the UC functions into a conversational agent
# The agent can use these functions to inspect Unity Catalog resources and answer questions
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain.prompts import ChatPromptTemplate
from databricks_langchain import (
  ChatDatabricks,
  UCFunctionToolkit,
)
import mlflow

# Initialize the LLM (using Meta Llama 3.3 70B model hosted on Databricks)
LLM_ENDPOINT_NAME = "databricks-meta-llama-3-3-70b-instruct"
llm = ChatDatabricks(endpoint=LLM_ENDPOINT_NAME, temperature=0.1)

# Create UC Function Toolkit with all our Unity Catalog functions
# This toolkit wraps our SQL functions to make them accessible to the LangChain agent
toolkit = UCFunctionToolkit(
    warehouse_id=None,  # Uses the default warehouse
    function_names=[
        # Resource validation functions
        f"{variables.CATALOG_NAME}.{variables.SCHEMA_NAME}.check_catalog_exist",
        f"{variables.CATALOG_NAME}.{variables.SCHEMA_NAME}.check_schema_exist", 
        f"{variables.CATALOG_NAME}.{variables.SCHEMA_NAME}.check_table_exist",
        
        # Table metadata and analysis functions
        f"{variables.CATALOG_NAME}.{variables.SCHEMA_NAME}.get_table_columns",
        f"{variables.CATALOG_NAME}.{variables.SCHEMA_NAME}.get_column_statistics",
        f"{variables.CATALOG_NAME}.{variables.SCHEMA_NAME}.get_table_summary"
    ]
)

# Get the tools from the toolkit
tools = toolkit.get_tools()

# Define the conversational prompt template
# The agent will use the available tools to answer user questions about Unity Catalog resources
prompt = ChatPromptTemplate.from_messages(
  [
    (
      "system",
      "You are a helpful assistant specialized in Unity Catalog data inspection. Use the available tools to provide comprehensive information about catalogs, schemas, tables, and column statistics. Make sure to use tools for additional functionality.",
    ),
    ("placeholder", "{chat_history}"),
    ("human", "{input}"),
    ("placeholder", "{agent_scratchpad}"),
  ]
)

# Enable automatic tracing for monitoring and debugging
mlflow.langchain.autolog()

# Create the tool-calling agent with LLM, tools, and prompt
agent = create_tool_calling_agent(llm, tools, prompt)

# Create the agent executor that handles the conversation flow
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)

# Test the agent with a comprehensive question about table analysis
# This example demonstrates how the agent uses multiple functions for complete table inspection
agent_executor.invoke({"input": "Can you provide a complete analysis of the table 'trips' in schema 'nyctaxi' and catalog 'samples'? Include column information, statistics, and a summary view."})