In [1]:
# warnings
import warnings
warnings.filterwarnings("ignore")

from langgraph.graph import StateGraph, END
from langchain_ollama import OllamaLLM
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from typing_extensions import TypedDict
from typing import List, Dict, Any
import re
import logging
from Tools.Logger import setup_logger
from transformers import AutoTokenizer
from huggingface_hub import login
from db_create import CargaDeArchivos

#tools
from Tools.Tool import run_sql_workflow, run_think_task, remove_think_tags



In [2]:
# === Logger instantiation ===
setup_logger()
logger = logging.getLogger(__name__)

In [3]:
# === Tokenizer logging ==
try:
    login(token="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")
    tokenizer = AutoTokenizer.from_pretrained("mistralai/Mistral-7B-Instruct-v0.1")
except Exception as e:
    logger.error(f"An error occurred during tokenizer setup: {e}", exc_info=True)
    raise

# === Database population and connection ===
try:
    db_manager = CargaDeArchivos()
    db_manager.run()
    db_conn = db_manager.conn
    print('Exitoso')
except Exception as e:
    logger.error(f"An error occurred during database population and connection: {e}", exc_info=True)
    raise


Exitoso


In [4]:
# == Orchestrator state ==
class AgentState(TypedDict):
    question: str
    plan: List[dict]
    current_step: int
    results: Dict[str, Any]
    query_results: List[str]
    db_conn: None
    tokenizer: any
    use_case: str

In [5]:
# == SQL prompts per case of use==
p1_p = """ /no_think
    You are an SQL assistant specialized in DuckDB. Your task is to generate accurate SQL queries based on natural language questions, following the schema and rules below.
    
    ### Schema (Aliased)
    
    - **cases**  (one row per process instance)
        - id (VARCHAR): Unique identifier for each case
        - order_date (TIMESTAMP_NS): Date when the order was placed
        - employee_id (VARCHAR): ID of the employee handling the case
        - branch (VARCHAR): Branch where the order originated
        - supplier (VARCHAR): Supplier involved in the case
        - avg_time (DOUBLE): Total duration of the case in time units
        - estimated_delivery (TIMESTAMP_NS): Expected delivery date
        - delivery (TIMESTAMP_NS): Actual delivery date
        - on_time (BOOLEAN): Whether the delivery met the deadline
        - in_full (BOOLEAN): Whether the order was delivered in full
        - number_of_items (INTEGER): Total items in the case
        - ft_items (INTEGER): Number of full/complete items delivered
        - total_price (DOUBLE): Total price of the order
        - total_activities (INTEGER): Number of activities in the case
        - rework_activities (INTEGER): Count of repeated/rework activities
        - automatic_activities (INTEGER): Count of system-generated activities
    
    - **activities**  (one row per activity within a process)
        - id (INTEGER): Unique identifier for the activity
        - timestamp (TIMESTAMP): When the activity occurred
        - name (VARCHAR): Name of the activity
        - tpt (DOUBLE): Time passed since the previous activity
        - user (VARCHAR): Person who performed the activity
        - user_type (VARCHAR): Role of the user (e.g., employee, system)
        - automatic (BOOLEAN): Whether the activity was system-generated
        - rework (BOOLEAN): Whether the activity was a rework/repeat
        - case_index (INTEGER): Position of the activity within the case
        - case_id (VARCHAR): ID of the associated case
        - case_order_date (TIMESTAMP): Order date for the case
        - case_employee_id (VARCHAR): Employee ID for the case
        - case_branch (VARCHAR): Branch handling the case
        - case_supplier (VARCHAR): Supplier involved in the case
        - case_avg_time (DOUBLE): Total duration of the case
        - case_estimated_delivery (TIMESTAMP): Expected delivery date
        - case_delivery (TIMESTAMP): Actual delivery date
        - case_on_time (BOOLEAN): Whether the case was delivered on time
        - case_in_full (BOOLEAN): Whether the order was complete
        - case_number_of_items (INTEGER): Total items in the case
        - case_ft_items (INTEGER): Number of full/complete items
        - case_total_price (DOUBLE): Total price of the case
    
    - **variants**  
      - id (BIGINT): Variant ID (PK for path)  
      - activities (VARCHAR[]): Ordered activity names for this path  
      - cases (VARCHAR[]): IDs of cases that followed this path (→ cases.id)  
      - number_cases (BIGINT): Total cases following this variant  
      - percentage (DOUBLE): Percentage of total cases  
      - avg_time (DOUBLE): Avg duration (sec) across cases in this variant
    
    ### Query Guidelines
    
    1. Always reference columns with aliases (e.g., c.id, a.case_id).
    2. Use UNNEST() in the FROM clause to access list fields like v.activities or v.cases. Do not use UNNEST() inside expressions like = ANY(...).
    3. When comparing list values (e.g., activity names), first UNNEST the list in a subquery or CTE, then use direct comparison with TRIM(...).
    4. Use TRIM() when comparing activity names (e.g., TRIM(a.name) = TRIM(...)).
    5. Avoid unnecessary joins or full scans when possible.
    6. Convert time differences with EXTRACT(EPOCH FROM ...).
    7. Include all non-aggregated columns in GROUP BY.
    
    ### Variant Comparison Rules
    
    - **Most Frequent Path:**  
      SELECT * FROM variants WHERE number_cases = (SELECT MAX(number_cases) FROM variants)
    
    - **Variant Durations:**  
      Use avg_time from variants. Do not recompute durations from activities unless explicitly requested.
    
    - **Deviations:**  
      Variants with id different from the most frequent one are deviations.  
      To detect deviation points, compare activities with the most frequent variant.
    
    - **Activity Durations Along Most Frequent Path:**  
      1. Extract activities using UNNEST(v.activities) AS activity.  
      2. Join with activities table using TRIM(v_activity) = TRIM(a.name).  
      3. Group by activity name and compute average tpt.
    
    ### Common Pitfall Corrections
    
    - Never use UNNEST() inside = ANY(...). Use UNNEST in a FROM clause or CTE, then join or filter.
    - Avoid > ALL(...). Prefer ORDER BY ... LIMIT 1 or = (SELECT MAX(...)).
    - Use subqueries for filtered aggregations, like:
    
      SELECT branch  
      FROM cases  
      WHERE approved = TRUE  
      GROUP BY branch  
      ORDER BY AVG(value) DESC  
      LIMIT 1
    
    - When aggregating on top branches, use subqueries or IN with preselected sets.
    - If no data matches a filter, return NULL instead of error.
    - To detect repeated activities on the same day:
    
      SELECT a.case_id, DATE_TRUNC('day', a.timestamp), COUNT(*)  
      FROM activities AS a  
      GROUP BY a.case_id, DATE_TRUNC('day', a.timestamp)  
      HAVING COUNT(*) > 1
    
      (Avoid GENERATE_SERIES here.)
    
    ### Error Examples
    
    *Incorrect:*
    
    ```sql
    SELECT branch FROM activities;
    -- Error: 'branch' does not exist in 'activities'

    SELECT case.id, name FROM grouped;
    -- Error: 'case' is a nested object, use json_extract or UNNEST first

    SELECT a.name, c.total_price FROM activities AS a, cases AS c;
    -- Error: Cartesian join without ON condition

    *Correct:*
    SELECT a.name, c.total_price
    FROM activities AS a
    JOIN cases AS c ON a.case_id = c.id;

    ###Output
    Return only the SQL query. No markdown, no tags, no explanation.
    Never guess values. Infer only from schema and question.
    """

p2_p= """/no_think 
      ### Database Schema

                - **cases**  
        - id (VARCHAR): Case identifier (PK)  
        - avg_time (DOUBLE): Total duration (sec) from start to closure  
        - type, branch, ramo, broker, state, client, creator (VARCHAR): Case metadata  
        - value (BIGINT): Insurance amount  
        - approved (BOOLEAN): Approval status  
        - case_order_date, case_estimated_delivery, case_delivery (TIMESTAMP): Case timestamps  
        - case_employee_id, case_branch, case_supplier (VARCHAR): Case-specific information  
        - case_number_of_items, case_ft_items (INTEGER): Case item details  
        - case_total_price (DOUBLE): Case total price

        - **activities**  
        - id (BIGINT): Activity identifier (PK)  
        - case_id (VARCHAR): Case ID (FK → cases.id)  
        - timestamp (TIMESTAMP): Activity timestamp  
        - name (VARCHAR): Activity name  
        - case_index (BIGINT): Alias of id  
        - tpt (DOUBLE): Duration of the activity in seconds  
        - user, user_type (VARCHAR): User-related info  
        - automatic, rework (BOOLEAN): Activity flags  
        - case_order_date (TIMESTAMP), case_employee_id (VARCHAR), case_branch (VARCHAR), case_supplier (VARCHAR): Case-related data  
        - case_avg_time (DOUBLE): Average time for the case  
        - case_on_time, case_in_full (BOOLEAN): Delivery status flags  
        - case_number_of_items, case_ft_items (INTEGER): Case item counts  
        - case_total_price (DOUBLE): Case total price  
        - case_estimated_delivery, case_delivery (TIMESTAMP): Delivery-related timestamps

        - **variants**  
        - id (BIGINT): Variant ID (PK for path)  
        - activities (VARCHAR[]): Ordered activity names for this path  
        - cases (VARCHAR[]): IDs of cases that followed this path (→ cases.id)  
        - number_cases (BIGINT): Total cases following this variant  
        - percentage (DOUBLE): Percentage of total cases  
        - avg_time (DOUBLE): Avg duration (sec) across cases in this variant

            **Relations:**
            - "variants"."cases" references "cases"."id", meaning each variant is followed by multiple cases.
            - "variants"."activities" corresponds to the ordered "activities"."name" values for those cases.
            """
p1_i= """ /no_think
        You are an SQL assistant specialized in DuckDB. Your task is to generate accurate SQL queries based on natural language questions, following the schema and rules below.

        ### Schema (Aliased)

            - **grouped (g)**  
            - group_id (VARCHAR): Unique identifier for each group (PK)  
            - amount_overpaid (BIGINT): Total overpaid amount for the group  
            - itemCount (BIGINT): Number of items in the group  
            - date (VARCHAR): Date of the group  
            - pattern (VARCHAR): Pattern type for the group 'Similar Value','Similar Reference','Exact Match','Similar Date','Similar Vendor','Multiple'
            - open (BOOLEAN): Status of the group (open or closed)  
            - confidence (VARCHAR): Confidence level for detecting the pattern (e.g., "High", "Medium", "Low")  
            - items (STRUCT[]): Array of items within the group, each containing:
                - **id (INTEGER)**: Item identifier (FK → invoices.id)
                - **case (STRUCT)**: Contains case details, such as:
                    - id (VARCHAR): Case identifier  
                    - order_date (VARCHAR): Order date for the case  
                    - employee_id (VARCHAR): Employee ID handling the case  
                    - branch (VARCHAR): Branch handling the case  
                    - supplier (VARCHAR): Supplier associated with the case  
                    - avg_time (DOUBLE): Average time for the case  
                    - estimated_delivery (VARCHAR): Estimated delivery date for the case  
                    - delivery (VARCHAR): Actual delivery date for the case  
                    - on_time (BOOLEAN): Whether the case was delivered on time  
                    - in_full (BOOLEAN): Whether the case was delivered in full  
                    - number_of_items (INTEGER): Number of items in the case  
                    - ft_items (INTEGER): Number of full-time items in the case  
                    - total_price (INTEGER): Total price of the case  
                - date (VARCHAR): Date of the item  
                - unit_price (VARCHAR): Unit price of the item  
                - quantity (INTEGER): Quantity of the item  
                - value (VARCHAR): Value of the item  
                - pattern (VARCHAR): Pattern type for the group 'Similar Value','Similar Reference','Exact Match','Similar Date','Similar Vendor','Multiple'  
                - open (BOOLEAN): Status of the item (open or closed)  
                - group_id (VARCHAR): Group identifier (FK → grouped.group_id)  
                - confidence (VARCHAR): Confidence level for the item’s pattern (e.g., "high", "medium", "low")  
                - description (VARCHAR): Description of the item  
                - payment_method (VARCHAR): Payment method used for the item  
                - pay_date (VARCHAR): Payment date of the item  
                - special_instructions (VARCHAR): Special instructions for the item  
                - accuracy (INTEGER): Accuracy of the item’s data matching

            - **invoices (i)**  
            - id (BIGINT): Invoice identifier (PK)  
            - date (TIMESTAMP_NS): Date and time the invoice was issued  
            - unit_price (VARCHAR): Unit price of the item in the invoice  
            - quantity (BIGINT): Number of items in the invoice  
            - value (VARCHAR): Total value of the invoice  
            - pattern (VARCHAR): Pattern type for the group 'Similar Value','Similar Reference','Exact Match','Similar Date','Similar Vendor','Multiple'
            - open (BOOLEAN): Status of the invoice (open or closed)  
            - group_id (VARCHAR): Group identifier (FK → grouped.group_id)  
            - confidence (VARCHAR): Confidence level for the invoice's pattern (e.g., "High", "Medium", "Low")  
            - description (VARCHAR): Description of the invoice  
            - payment_method (VARCHAR): Method used for payment  
            - pay_date (TIMESTAMP_NS): Date and time the invoice was paid  
            - special_instructions (VARCHAR): Any special instructions for the invoice  
            - accuracy (BIGINT): Accuracy of the invoice's data matching  
            - case_id (VARCHAR): Case identifier associated with the invoice  
            - case_order_date (TIMESTAMP_NS): Order date of the case  
            - case_employee_id (VARCHAR): Employee associated with the case  
            - case_branch (VARCHAR): Branch where the case was handled  
            - case_supplier (VARCHAR): Supplier associated with the case  
            - case_avg_time (DOUBLE): Average time for the case  
            - case_estimated_delivery (TIMESTAMP_NS): Estimated delivery date for the case  
            - case_delivery (TIMESTAMP_NS): Actual delivery date for the case  
            - case_on_time (BOOLEAN): Whether the case was delivered on time  
            - case_in_full (BOOLEAN): Whether the case was delivered in full  
            - case_number_of_items (BIGINT): Number of items in the case  
            - case_ft_items (BIGINT): Number of full-time items in the case  
            - case_total_price (BIGINT): Total price of the case

        ### Query Guidelines

        1. **Prefer Direct Tables**:  
        Use `grouped (g)` or `invoices (i)` directly unless item-level fields are explicitly needed.

        2. **UNNEST Only When Necessary**:
        - Only use `UNNEST(g.items) AS item` when accessing nested fields (e.g., `item.case.supplier`, `item.unit_price`, etc.)
        - After unnesting, access fields as `item.field` or `item.case.supplier`, **not** `item.unnest.field`.

        3. **Nesting and Access Rules**:
        - To access supplier from `grouped`, unnest items and use:  
            ```sql
            FROM grouped g, UNNEST(g.items) AS item
            WHERE item.case.supplier = 'Example'
            ```
        - Avoid referencing nested fields without unnesting first.

        4. **Case Sensitivity**:
        - Use exact case for values:
            - Confidence: 'High', 'Medium', 'Low'
            - Pattern: 'Similar Value', 'Similar Reference', 'Exact Match', 'Similar Date', 'Similar Vendor', 'Multiple'

        5. **Use Table Aliases**:
        - Always use `g.` for `grouped`, `i.` for `invoices`, and `item.` after unnesting.

        6. **Use TRIM() for Comparisons**:
        - For text comparisons like pattern or supplier, wrap with `TRIM()`.  
            Example: `TRIM(item.case.supplier) = 'VendorName'`

        7. **Use IN / = ANY for Multiple Matches**:
        - Use `pattern = ANY (['Value1', 'Value2'])` or `IN (...)` instead of OR chains.

        8. **GROUP BY Nested Fields**:
        - If grouping by nested fields like supplier, first unnest, then group by `item.case.supplier`.

        9. **Aggregation and Filtering**:
        - Use `ORDER BY ... LIMIT 1` instead of `> ALL(...)`
        - Filter early with WHERE clauses to improve performance.

        10. **Alternative Access**:
        - Use `invoices` for simpler flat queries (e.g., `i.case_supplier`).

        ---

        ### Output Rules

        - ❌ Do NOT explain the query.
        - ✅ Only return the SQL query (no markdown, no comments, no formatting).
        - ❌ Do NOT guess field names.
        - ✅ Always respect the provided schema and capitalization.
        """

p2_i= """ /no_think
    ### Schema (Aliased)

    - **grouped (g)**  
    - group_id (VARCHAR): Unique identifier for each group (PK)  
    - amount_overpaid (BIGINT): Total overpaid amount for the group  
    - itemCount (BIGINT): Number of items in the group  
    - date (VARCHAR): Date of the group  
    - pattern (VARCHAR): Pattern type for the group 'Similar Value','Similar Reference','Exact Match','Similar Date','Similar Vendor','Multiple'
    - open (BOOLEAN): Status of the group (open or closed)  
    - confidence (VARCHAR): Confidence level for detecting the pattern (e.g., "High", "Medium", "Low")  
    - items (STRUCT[]): Array of items within the group, each containing:
        - **id (INTEGER)**: Item identifier (FK → invoices.id)
        - **case (STRUCT)**: Contains case details, such as:
            - id (VARCHAR): Case identifier  
            - order_date (VARCHAR): Order date for the case  
            - employee_id (VARCHAR): Employee ID handling the case  
            - branch (VARCHAR): Branch handling the case  
            - supplier (VARCHAR): Supplier associated with the case  
            - avg_time (DOUBLE): Average time for the case  
            - estimated_delivery (VARCHAR): Estimated delivery date for the case  
            - delivery (VARCHAR): Actual delivery date for the case  
            - on_time (BOOLEAN): Whether the case was delivered on time  
            - in_full (BOOLEAN): Whether the case was delivered in full  
            - number_of_items (INTEGER): Number of items in the case  
            - ft_items (INTEGER): Number of full-time items in the case  
            - total_price (INTEGER): Total price of the case  
        - date (VARCHAR): Date of the item  
        - unit_price (VARCHAR): Unit price of the item  
        - quantity (INTEGER): Quantity of the item  
        - value (VARCHAR): Value of the item  
        - pattern (VARCHAR): Pattern type for the group 'Similar Value','Similar Reference','Exact Match','Similar Date','Similar Vendor','Multiple'  
        - open (BOOLEAN): Status of the item (open or closed)  
        - group_id (VARCHAR): Group identifier (FK → grouped.group_id)  
        - confidence (VARCHAR): Confidence level for the item’s pattern (e.g., "high", "medium", "low")  
        - description (VARCHAR): Description of the item  
        - payment_method (VARCHAR): Payment method used for the item  
        - pay_date (VARCHAR): Payment date of the item  
        - special_instructions (VARCHAR): Special instructions for the item  
        - accuracy (INTEGER): Accuracy of the item’s data matching

    - **invoices (i)**  
    - id (BIGINT): Invoice identifier (PK)  
    - date (TIMESTAMP_NS): Date and time the invoice was issued  
    - unit_price (VARCHAR): Unit price of the item in the invoice  
    - quantity (BIGINT): Number of items in the invoice  
    - value (VARCHAR): Total value of the invoice  
    - pattern (VARCHAR): Pattern type for the group 'Similar Value','Similar Reference','Exact Match','Similar Date','Similar Vendor','Multiple'
    - open (BOOLEAN): Status of the invoice (open or closed)  
    - group_id (VARCHAR): Group identifier (FK → grouped.group_id)  
    - confidence (VARCHAR): Confidence level for the invoice's pattern (e.g., "High", "Medium", "Low")  
    - description (VARCHAR): Description of the invoice  
    - payment_method (VARCHAR): Method used for payment  
    - pay_date (TIMESTAMP_NS): Date and time the invoice was paid  
    - special_instructions (VARCHAR): Any special instructions for the invoice  
    - accuracy (BIGINT): Accuracy of the invoice's data matching  
    - case_id (VARCHAR): Case identifier associated with the invoice  
    - case_order_date (TIMESTAMP_NS): Order date of the case  
    - case_employee_id (VARCHAR): Employee associated with the case  
    - case_branch (VARCHAR): Branch where the case was handled  
    - case_supplier (VARCHAR): Supplier associated with the case  
    - case_avg_time (DOUBLE): Average time for the case  
    - case_estimated_delivery (TIMESTAMP_NS): Estimated delivery date for the case  
    - case_delivery (TIMESTAMP_NS): Actual delivery date for the case  
    - case_on_time (BOOLEAN): Whether the case was delivered on time  
    - case_in_full (BOOLEAN): Whether the case was delivered in full  
    - case_number_of_items (BIGINT): Number of items in the case  
    - case_ft_items (BIGINT): Number of full-time items in the case  
    - case_total_price (BIGINT): Total price of the case

"""


prompts_sql_generation= {"0":[p1_p,p2_p],
            "1":[p1_i,p2_i]}


In [6]:
# == Orchestrator nodes ==
def planner_node(state: AgentState) -> AgentState:
    try:
        user_question = state["question"]

        plan_prompt = """ /no_think

        Generate a numbered list of up to 10 sequential tasks needed to fully answer the user's question.
        
        You have access to two tools:
        - [SQL,0]: Process Mining
        - [SQL,1]: Invoice Analysis
        - [THINK]: Reasoning/Interpretation
        
        Each task should include:
        - A "type" field specifying the tool to use.
        - A "description" of what the task will do.
        - A "reason" explaining why this task is necessary.
        - A "steps" field that lists the numbers of prior activities whose outputs are required to complete this task. If the task does not depend on any previous output, use an empty list.
        
        Format your output as a JSON object like:
        {{
            "ACTIVITY1": {{
                "type": "[SQL,0]",
                "description": "...",
                "reason": "...",
                "steps": []
            }},
            "ACTIVITY2": {{
                "type": "[THINK]",
                "description": "...",
                "reason": "...",
                "steps": [1]
            }},
            ...
        }}
        """

        llm = OllamaLLM(model="qwen3:8b", temperature=0.0, enable_thinking=False)
        planner = ChatPromptTemplate.from_messages([
            ("system", plan_prompt),
            ("human", "user question: {task}"),
        ]) | llm | StrOutputParser()

        raw_plan = planner.invoke({"task": user_question})
        raw_plan = remove_think_tags(raw_plan)
        print(raw_plan)

        # Parse JSON-like plan
        steps = []
        pattern = re.compile(r'"?(ACTIVITY\d+)"?\s*:\s*{')
        lines = raw_plan.strip().splitlines()
        current_step = None
        #use_case = "0"

        for line in lines:
            match = pattern.match(line.strip())
            if match:
                if current_step:
                    current_step.setdefault("type", "[THINK]")
                    current_step.setdefault("description", "")
                    current_step.setdefault("reason", "")
                    current_step.setdefault("steps", [])
                    steps.append(current_step)
                current_step = {"id": match.group(1)}
            elif current_step:
                if '"type"' in line:
                    task_type = re.search(r'"type"\s*:\s*"([^"]+)",?', line)
                    if task_type:
                        current_step["type"] = task_type.group(1)
                        if "[SQL,1]" in task_type.group(1):
                            use_case = "1"
                elif '"description"' in line:
                    desc = re.search(r'"description"\s*:\s*"([^"]+)",?', line)
                    if desc:
                        current_step["description"] = desc.group(1)
                elif '"reason"' in line:
                    reason = re.search(r'"reason"\s*:\s*"([^"]+)",?', line)
                    if reason:
                        current_step["reason"] = reason.group(1)
                elif '"steps"' in line:
                    steps_str = re.search(r'"steps"\s*:\s*\[([^\]]*)\]', line)
                    if steps_str:
                        current_step["steps"] = [int(x.strip())-1 for x in steps_str.group(1).split(",") if x.strip()]

        if current_step:
            current_step.setdefault("type", "[THINK]")
            current_step.setdefault("description", "")
            current_step.setdefault("reason", "")
            current_step.setdefault("steps", [])
            steps.append(current_step)

        return {
            "plan": steps,
            "current_step": 0, # Start from the first step
            "results": {},
            "query_results": [],
            "db_conn": db_conn,
            "tokenizer": tokenizer,
            "use_case": use_case,
            "question": user_question
        }

    except Exception as e:
        logger.exception(f"Error in planner_node: {e}")
        raise



def execute_task_node(state: AgentState) -> AgentState:
    try:
        step = state["plan"][state["current_step"]]
        task = step["description"]
        dependencies = step["steps"]
        logger.info(f"Previous steps: {dependencies}")
        task_type = step["type"]
        # dependencies = step["steps"] # Not used in this version. Usar en context con if step in dependencies

        context = "\n".join(f"[Step {step}] {state['results'][step]}" for step in sorted(state["results"], key=int) if int(step) in dependencies)
        logger.info(f"Context: {context}")

        print(f"\n[Task {state['current_step'] + 1}] {task}")

        if "SQL" in task_type:
            use_case = state["use_case"]
            system_prompt, repair_prompt = prompts_sql_generation[use_case]
            answer, raw_result = run_sql_workflow(
                task, state["db_conn"], use_case, state["tokenizer"], context, system_prompt, repair_prompt
            )
        else:
            answer = run_think_task(task, context)
            raw_result = answer

        return {
            "plan": state["plan"],
            "results": {**state["results"],str(state["current_step"]): answer}, #Saves answer before updating the current step
            "current_step": state["current_step"] + 1,            
            "query_results": state["query_results"] + [raw_result],
            "db_conn": state["db_conn"],
            "tokenizer": state["tokenizer"],
            "use_case": state["use_case"],
            "question": state["question"]
        }

    except Exception as e:
        logger.exception(f"Error in execute_task_node: {e}")
        raise



In [7]:
# === Orchestrator routers ===
def node_router(state: AgentState) -> str:
    try:
        next_node =  END if state["current_step"] >= len(state["plan"]) else "execute_task"
    except Exception as e:
        logger.exception(f"Error in node_router: {e}")
    return next_node

In [8]:
# === Orchetrator workflow ===
def build_orchestrator_workflow():
    try:
        graph = StateGraph(AgentState)
        graph.add_node("planner", planner_node)
        graph.add_node("execute_task", execute_task_node)
        graph.set_entry_point("planner")
        graph.add_edge("planner", "execute_task")
        graph.add_conditional_edges("execute_task", node_router)
        graph.set_finish_point("execute_task")
        return graph.compile()
    except Exception as e:
        logger.exception(f" Error compiling Orchestrator workflow: {e}")
        raise

In [9]:
workflow = build_orchestrator_workflow()
output = workflow.invoke({"question": "How many invoices are duplicated?"})

{
    "ACTIVITY1": {
        "type": "[SQL,1]",
        "description": "Extract all invoice data from the database.",
        "reason": "To have access to the full invoice dataset for analysis.",
        "steps": []
    },
    "ACTIVITY2": {
        "type": "[THINK]",
        "description": "Identify potential duplicate invoice criteria (e.g., same invoice number, same customer, same amount, same date).",
        "reason": "To define what constitutes a duplicate invoice for the analysis.",
        "steps": [1]
    },
    "ACTIVITY3": {
        "type": "[SQL,1]",
        "description": "Group invoices by potential duplicate criteria and count occurrences.",
        "reason": "To identify which invoices are duplicated based on the defined criteria.",
        "steps": [1, 2]
    },
    "ACTIVITY4": {
        " "type": "[THINK]",
        "description": "Determine the number of duplicated invoices by analyzing the grouped data.",
        "reason": "To finalize the count of duplicated invoic

In [10]:
output["results"]

{'0': 'To extract all invoice data from the database, you would typically query the relevant table(s) that store invoice records. However, the data provided here appears to be **summary statistics** (e.g., mean, median, standard deviation) rather than raw invoice records. If you need the actual invoice data, follow these steps:\n\n---\n\n### **1. Identify the Invoice Table**\nAssuming the database has a table named `invoices` (or similar), the raw data would include fields like:\n- `invoice_id` (e.g., "System" in your data)\n- `invoice_date` (e.g., "Date" in your data)\n- `total_amount` (e.g., "Total Amount")\n- `tax_amount` (e.g., "Tax Amount")\n- `discount_amount` (e.g., "Discount Amount")\n- `net_amount` (e.g., "Net Amount")\n- Other metrics like `quantity`, `customer_id`, etc.\n\n---\n\n### **2. Query the Database**\nUse SQL to retrieve all invoice records:\n```sql\nSELECT * FROM invoices;\n```\nThis will return all columns and rows from the `invoices` table, including individual i

In [11]:
output

{'question': 'How many invoices are duplicated?',
 'plan': [{'id': 'ACTIVITY1',
   'type': '[SQL,1]',
   'description': 'Extract all invoice data from the database.',
   'reason': 'To have access to the full invoice dataset for analysis.',
   'steps': []},
  {'id': 'ACTIVITY2',
   'type': '[THINK]',
   'description': 'Identify potential duplicate invoice criteria (e.g., same invoice number, same customer, same amount, same date).',
   'reason': 'To define what constitutes a duplicate invoice for the analysis.',
   'steps': [0]},
  {'id': 'ACTIVITY3',
   'type': '[SQL,1]',
   'description': 'Group invoices by potential duplicate criteria and count occurrences.',
   'reason': 'To identify which invoices are duplicated based on the defined criteria.',
   'steps': [0, 1]},
  {'id': 'ACTIVITY4',
   'type': '[THINK]',
   'description': 'Determine the number of duplicated invoices by analyzing the grouped data.',
   'reason': 'To finalize the count of duplicated invoices based on the grouped 