In [None]:
from langgraph.graph import StateGraph, END
from typing import TypedDict, List ,  Optional
from langchain_community.vectorstores import FAISS
from langchain.retrievers import BM25Retriever, EnsembleRetriever
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_core.documents import Document 
import pandas as pd
import duckdb
import re
from openai import OpenAI
import os
from sqlglot import parse_one
import json

#  Step 1: Define paths
csv_path = r"D:\RAG Task\Client_Shipment_Orders.csv"
db_path = r"D:\RAG Task\orders.duckdb"
faiss_index_path = r"D:\RAG Task\faiss_index"

#  Step 2: Create or update database table (runs only once)
with duckdb.connect(db_path) as con:
    # Create the 'orders' table if not already present
    con.execute(f"""
        CREATE TABLE IF NOT EXISTS orders AS
        SELECT * FROM read_csv_auto('{csv_path}');
    """)
    # Optional: Refresh data if you've updated CSV
    # con.execute(f"DELETE FROM orders; INSERT INTO orders SELECT * FROM read_csv_auto('{csv_path}');")

#  Step 3: Load DataFrame safely for local use
with duckdb.connect(db_path) as con:
    df = con.execute("SELECT * FROM orders").fetchdf()

#  Step 4: Clean / normalize text columns
for col in df.select_dtypes(include=["object"]).columns:
    df[col] = df[col].astype(str).str.strip().str.title()

sql_system_prompt = """
You are a SQL expert helping to query a DuckDB table named `orders`.

----------------------------------
TABLE INFORMATION
----------------------------------
Table name: orders  
Columns and their meanings:
- Order ID: Unique identifier for each order (text)
- Client Name: Name of the customer who placed the order (text)
- Email: Email address of the client (text)
- Contact Number: Client's contact phone number (text)
- Origin: Source location of the shipment (text)
- Destination: Delivery location of the shipment (text)
- Product Name: Name of the purchased product (text)
- Category: Product category (e.g., Furniture, Decor, Appliances)
- Material: Material type of the product (e.g., Wood, Glass, Metal)
- Color: Color of the product (text)
- Quantity: Number of units ordered (integer)
- Unit Price (‚Çπ): Price per unit in INR (numeric)
- Total Price (‚Çπ): Total order price in INR (numeric)
- Order Date: Date when the order was placed (date)
- Delivery Date: Date when the order was delivered (date)
- Status: Order status (e.g., Delivered, Pending, Cancelled)

----------------------------------
SAMPLE DATA
----------------------------------
ORD0001 | Kara Mata | chelsea75@yahoo.com | 038.830.3017x8206 | Port Mariamouth | Cohenmouth | Wall Art | Decor | Glass | Grey | 15 | 29878 | 448170 | 2025-05-13 | 2025-06-02 | Cancelled  
ORD0002 | Jesse Williams | ccasey@barrett.info | (426)505-2355 | Tamaraview | Lake Rickyport | Bed | Furniture | Glass | Brown | 30 | 1507 | 45210 | 2025-10-04 | 2025-11-03 | Cancelled  

----------------------------------
INSTRUCTIONS
----------------------------------
1. Generate SQL queries **only** for structured or numeric filters.
   Examples:
   - Total sales, sum, count, average, quantity, or price-based questions  
   - Filtering by columns such as Status, Category, Material, or Color  
   - Date-based filters (e.g., orders after 2025-05-01)

2. **Do NOT** generate queries based on subjective or descriptive logic
   such as reasons for cancellation, customer feedback, or preferences.
   These are handled separately by a semantic retriever system.

3. Use the correct table name `orders` and column names **exactly as shown**.
   Preserve proper case and special characters (e.g., `"Total Price (‚Çπ)"`).

4. Never hallucinate columns, tables, or calculations that do not exist.

5. Return **only** the SQL query ‚Äî no markdown, comments, or explanations.

6. **SAFETY RULES ‚Äî STRICTLY ENFORCED**
   - Never modify or delete data.
   - Do not use or suggest `UPDATE`, `DELETE`, `INSERT`, `DROP`, `TRUNCATE`, or `ALTER`.
   - Do not create or alter schemas, indexes, or tables.
   - Only allow read-only operations:  
     `SELECT`, `WHERE`, `GROUP BY`, `ORDER BY`, `LIMIT`, and aggregate functions (`COUNT`, `SUM`, `AVG`, `MIN`, `MAX`).

7. **Case Handling:**  
   When matching text values (like product or status), use `LOWER()` to make comparisons case-insensitive.  
   Example:  
   `WHERE LOWER("Product Name") = LOWER('Toilet Bowl')`

8. **Special Handling ‚Äî Highest or Maximum Queries:**  
   If the user asks questions like  
   *"Who made the highest purchase?"*,  
   *"Which client has the largest total?"*, or  
   *"Top buyer / maximum purchase amount"*,  
   use this pattern to avoid grouping errors:
   ```sql
   SELECT "Client Name", "Total Price (‚Çπ)"
   FROM orders
   WHERE "Total Price (‚Çπ)" = (
       SELECT MAX("Total Price (‚Çπ)") FROM orders
   );
"""

#  ROW-WISE LABELLED CHUNK GENERATION
def generate_labelled_chunks(csv_path):
    """Creates labelled text chunks from each row for embeddings."""
    df = pd.read_csv(csv_path)
    chunks = []
    for index, row in df.iterrows():
        labelled_text = f"Row ID: {index}\n"
        for col in df.columns:
            labelled_text += f"{col}: {row[col]}\n"
        chunks.append(labelled_text.strip())
    return df, chunks


df, labelled_chunks = generate_labelled_chunks(csv_path)

documents = [Document(page_content=chunk) for chunk in labelled_chunks]

print(f" Generated {len(labelled_chunks)} labelled chunks for embeddings.")

#  Initialize embeddings
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

#  Create or load FAISS vector store
if os.path.exists(faiss_index_path):
    print(" Loading existing FAISS index...")
    try:
        vector_store = FAISS.load_local(
            faiss_index_path, 
            embeddings,
            allow_dangerous_deserialization=True
        )
        print(" FAISS index loaded successfully!")
    except Exception as e:
        print(f"‚ö†Ô∏è Error loading FAISS index: {e}")
        print(" Creating new FAISS index...")
        vector_store = FAISS.from_documents(documents, embeddings)
        vector_store.save_local(faiss_index_path)
        print(" New FAISS index created and saved!")
else:
    print(" Creating new FAISS index...")
    vector_store = FAISS.from_documents(documents, embeddings)
    # Create directory if it doesn't exist
    os.makedirs(faiss_index_path, exist_ok=True)
    vector_store.save_local(faiss_index_path)
    print(" FAISS index created and saved!")

#  Set up retrievers
vector_retriever = vector_store.as_retriever(search_kwargs={"k": 15})
keyword_retriever = BM25Retriever.from_documents(documents)
keyword_retriever.k = 15

hybrid_retriever = EnsembleRetriever(
    retrievers=[vector_retriever, keyword_retriever],
    weights=[0.6, 0.4]
)
 
class GraphStateRequired(TypedDict):
    question: str  # Always required
    
class GraphState(GraphStateRequired, total=False):
    intent: str
    context: List[str]
    answer: str
    sql_query: str
    validation_error: Optional[str]


def intent_node(state: GraphState,config = None) -> GraphState:
    """Use LLM to classify query intent based on the Orders dataset"""
    question = state["question"]
    intent_prompt = f"""
        You are an intent classifier for user questions over an **Orders dataset**.
        The table has the following columns:
        Order ID, Client Name, Email, Contact Number, Origin, Destination,
        Product Name, Category, Material, Color, Quantity, Unit Price (‚Çπ),
        Total Price (‚Çπ), Order Date, Delivery Date, Status.

        RULE (important): If the user asks about *popularity, trends, likelihood, aesthetics, emotional value, suitability, or "best suited"*, treat the question as **semantic** ‚Äî even if the question mentions column names like Category or Product. Those words request interpretation, not a raw SQL lookup.

        Your task is to classify the user's question into one of the following intents:

        ---

        1. **numeric** ‚Üí The question can be answered using structured, factual, or count-based data directly from the dataset.
        Includes lookups, filters, conditions, or measurable aggregations.

         Examples:
        - "How many orders are pending?"
        - "List all clients whose orders are cancelled."
        - "Show orders where quantity > 10."
        - "Who bought d√©cor items?"
        - "List clients who ordered furniture and curtains."
        - "Show total sales amount from Bangalore."
        - "Names of clients who placed multiple orders."
        - "List all clients whose orders are cancelled and who prefer d√©cor items."
        - "What is the origin of Kara Mata?"    <-- entity attribute lookup ‚Üí numeric

         NOTE: Entity attribute lookups (e.g., "what is the product Jesse Williams ordered?") count as **numeric**.

        ---

        2. **semantic** ‚Üí The question requires interpretation, reasoning, or subjective understanding
        that cannot be directly derived from the dataset‚Äôs structured fields.
        These questions involve opinions, trends, likelihood, aesthetics or suitability.

         Examples:
        - "Which customers are likely to be loyal customers?"
        - "Which products seem to be trending this month?"
        - "Which items are best suited for festive seasons?"
        - "Which products have emotional or aesthetic value?"
        - "What categories are most popular among new customers?"
        - "Which products are considered budget-friendly?"
        - "Which customers might recommend our products?"
        - "Which destinations appear to be popular among high-value clients?"

        KEY: If the phrasing contains words like *popular, trending, likely, seem, appear, best suited, emotional, aesthetic, preference, suitability* ‚Äî treat as **semantic**.

        ---

        3. **hybrid** ‚Üí The question contains both numeric and semantic parts.

         Examples:
        - "What is the total sales amount, and which products are most popular in premium homes?"
        - "Count d√©cor orders and explain which clients usually place them."

        ---

        4. **greet** ‚Üí Simple greetings (Hi, Hello, etc.)

        ---

        5. **ignore** ‚Üí Questions unrelated to the dataset.

        Question: {question}

        Return only one word: numeric, semantic, hybrid, greet, or ignore.
        """


    intent = llm.invoke(intent_prompt).content.strip().lower()
    print(f" Detected Intent: {intent}")
    state["intent"] = intent
    return state


def greet_node(state: GraphState,config = None)-> GraphState:
    state["answer"] = "Hello üëã! How can I assist you with the order data today?"
    return state


def ignore_node(state: GraphState , config = None) -> GraphState:
    state["answer"] = "I'm designed to answer questions about the order dataset. Please ask something related."
    return state


llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

def retriever_node(state: GraphState,config = None)-> GraphState:
    question = state["question"]
    try:
        retrieved_chunks = hybrid_retriever.invoke(question)
        context = "\n".join([doc.page_content for doc in retrieved_chunks])
        prompt = f"Context:\n{context}\n\nQuestion: {question}\nAnswer briefly:"
        answer = llm.invoke(prompt).content.strip()
       
        state["answer"] = answer
    except Exception as e:
        state["answer"] = f"Error using retriever: {e}"
    return state


VALID_COLUMNS = [
    "Order ID", "Client Name", "Email", "Contact Number",
    "Origin", "Destination", "Product Name", "Category",
    "Material", "Color", "Quantity", "Unit Price (‚Çπ)",
    "Total Price (‚Çπ)", "Order Date", "Delivery Date", "Status"
]

def sql_validator_node(state: dict):
    """Validates generated SQL to ensure it's safe and valid for DuckDB execution"""
    sql_query = state.get("sql_query", "").strip()
    print(f" Validating SQL query: {sql_query}")

    # 1. Ensure it's a SELECT query
    if not sql_query.lower().lstrip().startswith("select"):
        state["validation_error"] = " Only SELECT queries are allowed."
        return state

    # 2. Block dangerous operations
    forbidden_keywords = ["insert", "update", "delete", "drop", "alter", "truncate", "create"]
    if any(kw in sql_query.lower() for kw in forbidden_keywords):
        state["validation_error"] = (
            f" Unsafe SQL operation detected. "
            f"Keywords like {', '.join(forbidden_keywords)} are not allowed."
        )
        return state

    # 3. Validate quoted column names used as actual columns (ignore aliases produced by AS)
    quoted_names = re.findall(r'"(.*?)"', sql_query)
    for name in quoted_names:
        # if name exactly matches a real column, OK
        if name in VALID_COLUMNS:
            continue

        # if the quoted name is used as an alias (AS "name"), skip validation
        alias_pattern = re.search(r'\bAS\s+"?' + re.escape(name) + r'"?', sql_query, flags=re.IGNORECASE)
        if alias_pattern:
            # it's an alias; safe to ignore
            continue

        # otherwise it's an invalid column name
        state["validation_error"] = f" Invalid column name used: '{name}'."
        return state

    # Passed all checks
    state["validation_error"] = None
    print(" SQL validation passed.")
    return state


def duckdb_node(state: GraphState,config = None)-> GraphState:
    """Handles numeric/structured questions ‚Äî validates SQL with SQLGlot before executing"""
    query = state["question"]

    try:
        #  Step 1: Ask LLM to generate SQL
        sql_prompt = f"{sql_system_prompt}\nUser question: {query}\nSQL:"
        sql_query = llm.invoke(sql_prompt).content.strip()

        #  Step 2: Clean LLM formatting
        sql_query = (
            sql_query.replace("```sql", "")
                     .replace("```", "")
                     .replace("`", "")
                     .replace("SQL:", "")
                     .strip()
        )

        print(f"\n Generated SQL query:\n{sql_query}")
        state["sql_query"] = sql_query

        #  Step 3: Syntax validation using SQLGlot
        try:
            parse_one(sql_query)
            print(" SQLGlot syntax check passed.")
        except Exception as parse_err:
            state["answer"] = f"‚ö†Ô∏è SQL syntax error detected: {parse_err}"
            return state

        #  Step 4: Custom SQL safety validation
        validation_state = sql_validator_node(state)
        if validation_state.get("validation_error"):
            state["answer"] = validation_state["validation_error"]
            return state

        #  Step 5: Safe execution inside a local DuckDB context
        try:
            with duckdb.connect(db_path) as con:
                result_df = con.execute(sql_query).fetchdf()

        except Exception as exec_err:
            state["answer"] = f"‚ö†Ô∏è SQL execution failed: {exec_err}"
            return state

        if result_df.empty:
            state["answer"] = "No matching records found."
            return state

        #  Step 6: Convert results to plain text
        result_text = result_df.to_string(index=False)

        #  Step 7: Generate human-readable summary
        summary_prompt = f"""
        The user asked: {query}
        The SQL result is:
        {result_text}

        Write a natural, clear explanation of these results.
        Avoid skipping rows or making assumptions.
        """
        answer = llm.invoke(summary_prompt).content.strip()
        state["answer"] = answer

    except Exception as e:
        state["answer"] = f"Error executing SQL: {str(e)}"

    return state


def hybrid_node(state: GraphState):
    """
    Handles hybrid, multi-intent queries that may contain:
      - multiple numeric sub-queries (SQL-based)
      - multiple semantic sub-queries (LLM-based)
      - or a mix of both.
    Ensures separate execution and clear structured output.
    """
    question = state["question"]
    print(f"\nüîÄ [Hybrid Node] Received question ‚Üí {question}")

    # 1Ô∏è‚É£ SPLITTING PHASE ‚Äî Identify all numeric & semantic sub-questions
    
    split_prompt = f"""
    You are a professional query decomposition assistant.
    Split the following user question into atomic sub-questions.

    Each sub-question should be labeled as:
      - numeric ‚Üí if it can be answered using SQL filters, counts, or aggregates (explicit SQL lookups or attribute lookups e.g., "what is the product Jesse Williams ordered?")
      - semantic ‚Üí if it requires descriptive, interpretive, or trend-based reasoning (popularity, trending, suitability, emotional/aesthetic value).

    Important rule: If a sub-question contains words like
    ["popular", "trending", "likely", "seem", "appear", "best suited", "emotional", "aesthetic", "budget-friendly", "suitable for", "preference", "preference for", "trend"],
    classify that sub-question as **semantic** (these require interpretation), even if they mention columns like Category or Product.

    Return JSON strictly in this format:
    {{
      "numeric_parts": [ "..." ],
      "semantic_parts": [ "..." ],
      "dependent": true/false
    }}

    Examples:
    1. "List all clients whose orders are cancelled and list the clients who prefer decor items"
    ‚Üí {{
      "numeric_parts": [
        "List all clients whose orders are cancelled",
        "List all clients who prefer decor items"
      ],
      "semantic_parts": [],
      "dependent": false
    }}

    2. "What categories are most popular among new customers and Which products have emotional or aesthetic value?"
    ‚Üí {{
      "numeric_parts": [],
      "semantic_parts": [
        "What categories are most popular among new customers",
        "Which products have emotional or aesthetic value"
      ],
      "dependent": false
    }}

    Question: {question}
    """

    try:
        split_result = llm.invoke(split_prompt).content
        split_result = split_result.replace("```json", "").replace("```", "").strip()
        print(f" Raw Split Result: {split_result}")
        split_data = json.loads(split_result)
    except Exception as e:
        print(f"‚ö†Ô∏è Error parsing split result: {e}")
        split_data = {"numeric_parts": [], "semantic_parts": [], "dependent": False}

    numeric_parts = split_data.get("numeric_parts", [])
    semantic_parts = split_data.get("semantic_parts", [])
    dependent = split_data.get("dependent", False)

    print(f"‚úÖ Parsed numeric part(s): {numeric_parts}")
    print(f"‚úÖ Parsed semantic part(s): {semantic_parts}")
    print(f"üîó Dependency flag: {dependent}")

    # 2Ô∏è‚É£ EXECUTE NUMERIC SUB-QUERIES
    numeric_results = []
    if numeric_parts:
        print(f"üî¢ Executing {len(numeric_parts)} numeric subquery(ies)...")
        for i, sub_q in enumerate(numeric_parts, 1):
            print(f"\n Numeric Sub-query {i}: {sub_q}")
            temp_state = {"question": sub_q, "intent": "numeric", "context": [], "answer": ""}
            try:
                numeric_state = duckdb_node(temp_state)
                result = numeric_state.get("answer", "")
                numeric_results.append({
                    "subquery": sub_q,
                    "result": result
                })
            except Exception as e:
                print(f" Error in numeric sub-query {i}: {e}")

    # 3Ô∏è‚É£ EXECUTE SEMANTIC SUB-QUERIES
    semantic_results = []
    if semantic_parts:
        print(f" Executing {len(semantic_parts)} semantic subquery(ies)...")
        for i, sub_q in enumerate(semantic_parts, 1):
            print(f"\n Semantic Sub-query {i}: {sub_q}")
            temp_state = {"question": sub_q, "intent": "semantic", "context": [], "answer": ""}
            try:
                semantic_state = retriever_node(temp_state)
                result = semantic_state.get("answer", "")
                semantic_results.append({
                    "subquery": sub_q,
                    "result": result
                })
            except Exception as e:
                print(f" Error in semantic sub-query {i}: {e}")

    # 4Ô∏è‚É£ FORMAT & MERGE RESULTS CLEARLY
    combined_output = []

    if numeric_results:
        for i, entry in enumerate(numeric_results, 1):
            combined_output.append(
                f" **Numeric Result {i}:** {entry['subquery']}\n{entry['result']}\n"
            )

    if semantic_results:
        for i, entry in enumerate(semantic_results, 1):
            combined_output.append(
                f" **Semantic Result {i}:** {entry['subquery']}\n{entry['result']}\n"
            )

    if not combined_output:
        combined_output = ["‚ö†Ô∏è No valid results found for this query."]

    final_answer = "\n".join(combined_output)
    print(f"\n‚úÖ Final Combined Answer:\n{final_answer}")

    # Store cleanly into state
    state["answer"] = final_answer
    return state

graph = StateGraph(GraphState)
graph.add_node("intent", intent_node)
graph.add_node("greet", greet_node)
graph.add_node("ignore", ignore_node)
graph.add_node("duckdb", duckdb_node)
graph.add_node("retriever", retriever_node)
graph.add_node("hybrid", hybrid_node)

graph.set_entry_point("intent")

graph.add_conditional_edges(
    "intent",
    lambda state: state["intent"],
    {
        "greet": "greet",
        "ignore": "ignore",
        "numeric": "duckdb",
        "semantic": "retriever",
        "hybrid": "hybrid",
    },
)

graph.add_edge("greet", END)
graph.add_edge("ignore", END)
graph.add_edge("duckdb", END)
graph.add_edge("retriever", END)
graph.add_edge("hybrid", END)

app = graph.compile()

if __name__ == "__main__":
    print("\n Smart Query Assistant ready! Type 'exit' to quit.\n")

    while True:
        try:
            user_input = input("You: ").strip()
            print(f"You: {user_input}")
            if user_input.lower() in ["exit", "quit"]:
                print("Assistant: Goodbye !")
                break

            result = app.invoke({"question": user_input})
            print(f"Assistant: {result['answer']}\n")
        
        except KeyboardInterrupt:
            print("\nAssistant: Goodbye ! ")
            break
        except Exception as e:
            print(f" Error: {str(e)}")
            print("Please try again with a different question.\n")

 Generated 50 labelled chunks for embeddings.
 Loading existing FAISS index...
 FAISS index loaded successfully!

 Smart Query Assistant ready! Type 'exit' to quit.

You: Why are certain colors more popular among buyers and how many products in those colors were sold?
 Detected Intent: hybrid

üîÄ [Hybrid Node] Received question ‚Üí Why are certain colors more popular among buyers and how many products in those colors were sold?
 Raw Split Result: {
  "numeric_parts": [
    "How many products in those colors were sold"
  ],
  "semantic_parts": [
    "Why are certain colors more popular among buyers"
  ],
  "dependent": false
}
‚úÖ Parsed numeric part(s): ['How many products in those colors were sold']
‚úÖ Parsed semantic part(s): ['Why are certain colors more popular among buyers']
üîó Dependency flag: False
üî¢ Executing 1 numeric subquery(ies)...

 Numeric Sub-query 1: How many products in those colors were sold

 Generated SQL query:
SELECT SUM("Quantity") AS total_products_sold
