In [1]:
import os
from dotenv import load_dotenv
load_dotenv('key.env')  

key_string = os.getenv('open_ai_API_Key')

In [2]:
from langchain import hub
from langchain.agents import AgentExecutor, create_tool_calling_agent, create_structured_chat_agent
from langchain.pydantic_v1 import BaseModel, Field
from langchain_core.tools import StructuredTool, Tool
from langchain_openai import ChatOpenAI

from langchain.tools import tool


For example, replace imports like: `from langchain.pydantic_v1 import BaseModel`
with: `from pydantic import BaseModel`
or the v1 compatibility namespace if you are working in a code base that has not been fully upgraded to pydantic 2 yet. 	from pydantic.v1 import BaseModel

  exec(code_obj, self.user_global_ns, self.user_ns)


In [3]:
class Extract_toolArgs(BaseModel):
    a: str = Field(description="Company name")
    b: str = Field(description="item")
    c: str = Field(description="Year_1")
    d: str = Field(description="Year_2")


In [4]:
years  = [2014,2015,2016,2017,2018, 2019, 2020, 2021, 2022, 2023, 2024]

In [15]:
# class GrowthToolArgs(BaseModel):
#     sales_data : list = Field(description="list of dictionaries with keys as year and net sales data point as values")
from typing import List, Dict

class GrowthToolArgs(BaseModel):
    sales_data: list[dict[int, int]] = Field(description="List of dictionaries with year as key and net sales as value")



In [25]:
import json
@tool(args_schema=Extract_toolArgs)
def Extract_Tool(a: str, b: str, c:str, d:str) -> list[dict[int, int]]:
    """extracts specific data for a company from Year_1 to Year_2."""
    with open("data_soo.json","r") as json_file:
        data = json.load(json_file)
    
    a = a.upper()
    b = b.lower()
    json_data = next(item for item in data[a] if item["Item"] == b)
    
    cash_data = []
    
    # Ensure c and d represent years in the correct order
    if int(c) > int(d):
        c, d = d, c
    
    for year in range(int(c), int(d) + 1):
        value = json_data.get(str(year))
        if value is not None:
            cash_data.append({year: value})
    
    if len(cash_data) == 0:
        raise ValueError(f"Data for the years {c} to {d} is not available.")
    
    return cash_data




In [8]:
@tool(args_schema=GrowthToolArgs)
def Growth_Tool(sales_data:list[str:int]) -> list:
    """
    Calculates year-on-year (YoY) growth for 'Net Sales' for a specific company. This function receives a list of dictionaries as data,
    where the key is year and value is the data point. 
    
    The tool automatically fetches data points of corresponding year and computes the
    percentage growth for that year compared to the previous year.
    
    
    Arguments:
    - sales_data: [{<year1>: <net sales1>, <year2>: <net sales2>... }]

    """
    # Hardcode the item to "Net Sales"
    # item = "Net sales"
    
    # # Call Extract_Tool to get sales data
    # try:
    #     sales_data = Extract_Tool(a=company, b=item, c=year_1, d=year_2)
    # except ValueError as e:
    #     return str(e)
    
    # Ensure there is sufficient data for year-on-year calculation
    if len(sales_data) < 2:
        return "Not enough data to calculate year-on-year sales growth."
    
    # Calculate YoY growth
    yoy_growth = []
    for i in sales_data:
        # Get the year and current year's sales from the dictionary
        year = list(i.keys())[0]  # Assuming each dictionary has one key-value pair
        year = int(year)  # Convert year to an integer
        current_year_sales = i[year]

        previous_year = str(int(year) - 1)
        
        try:
            # Find the previous year's data in the sales data
            previous_year_sales = next(d[previous_year] for d in sales_data if previous_year in d)
            growth = ((current_year_sales - previous_year_sales) / previous_year_sales) * 100
            yoy_growth.append(f"Year {year}: {growth:.2f}%")
        except StopIteration:
            print(f"No data for previous year {previous_year}, skipping year {year}")
            continue


    # for i in sales_data:
    #     current_year_sales = i.value()
    #     previous_year = int(i.key() - 1)
    #     try:
    #         previous_year_sales = previous_year.value()
    #         growth = ((current_year_sales - previous_year_sales) / previous_year_sales) * 100
    #         yoy_growth.append(f"Year {i}: {growth:.2f}%")

    #     except KeyError:
    #         print('here')
    #         continue
        
    # for i in range(1, len(sales_data)):
    #     previous_year_sales = sales_data[i - 1]
    #     current_year_sales = sales_data[i]
    #     if previous_year_sales is None or current_year_sales is None:
    #         yoy_growth.append(f"Year {start_year + i}: Data missing")
    #     else:
    #         growth = ((current_year_sales - previous_year_sales) / previous_year_sales) * 100
    #         yoy_growth.append(f"Year {start_year + i}: {growth:.2f}%")
    
    return "\n".join(yoy_growth)


In [17]:
def Growth_Tool(sales_data: list[dict[int, float]]) -> list:
    """
    Calculates year-on-year (YoY) growth for 'Net Sales' for a specific company. This function receives a list of dictionaries as data,
    where the key is year and value is the net sales data point. 
    
    The tool automatically fetches data points of the corresponding year and computes the
    percentage growth for that year compared to the previous year.

    Arguments:
    - sales_data: [{<year1>: <net sales1>, <year2>: <net sales2>, ...}]
    """
    if len(sales_data) < 2:
        return "Not enough data to calculate year-on-year sales growth."
    
    # Sort the data by year to ensure correct order for calculations
    sales_data_sorted = sorted(sales_data, key=lambda x: list(x.keys())[0])
    
    yoy_growth = []
    
    for i in range(1, len(sales_data_sorted)):  # Start from the second year
        current_year_data = sales_data_sorted[i]
        previous_year_data = sales_data_sorted[i - 1]
        
        # Extract the year and sales data
        current_year = list(current_year_data.keys())[0]
        previous_year = list(previous_year_data.keys())[0]
        
        current_year_sales = current_year_data[current_year]
        previous_year_sales = previous_year_data[previous_year]
        
        # Calculate the YoY growth
        growth = ((current_year_sales - previous_year_sales) / previous_year_sales) * 100
        yoy_growth.append(f"Year {current_year}: {growth:.2f}%")
    
    return yoy_growth

In [18]:
tools = [Extract_Tool,Growth_Tool]

In [24]:
# class CalculateKpIs(BaseModel):
#     a: str = Field(description="Company name")
#     b: str = Field(description="item")
#     c: list = Field(description="list of revenue data points")

# @tool(args_schema=CalculateKpIs)
# def CalculateKpIs(a: str, b: str, c:list) -> list:
#     """calculates kpis according to company and item name"""

In [19]:
llm = ChatOpenAI(model="gpt-4o", openai_api_key=key_string)

In [20]:
prompt = hub.pull("hwchase17/openai-tools-agent")



In [21]:
# from langchain.prompts import PromptTemplate

# prompt = PromptTemplate(
#     template="""
# You are an AI assistant skilled in calling tools to perform calculations and answer questions.
# Here are the tools you can use:
# {tool_descriptions}

# When asked a question, decide which tool to use and call it appropriately.
# Always respond with your reasoning and the final answer.

# Use the following format:
# Thought: I should use [Tool Name]
# Action: [Tool Name] <arguments>
# Observation: <output of the tool>

# If no tools are needed, provide a direct answer.

# {agent_scratchpad}
# """,
#     input_variables=["tool_descriptions", "agent_scratchpad"],
# )


In [22]:
from pprint import pprint
pprint(prompt)

ChatPromptTemplate(input_variables=['agent_scratchpad', 'input'], optional_variables=['chat_history'], input_types={'chat_history': list[typing.Annotated[typing.Union[typing.Annotated[langchain_core.messages.ai.AIMessage, Tag(tag='ai')], typing.Annotated[langchain_core.messages.human.HumanMessage, Tag(tag='human')], typing.Annotated[langchain_core.messages.chat.ChatMessage, Tag(tag='chat')], typing.Annotated[langchain_core.messages.system.SystemMessage, Tag(tag='system')], typing.Annotated[langchain_core.messages.function.FunctionMessage, Tag(tag='function')], typing.Annotated[langchain_core.messages.tool.ToolMessage, Tag(tag='tool')], typing.Annotated[langchain_core.messages.ai.AIMessageChunk, Tag(tag='AIMessageChunk')], typing.Annotated[langchain_core.messages.human.HumanMessageChunk, Tag(tag='HumanMessageChunk')], typing.Annotated[langchain_core.messages.chat.ChatMessageChunk, Tag(tag='ChatMessageChunk')], typing.Annotated[langchain_core.messages.system.SystemMessageChunk, Tag(tag='

In [23]:
agent = create_tool_calling_agent(
    llm=llm,  # Language model to use
    tools=tools,  # List of tools available to the agent
    prompt=prompt,  # Prompt template to guide the agent's responses
)

In [24]:
agent_executor = AgentExecutor.from_agent_and_tools(
    agent=agent,  # The agent to execute
    tools=tools,  # List of tools available to the agent
    verbose=True,  # Enable verbose logging
    handle_parsing_errors=True,  # Handle parsing errors gracefully
)

AttributeError: 'function' object has no attribute 'get'

In [None]:
response = agent_executor.invoke({"input": " extract net sales of Apple 2019 to 2023 "})

NameError: name 'agent_executor' is not defined

In [None]:
print(response)

{'input': ' extract net sales of APPLE from 2019 to 2023', 'output': 'The net sales for Apple from 2019 to 2023 are as follows (in million USD):\n\n- 2019: $260,174 million\n- 2020: $274,515 million\n- 2021: $365,817 million\n- 2022: $394,328 million\n- 2023: $383,285 million'}
