In [None]:
import dotenv
import os
import json
import ast
import astor
from pprint import pprint

dotenv.load_dotenv()

## Example Data

In [None]:
function_data_1 = {
    "user_query": "Find a coffee shop near me with free Wi-Fi in San Francisco.",
    "functions": [
        {
            "name": "Coffee Shop Locator",
            "api_call": "coffee_shop.find_nearby",
            "description": "Locate nearby coffee shops based on specific criteria like Wi-Fi availability.",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {
                        "type": "string",
                        "description": "The city and state, e.g. San Francisco, CA",
                    },
                    "amenities": {
                        "type": "array",
                        "items": {
                            "type": "string",
                            "enum": [
                                "Wi-Fi",
                                "Outdoor Seating",
                                "Bakery",
                                "Vegetarian Options",
                            ],
                        },
                        "description": "Preferred amenities.",
                    },
                },
                "required": ["location"],
            },
        },
    ],
    "model_answer_openai": {
        "api_call": "coffee_shop.find_nearby",
        "parameters": {
            "location": "San Francisco",
            "amenities": ["Wi-Fi"],
        },
    },
    "model_answer_normal": 'coffee_shop.find_nearby(location="San Francisco", amenities=["Wi-Fi"])',
}

In [None]:
function_data_2 = {
    "user_query": "Find the lyrics of the song 'Shape of You' by Ed Sheeran.",
    "functions": [
        {
            "name": "Lyrics Finder",
            "api_call": "lyrics.find",
            "description": "Retrieve the lyrics of a specific song.",
            "parameters": {
                "type": "object",
                "properties": {
                    "song": {"type": "string", "description": "The name of the song."},
                    "artist": {
                        "type": "string",
                        "description": "The name of the artist.",
                    },
                },
                "required": ["song", "artist"],
            },
        },
    ],
    "model_answer_openai": {
        "api_call": "lyrics.find",
        "parameters": {
            "song": "Shape of You",
            "artist": "Ed Sheeran",
        },
    },
    "model_answer_normal": 'lyrics.find(song="Shape of You", artist="Ed Sheeran")',
}

In [None]:
function_data_3 = {
    "user_query": "Retrieve the list of available characters in a fighting game.",
    "functions": [
        {
            "name": "Character Catalog",
            "api_call": "game.get_characters",
            "description": "Get a list of characters available in a fighting game.",
            "parameters": {
                "type": "object",
                "properties": {
                    "game_id": {"type": "integer", "description": "ID of the game."},
                    "combat_style": {
                        "type": "string",
                        "enum": ["melee", "ranged", "hybrid"],
                        "description": "Combat style of the characters.",
                    },
                    "tier": {
                        "type": "string",
                        "enum": ["S", "A", "B", "C"],
                        "description": "Tier ranking of the characters.",
                    },
                },
                "required": ["game_id", "combat_style", "tier"],
            },
        },
    ],
    "model_answer_openai": {
        "api_call": "game.get_characters",
        "parameters": {
            "game_id": 654,
            "combat_style": "melee",
            "tier": "A",
        },
    },
    "model_answer_normal": "game.get_characters(game_id=654, combat_style='melee', tier='A')",
}

## Version 1

In [None]:
# user_query = function_data.pop("user_query", None)
# if user_query is None:
#     raise Exception("No user query found")
# if isinstance(user_query, str) is False:
#     raise Exception("User query is not a string")
# else:
#     print("User query:", user_query)

In [None]:
# functions = function_data.get("functions")
# if functions is None:
#     raise Exception("No functions found")
# if isinstance(functions, list) is False:
#     raise Exception("Functions is not a list")
# else:
#     print("Functions:", functions)

In [None]:
# possible_api_calls = []
# parameters = []

In [None]:
# def check_required_field(obj, field_name, feild_location=None, error=True):
#     value = obj.pop(field_name, None)
#     if value is not None:
#         return value
#     if error:
#         raise Exception(f"Element `{field_name}` not found in `{feild_location}`")


# for num, function in enumerate(functions):
#     # `name`, `api_call`, `description`, `parameters`
#     check_required_field(function, "name", f"functions/{num}")

#     possible_api_calls.append(
#         check_required_field(function, "api_call", f"functions/{num}")
#     )

#     check_required_field(function, "description", f"functions/{num}")

#     function_parameters = check_required_field(
#         function, "parameters", f"functions/{num}"
#     )

#     # `type`, `properties`, `required`
#     if check_required_field(function_parameters, "type", "/functions/parameters") != "object":
#         raise Exception("Function parameters type is not an object")

#     function_parameters_properties = check_required_field(
#         function_parameters, "properties", "/functions/parameters"
#     )

#     function_parameters_required = check_required_field(
#         function_parameters, "required", "/functions/parameters"
#     )
#     if isinstance(function_parameters_required, list) is False:
#         raise Exception("Function parameters required is not a list")

#     for required_parameter in function_parameters_required:
#         if isinstance(required_parameter, str) is False:
#             raise Exception("Function parameters required is not a string")

#     # in `function_parameters_properties`, each key is a parameter name
#     # each value is a dict with `type`, `description`, `enum` (optional)
#     for parameter in function_parameters_properties.keys():
#         function_argument = function_parameters_properties[parameter]

#         function_property_type = check_required_field(
#             function_argument, "type", f"/functions/parameters/properties/{parameter}"
#         )

#         parameters.append(
#             {
#                 "name": parameter,
#                 "type": function_property_type,
#             }
#         )

#         check_required_field(
#             function_argument,
#             "description",
#             f"/functions/parameters/properties/{parameter}",
#         )

#         if function_property_type == "array":
#             function_property_items = check_required_field(
#                 function_argument,
#                 "items",
#                 f"/functions/parameters/properties/{parameter}",
#             )

#             function_property_items_type = check_required_field(
#                 function_property_items,
#                 "type",
#                 f"/functions/parameters/properties/{parameter}/items",
#             )
#             if function_property_items_type != "string":
#                 raise Exception("NOT IMPLEMENTED YET")

#             function_property_items_enum = check_required_field(
#                 function_property_items,
#                 "enum",
#                 f"/functions/parameters/properties/{parameter}/items",
#             )

#             if isinstance(function_property_items_enum, list) is False:
#                 raise Exception("Function property items enum is not a list")

#             for enum_option in function_property_items_enum:
#                 if isinstance(enum_option, str) is False:
#                     raise Exception(
#                         "Function property items enum option is not a string"
#                     )

#         elif function_property_type == "string":
#             pass

#         elif function_property_type == "integer":
#             pass

#         else:
#             raise Exception("NOT IMPLEMENTED YET")

#         if len(function_argument) != 0:
#             raise Exception("Function argument has some remaining fields: " + str(function_argument))

#     if len(function_parameters) != 0:
#         raise Exception("Function parameters has some remaining fields: " + str(function_parameters))

#     if len(function_parameters_properties) != 0:
#         raise Exception("Function has some remaining fields: " + str(function))
# # `total_len` should be 0 if there are no remaining fields in any function in `functions`
# total_len = sum([len(fn) for fn in functions])
# if total_len == 0:
#     print("SUCCESS")
# else:
#     raise Exception("Function has some remaining fields")

In [None]:
# model_answer_openai = function_data.pop("model_answer_openai", None)
# if model_answer_openai is None:
#     raise Exception("No model answer found")
# if isinstance(model_answer_openai, dict) is False:
#     raise Exception("Model answer is not a dict")
# else:
#     print("Model answer:", model_answer_openai)

In [None]:
# model_answer_openai_converted_normal = (
#     f"{model_answer_openai['api_call']}("
#     + ", ".join(
#         [f"{key}={value!r}" for key, value in model_answer_openai["parameters"].items()]
#     )
#     + ")"
# )

In [None]:
# model_answer_openai_api_call = model_answer_openai.pop("api_call", None)
# if model_answer_openai_api_call is None or model_answer_openai_api_call == "":
#     raise Exception("No model answer API call found")
# if isinstance(model_answer_openai_api_call, str) is False:
#     raise Exception("Model answer API call is not a string")
# if model_answer_openai_api_call not in possible_api_calls:
#     raise Exception("Model answer API call not in possible API calls")
# else:
#     print(
#         "Model answer API call:",
#         model_answer_openai_api_call,
#         "\nPossible API calls:",
#         possible_api_calls,
#     )

In [None]:
# model_answer_openai_parameters = model_answer_openai.pop("parameters", None)
# if model_answer_openai_parameters is None:
#     raise Exception("No model answer parameters found")
# if isinstance(model_answer_openai_parameters, dict) is False:
#     raise Exception("Model answer parameters is not a dict")
# else:
#     print("Model answer parameters:", model_answer_openai_parameters)

In [None]:
# for original_parm, parm in zip(parameters, model_answer_openai_parameters.keys()):
#     if original_parm["name"] != parm:
#         raise Exception("Parameter names do not match")
#     if original_parm["type"] == "string":
#         if isinstance(model_answer_openai_parameters[parm], str) is False:
#             raise Exception("Parameter type does not match")
#     elif original_parm["type"] == "array":
#         if isinstance(model_answer_openai_parameters[parm], list) is False:
#             raise Exception("Parameter type does not match")
#     elif original_parm["type"] == "integer":
#         if isinstance(model_answer_openai_parameters[parm], int) is False:
#             raise Exception("Parameter type does not match")
#     else:
#         raise Exception("NOT IMPLEMENTED YET")

In [None]:
# if len(model_answer_openai) != 0:
#     raise Exception("Model answer has some remaining fields")
# else:
#     print("SUCCESS")

In [None]:
# model_answer_normal = function_data.pop("model_answer_normal", None)
# if model_answer_normal is None:
#     raise Exception("No model answer found")
# if isinstance(model_answer_normal, str) is False:
#     raise Exception("Model answer is not a string")
# else:
#     print("Model answer:", model_answer_normal)

In [None]:
# def are_expressions_equal(expr1, expr2):
#     ast1 = ast.parse(expr1, mode="eval")
#     ast2 = ast.parse(expr2, mode="eval")

#     return ast.dump(ast1) == ast.dump(ast2)

## Version 2

In [None]:
def check_feild(
    obj: dict,
    feild_name: str,
    feild_type: type,
    feild_location: str,
    pop: bool,
    print_value: bool,
    error_if_not_found: bool = True,
    debug: bool = True,
) -> type or None:
    """
    Check if a field exists in an object and validate its type.

    Args:
        obj (dict): The object to check the field in.
        feild_name (str): The name of the field to check.
        feild_type (type): The expected type of the field.
        feild_location (str): The location of the field (e.g., /functions/0/required).
        pop (bool): A boolean indicating whether to remove the field from the object if found.
        print_value (bool): A boolean indicating whether to print the value of the field to stdout.
        error_if_not_found (bool, optional): A boolean indicating whether to raise an exception if the field is not found. Defaults to True.
        debug (bool, optional): A boolean indicating whether to print information. Defaults to True.

    Returns:
        The value of the field if found and of the correct type or None if not found.

    Raises:
        Exception: If the field is not found and `error_if_not_found` is True.
        Exception: If the field is not of the expected type.
    """
    if pop:
        value = obj.pop(feild_name, None)
    else:
        value = obj.get(feild_name)
    if value is None and error_if_not_found:
        raise Exception(f"Element `{feild_name}` not found in `{feild_location}`")
    if isinstance(value, feild_type) is False and value is not None:
        raise Exception(f"Element `{feild_name}` is not of type `{feild_type}`")
    if print_value:
        if debug: print("\n\n")
        if pop:
            if debug: print("--POPPING--")
        if debug: print(f"`{feild_name}` at `{feild_location}`: {value}")
    return value

In [None]:
def clean_empty_dicts(
    data: dict or list,
    prevData: dict or list,
    count: int = -1,
    debug: bool = True,
) -> dict or list or None:
    """
    Recursively cleans empty dictionaries and lists from the given data.

    Args:
        data (dict or list): The data to be cleaned.
        prevData (dict or list): The previous data to compare with.
        count (int, optional): The count of recursive calls. Defaults to -1.
        debug (bool, optional): Whether to print debug information. Defaults to True.

    Returns:
        dict or list or None: The cleaned data. If the input data can be cleaned, None is returned.

    Example:
        >>> data = {'a': {}, 'b': [1, None, {}]}
        >>> clean_empty_dicts(data, data)
        {'b': [1]}
    """
    if count == -1:
        if debug: print("\n\n Cleaning empty dicts")
    if isinstance(data, dict):
        # Filter out empty dictionaries
        count += 1
        if debug: print("\n", "\t" * count, "dataIn{}:", data, end="")
        if data == {} or data == {None}:
            if debug: print("\n", "\t" * count, "dataOut{}:", None, end="")
            return None
        data = {
            key: clean_empty_dicts(value, value, count, debug)
            for key, value in data.items()
            if value
        }
        if debug: print("\n", "\t" * count, "dataOut{}:", data, end="")

    elif isinstance(data, list):
        # Recursively clean each element in the list
        count += 1
        if debug: print("\n", "\t" * count, "dataIn[]:", data, end="")
        if data == [] or data == [None]:
            if debug: print("\n", "\t" * count, "dataOut[]:", None, end="")
            return None
        data = [clean_empty_dicts(item, item, count, debug) for item in data]
        if debug: print("\n", "\t" * count, "dataOut[]:", data, end="")

    if prevData != data:
        return clean_empty_dicts(data, data, count, debug)
    return data

In [None]:
def convert_normal_function_call_to_dict(
    function_call: str, 
    debug: bool = True,
) -> dict:
    """
    Converts a normal function call to a dictionary representation.

    Args:
        function_call (str): The function call to be converted.
        debug (bool, optional): Flag to enable debug mode. Defaults to True.

    Returns:
        dict: A dictionary representation of the function call, containing the API call and its parameters.
        
    Example:
        >>> convert_normal_function_call_to_dict('coffee_shop.find_nearby(location="San Francisco", amenities=["Wi-Fi"])')
        {'api_call': 'coffee_shop.find_nearby', 'parameters': {'location': 'San Francisco', 'amenities': ['Wi-Fi']}}
    """
    tree = ast.parse(function_call)
    if debug: print("Function call tree:", ast.dump(tree))
    expr = tree.body[0]
    call = expr.value
    api_call = call.func.attr

    parameters = {}
    for keyword in call.keywords:
        parameters[keyword.arg] = (
            keyword.value.value
            if isinstance(keyword.value, ast.Constant)
            else eval(astor.to_source(keyword.value))
        )

    return {
        "api_call": f"{call.func.value.id}.{api_call}",
        "parameters": parameters,
    }

In [None]:
def verify_openai_function_call(function_call_dict: dict, functions: list, debug=True):
    """
    Verify if a given function call dictionary matches the expected format defined in a list of functions.

    Args:
        function_call_dict (dict): The function call dictionary to be verified.
        functions (list): The list of functions containing the expected format.
        debug (bool, optional): Whether to print debug information. Defaults to True.

    Raises:
        Exception: If the function call dictionary does not match the expected format.

    Returns:
        bool: True if the function call dictionary matches the expected format, False otherwise.
    """
    for function in functions:
        if function_call_dict["api_call"] != function["api_call"]:
            if debug: print(
                f"Skipping function `{function['api_call']}` as it does not match API call `{function_call_dict['api_call']}`"
            )
            continue
        if debug: print("Found matching API call")
        required_parameters = function["parameters"]["required"]
        
        for parameter in function["parameters"]["properties"].keys():
            # Check if parm exists in function call
            if parameter not in function_call_dict["parameters"] and parameter in required_parameters:
                raise Exception(f"Parameter `{parameter}` not found in function call")
            elif parameter not in function_call_dict["parameters"] and parameter not in required_parameters:
                continue
            
            # Check if function call parm is of correct data type
            dtype_map = {
                "string": str,
                "integer": int,
                "float": float,
                "array": list,
                "boolean": bool,
            }
            if (
                type(function_call_dict["parameters"][parameter])
                != dtype_map[function["parameters"]["properties"][parameter]["type"]]
            ):
                raise Exception(
                    f"Parameter `{parameter}` is not of type `{function['parameters']['properties'][parameter]['type']}`"
                )

            # Check if this parm has some enum values
            if "enum" in function["parameters"]["properties"][parameter]:
                if (
                    function_call_dict["parameters"][parameter]
                    not in function["parameters"]["properties"][parameter]["enum"]
                ):
                    raise Exception(
                        f"Parameter `{parameter}` is not in enum `{function['parameters']['properties'][parameter]['enum']}`"
                    )
        
        # Check if there are any extra parameters in the function call
        for parameter in function_call_dict["parameters"].keys():
            if parameter not in function["parameters"]["properties"].keys():
                raise Exception(f"Extra parameter `{parameter}` found in function call")
        
        return True

    raise Exception(
        f"No matching API call found for `{function_call_dict['api_call']}`"
    )

In [None]:
def main(function_data, debug=True):
    function_data = json.loads(json.dumps(function_data))
    if debug: print("Function data:", function_data)
    """
    At top level, we have:
        - `user_query` (str)    #! Future, use `re` to check if it is a valid query
        - `functions` (list)(dict)
        - `model_answer_openai` (dict)
        - `model_answer_normal` (str)
    """
    check_feild(function_data, "user_query", str, "/", pop=True, print_value=True, debug=debug)
    check_feild(function_data, "functions", list, "/", pop=False, print_value=True, debug=debug)
    verify_openai_function_call(
        function_call_dict=function_data["model_answer_openai"], 
        functions=function_data["functions"],
        debug=debug
    )
    if check_feild(
        function_data, "model_answer_openai", dict, "/", pop=True, print_value=True, debug=debug
    ) != convert_normal_function_call_to_dict(
        check_feild(
            function_data, "model_answer_normal", str, "/", pop=True, print_value=True, debug=debug
        ), debug=debug
    ):
        raise Exception("Model answer normal and openai do not match")

    """
    Now, we have `functions` (list)(dict):
        [
            {
                - name (str)
                - api_call (str)
                - description (str)
                - parameters (dict)
            }
        ]
    """
    for num, function in enumerate(function_data["functions"]):
        function_location = f"/functions/{num}"
        check_feild(function, "name", str, function_location, pop=True, print_value=True, debug=debug)
        check_feild(
            function, "api_call", str, function_location, pop=True, print_value=True, debug=debug
        )
        check_feild(
            function, "description", str, function_location, pop=True, print_value=True, debug=debug
        )
        check_feild(
            function, "parameters", dict, function_location, pop=False, print_value=True, debug=debug
        )

        """
        Now, we have `functions`_`parameters` (dict):
            {
                - type (str)
                - properties (dict)
                - required (list)
            }
        """
        if (
            check_feild(
                function["parameters"],
                "type",
                str,
                function_location + "/parameters",
                pop=True,
                print_value=True,
                debug=debug
            )
            != "object"
        ):
            raise Exception("Function parameters type is not an object")

        for required_parameter in check_feild(
            function["parameters"],
            "required",
            list,
            function_location + "/parameters",
            pop=True,
            print_value=True,
            debug=debug
        ):
            if isinstance(required_parameter, str) is False:
                raise Exception("Function parameters required is not a string")

        for parameter_name, parameter_desc in function["parameters"]["properties"].items():
            parameter_location = (
                function_location + f"/parameters/properties/{parameter_name}"
            )

            """
            Now, we have `functions`_`parameters`_`properties` (dict):
                {
                    - parameter_name : {
                        - type (str) (enum: string, integer, array)
                    }
                }
            """

            function_property_type = check_feild(
                parameter_desc,
                "type",
                str,
                parameter_location,
                pop=True,
                print_value=True,
                debug=debug
            )

            if function_property_type == "array":
                """
                `type` == `array` (popped)
                `items` (dict):
                    - type (str) (enum: string)
                    - enum (list)(str) (OPTIONAL)
                `description` (str)
                """

                check_feild(
                    parameter_desc,
                    "description",
                    str,
                    parameter_location,
                    pop=True,
                    print_value=True,
                    debug=debug
                )

                function_property_items = check_feild(
                    parameter_desc,
                    "items",
                    dict,
                    parameter_location,
                    pop=False,
                    print_value=True,
                    debug=debug
                )

                function_property_items_type = check_feild(
                    function_property_items,
                    "type",
                    str,
                    parameter_location + "/items",
                    pop=True,
                    print_value=True,
                    debug=debug
                )
                if function_property_items_type != "string":
                    raise Exception("NOT IMPLEMENTED YET")

                function_property_items_enum = check_feild(
                    function_property_items,
                    "enum",
                    list,
                    parameter_location + "/items",
                    pop=True,
                    print_value=True,
                    error_if_not_found=False,
                    debug=debug
                )
                if function_property_items_enum is not None:
                    for enum_option in function_property_items_enum:
                        if isinstance(enum_option, str) is False:
                            raise Exception(
                                "Function property items enum option is not a string"
                            )

            elif function_property_type == "string":
                """
                `type` == `string` (popped)
                `enum` (list)(str) (OPTIONAL)
                `description` (str)
                """

                check_feild(
                    parameter_desc,
                    "description",
                    str,
                    parameter_location,
                    pop=True,
                    print_value=True,
                    debug=debug
                )

                function_property_enum = check_feild(
                    parameter_desc,
                    "enum",
                    list,
                    parameter_location,
                    pop=True,
                    print_value=True,
                    error_if_not_found=False,
                    debug=debug
                )

                if function_property_enum is not None:
                    for enum_option in function_property_enum:
                        if isinstance(enum_option, str) is False:
                            raise Exception("Function property enum option is not a string")

            elif function_property_type == "integer" or function_property_type == "float":
                """
                `type` == [`integer` | `float` ](popped)
                `description` (str)
                """

                check_feild(
                    parameter_desc,
                    "description",
                    str,
                    parameter_location,
                    pop=True,
                    print_value=True,
                    debug=debug
                )
            
            elif function_property_type == "boolean":
                """
                `type` == `boolean` (popped)
                `description` (str)
                """

                check_feild(
                    parameter_desc,
                    "description",
                    str,
                    parameter_location,
                    pop=True,
                    print_value=True,
                    debug=debug
                )
            
            else:
                raise Exception("NOT IMPLEMENTED YET")

    remaining_fields = clean_empty_dicts(function_data, function_data, debug=debug)
    if remaining_fields is not None:
        if debug: print("\n\nRemaining fields:", remaining_fields)
        raise Exception(f"Function has some remaining fields: {remaining_fields}")
    else:
        if debug: print("\n\n-------SUCCESS-------")
        return True

## Testing

In [None]:
n_valid = 0
total = 0
with open('output_cleaned_better.jsonl', 'r') as f:
    for num, line in enumerate(f.readlines()):
        function_data = json.loads(line)
        total += 1
        try:
            if main(function_data, debug=False):
                n_valid += 1
        except Exception as e:
            print("-"*20, f"\nERROR IN {num}: {e}")

print(f"Valid: {n_valid} / {total} ({n_valid/total*100:.2f}%)")

In [None]:
with open('temp.jsonl', 'r') as f:
    data = f.readlines()
function_data = json.loads(data[34])
# function_data
# function_data["model_answer_normal"]
# main(function_data, debug=True)


## Diff between cleaned and raw data

In [None]:
with open('temp.jsonl', 'r') as f:
    cleaned_data = f.readlines()
    
with open(os.getenv("TEST_FILE_PATH"), 'r') as f:
    original_data = f.readlines()

In [None]:
# `model_answer_normal` (cleaned_data) == `model_answer` (original_data)    
for cleaned_line, original_line in zip(cleaned_data, original_data):
    cleaned_line = json.loads(cleaned_line)
    original_line = json.loads(original_line)
    if cleaned_line["model_answer_normal"] != original_line["model_answer"]:
        print("\n\n")
        print('Cleaned Response:  ',cleaned_line["model_answer_normal"])
        print('Original Response: ', original_line["model_answer"])

In [None]:
print("\u2026")

In [None]:
for cleaned_line, original_line in zip(cleaned_data, original_data):
    cleaned_line = json.loads(cleaned_line)
    original_line = json.loads(original_line)
    if cleaned_line["functions"][0] != original_line["function"]:
        print("\n\n")
        print('Cleaned Response:  ',cleaned_line["functions"][0])
        print('\nOriginal Response: ', original_line["function"])