In [33]:
import openai
import os
import json
import gradio as gr
import base64
from io import BytesIO
from PIL import Image
from IPython.display import Audio, display
from typing import Optional
import pandas as pd  
import requests
from week2.baq_svc import get_baq
import dotenv
from urllib.parse import quote_plus
from itertools import product
from math import ceil


In [20]:
system_message = """You are an AI assistant used internally at Pedigo Products by CSRs. You will used primarily as a tool for interacting with Epicor."""
system_message += "You will always display returned information from tool responses in a human-readable format."
system_message += "Whenever a User asks about an order, you should use the query_order_tracker tool to find relevant information."
system_message += "If you return no results for a po_number, suggest that the user try the provided number as an order_number"
system_message += "If you return no results for an order_number, suggest that the user try the provided number po_number"
system_message += "if the user provides a tracking number, use the order_number method"


gpt_model = 'gpt-4o-mini'


In [None]:


# Load environment variables from .env file
dotenv.load_dotenv(dotenv_path=".env")
CREDS = os.getenv("EPICOR_ENCODED_CREDENTIALS")

# Retrieve necessary environment variables
EPICOR_API_BASE_URL_V2 = os.getenv("EPICOR_API_BASE_URL_V2")
API_KEY = os.getenv("EPICOR_API_KEY")  # Assuming you use an API key for authorization

# Validate essential environment variables
if not EPICOR_API_BASE_URL_V2:
    raise EnvironmentError("EPICOR_API_BASE_URL_V2 is not set in the environment.")
if not API_KEY:
    raise EnvironmentError("API_KEY is not set in the environment.")

# Define headers, including authorization if required
HEADERS_V2 = {
    "Content-Type": "application/json",
    "X-api-key": API_KEY,  # Adjust based on your authentication method
    "Accept": "application/json",
    "Authorization": f"Basic {CREDS}",
}


def chunk_list(lst, chunk_size=50):
    """
    Yield successive chunks of specified size from the list.

    Args:
        lst (list): The list to be chunked.
        chunk_size (int, optional): The maximum size of each chunk. Defaults to 50.

    Yields:
        list: A chunk of the original list.
    """
    for i in range(0, len(lst), chunk_size):
        yield lst[i : i + chunk_size]


def get_baq(baq_name, select=None, retry_method=None, **filters):
    """
    Retrieve data from the specified Epicor BAQ service with flexible filtering and selection.
    Automatically chunks any filter lists in batches of 50 if there are more than 50 items.
    Tracks and identifies any filter values that did not return any data.

    Args:
        baq_name (str): The name of the BAQ to query.
        select (str or list, optional): Fields to select. Can be a single field as a string or a list of fields.
        retry_method (str, optional): If an HTTPError occurs, specify an alternative method to retry with.
        **filters: Arbitrary keyword arguments representing filter conditions.
                   Keys are field names, and values are either a single value or a list of values.

    Returns:
        pd.DataFrame: A DataFrame containing the retrieved data.
    """
    # Identify filters that are lists and need to be chunked
    list_filters = {k: v for k, v in filters.items() if isinstance(v, list)}
    single_filters = {k: v for k, v in filters.items() if not isinstance(v, list)}

    # Initialize a dictionary to track found values for each list filter
    found_values = {k: set() for k in list_filters.keys()}

    # If no list filters or all list filters have <=50 items, proceed normally
    if not list_filters or all(len(v) <= 50 for v in list_filters.values()):
        df = _get_baq_single_request(
            baq_name, select, retry_method, single_filters, list_filters, found_values
        )
        _save_missing_items(_identify_missing_filters(list_filters, found_values))
        return df.drop_duplicates()

    # Otherwise, need to chunk list filters and make multiple requests
    # First, split each list filter into chunks of 50
    chunked_filters = {}
    for key, value in list_filters.items():
        chunked_filters[key] = list(chunk_list(value, 50))

    # Prepare all combinations of chunks across different list filters
    # This can lead to a large number of requests if multiple list filters are present
    # To manage this, we'll use itertools.product to iterate through all possible chunk combinations
    # Note: Be cautious with the number of combinations to avoid excessive API calls

    # Create a list of lists where each sublist contains the chunks for a specific filter
    filter_chunks = [chunked_filters[key] for key in chunked_filters]
    filter_keys = list(chunked_filters.keys())

    all_combinations = list(product(*filter_chunks))  # Cartesian product of all chunks

    # Initialize an empty DataFrame to collect all results
    result_df = pd.DataFrame()

    for combination in all_combinations:
        # Build the current set of list filters
        current_list_filters = {}
        for idx, chunk in enumerate(combination):
            current_list_filters[filter_keys[idx]] = chunk

        # Merge with single filters
        combined_filters = {**single_filters, **current_list_filters}

        # Make the API request for the current combination of filters
        df = _get_baq_single_request(
            baq_name,
            select,
            retry_method,
            single_filters,
            current_list_filters,
            found_values,
        )

        # Append the result to the final DataFrame
        result_df = pd.concat([result_df, df], ignore_index=True)

    # After all requests, identify missing filter values and save them
    missing_items = _identify_missing_filters(list_filters, found_values)
    _save_missing_items(missing_items)

    return result_df.drop_duplicates()


def _get_baq_single_request(
    baq_name, select, retry_method, single_filters, list_filters, found_values
):
    """
    Helper function to make a single API request with given filters.
    Updates the found_values dictionary with values that returned data.

    Args:
        baq_name (str): The name of the BAQ to query.
        select (str or list, optional): Fields to select.
        retry_method (str, optional): Method to retry with on HTTPError.
        single_filters (dict): Filters that are single values.
        list_filters (dict): Filters that are lists (already chunked if necessary).
        found_values (dict): Dictionary to accumulate found filter values.

    Returns:
        pd.DataFrame: DataFrame with the results of the API call.
    """
    # Construct the dynamic URL using the mandatory baq_name
    url = f"{EPICOR_API_BASE_URL_V2}/BaqSvc/{quote_plus(baq_name)}/Data"
    params = {}

    # Handle $filter parameter
    filter_clauses = []

    # Process single filters
    for key, value in single_filters.items():
        clause = (
            f"{key} eq '{value}'" if isinstance(value, str) else f"{key} eq {value}"
        )
        filter_clauses.append(clause)

    # Process list filters (treated as OR clauses within each list filter)
    for key, values in list_filters.items():
        if isinstance(values, list):
            or_clauses = [
                f"{key} eq '{v}'" if isinstance(v, str) else f"{key} eq {v}"
                for v in values
            ]
            clause = "(" + " or ".join(or_clauses) + ")"
            filter_clauses.append(clause)
        else:
            clause = (
                f"{key} eq '{values}'"
                if isinstance(values, str)
                else f"{key} eq {values}"
            )
            filter_clauses.append(clause)

    if filter_clauses:
        params["$filter"] = " and ".join(filter_clauses)

    # Handle $select parameter
    if select:
        if isinstance(select, list):
            params["$select"] = ",".join(select)
        elif isinstance(select, str):
            params["$select"] = select
        else:
            raise ValueError("`select` must be a string or a list of strings.")

    try:
        response = requests.get(url, params=params, headers=HEADERS_V2, verify=False)
        response.raise_for_status()
        data = response.json()

        # Check if data contains results
        if "value" in data and data["value"]:
            df = pd.DataFrame(data["value"])

            # Update found_values based on the response data
            _update_found_values(df, list_filters, found_values)

            return df
        else:
            print("No data found for the given filters.")
            return pd.DataFrame()

    except requests.exceptions.HTTPError as http_err:
        print(f"HTTP error occurred: {http_err}")
        # Retry logic if a retry_method is specified
        if retry_method:
            print(f"Retrying with method: {retry_method}")
            retry_filters = {**single_filters, "method": retry_method}
            return _get_baq_single_request(
                baq_name=baq_name,
                select=select,
                retry_method=None,
                single_filters=single_filters,
                list_filters=retry_filters,
                found_values=found_values,
            )
        else:
            return pd.DataFrame()

    except requests.exceptions.RequestException as e:
        print(f"Request error occurred: {e}")
        return pd.DataFrame()

    except ValueError as ve:
        print(f"Value error: {ve}")
        return pd.DataFrame()


def _update_found_values(df, list_filters, found_values):
    """
    Updates the found_values dictionary based on the DataFrame and list_filters.

    Args:
        df (pd.DataFrame): The DataFrame containing API response data.
        list_filters (dict): The current list filters used in the API request.
        found_values (dict): Dictionary to accumulate found filter values.
    """
    for key in list_filters.keys():
        if key in df.columns:
            # Extract unique values from the DataFrame for the filter key
            found = set(df[key].astype(str).dropna().unique())
            found_values[key].update(found)
        else:
            print(
                f"Warning: The filter key '{key}' was not found in the API response data."
            )


def _identify_missing_filters(list_filters, found_values):
    """
    Identifies missing filter values by comparing list_filters with found_values.

    Args:
        list_filters (dict): Original list filters with all filter values.
        found_values (dict): Dictionary containing found filter values.

    Returns:
        dict: A dictionary mapping filter keys to lists of missing values.
    """
    missing_items = {}
    for key, values in list_filters.items():
        original_set = set(map(str, values))  # Ensure all values are strings
        found_set = set(map(str, found_values.get(key, [])))
        missing = original_set - found_set
        if missing:
            missing_items[key] = sorted(list(missing))
    return missing_items


def _save_missing_items(missing_items, filename="missing_items.xlsx"):
    """
    Saves the missing filter items to an Excel file.

    Args:
        missing_items (dict): Dictionary mapping filter keys to lists of missing values.
        filename (str, optional): The filename for the Excel file. Defaults to "missing_items.xlsx".
    """
    if not missing_items:
        print("No missing items to save.")
        return

    # Convert the missing_items dictionary to a DataFrame for better formatting
    missing_data = []
    for key, values in missing_items.items():
        for value in values:
            missing_data.append({"Filter_Key": key, "Missing_Value": value})

    missing_df = pd.DataFrame(missing_data)

    # Save to Excel
    missing_df.to_excel(filename, index=False)
    print(f"Missing items have been written to '{filename}'")
    print(f"Number of unique missing items: {missing_df['Missing_Value'].nunique()}")



"""
    # Example Usage Scenarios:

    # Example 1: Using specific filters and selecting certain fields
    BAQ_NAME = "OrderTrackerWebsiteQuery"
    df1 = get_baq(
        baq_name=BAQ_NAME,
        select=["OrderHed_OrderNum", "OrderHed_PONum", "ShipHead_TrackingNumber"],
        OrderHed_PONum=["68202", "35425", "4230331364", "65165", "64818", "4230269969", "47871"],
    )
    print("Example 1: Specific Filters and Selected Fields")
    print(df1)

    # Example 2: Using a single filter
    df2 = get_baq(baq_name=BAQ_NAME, OrderHed_OrderNum="1001")
    print("\nExample 2: Single Filter")
    print(df2)

    # Example 3: Using select without any filters
    df3 = get_baq(
        baq_name=BAQ_NAME,
        select=["OrderHed_OrderNum", "OrderHed_PONum"]
    )
    print("\nExample 3: Select Without Filters")
    print(df3)

    # Example 4: Calling without filters or select to retrieve all data (if API allows)
    df4 = get_baq(baq_name=BAQ_NAME)
    print("\nExample 4: No Filters or Select (All Data)")
    print(df4)

    # Example 5: Passing a list of Order Numbers to filter
    df5 = get_baq(
        baq_name=BAQ_NAME,
        select=["OrderHed_OrderNum", "OrderHed_PONum", "ShipHead_TrackingNumber"],
        OrderHed_OrderNum=["1001", "1002", "1003"]
    )
    print("\nExample 5: List of Order Numbers")
    print(df5)

    # Example 6: Passing multiple lists for different filters
    df6 = get_baq(
        baq_name=BAQ_NAME,
        select=["OrderHed_OrderNum", "OrderHed_PONum", "ShipHead_TrackingNumber"],
        OrderHed_OrderNum=["1001", "1002", "1003"],
        ShipHead_TrackingNumber=["TRACK7890", "TRACK7891"]
    )
    print("\nExample 6: Multiple Lists for Different Filters")
    print(df6)
    """


'\n    # Example Usage Scenarios:\n\n    # Example 1: Using specific filters and selecting certain fields\n    BAQ_NAME = "OrderTrackerWebsiteQuery"\n    df1 = get_baq(\n        baq_name=BAQ_NAME,\n        select=["OrderHed_OrderNum", "OrderHed_PONum", "ShipHead_TrackingNumber"],\n        OrderHed_PONum=["68202", "35425", "4230331364", "65165", "64818", "4230269969", "47871"],\n    )\n    print("Example 1: Specific Filters and Selected Fields")\n    print(df1)\n\n    # Example 2: Using a single filter\n    df2 = get_baq(baq_name=BAQ_NAME, OrderHed_OrderNum="1001")\n    print("\nExample 2: Single Filter")\n    print(df2)\n\n    # Example 3: Using select without any filters\n    df3 = get_baq(\n        baq_name=BAQ_NAME,\n        select=["OrderHed_OrderNum", "OrderHed_PONum"]\n    )\n    print("\nExample 3: Select Without Filters")\n    print(df3)\n\n    # Example 4: Calling without filters or select to retrieve all data (if API allows)\n    df4 = get_baq(baq_name=BAQ_NAME)\n    print("\

In [22]:
def define_tool_property(arg_name: str, arg_type, description: str) -> dict:
    """
    Define a single property for a tool package.

    Parameters:
    - arg_name (str): Name of the argument.
    - arg_type (str or list): Type of the argument (e.g., "string", ["string", "array"]).
    - description (str): Description of the argument.

    Returns:
    - dict: A dictionary defining the tool property.
    """
    return {arg_name: {"type": arg_type, "description": description}}



def create_tool_package(
    tool_name: str, tool_description: str, properties: dict
) -> dict:
    """
    Create a tool package for use with LLM integrations.

    Parameters:
    - tool_name (str): Name of the tool.
    - tool_description (str): Description of what the tool does.
    - properties (dict): Dictionary of properties describing the tool's parameters.

    Returns:
    - dict: A dictionary containing the tool package.
    """
    return {
        "name": tool_name,
        "description": tool_description,
        "parameters": {
            "type": "object",
            "properties": properties,
            "required": ["baq_name"],  # Only 'baq_name' is required
            "additionalProperties": {
                "type": ["string", "array"],  # Allow additional properties as string or array
                "items": {"type": "string"},   # If array, items must be strings
            },
        },
    }


def create_one_of_tool_package(
    tool_name: str, tool_description: str, properties: dict
) -> dict:
    """
    Create a tool package for use with LLM integrations.

    Parameters:
    - tool_name (str): Name of the tool.
    - tool_description (str): Description of what the tool does.
    - properties (dict): Dictionary of properties describing the tool's parameters.

    Returns:
    - dict: A dictionary containing the tool package.
    """
    return {
        "name": tool_name,
        "description": tool_description,
        "parameters": {
            "type": "object",
            "properties": properties,
            "required": list(properties.keys()),
            "additionalProperties": False,
        },
    }


In [23]:
order_tracker_package = {
    'name': 'query_order_tracker',
    'description': 'Track order information by either Purchase Order number or Sales Order number. Only one of the two parameters (po_number or order_number) should be provided. if a tracking number is provded, use the number with the order_number method',
    'parameters': {
        'type': 'object',
        'properties': {
            'po_number': {
                'type': 'string',
                'description': 'Purchase Order number (optional, but required if order_number is not provided)'
            },
            'order_number': {
                'type': 'string',
                'description': 'Sales Order number (optional, but required if po_number is not provided)'
            }
        },
        'required': [],  # No required fields at schema level
        'additionalProperties': False
    }
}


In [24]:
baq_svc_properties = {
    'baq_name': define_tool_property('baq_name', 'string', 'Name of the BAQ to run'),
    'select': define_tool_property(
        'select', 
        {
            "anyOf": [
                {"type": "string"},
                {
                    "type": "array",
                    "items": {"type": "string"}
                }
            ]
        }, 
        'Fields to select (string or list of strings)'
    ),
    'retry_method': define_tool_property(
        'retry_method', 
        'string', 
        'Method to retry with on HTTPError'
    ),
    # Removed '**filters' as a separate property
}

baq_svc_package = create_tool_package(
    "get_baq", 
    "Get data from an Epicor BAQ", 
    baq_svc_properties
)


In [25]:
def query_order_tracker(po_number=None, order_number=None):
    """
    Send a POST request to the Flask app's /search endpoint.

    Args:
        base_url (str): The base URL of the Flask app (e.g., "http://localhost:5000").
        po_number (str): The Purchase Order number (optional).
        order_number (str): The Order number (optional).

    Returns:
        dict: The JSON response from the Flask app.
    """
    base_url = "http://localhost:5000"  
    # Endpoint URL
    url = f"{base_url}/search"

    # Request headers
    headers = {
        "Content-Type": "application/json"
    }

    # JSON payload
    data = {}
    if po_number:
        data["po_number"] = po_number
    if order_number:
        data["order_number"] = order_number

    try:
        # Send POST request
        response = requests.post(url, headers=headers, json=data)
        response.raise_for_status()  # Raise exception for HTTP errors

        # Parse and return JSON response
        return response.json()

    except requests.exceptions.RequestException as e:
        print(f"An error occurred: {e}")
        return None

In [26]:

tools = [
    {"type": "function", "function": order_tracker_package},
    {"type": "function", "function": baq_svc_package},
]

In [27]:
function_mapping = {
    "query_order_tracker": query_order_tracker, 
    "get_baq": get_baq, 
}


In [28]:
def handle_tool_call(message):
    """
    Processes all tool calls in the message and returns a list of responses.
    """
    responses = []
    item = None

    for tool_call in message.tool_calls:
        func_name = tool_call.function.name
        arguments = json.loads(tool_call.function.arguments)

        print("The chosen function is:", func_name)
        print("The arguments are:", arguments)

        if func_name in function_mapping:
            # Dynamically call the function from function_mapping
            func_response = function_mapping[func_name](**arguments)

        else:
            raise ValueError(f"Function '{func_name}' is not defined in function_mapping.")

        print("func response: ", func_response)

        # Prepare the response for the tool call
        response = {
            "role": "tool",
            "content": json.dumps(func_response),
            "tool_call_id": tool_call.id
        }
        responses.append(response)

    # Return the responses and item if an item was processed
    return responses

In [29]:
def call_gpt(history):
    """
    Handles the conversation flow and processes tool calls when invoked.
    """
    # Add the user's message to the conversation
    messages = [{'role': 'system', 'content': system_message}] + history
    response = openai.chat.completions.create(model=gpt_model, messages=messages, tools=tools)

    while response.choices[0].finish_reason == 'tool_calls':
        tool_message = response.choices[0].message  # Extract tool call request
        tool_responses= handle_tool_call(tool_message)  # Process tool calls and get responses
        print(f'Tool responses: {tool_responses}')
        # Add all tool responses to the conversation history
        messages.append(tool_message)  # Add tool call request to history
        messages.extend(tool_responses)  # Add all tool responses to history
        # Re-query GPT with the updated conversation history
        response = openai.chat.completions.create(model=gpt_model, messages=messages)

    reply = response.choices[0].message.content
    
    history += [{"role": "assistant", "content": reply}]
    



    # Return the final response, image (if any), and audio file
    return history


In [30]:
# add ?__theme=dark to the end of the url to force dark mode


In [32]:
# More involved Gradio code as we're not using the preset Chat interface!
# Passing in inbrowser=True in the last line will cause a Gradio window to pop up immediately.

with gr.Blocks() as ui:
    with gr.Row():
        chatbot = gr.Chatbot(height=250, type="messages")
    with gr.Row():
        entry = gr.Textbox(label="Chat with our AI Assistant:")
    with gr.Row():
        clear = gr.Button("Clear")

    def do_entry(message, history):
        history += [{"role":"user", "content":message}]
        return "", history

    entry.submit(do_entry, inputs=[entry, chatbot], outputs=[entry, chatbot]).then(
        call_gpt, inputs=chatbot, outputs=[chatbot]
    )
    clear.click(lambda: None, inputs=None, outputs=chatbot, queue=False)

# ui.launch(inbrowser=True)
ui.launch()


* Running on local URL:  http://127.0.0.1:7860

To create a public link, set `share=True` in `launch()`.




The chosen function is: get_baq
The arguments are: {'baq_name': 'OrderTrackerWebsiteQuery', 'select': ['*'], 'retry_method': 'none'}




No missing items to save.
func response:         OrderHed_OrderNum         OrderHed_OrderDate Customer_CustID  \
0                 408605  2023-12-28T00:00:00-08:00        *WILL020   
1                 408605  2023-12-28T00:00:00-08:00        *WILL020   
2                 408605  2023-12-28T00:00:00-08:00        *WILL020   
3                 408605  2023-12-28T00:00:00-08:00        *WILL020   
4                 408605  2023-12-28T00:00:00-08:00        *WILL020   
...                  ...                        ...             ...   
65481             534985  2025-01-08T00:00:00-08:00         GETI001   
65482             534986  2025-01-08T00:00:00-08:00         FRES005   
65483             534986  2025-01-08T00:00:00-08:00         FRES005   
65484             534986  2025-01-08T00:00:00-08:00         FRES005   
65485             534987  2025-01-08T00:00:00-08:00         MEDL001   

                Customer_Name OrderHed_EntryPerson OrderHed_PONum  \
0       WILLIAMSON EYE CENTER       

Traceback (most recent call last):
  File "c:\Users\Ben.Ball\Documents\work_projects\llm_engineering\venv\Lib\site-packages\gradio\queueing.py", line 625, in process_events
    response = await route_utils.call_process_api(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\Ben.Ball\Documents\work_projects\llm_engineering\venv\Lib\site-packages\gradio\route_utils.py", line 322, in call_process_api
    output = await app.get_blocks().process_api(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\Ben.Ball\Documents\work_projects\llm_engineering\venv\Lib\site-packages\gradio\blocks.py", line 2047, in process_api
    result = await self.call_function(
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\Ben.Ball\Documents\work_projects\llm_engineering\venv\Lib\site-packages\gradio\blocks.py", line 1594, in call_function
    prediction = await anyio.to_thread.run_sync(  # type: ignore
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File