##### Libraries

In [33]:

import json
import requests
import os
import pandas as pd
import logging
import requests
import ml_anomalydetection 
import lancedb
from openai import OpenAI
from pydantic import BaseModel, Field
from typing import Optional, Literal, Dict, Any
from typing import List, Dict
from dotenv import load_dotenv


##### General parameters and configurations

In [34]:
# Set up logging configuration
logging.basicConfig(
    filename="application.log",
    filemode="w",
    level=logging.INFO,
    format="\n%(asctime)s - %(levelname)s - %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
    force=True    
    
)
logger = logging.getLogger(__name__)

# Setup LLM 
# Load environment variables
load_dotenv()
# Initialize OpenAI client
client = OpenAI()
model = "gpt-4o-mini"


##### Functions/Tools called by the Agent

In [35]:
def detect_anomaly_by_dates(start_date, end_date):     
    """Function to get all anomalies in a date range. Returns: JSON string and type of anomaly"""    
    data_frame_anomalies, type_anomaly = ml_anomalydetection.detect_all_anomalies_dates(start_date, end_date)

    if data_frame_anomalies is None:
        return {"temperature": 0.0, "response": "No records found in these dates."}
    json_str = data_frame_anomalies.to_json(orient='records')
    return json_str, type_anomaly

# Function to get all anomalies in a date range by parameter
def detect_anomaly_by_dates_by_parameter(start_date, end_date, parameter):     
    """Function to get all anomalies in a date range and checking specific parameter desviation. Returns: JSON string and type of anomaly"""     
    data_frame_anomalies, type_anomaly = ml_anomalydetection.detect_all_anomalies_dates_by_parameter(start_date, end_date, parameter)
    if data_frame_anomalies is None:
        return {"temperature": 0.0, "response": "No records found in these dates."}
    json_str = data_frame_anomalies.to_json(orient='records')
    return json_str, type_anomaly

def detect_anomaly_by_dates_by_id(type_anomaly, start_date, end_date):     
    """Function to get all anomalies in a date range and checking specific id or type of anomaly. Returns: JSON string and type of anomaly"""         
    data_frame_anomalies, type_anomaly = ml_anomalydetection.detect_anomalies_dates(type_anomaly, start_date, end_date)

    if data_frame_anomalies is None:
        return {"temperature": 0.0, "response": "No records found in these dates."}
    json_str = data_frame_anomalies.to_json(orient='records')
    return json_str, type_anomaly

def detect_anomaly_by_rows(type_anomaly, start_row, end_row):     
    """Function to get all anomalies in a range of rows. Returns: JSON string and type of anomaly"""         
    data_frame_anomalies, type_anomaly = ml_anomalydetection.detect_anomalies_file(type_anomaly, start_row, end_row)

    if data_frame_anomalies is None:
        return {"temperature": 0.0, "response": "No records found in these dates."}
    json_str = data_frame_anomalies.to_json(orient='records')
    return json_str, type_anomaly

def identify_anomaly_by_id(type_anomaly):     
    """Function to consult the dictionary of anomalies. Returns: JSON string"""     
    df = pd.read_csv('anomalies_glossary.csv')
    data_frame_description = df.loc[df['Type of anomaly'] == type_anomaly, 'Description']

    if data_frame_description is None:
        return {"temperature": 0.0, "response": "No records found in these dates."}     
    json_str = data_frame_description.to_json(orient='records')
    return json_str

def send_email_notification(to: str, subject: str, body: str):
    """Function to send an email. Returns: dictionary with parameters of the email"""

    # Simulate sending an email
    logger.info(f"Email sent successfully")

    # PENDING COMPLETE FUNCTIONALITY TO SEND REAL EMAIL!!!

    # Create the email message
    #message = MIMEMultipart()
    #message["From"] = sender_email
    #message["To"] = to
    #message["Subject"] = subject

    # Attach the body
    #message.attach(MIMEText(body, "plain"))

    #try:
        # Connect to Gmail SMTP server
        #with smtplib.SMTP("smtp.gmail.com", 587) as server:
        #    server.starttls()  # Secure the connection
        #    server.login(sender_email, sender_password)
        #    server.sendmail(sender_email, recipient_email, message.as_string())
        #    logger.info("Email sent successfully")
    #except Exception as e:
        #logger.info("Failed to send email")    
    return {"action": "Email sent", "to": to, "subject": subject, "body": body}

def call_API(endpoint: str, method: str, title: str, body: str, userId: int):
    """Function to call an API. Returns: dictionary with parameters of the request"""

    # Simulate calling an API (no real request here for safety)
    logger.info(f"\API called sucessfully" )
   
    # PENDING COMPLETE FUNCTIONALITY TO CALL AN API!!!
    #if method == "POST":
    #    response = requests.post(endpoint, json=payload)
    #elif method == "GET":
    #    response = requests.get(endpoint, params=payload)
    #return response.json()
    return {"action": "API called", "endpoint": endpoint, "method": method, "title": title, "body": body, "userId": userId}    


##### Definition of data models

In [36]:
class AnomalyGeneralFlowRequestType(BaseModel):
    """Router LLM call: Determine the step of the initial request of the anomaly"""

    request_type: Literal["detect_anomaly", "identify_anomaly_by_id", "check_documentation", "apply_action", "other"] = Field(
        description="Step or process to start the anomaly flow to apply"
    )
    confidence_score: float = Field(description="Confidence score between 0 and 1")
    description: str = Field(description="Cleaned description of the request")

class AnomalyDetectionRequestType(BaseModel):
    """Router LLM call: Determine the type of anomaly search detection request"""

    request_type: Literal["detect_anomaly_by_dates", "detect_anomaly_by_dates_by_parameter", "other"] = Field(
        description="Type of detection request being made"
    )
    confidence_score: float = Field(description="Confidence score between 0 and 1")
    description: str = Field(description="Cleaned description of the request")

class AnomalyDetectionRequestAction(BaseModel):
    """Router LLM call: Determine the action to apply when an anomaly is detected"""

    request_type: Literal["call_API", "send_email_notification",  "other"] = Field(
        description="Type of detection request being made"
    )
    confidence_score: float = Field(description="Confidence score between 0 and 1")
    description: str = Field(description="Cleaned description of the request")

class AnomaliesDatetimeRequestType(BaseModel):
    """Details for calling the function to detect an anomaly in specific range of time"""

    type_anomaly: int = Field(description="Type of anomaly")
    start_date: str = Field(description="Start date and time to verify anomaly (ISO 8601)")
    end_date: str = Field(description="End date and time to verify anomaly (ISO 8601)")

    temperature: float = Field(
        description="If the dataset has more True values in anomaly column the message is anomalies, otherwise no anomalies were found."
    )
    response: str = Field(
        description="A natural language response to the user's question giving details about the anomalies found. Give the total of the rows returned by the tool. Display a table with the total of values True and False in column anomalies'."
    )

class AnomaliesDatetimeParameterRequestType(BaseModel):
    """Details for calling the function to detect an anomaly in specific range of time analyzing an specific parameter"""

    type_anomaly: int = Field(description="Type of anomaly")
    start_date: str = Field(description="Start date and time to verify anomaly (ISO 8601)")
    end_date: str = Field(description="End date and time to verify anomaly (ISO 8601)")
    parameter: str = Field(description="Parameter to verify anomaly")

    temperature: float = Field(
        description="If the dataset has more True values in anomaly column the message is anomalies, otherwise no anomalies were found."
    )
    response: str = Field(
        description="A natural language response to the user's question giving details about the anomalies found. Give the total of the rows returned by the tool. Display a table with the total of values True and False in column anomalies'."
    )

class AnomaliesFileRequestType(BaseModel):
    """Details for calling the function to detect an anomaly in a file and specific range of rows"""

    type_anomaly: int = Field(description="Type of anomaly")
    file_name: str = Field(description="Name of the .cvs file to verify the anomaly")
    start_row: int = Field(description="Start row to verify anomaly")
    end_row: str = Field(description="End row verify anomaly")  

    temperature: float = Field(
        description="If the dataset has more True values in anomaly column the message is anomalies, otherwise no anomalies were found."
    )
    response: str = Field(
        description="A natural language response to the user's question giving details about the anomalies found. Give the total of the rows returned by the tool. Display a table with the total of values True and False in column anomalies'."
    )

class AnomaliesSendEmailRequestType(BaseModel):
    """Details for calling the function to send an email notification"""

    to: str = Field(description="Destination email address")
    subject: str = Field(description="Subject of the email")
    body: str =  Field(description="Body of the email")
    response: str = Field(
        description="A natural language response to the user's question giving details about the action applied to the anomaly'."
    )    

class AnomaliesCallAPIRequestType(BaseModel):
    """Details for calling the function to call an API"""

    endpoint: str  = Field(description="Endpoint of the API called")
    method: Literal["GET", "POST"] = Field(description="HTTP method used to call the API")  
    title: str = Field(description="Title of the API called")
    body: str = Field(description="Body of the API called")
    userId: int = Field(description="User ID of the API called")
    response: str = Field(
        description="A natural language response to the user's question giving details about the action applied to the anomaly'."
    )        

class AnomalyDetectionResponse(BaseModel):
    """Response model for an anomaly detection"""
   
    response: str = Field(
        description="A natural language response to the user's question giving details about the anomalies found. Give the total of the rows returned by the tool. Display a table with the total of values True and False in column anomalies'."
    )
    success: bool = Field(description="Whether the operation was successful")
    message: str = Field(description="User-friendly response message")
    type_anomaly: int = Field(description="Type of anomaly")

class AnomalyDocumentationResponse(BaseModel):    
    """Response model for a result in a documentation search"""    

    source: str = Field(description="Source of information about the anomaly") 
    title: str = Field(description="Title of the information") 
    text: str = Field(description="Detail about the anomaly in the documentation") 
    response: str = Field(
        description="A natural language response to the user's question giving details about the anomalies found. Give the total of the rows returned by the tool. Display a table with the total of values True and False in column anomalies'."
    )
    success: bool = Field(description="Whether the operation was successful")
    message: str = Field(description="User-friendly response message")

class IdentificationAnomalyResponse(BaseModel):
    """Response model for an anomaly identified in the dictionary"""

    type_anomaly: int = Field(description="Type of anomaly to find")   
    description_anomaly: str = Field(description="Description of the anomaly detected")    
    response: str = Field(
        description="A natural language response to the user's adding information about the description of the anomaly."
    ) 

class AnomalyActionResponse(BaseModel):
    """Response model for an action applied to an anomaly"""

    response: str = Field(
        description="A natural language response to the user's indicated the action or tool applied."
    )    
    success: bool = Field(description="Whether the operation was successful")
    message: str = Field(description="User-friendly response message")   

##### Tools definition

In [37]:
"""Structre with the functions/tool to be used by the agent"""
tools = [        

    {
        "type": "function",
        "function": {
            "name": "detect_anomaly_by_dates_by_id",
            "description": "Detect anomalies in specified dates rang and id of anomaly, convert the date in datetime format. Only call this function if the user message include dates and an number with the type of anomaly.",
            "parameters": {
                "type": "object",
                "properties": {
                    "type_anomaly": {
                        "type": "integer",
                        "description": "The number of the type of the anomaly."
                    },
                    "start_date": {
                        "type": "string",
                        "description": "The start time for anomaly detection in ISO 8601 format (e.g., '2025-02-24T21:21:34Z')."
                    },
                    "end_date": {
                        "type": "string",
                        "description": "The end time for anomaly detection in ISO 8601 format (e.g., '2025-02-24T22:21:34Z')."
                    },                
                },
                "required": ["type_anomaly", "start_date", "end_date"],
                "additionalProperties": False,
            },
            "strict": True,
        },
    },
    {
        "type": "function",
        "function": {
            "name": "detect_anomaly_by_rows",
            "description": "Detect anomalies in the specified rows range and file. Only call this function if the user mentions a file name.",
            "parameters": {
                "type": "object",
                "properties": {
                    "type_anomaly": {
                        "type": "integer",
                        "description": "The number of the type of the anomaly')."  
                    },
                    "start_row": {
                        "type": "integer",
                        "description": "The start row to check the anomalies in the file."
                    },
                    "end_row": {
                        "type": "integer",
                        "description": "The end row to check the anomalies in the file."
                    }, 
                    "file_name": {
                        "type": "string",
                        "description": "The file name to check the anomalies."
                    }, 
                },
                "required": ["type_anomaly", "start_row", "end_row", "file_name"],
                "additionalProperties": False,
            },
            "strict": True,
        },
    },
    {
        "type": "function",
        "function": {
            "name": "detect_anomaly_by_dates",
            "description": "Detect anomalies in specified dates range, convert the date in datetime format. Only call this function if the user message include dates.",
            "parameters": {
                "type": "object",
                "properties": {
                    "start_date": {
                        "type": "string",
                        "description": "The start time for anomaly detection in ISO 8601 format (e.g., '2025-02-24T21:21:34Z')."
                    },
                    "end_date": {
                        "type": "string",
                        "description": "The end time for anomaly detection in ISO 8601 format (e.g., '2025-02-24T22:21:34Z')."
                    },                
                },
                "required": ["start_date", "end_date"],
                "additionalProperties": False,
            },
            "strict": True,
        },
    },
    {
        "type": "function",
        "function": {
            "name": "detect_anomaly_by_dates_by_parameter",
            "description": "Detect anomalies in specified dates range and for specific parameter, convert the date in datetime format. Only call this function if the user message include dates and a parameter.",
            "parameters": {
                "type": "object",
                "properties": {
                    "start_date": {
                        "type": "string",
                        "description": "The start time for anomaly detection in ISO 8601 format (e.g., '2025-02-24T21:21:34Z')."
                    },
                    "end_date": {
                        "type": "string",
                        "description": "The end time for anomaly detection in ISO 8601 format (e.g., '2025-02-24T22:21:34Z')."
                    },   
                    "parameter": {
                        "type": "string",
                        "description": "The name of the parameter to analize anomaly."
                    },                
                },          
                "required": ["start_date", "end_date", "parameter"],
                "additionalProperties": False,
            },
            "strict": True,
        },
    },
    {
        "type": "function",
        "function": {
            "name": "identify_anomaly_by_id",
            "description": "Consult anomaly description in the dictionary.",
            "parameters": {
                "type": "object",
                "properties": {
                    "type_anomaly": {
                        "type": "integer",
                        "description": "The number of the type of the anomaly')."  
                    },                
                },
                "required": ["type_anomaly"],
                "additionalProperties": False,
            },
            "strict": True,
        },
    },
    {
        "type": "function",
        "function": {
            "name": "send_email_notification",
            "description": "Send an email to a specific address",
            "parameters": {
                "type": "object",
                "properties": {
                    "to": {
                        "type": "string",
                        "description": "Destiny of the email)."  
                    },   
                    "subject": {
                        "type": "string",
                        "description": "Subject of the email)."  
                    },
                    "body": {
                        "type": "string",
                        "description": "Body of the email)."  
                    },             
                },
                "required": ["to", "subject", "body"],
                "additionalProperties": False,
            },
            "strict": True,
        }
    },
    {
        "type": "function",
        "function": {
            "name": "call_API",
            "description": "Make a call to a public API",
            "parameters": {
                "type": "object",
                "properties": {
                    "endpoint": {
                        "type": "string",
                        "description": "The endpoint of the API."  
                    },   
                    "method": {
                        "type": "string",
                        "description": "The method of the API."  
                    },
                    "title": {
                        "type": "string",
                        "description": "The title of the API."  
                    },
                    "body": {
                        "type": "string",
                        "description": "The body of the API."  
                    },
                    "userId": {
                        "type": "integer",
                        "description": "The user ID of the API."  
                    },        
                },
                "required": ["endpoint", "method", "payload"],
                "additionalProperties": False,
            },
           
        }
    }
    
]

##### Prompts definition

In [38]:

IDENTIFICATION_PROMPT = """
Response the description of the anomaly based on anomaly type.

Anomaly Type: {type_anomaly}
Anomaly Description: {description_anomaly}

Provide the description of the anomaly based on the type of anomaly detected. User-friendly response message, include more information to complement the anomaly information


Return your response in this format:

# Identification
- Type: type_anomaly
- Description anomaly: description_anomaly
- Additional information: [Your additional information here]

"""

DOCUMENTATION_PROMPT = """
Apply the anomaly detection:
Description anomaly: {description_anomaly}
Source: {source}
Title: {title}
Text: {text}

Return your response in this format:

# Identification
Description anomaly: {description_anomaly}

# Documentation
- Source: {source}
- Title: {title}
- Text: {text}

"""

##### Orchestrator of the Agent AI 

In [39]:
class BlogOrchestrator:
    """Orchestrator class to handle the routing of the anomaly detection flow and all the functions to be called by the agent"""
    def __init__(
        self,       
    ):        
        self.sections_content = {}

    
# Initialize LanceDB connection
    def init_documentation_db(self):
        """Initialize database connection. Returns: LanceDB table object"""
        db = lancedb.connect("data/lancedb")
        return db.open_table("docling")

    def call_function(self, name, args):    
        """Function to call the tool functions based on the names identified by the routers"""
        
        if name == "detect_anomaly_by_dates_by_id":
            return detect_anomaly_by_dates_by_id(**args)
        if name == "detect_anomaly_by_dates":
            return detect_anomaly_by_dates(**args)
        if name == "detect_anomaly_by_dates_by_parameter":
            return detect_anomaly_by_dates_by_parameter(**args)
        if name == "identify_anomaly_by_id":
            return identify_anomaly_by_id(**args) 
        if name == "send_email_notification":
            return send_email_notification(**args) 
        if name == "call_API":
            return call_API(**args) 
       

    def call_tool(self, messages: List[Dict]) -> List[Dict]:  
        """Function to identify the function/tool to call based on the prompts, extract datails and parameters to add in the messages or prompt to execute and call it. Returns: The messages appending the tool call and the result of the tool call"""      
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=messages,
            tools=tools,
        )  
        # Extract tool call details
        tool_call = response.choices[0].message.tool_calls[0]
        tool_name = tool_call.function.name
        tool_args = json.loads(tool_call.function.arguments)

        logger.debug(f"Tool call: {tool_name} with args: {tool_args}")

        # Append the assistant's tool_call message
        messages.append(response.choices[0].message)

        # Call the function directly
        tool_result = self.call_function(tool_name, tool_args)          

        # Append the result of the tool call
        messages.append({
            "role": "tool",
            "tool_call_id": tool_call.id,
            "content": json.dumps(tool_result)
        })

        return messages
    
    def route_general_flow_request(self, user_input: str) -> AnomalyGeneralFlowRequestType:
        """Router LLM call to determine the flow of the request"""
        logger.info("Routing general flow request")

        completion = client.beta.chat.completions.parse(
            model=model,
            messages=[
                {
                    "role": "system",
                    "content": "Determine if this is a request to detect an anomaly in specific range of time, identify an anomaly by id, check the documentation of specific anomaly or apply an action.",
                },
                {"role": "user", "content": user_input},
            ],
            response_format=AnomalyGeneralFlowRequestType,
        )
        result = completion.choices[0].message.parsed
        logger.info(
            f"Request routed as: {result.request_type} with confidence: {result.confidence_score}"
        )
        return result
    
    def route_anomaly_detection_request(self, user_input: str) -> AnomalyDetectionRequestType:
        """Router LLM call to determine the search of anomaly detection"""
        logger.info("Routing anomaly detection request")

        completion = client.beta.chat.completions.parse(
            model=model,
            messages=[
                {
                    "role": "system",
                    "content": "Determine if this is a request to detect an anomaly by a range of rows in a file or by a range of dates or by range of dates for specific parameter. In case the request is asking about of an anomaly return the sentence: check documentation.",
                },
                {"role": "user", "content": user_input},
            ],
            response_format=AnomalyDetectionRequestType,
        )
        result = completion.choices[0].message.parsed
        logger.info(
            f"Request routed as: {result.request_type} with confidence: {result.confidence_score}"
        )
        return result
    
    def route_anomaly_detection_action(self, documentation_result: str) -> AnomalyDetectionRequestAction:
        """Router LLM call to determine the anomaly action to apply"""
        logger.info("Routing anomaly detection action")

        completion = client.beta.chat.completions.parse(
            model=model,
            messages=[
                {
                    "role": "system",
                    "content": "Determine if this is a request to an email send notification by or call an API.",
                },
                {"role": "user", "content": documentation_result},
            ],
            response_format=AnomalyDetectionRequestAction,
        )
        result = completion.choices[0].message.parsed
        logger.info(
            f"Request routed as: {result.request_type} with confidence: {result.confidence_score}"
        )
        return result 
    
    def get_documentation_context(self, query: str, table, num_results: int = 3) -> str:
        """Search the database for context of the information to search in documentation"""
        results = table.search(query).limit(num_results).to_pandas()
        contexts = []

        for _, row in results.iterrows():
            # Extract metadata
            filename = row["metadata"]["filename"]
            page_numbers = row["metadata"]["page_numbers"]
            title = row["metadata"]["title"]

            # Build source 
            source_parts = []
            if filename:
                source_parts.append(filename)
            if len(page_numbers) > 0:
                source_parts.append(f"p. {', '.join(str(p) for p in page_numbers)}")

            source = f"\nSource: {' - '.join(source_parts)}"
            if title:
                source += f"\nTitle: {title}"

            contexts.append(f"{row['text']}{source}")

        return "\n\n".join(contexts)


    def get_documentation_response(self, context: str) -> str:
        """Get streaming response with the documenation found. Returns: str: Model's response """

        system_prompt = f"""You are a helpful assistant that answers questions about anomalies in oil and gas from documents.
        Use only the information from the context to answer questions. If you're unsure or the context
        doesn't contain the relevant information, say so.
        
        Context:
        {context}
        """

        messages_with_context = [{"role": "system", "content": system_prompt}]

        # Create the streaming response
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=messages_with_context,
            temperature=0.7
        )

        # Extract the full response content
        full_response = response.choices[0].message.content

        return full_response    
    
    def handle_extract_anomaly_by_id(self, user_input: str) -> AnomalyDetectionResponse:
        """LLM call to extract the id of an anomaly recieve in the user message or prompt"""
        logger.info("Processing event modification request")

        completion = client.beta.chat.completions.parse(
            model=model,
            messages=[
                {
                    "role": "system",
                    "content": "Extract the id of the anomaly in type_anomaly .",
                },
                {"role": "user", "content": user_input},
            ],
            response_format=AnomalyDetectionResponse,
        )
        details = completion.choices[0].message.parsed

        logger.info(f"Identified anomaly: {details.model_dump_json(indent=2)}")

        return AnomalyDetectionResponse(
            message=f"Extracted the information for identify the anomaly. The type of anomaly is {details.type_anomaly}", 
            response=f"Extracted the information for identify the anomaly. The type of anomaly is {details.type_anomaly}",  
            type_anomaly=details.type_anomaly,
            success=True,            
        )    
    

    def handle_identify_anomaly_by_id(self, type_anomaly: str) -> IdentificationAnomalyResponse:
        """LLM call the function/tool to extract the call the function to identify the anomaly by id or type of anomaly"""

        logger.info("Processing detection by dates")
        messages=[
                {
                    "role": "system",
                    "content": "Consult the description of the anomaly in database. You have to call the function identify_anomaly_by_id.",
                },
                {"role": "user", "content": "The type of anomaly is " + type_anomaly},
            ]
        
        messages = self.call_tool(messages)

        completion = client.beta.chat.completions.parse(
            model=model,
            messages=[
                {
                    "role": "system",
                    "content": IDENTIFICATION_PROMPT.format(
                        type_anomaly=type_anomaly,
                        description_anomaly=messages
                    ),
                }
            ],
            response_format=IdentificationAnomalyResponse,
        )

        details= completion.choices[0].message.parsed

        return details     
    
    def handle_get_documentation(self, description_anomaly: str) -> AnomalyDocumentationResponse:
        """LLM call to get the documentation of the anomaly extracing the information from the description of the anomaly"""

        # Initialize database connection
        table = self.init_documentation_db()

        context = self.get_documentation_context("Summary the information related to" + description_anomaly, table)

        for chunk in context.split("\n\n"):
            # Split into text and metadata parts
            parts = chunk.split("\n")
            text = parts[0]
            metadata = {
                line.split(": ")[0]: line.split(": ")[1]
                for line in parts[1:]
                if ": " in line
            }

        source = metadata.get("Source", "Unknown source")
        title = metadata.get("Title", "Untitled section")
        message = {"role": "user", "content": context}

        response = self.get_documentation_response(context)
       
        completion = client.beta.chat.completions.parse(
            model=model,
            messages=[
                {
                    "role": "system",
                    "content": DOCUMENTATION_PROMPT.format(
                        source=source,
                        title=title,
                        text=response,
                        description_anomaly=description_anomaly,
                    ),
                }
            ],
            response_format=AnomalyDocumentationResponse,
        )
   
        details = completion.choices[0].message.parsed
            
        logger.info(f"Documentation search: {details.model_dump_json(indent=2)}")

        
        return AnomalyDocumentationResponse(
            message=f"Information about the anomaly {description_anomaly}", 
            response=details.response,
            source=details.source,
            title=details.title,
            text=details.text,
            success=True,
        )     
            
    def handle_detect_anomaly_by_dates(self, description: str) -> AnomalyDetectionResponse:
        """LLM call the function/tool to detect an anomaly by dates, extracing the information from the user message or prompt"""
        logger.info("Processing detection by dates")
        messages=[
                {
                    "role": "system",
                    "content": "Extract details to detect an anomaly by a range of dates. You have to call the function detect_anomaly_by_dates.",
                },
                {"role": "user", "content": description},
            ]
                
        messages = self.call_tool(messages)

        completion = client.beta.chat.completions.parse(
            model=model,
            messages=messages,
            response_format=AnomaliesDatetimeRequestType,
        )    

        details = completion.choices[0].message.parsed
            
        logger.info(f"New detection by dates: {details.model_dump_json(indent=2)}")

        
        return AnomalyDetectionResponse(
            message=f"Detect an anomaly dates {details.start_date} and {details.end_date}. The type of anomaly is {details.type_anomaly}", 
            response=details.response,     
            type_anomaly=details.type_anomaly,
            success=True,
            
        )   
    
    def handle_detect_anomaly_by_dates_by_parameter(self, description: str) -> AnomalyDetectionResponse:
        """LLM call the function/tool to detect an anomaly by dates and for specific parameter, extracing the information from the user message or prompt"""
        logger.info("Processing detection by dates")
        messages=[
                {
                    "role": "system",
                    "content": "Extract details to detect an anomaly by a range of dates and specific name of parameter desviation. You have to call the function detect_anomaly_by_dates_by_parameter.",
                },
                {"role": "user", "content": description},
            ]
                
        messages = self.call_tool(messages)

        
        completion = client.beta.chat.completions.parse(
            model=model,
            messages=messages,
            response_format=AnomaliesDatetimeParameterRequestType
        )    

        details = completion.choices[0].message.parsed
            
        logger.info(f"New detection by dates and parameter: {details.model_dump_json(indent=2)}")

        
        return AnomalyDetectionResponse(
            message=f"Detect an anomaly dates for specific parameter desviation {details.start_date} and {details.end_date}. The type of anomaly is {details.type_anomaly}", 
            response=details.response,     
            type_anomaly=details.type_anomaly,
            success=True,
            
        ) 

    def handle_send_email_notification(self, description: str) -> AnomalyActionResponse:
        """LLM call the function/tool to apply the action to send an email notification, extracing the information from the user message or prompt"""
        logger.info("Calling action send email notification")
        messages=[
                {
                    "role": "system",
                    "content": "Send a notification about the anomaly detected. You have to call the function send_email_notification.",
                },
                {"role": "user", "content": "Please send an email to adrianadiazgranados@gmail.com saying that an anomaly was detected with the summary of the below information." + description}
            ]
                
        messages = self.call_tool(messages)

        
        completion = client.beta.chat.completions.parse(
            model=model,
            messages=messages,
            response_format=AnomaliesSendEmailRequestType,
        )    

        details = completion.choices[0].message.parsed
            
        logger.info(f"Send an email notification: {details.model_dump_json(indent=2)}")

        
        return AnomalyActionResponse(
            message=f"Email send to {details.to}, subject {details.subject}. Text {details.body}", 
            response=details.response,     
            success=True,
            
        )   

    def handle_call_API(self, description: str) -> AnomalyActionResponse:
        """LLM call the function/tool to apply the action to call an API, extracing the information from the user message or prompt"""
        logger.info("Calling action call API")
        messages=[
                {
                    "role": "system",
                    "content": "Call a public API.",
                },
                {"role": "user", "content": "Call the API https://jsonplaceholder.typicode.com/posts, method POST and payload parameters in the description " + description}
            ]
                
        messages = self.call_tool(messages)

        
        completion = client.beta.chat.completions.parse(
            model=model,
            messages=messages,
            response_format=AnomaliesCallAPIRequestType,
        )    

        details = completion.choices[0].message.parsed
            
        logger.info(f"Call and API: {details.model_dump_json(indent=2)}")

        
        return AnomalyActionResponse(
            message=f"API call {details.endpoint}, method {details.method}. Title {details.title}. Body {details.body}. User ID {details.userId}", 
            response=details.response,     
            success=True,
            
        )      

    def process_anomaly_detection_request(self, user_input: str) -> Optional[AnomalyDetectionResponse]:
        """Function implementing the process workflow for the type of search for an anomaly detection"""
        logger.info("Processing anomaly detection request")

        # Route the request
        route_result = self.route_anomaly_detection_request(user_input)
    
        # Check confidence threshold
        if route_result.confidence_score < 0.7:
            logger.warning(f"Low confidence score: {route_result.confidence_score}")
            return None
        
        user_input = user_input + " " + route_result.description

        # Route to appropriate handler
        if route_result.request_type == "detect_anomaly_by_dates":
            return self.handle_detect_anomaly_by_dates(user_input)
        elif route_result.request_type == "detect_anomaly_by_dates_by_parameter":
            return self.handle_detect_anomaly_by_dates_by_parameter(user_input)
        else:
            logger.warning("Validate other request")
            return None
        
    def process_anomaly_detection_action(self, documentation_result: str) -> Optional[AnomalyDetectionResponse]:
        """Function implementing the process workflow for the type of action to apply when an anomaly is detected"""
        logger.info("Processing anomaly detection action")

        # Route the request
        route_result = self.route_anomaly_detection_action(documentation_result)


        # Check confidence threshold
        if route_result.confidence_score < 0.7:
            logger.warning(f"Low confidence score: {route_result.confidence_score}")
            return None
        
        action_request = documentation_result + " " + route_result.description

        # Route to appropriate handler
        if route_result.request_type == "send_email_notification":
            return self.handle_send_email_notification(action_request)
        elif route_result.request_type == "call_API":
            return self.handle_call_API(action_request)
        else:
            return self.handle_send_email_notification(action_request)

    def process_general_flow(self, user_input: str) -> Dict:
        """Main function implementing the entire process routing workflow to manage an anomaly following flexible flow depend of the user message or prompt"""
        analysis_result = None
        identification_result = None
        documentation_result = None
        action_result = None
        
        logger.info("Processing general flow")

        # Route the request
        route_result = self.route_general_flow_request(user_input)
    
        # Check confidence threshold
        if route_result.confidence_score < 0.7:
            logger.warning(f"Low confidence score: {route_result.confidence_score}")
            return None
        
        user_input = user_input + " " + route_result.description

        # Route to appropriate handler or router
        if route_result.request_type == "detect_anomaly":
            analysis_result = self.process_anomaly_detection_request(user_input)
            if analysis_result:
                print("\nANALYSIS RESULT\n")
                print(f"Message: {analysis_result.message}")
                print(f"Response 2: {analysis_result.response}")
                print(f"Response Type Anomaly: {analysis_result.type_anomaly}")

                logger.info(f"Anomaly identification {analysis_result.message}")
                logger.info(f"Anomaly identification {analysis_result.type_anomaly}")

        if route_result.request_type == "identify_anomaly_by_id" or (analysis_result and analysis_result.type_anomaly > 0):

            if not analysis_result:
                analysis_result = self.handle_extract_anomaly_by_id(user_input)
    
            identification_result = self.handle_identify_anomaly_by_id(str(analysis_result.type_anomaly))

            if identification_result:
                print("\nIDENTIFICATION RESULT\n")
                print(f"Identification response: {identification_result.response}")
                print(f"Type Anomaly: {identification_result.type_anomaly}")
                print(f"Description Anomaly: {identification_result.description_anomaly}")

                logger.info("Anomaly identification result")

        if route_result.request_type == "check_documentation" or (identification_result and len(identification_result.description_anomaly) > 0):
           
            if not analysis_result:
                description_anomaly = user_input
            else:
                description_anomaly = identification_result.description_anomaly

            documentation_result = self.handle_get_documentation(description_anomaly)

            if documentation_result:
                print("\nDOCUMENTATION RESULT\n")
                print(f"Documentation Source: {documentation_result.source}")
                print(f"Documentation Title: {documentation_result.title}")
                print(f"Documentation Text: {documentation_result.text}")
                print(f"Documentation Response: {documentation_result.response}")
                print(f"Documentation message: {documentation_result.message}")

                logger.info("Anomaly documentation result")

        if route_result.request_type == "apply_action" or (documentation_result and len(documentation_result.text) > 0):
            
            #action = " Call the API to manage the anomaly with parameters title: description of the anomaly, body: Summary of the anomaly, userId: 123 ."
            action = "Send an email with the details about the anomaly detected and the summary of the information."

            if not analysis_result:
                description_action = user_input
            else:
                description_action = documentation_result.response +  action
 
            action_result = self.process_anomaly_detection_action(description_action)
            
            if action_result:
                print("\nACTION RESULT\n")
                print(f"Action Response: {action_result.response}")
                print(f"Action Success: {action_result.success}")
                print(f"Action Message: {action_result.message}")

                logger.info("Anomaly action result")

        if route_result == None or route_result.request_type == "other":
            logger.warning("Validate other request")
            return None
        
        return {"Analysis": analysis_result, "Identification": identification_result, "Documentation": documentation_result, "Action": action_result}

    def process_fixed_flow(self, request: str) -> Dict:
        """Main function implementing the entire process routing workflow to manage an anomaly following fixed steps"""        

        logger.info(f"Starting anomaly identification: {request}")

    
        analysis_result = self.process_anomaly_detection_request(request)
        if analysis_result:
            print("\nANALYSIS RESULT\n")
            print(f"Message: {analysis_result.message}")
            print(f"Response 2: {analysis_result.response}")
            print(f"Response Type Anomaly: {analysis_result.type_anomaly}")

            logger.info(f"Anomaly identification {analysis_result.message}")
            logger.info(f"Anomaly identification {analysis_result.type_anomaly}")
        
            identification_result = self.handle_identify_anomaly_by_id(str(analysis_result.type_anomaly))
            logger.info("Anomaly identification result")

        if(identification_result):
            print("\nIDENTIFICATION RESULT\n")
            print(f"Type Anomaly: {identification_result.type_anomaly}")
            print(f"Description Anomaly: {identification_result.description_anomaly}")
            
            documentation_result = self.handle_get_documentation(identification_result.description_anomaly)
            logger.info("Anomaly action result")
            
        if(documentation_result):
            print("\nDOCUMENTATION RESULT\n")
            print(f"Documentation Source: {documentation_result.source}")
            print(f"Documentation Title: {documentation_result.title}")
            print(f"Documentation Text: {documentation_result.text}")
            print(f"Documentation Response: {documentation_result.response}")
            print(f"Documentation message: {documentation_result.message}")
           
            message = documentation_result.response + " Call the API to manage the anomaly with parameters title: description of the anomaly, body: Summary of the anomaly, userId: 123 ."
            action_result = self.process_anomaly_detection_action(message)
            logger.info("Anomaly action result")

        if(action_result):
            print("\nACTION RESULT\n")
            print(f"Action Response: {action_result.response}")
            print(f"Action Success: {action_result.success}")
            print(f"Action Message: {action_result.message}")

        return {"Analysis": analysis_result, "Identification": identification_result, "Documentation": documentation_result, "Action": action_result}   


##### Main to start the process of the AI Agent

In [40]:

if __name__ == "__main__":
    orchestrator = BlogOrchestrator()

    #detect_anomaly_dates = "Verify if there are anomalies between January 24th 2014 at 7:00:00 PM until 10:00:00 PM for the parameter T-JUS-CKP"
    #result = orchestrator.process_fixed_flow(
    #    request=detect_anomaly_dates
    #)

    #print("\nManage anomaly:")
  
    #print(f"Message: {result}")

    user_input = "Verify if there are anomalies between January 24th 2014 at 7:00:00 PM until 10:00:00 PM"
    print("\n\n------------------ TEST 1 ------------------\n")
    print(f"User input: {user_input} \n")
    result = orchestrator.process_general_flow(
        user_input=user_input
    )

    user_input = "Verify if there are anomalies between January 24th 2014 at 7:00:00 PM until 10:00:00 PM for the parameter T-JUS-CKP"
    print("\n\n------------------ TEST 2 ------------------\n")
    print(f"User input: {user_input} \n")
    result = orchestrator.process_general_flow(
        user_input=user_input
    )

    #print("\nManage anomaly:")
  
    #print(f"Message: {result}")

    user_input = "Give me the information about the anomaly with id 1"
    print("\n\n------------------ TEST 3 ------------------\n")
    print(f"User input: {user_input}\n")
    result = orchestrator.process_general_flow(
        user_input=user_input
    )
 
    #print("\nManage anomaly:")
  
    #print(f"Message: {result}")
    

    user_input = "I need the documentation about issue with thermal energy balance"
    print("\n\n------------------ TEST 4 ------------------\n")
    print(f"User input: {user_input}\n")
    result = orchestrator.process_general_flow(
        user_input=user_input
    )

    print("\nManage anomaly:")
  
    print(f"Message: {result}")
    
   
    user_input = "Send an email to notifiy about an anomaly detection in the valve"
    print("\n\n------------------ TEST 5 ------------------\n")
    print(f"User input: {user_input}\n")
    result = orchestrator.process_general_flow(
        user_input=user_input
    )

    #print("\nManage anomaly:")
  
    #print(f"Message: {result}")
    





------------------ TEST 1 ------------------

User input: Verify if there are anomalies between January 24th 2014 at 7:00:00 PM until 10:00:00 PM 


CALL FUNCTION TO DETECT ANOMALIES IN MACHINE LEARNING OR LLM MODEL

Function detect_all_anomalies_dates: 2014-01-24T19:00:00Z, 2014-01-24T22:00:00Z
Processing folder: 1
Processing file: WELL-00001_20140124213136.csv


  return xp.asarray(numpy.nanmin(X, axis=axis))
  return xp.asarray(numpy.nanmax(X, axis=axis))
  data_frame_target.loc[:, 'anomaly']  = df[col].apply(lambda x: True if x != 0 and x != '' else False)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df.loc[:, 'target'] = df[common_cols].sum(axis=1, min_count=1)


Data frame actual anomalies
(20, 1)
Data frame anomalies predicted
(20, 7)
Accuracy: 1.0

ANALYSIS RESULT

Message: Detect an anomaly dates 2014-01-24T19:00:00Z and 2014-01-24T22:00:00Z. The type of anomaly is 1
Response 2: Anomalies found during the specified time range. Total rows returned: 30. Anomalies count: True: 30, False: 0.
Response Type Anomaly: 1

IDENTIFICATION RESULT

Identification response: The identified anomaly is related to an issue with the thermal energy balance. This typically suggests that there is a discrepancy in the heat input and output in a system. It could indicate that the system is not regulating temperature properly, which might affect efficiency and performance. Addressing thermal energy balance concerns is crucial in systems such as HVAC, engines, and power plants to ensure optimal operation and prevent potential damage.
Type Anomaly: 1
Description Anomaly: issue with thermal energy balance.

DOCUMENTATION RESULT

Documentation Source: Oil and gas produ

  data_frame_target.loc[:, 'anomaly']  = df[col].apply(lambda x: True if x != 0 and x != '' else False)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df.loc[:, 'target'] = df[common_cols].sum(axis=1, min_count=1)


Data frame actual anomalies
(20, 1)
Data frame anomalies predicted
(20, 7)
Accuracy: 1.0

ANALYSIS RESULT

Message: Detect an anomaly dates for specific parameter desviation 2014-01-24T19:00:00Z and 2014-01-24T22:00:00Z. The type of anomaly is 1
Response 2: Anomalies detected for the parameter T-JUS-CKP between January 24th 2014 at 7:00 PM and 10:00 PM. A total of 30 values were analyzed, with 30 anomalies detected and 0 non-anomalies. 

| Anomaly Status | Count |
|----------------|-------|
| True           | 30    |
| False          | 0     |
Response Type Anomaly: 1

IDENTIFICATION RESULT

Identification response: The detected anomaly pertains to a significant issue with thermal energy balance. This could indicate potential problems in systems involved in maintaining or regulating temperature, leading to inefficiencies or failures. It is crucial to investigate the underlying causes, as this could impact overall system performance, safety, and energy consumption. Regular maintenance a