In [137]:
import os
from dotenv import load_dotenv
from sqlalchemy import create_engine, text
import pandas as pd
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.tools import tool
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser, CommaSeparatedListOutputParser, JsonOutputParser
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain.tools import MoveFileTool, format_tool_to_openai_function
from langchain.chat_models import ChatOpenAI
from langchain.schema import HumanMessage, AIMessage, ChatMessage, FunctionMessage
from langchain.tools import MoveFileTool, format_tool_to_openai_function
from langchain.tools import BaseTool
from typing import Optional, Type
from langchain.agents import initialize_agent, Tool
from langchain.agents import AgentType
from langchain_openai import ChatOpenAI
import json
import requests
import pandas as pd
import numpy


# Load environment variables from the .env file
load_dotenv()

pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', None) 


In [28]:
pip show langchain


Name: langchainNote: you may need to restart the kernel to use updated packages.

Version: 0.2.5
Summary: Building applications with LLMs through composability
Home-page: https://github.com/langchain-ai/langchain
Author: 
Author-email: 
License: MIT
Location: C:\Users\starm\AppData\Roaming\Python\Python311\site-packages
Requires: aiohttp, langchain-core, langchain-text-splitters, langsmith, numpy, pydantic, PyYAML, requests, SQLAlchemy, tenacity
Required-by: langchain-community


In [2]:
TEST_MAIL = 'admin@platform.com'

## Token

In [139]:
base_url = 'https://dashboard.colorines.paitesting.com/api'

# Set the URL for the token API
url_token = base_url+"/v1/token"

# Set the data for the POST request
data = {
    "email": "sale@platform.com",
    "password": os.getenv('CONSULTANT_PASSWORD'),
    "device_name": "colorina_2",
    "role": "consultant"
}

# Send a POST request
response = requests.post(url_token, json=data)

#print(response)

# Extract the token from the response
token = response.json().get('data').get('token')

#print("Token:", token)


## Components

In [4]:
#TODO: API need to add a filter by 'is_active'

def GetProperties(token: str):
    headers = {"Authorization": f"Bearer {token}"}
    url = "https://dashboard.colorines.paitesting.com/api/v1/properties"
    params = {'include': 'step,project,development', 'filter[status]': 'available'} 
    try:
        response = requests.get(url, headers=headers, params=params)
        response.raise_for_status()  # Raise an exception for HTTP errors
        
        # Check if the response contains any content
        if response.content:
            data = response.json()
        else:
            return pd.DataFrame({"error": ["Empty response from the server"]})
        
    except requests.exceptions.RequestException as e:
        return pd.DataFrame({"error": [str(e)]})
    except ValueError as ve:
        return pd.DataFrame({"error": ["Error parsing JSON response"]})
    
    # Check if the 'data' key exists and contains a list of dictionaries
    if 'data' in data:
        if isinstance(data['data'], list):
            return pd.DataFrame(data['data'])
        else:
            return pd.DataFrame({"error": ["'data' key does not contain a list"]})
    else:
        return pd.DataFrame({"error": ["'data' key not found in the response"]})
    
#PREPROCESS projects
def PreprocessProjects(df):
    df['development_name'] = df['development'].apply(lambda x: x.get('name'))
    
    def filter_active_steps(steps):
        return [step['step_number'] for step in steps if step['is_active']]

    # Applying the function to create the new column
    df['active_steps'] = df['steps'].apply(filter_active_steps)

    return df

#PREPROCESS Properties
def PreprocessProperties(df):
    df['project_name'] = df['project'].apply(lambda x: x.get('name'))
    df['development_name'] = df['development'].apply(lambda x: x.get('name'))
    df['step_number'] = df['step'].apply(lambda x: x.get('step_number')) #numero de etapa

    def extract_total_amount(prices, currency_code):
        total_amount = None  # Initialize to None or any default value as needed
        for entry in prices:
            if entry['type'] == 'property' and entry['currency_code'] == currency_code:
                total_amount = entry['total_amount']
                break  # Exit loop once found
        return total_amount

    # Extracting total_amount_mxn and total_amount_usd
    df['total_amount_mxn'] = df['prices'].apply(lambda x: extract_total_amount(x, 'MXN'))
    df['total_amount_usd'] = df['prices'].apply(lambda x: extract_total_amount(x, 'USD'))

    return df


# Example usage

#df_properties = GetProperties(token)

def GetProjects(token: str):
    headers = {"Authorization": f"Bearer {token}"}
    url = "https://dashboard.colorines.paitesting.com/api/v1/projects"
    params = {'include': 'steps,development', 'is_active': 'True'} 
    try:
        response = requests.get(url, headers=headers, params=params)
        response.raise_for_status()  # Raise an exception for HTTP errors
        data = response.json()
    except requests.exceptions.RequestException as e:
        return pd.DataFrame({"error": [str(e)]})
    
    # Check if the 'data' key exists and contains a list of dictionaries
    if 'data' in data:
        if isinstance(data['data'], list):
            return pd.DataFrame(data['data'])
        else:
            return pd.DataFrame({"error": ["'data' key does not contain a list"]})
    else:
        return pd.DataFrame({"error": ["'data' key not found in the response"]})

# Example usage

df_projects = GetProjects(token)

def GetDevelopments(token: str):
    headers = {"Authorization": f"Bearer {token}"}
    url = "https://dashboard.colorines.paitesting.com/api/v1/developments"
    try:
        response = requests.get(url, headers=headers)
        response.raise_for_status()  # Raise an exception for HTTP errors
        data = response.json()
    except requests.exceptions.RequestException as e:
        return pd.DataFrame({"error": [str(e)]})
    
    # Check if the 'data' key exists and contains a list of dictionaries
    if 'data' in data:
        if isinstance(data['data'], list):
            return pd.DataFrame(data['data'])
        else:
            return pd.DataFrame({"error": ["'data' key does not contain a list"]})
    else:
        return pd.DataFrame({"error": ["'data' key not found in the response"]})

# Example usage

#df_devs = GetDevelopments(token)



### Sales

In [5]:
def GetSales(token: str, status: str = None, customer_id: int = None):
    headers = {
        "Authorization": f"Bearer {token}",
        "Accept": "application/json"  # Ensures the server knows you expect JSON
    }
    url = "https://dashboard.colorines.paitesting.com/api/v1/sales"
    params={}
    if status:
        params.update({"filter[status]": status})
        
    if customer_id:
        params.update({"customer_id": customer_id})
    
    try:
        response = requests.get(url, headers=headers, params=params)
        #print("Status Code:", response.status_code)  # Log the status code
        #print("Response Body:", response.text)   
        response.raise_for_status()  # Raise an exception for HTTP errors
        
        # Check if the response contains any content
        if response.content:
            data = response.json()
        else:
            return pd.DataFrame({"error": ["Empty response from the server"]})
        
    except requests.exceptions.RequestException as e:
        return pd.DataFrame({"error": [str(e)]})
    except ValueError as ve:
        return pd.DataFrame({"error": ["Error parsing JSON response"]})
    
    # Check if the 'data' key exists and contains a list of dictionaries
    if 'data' in data:
        if isinstance(data['data'], list):
            return pd.DataFrame(data['data'])
        else:
            return pd.DataFrame({"error": ["'data' key does not contain a list"]})
    else:
        return pd.DataFrame({"error": ["'data' key not found in the response"]})

def PreprocessSales(df):
    df['property_name'] = df['property'].apply(lambda x: x.get('alias'))

    return df

### Customers

In [6]:
def GetCustomers(token: str):
    headers = {
        "Authorization": f"Bearer {token}",
        "Accept": "application/json"  # Ensures the server knows you expect JSON
    }
    url = "https://dashboard.colorines.paitesting.com/api/v1/customers"

    try:
        response = requests.get(url, headers=headers)
        response.raise_for_status()  # Raise an exception for HTTP errors
        
        # Check if the response contains any content
        if response.content:
            data = response.json()
        else:
            return pd.DataFrame({"error": ["Empty response from the server"]})
        
    except requests.exceptions.RequestException as e:
        return pd.DataFrame({"error": [str(e)]})
    except ValueError as ve:
        return pd.DataFrame({"error": ["Error parsing JSON response"]})
    
    # Check if the 'data' key exists and contains a list of dictionaries
    if 'data' in data:
        if isinstance(data['data'], list):
            return pd.DataFrame(data['data'])
        else:
            return pd.DataFrame({"error": ["'data' key does not contain a list"]})
    else:
        return pd.DataFrame({"error": ["'data' key not found in the response"]})

def GetCustomerID(df, email):
    df_filtered = df[df['email'] == email]
    if (len(df_filtered)>0):
        return df_filtered.iloc[0].id
    else:
        return None

## Clean functions

In [260]:
def CleanProjects(df):
    df_clean = df[['name', 'development_name', 'active_steps']]
    df_clean = df_clean.rename(columns={'name': 'nombre_proyecto', 'development_name': 'nombre_desarrollo', 'active_steps': 'etapa'})
    return df_clean.to_json(orient='records', force_ascii=False)

def CleanProperties(df):
    df_clean = df[['alias', 'status', 'project_name', 'development_name', 'step_number', 'total_amount_mxn', 'total_amount_usd']]
    
    df_clean = df_clean.rename(columns={'alias': 'nombre_propiedad',
                                        'project_name': 'nombre_proyecto',
                                        'development_name': 'nombre_desarrollo',
                                        'step_number': 'etapa',
                                        'total_amount_mxn': 'precio_MXN',
                                        'total_amount_usd': 'precio_USD'
                                       })
    return df_clean.to_json(orient='records', force_ascii=False)

def CleanSalesByCustomer(df):
    df_clean = df[['property_name', 'currency', 'reserve_amount', 'property_amount', 'property_discount_amount', 'down_payment_amount', 'down_payment_percent', 'number_installments', 'installments_amount', 'status', 'amount_total', 'amount_paid', 'amount_remaining', 'next_payment_date', 'advance_percent', 'overdue_installments_count', 'overdue_installments_amount']]
    return df_clean.to_json(orient='records', force_ascii=False)

## Tools

In [41]:
@tool
def get_active_properties(token: str) -> str:
    """
    Uses the token to fetch the active properties from colorines API.
    
    Args:
        token (str): The authorization token required to access the API.

    Returns:
        str: The JSON response from the API as a string.
    """
    properties_df = GetProperties(token)
    properties_df = properties_df[properties_df['is_active'] == True]
    

    response = requests.get(url, headers=headers, params=params)
    # Check if the response was successful
    if response.status_code == 200:
        try:
            return json.dumps(response.json())  # Convert JSON response to string
        except ValueError:  # Catch JSON errors
            return json.dumps({"error": "Error parsing JSON"})
    else:
        return json.dumps({"error": f"Received status code {response.status_code}"})



@tool
def get_last_sale(token: str) -> str:
    """
    Uses the token to fetch the last sale from colorines API.
    
    Args:
        token (str): The authorization token required to access the API.

    Returns:
        str: The JSON response from the API as a string.
    """
    headers = {"Authorization": f"Bearer {token}"}
    url = "https://dashboard.colorines.paitesting.com/api/v1/sales"
    params = {"per_page": 1, "sort": "-created_at"}
    
    try:
        response = requests.get(url, headers=headers, params=params)
        # Check if the response was successful
        if response.status_code == 200:
            try:
                return json.dumps(response.json())  # Convert JSON response to string
            except ValueError:  # Catch JSON errors
                return json.dumps({"error": "Error parsing JSON"})
        else:
            return json.dumps({"error": f"Received status code {response.status_code}"})
    except requests.exceptions.RequestException as e:
        return json.dumps({"error": str(e)})



### Properties, developments and Steps

In [42]:
@tool
def get_active_projects(token: str) -> str:   
    """
    Uses the token to fetch the active (available) projects from colorines API.
    
    Args:
        token (str): The authorization token required to access the API.

    Returns:
        str: The JSON response from the API as a string.
    """
    df = GetProjects(token)
    pre_df = PreprocessProjects(df)
    return CleanProjects(pre_df)
    
@tool
def get_active_properties(token: str)-> str:
    """
    Uses the token to fetch the active (available) properties from colorines API.
    
    Args:
        token (str): The authorization token required to access the API.

    Returns:
        str: The JSON response from the API as a string.
    """
    
    df = GetProperties(token)
    pre_df = PreprocessProperties(df)
    return CleanProperties(pre_df)



### Sales

In [43]:
@tool
def count_sales_by_status(token: str, status: str)-> int:
    """
    Count the number of sales the user has with a specific status.
    
    
    Args:
        token (str): The authorization token required to access the API.
        status (str): The status to filter the API request. Acceptable status by the API: reserved, documents, contract, active, canceled, overdue, finalized, deeded.

    Returns:
        int: The number of sales with the requested status.
    """
    
    df = GetSales(token=token, status=status)
    count = len(df)
    return count

@tool
def get_sales_by_customer(token: str, email: str) -> str:
    """
    Retrieves and returns a list of sales associated with a customer identified by their email.

    This function performs the following steps:
    1. Uses the provided email to search for the customer ID.
    2. Retrieves the sales attached to that customer ID.
    3. Returns relevant sales information in JSON format.

    Args:
        token (str): The authorization token required to access the API.
        email (str): The email address used to identify the customer.

    Returns:
        str: A JSON response containing information about sales made to the customer.
             The JSON includes details about each sale that will be listed to the user by the agent.
    """
    # Function implementation here
    customers = GetCustomers(token)
    customer_id = GetCustomerID(customers, email=email)
    if customer_id:
        df = GetSales(token=token, customer_id=customer_id)
        df = PreprocessSales(df)
        return CleanSalesByCustomer(df)
    else:
        return 'No se encontró ningún cliente con ese correo electrónico, pregunta al cliente si el correo es correcto'




In [None]:
def call_json_output_parser():
    prompt = ChatPromptTemplate.from_messages([
        ("system", "Extract information from the Colorines API using the {token}.\nFormatting Instructions: {format_instructions}"),
        ("human", "{input}")
    ])

    class Data(BaseModel):
        data: str = Field(description="JSON with the response containing all the relevant information for the user")
        
    parser = JsonOutputParser(pydantic_object=Data)

    chain = prompt | model | parser
    
    return chain.invoke({
        "phrase": "The ingredients for a Margherita pizza are tomatoes, onions, cheese, basil",
        "format_instructions": parser.get_format_instructions()
    })

#### Sales remake

In [236]:
# Count sales
class CountSalesInput(BaseModel):
    """Input to grant access to Colorines API."""

    token: str = Field(..., description="token to authenticate the user to consume the API")
    status: str = Field(..., description="Status to filter the sales to retrieve from the API. Acceptable status by the API: reserved, documents, contract, active, canceled, overdue, finalized, deeded")
    
class CountSalesTool(BaseTool):
    name = "count_sales_by_status"
    description = "Count the number of sales the user has with a specific status. You must input the auth token"

    def _run(self, token: str, status: str):
        df = GetSales(token=token, status=status)
        count = len(df)
        return json.dumps({"count": count})  # Directly return JSON string


    def _arun(self, token: str, status: str):
        raise NotImplementedError("This tool does not support async")

    args_schema: Optional[Type[BaseModel]] = CountSalesInput
    
#list sales by email
# Input schema for fetching sales by customer
class GetSalesByCustomerInput(BaseModel):
    """Input to fetch sales by customer."""

    token: str = Field(..., description="Authorization token to access the API.")
    email: str = Field(..., description="Email address of the customer.")

# Tool class to retrieve sales by customer
class GetSalesByCustomerTool(BaseTool):
    name = "get_sales_by_customer"
    description = "Retrieve sales associated with a customer identified by email."

    def _run(self, token: str, email: str):
        """
        Retrieves and returns sales associated with a customer identified by email.

        Args:
            token (str): Authorization token to access the API.
            email (str): Email address of the customer.

        Returns:
            str: JSON response containing information about sales made to the customer.
        """
        # Replace these with your actual functions to fetch data
        customers = GetCustomers(token)
        customer_id = GetCustomerID(customers, email=email)
        
        if customer_id:
            df = GetSales(token=token, customer_id=customer_id)
            df = PreprocessSales(df)
            return json.dumps(CleanSalesByCustomer(df))
        else:
            return json.dumps({
                "message": "No se encontró ningún cliente con ese correo electrónico. "
                           "Verifica el correo proporcionado e intenta nuevamente."
            })

    def _arun(self, token: str, email: str):
        raise NotImplementedError("This tool does not support async.")

    args_schema: Optional[Type[BaseModel]] = GetSalesByCustomerInput


## Half agent use, just to select Tools

In [262]:
#ChatGpt model to select tools
model = ChatOpenAI(model="gpt-3.5-turbo", api_key=api_key)

#The tools classes
tool_names = [CountSalesTool().name, GetSalesByCustomerTool().name]
tools = [CountSalesTool(), GetSalesByCustomerTool()]

#Format Tools for being usable by the model
functions = [format_tool_to_openai_function(t) for t in tools]

#Token append to input string
token_str = f"mi auth token: {token}"

#Uses the model to predict which tool use, but we return the tool response directly
def smart_tool_selector(input_str, token_str, model, functions, tool_names):
    """
    Uses a machine learning model to predict which tool to use based on input,
    then directly returns the response from the selected tool.

    Args:
    - input_str (str): The input string to be processed.
    - token_str (str): The token string to be included in the input.
    - model: The machine learning model used to predict the tool.
    - functions (list): List of functions or tools available for selection.

    Returns:
    - tool_result: The result returned by the selected tool.
    - _args (dict): Arguments parsed from the AI model's function call prediction.
    - ai_message: Message object containing details of the AI model's prediction.
    """
    # Prepare input data for the model prediction
    input_data = {
        "input": f"{input_str} {token_str}"
    }
    
    # Predict the tool to use based on input using the machine learning model
    ai_message = model.predict_messages([HumanMessage(content=input_data['input'])], functions=functions)
    
    # Extract arguments for the tool from the AI model's function call prediction
    _args = json.loads(ai_message.additional_kwargs['function_call'].get('arguments'))
    
    tool_name = ai_message.additional_kwargs['function_call']['name']
    tool_index = tool_names.index(tool_name)
    # Execute the selected tool and capture its result
    tool_result = tools[tool_index](_args)  # Pass _args as keyword arguments
    
    # Return the tool's result, parsed arguments, and AI message
    return tool_result, _args, ai_message



In [258]:
input_str = 'cuantas ventas en el proceso de documentación tengo?'
token_str = "mi auth token: {token}"  # Make sure `{token}` is replaced by an actual token value.
result, args, msg = smart_tool_selector(input_str, token_str, model, functions, tool_names)
print(result)


count_sales_by_status 0
{"count": 1}


In [None]:
input_str='muéstrame las ventas del cliente admin@platform.com'
result, args, msg = smart_tool_selector(input_str, token_str, model, functions, tool_names)
result

In [158]:
TEST_MAIL

'admin@platform.com'

## Stable old agent

In [75]:
    
# Define the prompt template
prompt = ChatPromptTemplate.from_messages([
    ("system", "you're a helpful assistant who will use the {token} to use the tools to bring information to the user"),
    ("human", "{input}"),
    ("placeholder", "{agent_scratchpad}"),
])

# Example of integrating with langchain agent framework
api_key = os.getenv('OPENAI_API_KEY')
llm = ChatOpenAI(model="gpt-3.5-turbo", api_key=api_key)

# Add the `get_sales` function to the list of tools
tools_api = [get_last_sale, get_active_projects, count_sales_by_status, get_sales_by_customer] 

# Create the agent
agent = create_tool_calling_agent(llm, tools_api, prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools_api, verbose=True)

# Example usage:
# token = 'your_actual_token_here'
# base_url = 'https://api.yourdomain.com'


In [76]:
TEST_MAIL

'admin@platform.com'

In [78]:
input_data = {
    "input": "cuantas ventas en la etapa de documentos tengo?",
    "token": token
}

# Invoke the agent
agent_executor.invoke(input_data)



[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3m
Invoking: `count_sales_by_status` with `{'token': '2134|lFXht7lxyqFZ4N25J08O0ffzUHxf5gtJLIzzzJ4a6a41aaf3', 'status': 'documents'}`


[0m[38;5;200m[1;3m11[0m[32;1m[1;3mTienes un total de 11 ventas en la etapa de documentos.[0m

[1m> Finished chain.[0m


{'input': 'cuantas ventas en la etapa de documentos tengo?',
 'token': '2134|lFXht7lxyqFZ4N25J08O0ffzUHxf5gtJLIzzzJ4a6a41aaf3',
 'output': 'Tienes un total de 11 ventas en la etapa de documentos.'}