In [657]:
import os 
import json
from langgraph.graph import StateGraph,START,END
from typing import TypedDict,Dict,List,Any
import pymongo
from langchain_openai import AzureChatOpenAI
from langchain.prompts import PromptTemplate

from dotenv import load_dotenv
from datetime import datetime
import re
from config import ConfigData

In [658]:
load_dotenv()

True

In [659]:
class MongoDBAgent:
    def __init__(self, uri: str, db_name: str, default_collection: str):
        import pymongo
        self.client = pymongo.MongoClient(uri)
        self.db = self.client[db_name]
        self.default_collection_name = default_collection

    def list_collections(self):
        return self.db.list_collection_names()

    def get_collection(self, name: str):
        return self.db[name]

    def validate_collection(self, name: str):
        return name in self.list_collections()

    def get_default_collection(self):
        """Return the default collection as pymongo Collection object"""
        return self.db[self.default_collection_name]

MONGO_URI = "mongodb://localhost:27017/mydatabase"
MONGO_DB = "ASD"
DEFAULT_COLLECTION = "2025-05-13"


mongo_agent = MongoDBAgent(MONGO_URI, MONGO_DB, DEFAULT_COLLECTION)
collection = mongo_agent.get_default_collection()



In [660]:
collections=mongo_agent.list_collections()
print("Available collections:", collections)

Available collections: ['2025-08-01', '2025-08-11', '2025-05-13', '2025-08-17', '2025-08-12', '2025-08-18']


In [661]:
llm = AzureChatOpenAI(
    azure_deployment=os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME"),
    api_version=os.getenv("AZURE_OPENAI_API_VERSION"),
    temperature=0
)

In [662]:



class State(TypedDict, total=False):
    date: str                          # which collection to use
    formula_included: bool             # true if formula/report is detected
    formula_list_included: list[dict]  # list of formula(s) matched from json
    query: str                         # MongoDB query generated
    result: Any                        # query result
    user_question: str 



In [663]:
date_prompt = PromptTemplate(
    template="""
You are an intelligent assistant that selects the correct MongoDB collection name.

Rules:
- If the user mentions "today" or "todays", return today's date in format YYYY-MM-DD.
- If the user mentions a specific date (e.g., 2025-08-01, 1st August 2025, Aug 1 2025), convert it to YYYY-MM-DD format.
- If no date is mentioned, return the default collection: {default_collection}.
- Output ONLY the date string, nothing else.

Today's Date: {today}

User Question: {user_question}
""",
    input_variables=["user_question", "default_collection", "today"]
)

In [664]:
code_prompt = PromptTemplate(
    template="""
You are an expert Python developer working with **MongoDB** data using PyMongo.

The MongoDB collection is available as a Python variable named `collection`.

⚠️ Important Rules:
- The collection name itself already corresponds to the correct date. NEVER add `$match` filters for dates.
- Only query/filter using relevant fields from the schema (NOT the date).
- You are provided with a list of sample formulas. 
  - If a user request matches one or more formulas, you must use those formulas as the basis for the query.
  - If the request requires adjustments, you may adapt the formula logic to fit the user’s intent.
  - If multiple formulas are included, you must combine them and structure the result accordingly.

⚠️ Do NOT return markdown, comments, or explanations. Output **only**:
result = ...
print(result)

Table Schema:
{table_schema}

Sample Formulas (with description and example outputs):
{included_formulas}

User Question:
{user_question}
""",
    input_variables=["user_question", "table_schema", "included_formulas"],
)

In [665]:
def date_agent(state: State) -> State:
    user_q = state.get("user_question", "")
    
    prompt_text = date_prompt.format(
        user_question=user_q,
        default_collection=DEFAULT_COLLECTION,
        today=datetime.now().strftime("%Y-%m-%d"),
    )
    
    raw = llm.invoke(prompt_text).content.strip()
    
    # validate
    # match = re.search(r"\d{4}-\d{2}-\d{2}", raw)
    # date_str = match.group(0) if match else DEFAULT_COLLECTION
    # if not mongo_agent.validate_collection(date_str):
    #     date_str = DEFAULT_COLLECTION
    
    state["date"] = raw
    return state

In [666]:


# Load formulas
with open("formulas.json", "r") as f:
    FORMULAS = json.load(f)

# Prompt
# formula_prompt = PromptTemplate.from_template("""
# You are an expert in parcel sorting systems and PLC-based data analysis. 
# Your task is to detect if the user explicitly requests an analysis formula.

# Available formulas: {formula_keys}

# User Question: {user_question}

# Strict Rules:
# - Return one or more formula names ONLY if the user clearly asks for that report or uses synonyms 
#   that exactly correspond to the available formulas.
# - If the request only partially matches (e.g., similar wording but not an exact match), return "NONE".
# - Multiple formulas may be selected if the user explicitly requests them. Separate with commas.
# - Do NOT invent or guess. Only output names from the provided list.
# - If nothing matches, output exactly "NONE".
# - Output must contain only the formula name(s) or "NONE" (no extra text).
# """)

formula_prompt = PromptTemplate.from_template("""
You are an expert in parcel sorting systems and PLC-based data analysis. 
Your task is to detect if the user explicitly requests an analysis formula.

Available formulas: {formula_keys}

User Question: {user_question}

Rules:
- Carefully interpret the user request in the context of parcel sorting and reporting.
- Select one or more formula names from the list if the request is clearly referring to them,
  even if the wording is slightly different (e.g., "sortation result" = "sort_report").
- Only output formula names that semantically match the user request.
- If none match, output exactly "NONE".
- Output must contain only the formula name(s) or "NONE" (no extra text).
""")


def check_if_formula_matches(state: State) -> State:
    user_q = state.get("user_question", "")
    
    prompt_text = formula_prompt.format(
        formula_keys=list(FORMULAS.keys()),
        user_question=user_q
    )
    
    raw = llm.invoke(prompt_text).content.strip()
    
    if raw.upper() == "NONE":
        state["formula_included"] = False
        state["formula_list_included"] = []
    else:
        matched = [name.strip() for name in raw.split(",") if name.strip() in FORMULAS]
        if matched:
            state["formula_included"] = True
            state["formula_list_included"] = [{m: FORMULAS[m]} for m in matched]
        else:
            state["formula_included"] = False
            state["formula_list_included"] = []
    
    return state


In [667]:
def formula_branch(state: State) -> str:
    """Route execution based on formula detection."""
    if state.get("formula_included", False):
        return "generate_query_from_formula"
    else:
        return "other_branch"   # e.g., general Q&A handler

In [668]:
from config import ConfigData
table_schema = ConfigData.TABLE_SCHEMA
schema_description = ConfigData.SCHEMA_DESCRIPTION

In [669]:
# code_prompt = """
# You are an expert MongoDB assistant.

# User question: {user_question}
# Table schema: {table_schema}
# Available formulas:
# {included_formulas}

# Rules:
# 1. Only return valid Python code.
# 2. Always assign the final query result to a variable called `result`.
# 3. If using `collection.find()`, wrap it with `list()` so that `result` is a list of documents.
# 4. Do NOT include print statements, comments, or markdown.
# 5. Assume `collection` is already a pymongo collection object.
# 6. Only output the Python code, nothing else.
# """

# def generate_query_from_formula(state: State) -> State:
#     user_q = state["user_question"]
#     included_formulas = state.get("formula_list_included", [])

#     # Format formulas nicely for LLM
#     formulas_text = "\n".join([str(f) for f in included_formulas]) if included_formulas else "None"

#     # Use the global code_prompt template
#     prompt_text = code_prompt.format(
#         user_question=user_q,
#         table_schema=table_schema,
#         included_formulas=formulas_text
#     )

#     # Call LLM
#     raw_code = llm.invoke(prompt_text).content.strip()

#     # Save to state
#     state["query"] = raw_code
#     return state
def generate_query_from_formula(state: State) -> State:
    user_q = state["user_question"]
    included_formulas = state.get("formula_list_included", [])

    # Format formulas nicely for LLM
    formulas_text = "\n".join([str(f) for f in included_formulas]) if included_formulas else "None"

    prompt_text = f"""
You are a MongoDB query generator.

User Question:
{user_q}

Table Schema:
{table_schema}

Included Formulas:
{formulas_text}

⚠️ Rules:
1. Only return valid Python code.
2. Do NOT use triple backticks (```).
3. Do NOT use print().
4. Always assign the final query result to a variable called `result`.
5. Assume you already have a variable `collection` which is a pymongo collection object.
6. Only output the Python code, nothing else.
7. Don't consider any date in the query; collection name itself is the date.
8. If using `collection.find()`, wrap it with `list()` so that `result` is a list of documents.


"""

    raw_code = llm.invoke(prompt_text).content.strip()
    raw_code = raw_code.replace("```", "").strip()

    # Ensure any collection.find() is wrapped with list() automatically
    if "collection.find(" in raw_code and not raw_code.strip().startswith("result = list("):
        raw_code = raw_code.replace("collection.find(", "list(collection.find(")
        if not raw_code.strip().startswith("result ="):
            raw_code = f"result = {raw_code}"

    state["query"] = raw_code
    return state


In [670]:
def other_branch(state: State) -> State:
    """
    Fallback branch when no formula is detected.
    For now, just echo the user question or mark it unresolved.
    """
    user_q = state.get("user_question", "")
    
    state["formula_included"] = False
    state["query"] = None
    state["result"] = f"No matching formula found for: '{user_q}'"
    
    return state

In [None]:


def execute_agent(state: State):
    code = state.get("query")
    if not code:
        return {"result": "❌ No query to execute."}

    collection = mongo_agent.get_default_collection()
    local_vars = {"collection": collection}
    try:
        exec(code, {}, local_vars)
        result = local_vars.get("result")
        print("🔎 Executed Result:", result)   # Debug print
        return {"result": result}              # <-- match your State
    except Exception as e:
        return {"result": f"❌ Error executing code: {e}"}




In [672]:


graph = StateGraph(State)


graph.add_node("date_agent", date_agent)
graph.add_node("check_if_formula_matches", check_if_formula_matches)
graph.add_node("generate_query_from_formula", generate_query_from_formula)
graph.add_node("execute_agent", execute_agent)   # new node to run queries
graph.add_node("other_branch", other_branch)     # fallback if no formula

# --- Entry ---
graph.set_entry_point("date_agent")

# --- Flow ---
graph.add_edge("date_agent", "check_if_formula_matches")

# Conditional branching: goes to query generator OR fallback
graph.add_conditional_edges(
    "check_if_formula_matches",
    formula_branch  # routes to "generate_query_from_formula" or "other_branch"
)

# If formula path → execute MongoDB query
graph.add_edge("generate_query_from_formula", "execute_agent")
graph.add_edge("execute_agent", END)

# If no formula → directly end
graph.add_edge("other_branch", END)

# --- Compile ---
app = graph.compile()


In [673]:

app = graph.compile()

In [674]:
result = app.invoke({"user_question": "Share the complete sortation result of the system 13 may ? "})

🔎 Executed Result: [{'_id': ObjectId('68af4735395283916b1d2c39'), 'hostId': '2027756', 'pic': 460, 'date': '2025-05-13', 'registerTS': '07:46:40,304', 'latestTS': '07:48:39,025', 'closedTS': '07:48:39,025', 'status': 'sorted', 'identificationTS': '07:47:30,837', 'identification_location': '1001.0041.0091', 'exitTS': '07:48:39,025', 'exit_location': '1001.0045.0040.B71', 'plc_number': 'PLC-1001', 'Registered_location': '1001.0023.0001.B71', 'customer_location': 'INF08', 'sort_strategy': '1', 'barcode_data': {'barcodes': ['05900056383516'], 'barcode_count': 1, 'barcode_state': 6}, 'barcode_error': False, 'alibi_id': '00000783320250513074918', 'volume_data': {'volume_state': 6, 'length': 300, 'width': 295, 'height': 200, 'box_volume': 17700, 'real_volume': 16745}, 'volume_error': False, 'item_state': None, 'actual_destination': '16', 'destinations': ['016'], 'destination_status': {'16': 1}, 'sort_code': 1, 'entrance_state': '2', 'exit_state': None, 'degister': None, 'events': [{'date': '2

In [675]:
result

{'date': '2025-05-13',
 'formula_included': True,
 'formula_list_included': [{'sort_report': 'result = collection.count_documents({"sort_code": 1})'}],
 'query': 'result = list(collection.find({}))',
 'result': [{'_id': ObjectId('68af4735395283916b1d2c39'),
   'hostId': '2027756',
   'pic': 460,
   'date': '2025-05-13',
   'registerTS': '07:46:40,304',
   'latestTS': '07:48:39,025',
   'closedTS': '07:48:39,025',
   'status': 'sorted',
   'identificationTS': '07:47:30,837',
   'identification_location': '1001.0041.0091',
   'exitTS': '07:48:39,025',
   'exit_location': '1001.0045.0040.B71',
   'plc_number': 'PLC-1001',
   'Registered_location': '1001.0023.0001.B71',
   'customer_location': 'INF08',
   'sort_strategy': '1',
   'barcode_data': {'barcodes': ['05900056383516'],
    'barcode_count': 1,
    'barcode_state': 6},
   'barcode_error': False,
   'alibi_id': '00000783320250513074918',
   'volume_data': {'volume_state': 6,
    'length': 300,
    'width': 295,
    'height': 200,
   