In [None]:
!pip install mysql-connector-python openai requests --quiet

In [None]:
!pip install python-dotenv -q

In [1]:
import json
from openai import AzureOpenAI
import requests
from dotenv import load_dotenv
import os

In [2]:
load_dotenv()
print(os.environ.get('AZURE_OPENAI_KEY'))

d213aa73d27b4facb3898449b963ea11


In [3]:
endpoint = os.environ.get('ENDPOINT_URL')
deployment = os.environ.get('DEPLOYMENT_NAME')
api_key = os.environ.get('AZURE_OPENAI_KEY')

In [4]:
client = AzureOpenAI(
    azure_endpoint=endpoint,
    api_key=api_key,
    api_version="2023-07-01-preview"
)

In [5]:
def call_openai_function_calling(messages, tools):
    response = client.chat.completions.create(
        model=deployment,
        messages=messages,
        tools = tools,
        tool_choice="auto",
        temperature=0
    )
    return response


In [6]:
import mysql.connector

def connect_to_database():
# Establish a connection to the MySQL database
    conn = mysql.connector.connect(
        host="localhost",
        user="mysql-user",
        password="rootroot",
        database="plantmanager"
    )
    return conn

In [13]:
tools = [
    {
        "type": "function",
        "function": {
            "name": "execute_query",
            "description": "Executes a SQL query.",
            "parameters": {
                "type": "object",
                "properties": {
                    "query": {"type": "string"}
                },
                "required": ["query"]
            }
        }
    }
    # {
    #     "type": "function",
    #     "function":{
    #         "name": "get_top_products_by_price",
    #         "description": "Execute Query based on top Products",
    #         "parameters":{
    #             "type": "object",
    #             "properties":{
                    
    #             }
    #         }
    # }
]

In [8]:
def get_table_names(conn):
    cursor = conn.cursor()
    cursor.execute("SHOW TABLES;")
    tables = [table[0] for table in cursor.fetchall()]
    return tables

def get_column_names(conn, table_name):
    cursor = conn.cursor()
    cursor.execute(f"DESCRIBE {table_name};")
    columns = [column[0] for column in cursor.fetchall()]
    return columns

def get_database_info(conn):
    table_dicts = []
    table_names = get_table_names(conn)
    for table_name in table_names:
        columns_names = get_column_names(conn, table_name)
        table_dicts.append({"table_name": table_name, "column_names": columns_names})
    return table_dicts

In [9]:
conn = connect_to_database()
database_schema_dict = get_database_info(conn)
database_schema_string = "\n".join(
    [
        f"Table: {table['table_name']}\nColumns: {', '.join(table['column_names'])}"
        for table in database_schema_dict
    ]
)
print(database_schema_string)


Table: machine_operation
Columns: machine_id, operation_id
Table: machines
Columns: machine_id, machien_desc, machine_name
Table: operations
Columns: operation_id, operation_desc, operation_name
Table: product_operation
Columns: product_id, operation_id
Table: product_rawmaterial
Columns: product_id, rawmaterial_id
Table: products
Columns: product_id, pro_desc, pro_name, price
Table: rawmaterials
Columns: rawmaterial_id, raw_code, raw_name
Table: role
Columns: role_id, role
Table: user
Columns: user_id, contact_no, email, password, user_name
Table: user_role
Columns: user_id, role_id


In [10]:
def execute_query(conn,query):
    conn = connect_to_database()
    cursor = conn.cursor()
    cursor.execute(query)
    result = cursor.fetchall()
    return result


In [None]:
def handle_user_query(messages, tools):
    chat_response = call_openai_function_calling(messages, tools)
    function_call = chat_response.choices[0].message.function_call

    if function_call:
        function_name = function_call.name
        function_args = json.loads(function_call.arguments)
        if function_name == "execute_query":
            query = function_args["query"]
            result = execute_query(query)
            messages.append({"role": "function", "name": function_name, "content": str(result)})
            return result
    else:
        return chat_response.choices[0].message.content

In [11]:
def get_top_products_by_price(conn, limit=3):
    """
    Fetch the top N products by price from the database.
    
    Parameters:
    - conn: MySQL database connection object.
    - limit: The number of top products to fetch (default is 3).
    
    Returns:
    - List of tuples containing product names and their prices.
    """
    query = f"SELECT pro_name, price FROM products ORDER BY price DESC LIMIT {limit};"
    
    cursor = conn.cursor()
    cursor.execute(query)
    result = cursor.fetchall()
    return result

In [12]:
messages = [
    {"role": "system", "content": "You are a helpful assistant connected to a MySQL database."},
    {"role": "user", "content": "Give me the top 3 product pricing"}
]
response = call_openai_function_calling(messages,tools)

response_message = response.choices[0].message

print(response_message)

ChatCompletionMessage(content=None, role='assistant', function_call=None, tool_calls=[ChatCompletionMessageToolCall(id='call_YiizdmP1G6HP5zTFWMz2bV70', function=Function(arguments='{"query":"SELECT product_name, price FROM products ORDER BY price DESC LIMIT 3"}', name='execute_query'), type='function')])


In [15]:
query = "SELECT pro_name, price FROM products ORDER BY price DESC LIMIT 3;"
top_product = execute_query(conn,query)
print(top_product)


[('Machine Delta', Decimal('499.99')), ('Device Alpha', Decimal('199.99')), ('Gadget X', Decimal('99.99'))]


In [None]:
messages = []
messages.append({"role": "system", "content": "Don't make assumptions about what values to plug into functions. Ask for clarification if a user request is ambiguous."})
messages.append({"role": "user", "content": "Give the top 3 products by price"})

chat_response = call_openai_function_calling(messages,tools)

tools_call = chat_response.choices[0].message.tool_calls

if tools_call:

  
    
                 



In [12]:
tools_call = response_message.tool_calls

if tools_call:
     tool_call_id = tools_call[0].id
     tool_function_name = tools_call[0].function.name
     tool_query_string = json.loads(tools_call[0].function.arguments)['query']
     if tool_function_name == 'execute_query':
          results = execute_query(conn,tool_query_string)
          messages.append({
            "role":"tool", 
            "tool_call_id":tool_call_id, 
            "name": tool_function_name, 
            "content":results
        })
             
             
         )
          msg_content = response.choices[0].message.content
          print(msg_content)
         
         

ProgrammingError: 1054 (42S22): Unknown column 'product_name' in 'field list'

In [None]:
def get_table_names(conn):
    cursor = conn.cursor()
    cursor.execute("SHOW TABLES;")
    tables = [table[0] for table in cursor.fetchall()]
    return tables

def get_column_names(conn, table_name):
    cursor = conn.cursor()
    cursor.execute(f"DESCRIBE {table_name};")
    columns = [column[0] for column in cursor.fetchall()]
    return columns

def get_database_info(conn):
    table_dicts = []
    table_names = get_table_names(conn)
    for table_name in table_names:
        columns_names = get_column_names(conn, table_name)
        table_dicts.append({"table_name": table_name, "column_names": columns_names})
    return table_dicts

In [None]:
conn = connect_to_database()
database_schema_dict = get_database_info(conn)
database_schema_string = "\n".join(
    [
        f"Table: {table['table_name']}\nColumns: {', '.join(table['column_names'])}"
        for table in database_schema_dict
    ]
)
print(database_schema_string)

In [None]:
def get_table_names(conn):
    """Return a list of table names."""
    table_names = []
    tables = conn.execute("SHOW TABLES;")
    for table in tables.fetchall():
        table_names.append(table[0])
    return table_names


def get_column_names(conn, table_name):
    """Return a list of column names."""
    column_names = []
    columns = conn.execute(f"SHOW COLUMNS FROM {table_name};").fetchall()
    for col in columns:
        column_names.append(col[1])
    return column_names


def get_database_info(conn):
    """Return a list of dicts containing the table name and columns for each table in the database."""
    table_dicts = []
    for table_name in get_table_names(conn):
        columns_names = get_column_names(conn, table_name)
        table_dicts.append({"table_name": table_name, "column_names": columns_names})
    return table_dicts

In [None]:
def ask_database(conn, query):
    """Function to query SQLite database with a provided SQL query."""
    try:
        results = str(conn.execute(query).fetchall())
    except Exception as e:
        results = f"query failed with error: {e}"
    return results

def execute_function_call(message):
    if message["tool_calls"][0]["function"]["name"] == "ask_database":
        query = json.loads(message["tool_calls"][0]["function"]["arguments"])["query"]
        results = ask_database(conn, query)
    else:
        results = f"Error: function {message['tool_calls'][0]['function']['name']} does not exist"
    return results

In [None]:
tools = [
    {
        "type": "function",
        "function": {
            "name": "ask_database",
            "description": "Use this function to answer user questions about music. Input should be a fully formed SQL query.",
            "parameters": {
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": f"""
                                SQL query extracting info to answer the user's question.
                                SQL should be written using this database schema:
                                The query should be returned in plain text, not in JSON.
                                """,
                    }
                },
                "required": ["query"],
            },
        }
    }
]

In [None]:
# Fetching database schema information
database_schema_dict = get_database_info(conn)

# Formatting the schema information as a string
database_schema_string = "\n".join(
    [
        f"Table: {table['table_name']}\nColumns: {', '.join(table['column_names'])}"
        for table in database_schema_dict
    ]
)

In [None]:
function_definitions = [
    {
        "name": "get_top_product_pricing",
        "description": "Get the top products based on pricing.",
        "parameters": {
            "type": "object",
            "properties": {
                "limit": {"type": "integer", "description": "The number of top products to fetch."}
            },
            "required": ["limit"]
        }
    }
    
]

In [None]:
# def get_top_product_pricing(conn, limit=5):
#     """Fetch top products based on pricing."""
#     cursor = conn.cursor()
#     query = f"SELECT productName, price FROM product_rawmaterial ORDER BY price DESC LIMIT {limit};"
#     cursor.execute(query)
#     results = cursor.fetchall()
   
#     return results

cursor = conn.cursor(dictionary=True)
def get_product_details_and_top_pricing():
    query = "SELECT product_name, product_description, price FROM products ORDER BY price DESC LIMIT 5"
    cursor.execute(query)
    results = cursor.fetchall()
    return results

In [None]:
# Simulate a user message and process it
user_message = "Give me the top product pricing"

if "top product pricing" in user_message.lower():
    top_pricing_products = get_product_details_and_top_pricing()
    print("Top Priced Products:")
    for product in top_pricing_products:
        print(f"Name: {product['product_name']}, Price: {product['price']}, Description: {product['product_description']}")

# Close the cursor and connection
cursor.close()
conn.close()
