# Build a Data Analyst AI Agent from Scratch

## Project Setup  

This section outlines the steps required to set up the project:  
- Import necessary libraries  
- Load the required API keys  
- Establish the database connection  
- Load the sample data into the database  


In [1]:
!pip install openai



In [2]:
from openai import OpenAI
import json
import inspect
import requests
from teradataml import *

In [3]:
import pandas as pd

In [4]:
with open('configs.json', 'r') as file:
    configs=json.load(file)

In [5]:
client = OpenAI(api_key = configs['llm-api-key'])

In [6]:
%run -i ../startup.ipynb
eng = create_context(host = 'host.docker.internal', username='demo_user', password = password)
print(eng)

Performing setup ...
Setup complete



Enter password:  ·············


... Logon successful
Connected as: teradatasql://demo_user:xxxxx@host.docker.internal/dbc
Engine(teradatasql://demo_user:***@host.docker.internal)


In [None]:
data_loading_queries = [
'''
CREATE DATABASE teddy_retailers
AS PERMANENT = 50000000;
''',
'''
CREATE TABLE teddy_retailers.fct_order_details AS
(
  SELECT order_id,product_id,customer_id,order_date,unit_price,quantity,amount
     FROM (
		LOCATION='/s3/dev-rel-demos.s3.amazonaws.com/ai_agent_dbt/fct_order_details.csv') as order_details
) WITH DATA;''',

'''
CREATE TABLE teddy_retailers.dim_customers AS
(
  SELECT customer_id,first_name,last_name,email
     FROM (
		LOCATION='/s3/dev-rel-demos.s3.amazonaws.com/ai_agent_dbt/dim_customers.csv') as customers
) WITH DATA;
''',
'''
CREATE TABLE teddy_retailers.dim_orders AS
(
  SELECT order_id,order_date,order_status
     FROM (
		LOCATION='/s3/dev-rel-demos.s3.amazonaws.com/ai_agent_dbt/dim_orders.csv') as orders
) WITH DATA;
''',
'''
CREATE TABLE teddy_retailers.dim_products AS
(
  SELECT product_id,product_name,product_category,price_dollars
     FROM (
		LOCATION='/s3/dev-rel-demos.s3.amazonaws.com/ai_agent_dbt/dim_products.csv') as products
) WITH DATA;
'''
]
for query in data_loading_queries:
    execute_sql(query)

## Agent Configuration  

This section covers the configuration of the agent, including:  
* Defining the data context that the agent will interact with  
* Setting up the routine the agent will follow as a system prompt (embedding the data context)  
* Establishing the list of tools available for the agent to complete its tasks  


In [17]:
def query_dbt_catalog(path):
    response = requests.get(path)
    catalog = response.json()
    return catalog

In [19]:
dbt_catalog_path = 'https://raw.githubusercontent.com/Teradata/simple_data_ai_agent/refs/heads/main/catalogs/catalog.json'

In [20]:
system_prompt_full = f"""
You are an advanced data analyst for a retail company, specializing in analyzing data from a Teradata system. Your primary responsibility is to assist users by answering business-related questions using SQL queries on the Teradata database. Follow these steps:

1. Understanding User Requests
   - Users provide business questions in plain English.
   - Extract relevant data points needed to construct a meaningful response.

2. Generating SQL Queries
   - Construct an optimized Teradata SQL query to retrieve the necessary data.
   - The query must be a **single-line string** without carriage returns or line breaks.
   - The catalog of databases, tables, and columns to query is in the following JSON structure, which is a dbt catalog. Use only the dimensional and fact tables, which are prefixed with `dim_` and `fct_`, respectively, to build the query.  
     {query_dbt_catalog(dbt_catalog_path)}
   - Ensure that the SQL query adheres to **Teradata SQL syntax** and avoids unsupported keywords such as `LIMIT`.
   - Apply necessary joins between tables if the user's question requires it.
   - Apply appropriate filtering, grouping, and ordering to enhance performance and accuracy.

3. Executing the Query
   - Show me the query and the reasoning you followed to produce it.
   - Show me the catalog you retrieved for building the query and your reasoning for using it.
   - Run the query in the Teradata database.

4. Responding to the User
   - Respond to the user's question based on the result of running the query.
"""

In [21]:
def function_to_schema(func) -> dict:
    type_map = {
        str: "string",
        int: "integer",
        float: "number",
        bool: "boolean",
        list: "array",
        dict: "object",
        type(None): "null",
    }

    try:
        signature = inspect.signature(func)
    except ValueError as e:
        raise ValueError(
            f"Failed to get signature for function {func.__name__}: {str(e)}"
        )

    parameters = {}
    for param in signature.parameters.values():
        try:
            param_type = type_map.get(param.annotation, "string")
        except KeyError as e:
            raise KeyError(
                f"Unknown type annotation {param.annotation} for parameter {param.name}: {str(e)}"
            )
        parameters[param.name] = {"type": param_type}

    required = [
        param.name
        for param in signature.parameters.values()
        if param.default == inspect._empty
    ]

    return {
        "type": "function",
        "function": {
            "name": func.__name__,
            "description": (func.__doc__ or "").strip(),
            "parameters": {
                "type": "object",
                "properties": parameters,
                "required": required,
            },
        },
    }

In [22]:
def clean_statement(sql_statement):
    # This regex keeps everything up to and including the ORDER BY clause,
    # but cuts off before LIMIT or FETCH if present.
    pattern = r"""
        ^(.*?)                # capture everything lazily up to...
        \bORDER\s+BY\b        # ...the ORDER BY keyword
        [\s\S]*?              # ...and all that follows (ASC/DESC, etc.), non-greedy
        (?=(\bLIMIT\b|\bFETCH\b)|$)  # stop before LIMIT or FETCH or end of string
    """
    match = re.search(pattern, sql_statement, re.IGNORECASE | re.VERBOSE)
    
    if match:
        query_statement = match.group(0).strip()
    else:
        query_statement = sql_statement.strip()

    return query_statement

In [23]:
def query_teradata_database(sql_statement):
    query_statement = clean_statement(sql_statement)
    result = pd.read_sql(query_statement,eng)  
    return result.to_json()

## Agent Runtime
This section covers the code executed while the agent is in action, including:
* Preparing the tools for use by the agent
* The agent's runtime function

In [24]:
tools = [query_teradata_database]
tool_schemas = [function_to_schema(tool) for tool in tools]
tools_map = {tool.__name__: tool for tool in tools}

In [25]:
def execute_tool_call(tool_call, tools_map):
    name = tool_call.function.name
    args = json.loads(tool_call.function.arguments)
    print(f"Assistant: {name}({args})")
    # call corresponding function with provided arguments
    return tools_map[name](**args)

In [26]:
def run_full_turn(system_message, messages):

    while True:
        print(f"just logging messages {messages}")
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "system", "content": system_message}] + messages,
            tools=tool_schemas or None,
            seed = 2
        )
        print(f"logging response {response}")
        message = response.choices[0].message
        messages.append(message)

        if message.content:  # print assistant response
            print("Assistant Response:", message.content)

        if message.tool_calls:  # if finished handling tool calls, break
            # === handle tool calls ===
            for tool_call in message.tool_calls:
                result = execute_tool_call(tool_call, tools_map)

                result_message = {
                    "role": "tool",
                    "tool_call_id": tool_call.id,
                    "content": result,
                }
                messages.append(result_message)
        else:
            break

## Running the Agent  
Ask a business question and receive a response. Since this is a simple agent, it can only handle basic questions. While its capabilities can be enhanced, such improvements are currently out of scope.

In [27]:
messages =[]
user_input = input("User: ")
messages.append({"role": "user", "content": user_input})
new_messages = run_full_turn(system_prompt_full, messages)

User:  what is the best selling product that we have


just logging messages [{'role': 'user', 'content': 'what is the best selling product that we have'}]
logging response ChatCompletion(id='chatcmpl-BJy0aAXpZBDCoAo3lWzEkbHIEBCLT', choices=[Choice(finish_reason='tool_calls', index=0, logprobs=None, message=ChatCompletionMessage(content=None, refusal=None, role='assistant', annotations=[], audio=None, function_call=None, tool_calls=[ChatCompletionMessageToolCall(id='call_mKerQsBTGrmuwyHacxmi08x5', function=Function(arguments='{"sql_statement":"SELECT dp.product_name, SUM(fod.quantity) AS total_quantity_sold FROM teddy_retailers.dim_products AS dp JOIN teddy_retailers.fct_order_details AS fod ON dp.product_id = fod.product_id GROUP BY dp.product_name ORDER BY total_quantity_sold DESC FETCH FIRST 1 ROWS ONLY;"}', name='query_teradata_database'), type='function')]))], created=1744098604, model='gpt-4o-mini-2024-07-18', object='chat.completion', service_tier='default', system_fingerprint='fp_b376dfbbd5', usage=CompletionUsage(completion_tokens

## Cleaning testing data

In [None]:
data_cleaning_queries = [
'''
DELETE DATABASE teddy_retailers ALL;
''',
'''
DROP DATABASE teddy_retailers
'''
]
for query in data_cleaning_queries:
    execute_sql(query)