# 🤖 Project Overview: RecoMind

This notebook is a proof-of-concept for the core functionality of RecoMind, an AI-powered tool that converts natural language requests into actionable insights from a SQL database. The agent's workflow is structured into a LangGraph pipeline with three main nodes.

---

### The Three Nodes of the Agent

This project is built around a three-step pipeline to handle user requests from start to finish.

#### 1. Node 1: User Key 🗣️
This is the starting point where the user inputs their request in plain, natural language. The agent receives this input and prepares it for the next step.

#### 2. Node 2: SQL Query Generation 🧠
The core of the agent. An LLM (Large Language Model) processes the user's natural language request and the database's schema to generate a correct and efficient SQL query.

#### 3. Node 3: Database Query Execution ⚙️
The final step where the generated SQL query is executed on the live database (in our case, the Azure SQL Database) to retrieve the requested data. The agent then returns the results to the user.

## Environment setup and database connection

In [1]:
# Import core libraries
import os
import pyodbc
from dotenv import load_dotenv

# Import LangChain components for LLM, SQL, and Agents
from langchain_community.chat_models import ChatOpenAI
from langchain_community.utilities import SQLDatabase
from langchain_community.agent_toolkits import SQLDatabaseToolkit
from langchain.chains import create_sql_query_chain
from langchain.agents.agent_types import AgentType

# Import LangGraph components
from langgraph.graph import StateGraph, END
from typing import TypedDict

# Load environment variables from the .env file
load_dotenv()

# Get the API key from the environment variables
openrouter_key = os.getenv("OPENROUTER_API_KEY")

# Set the API key in the environment for the LLM
os.environ["OPENROUTER_API_KEY"] = openrouter_key

print("Environment and libraries loaded successfully.")

Environment and libraries loaded successfully.


Database Connectition Testing

In [None]:
# Get database credentials from environment variables
db_password = os.getenv("DB_PASSWORD")

# Connection info
server = "RecoMind.mssql.somee.com"
database = "AdventureWorks2022"
username = "RecoMind_SQLLogin_1"
password = db_password
driver = '{ODBC Driver 17 for SQL Server}'

# Connection string
conn_str = f"""
    DRIVER={driver};
    SERVER={server};
    DATABASE={database};
    UID={username};
    PWD={password};
    TrustServerCertificate=Yes;
    Packet Size=4096;
"""

try:
    # Establish connection
    print("Attempting to connect to the database...")
    conn = pyodbc.connect(conn_str)
    print("Connection successful! ✅")
    
except Exception as e:
    print(f"Connection failed! ❌")
    print(f"Error details: {e}")
    
finally:
    # Close connection
    if 'conn' in locals() and conn:
        conn.close()
        print("Connection closed.")

Attempting to connect to the database...
Connection successful! ✅
Connection closed.


# 🧠 Define the Agent Graph
This section defines the core components and the nodes of our LangGraph pipeline.
## Define Graph State and Components
We'll define a shared state object that holds all the data as it moves through the graph. We also initialize the database connection and the LLM.

In [None]:
# --- Define the LLM ---
model_name = "meta-llama/llama-3.3-70b-instruct:free"
llm = ChatOpenAI(
    model=model_name,
    openai_api_base="https://openrouter.ai/api/v1",
    openai_api_key=os.environ["OPENROUTER_API_KEY"]
)

# 🛠️ Build and Run the Graph
This final section builds the nodes' logic, compiles the graph, and runs the entire process with a sample user input.
## Build the Nodes
Each function below represents a node in our graph, handling a specific task.

In [None]:
# Define the state schema for the graph
class RecoMindState(TypedDict):
    """
    Represents the state of the graph.
    """
    user_key: str
    cleaned_input: str
    db_schema: str
    selected_schema: str  
    generated_sql: str
    query_results: str

In [None]:
#
# --- Node 1: User Input & Manual Table Loading ---
#
def user_input_node(state: RecoMindState):
    """
    Takes the user key, loads all database table names, and stores them in the state.
    """
    user_key = state.get("user_key", "")
    
    db_schema = "Error: Database connection or schema loading failed."
    try:
        db_password = os.getenv("DB_PASSWORD")
        server = "recomind.database.windows.net"
        database = "AdventureWorks2014"
        username = "adminhossam"
        
        conn_string = (
            f"DRIVER={{ODBC Driver 17 for SQL Server}};"
            f"SERVER={server};DATABASE={database};UID={username};PWD={db_password}"
        )
        
        cnxn = pyodbc.connect(conn_string)
        cursor = cnxn.cursor()
        
        print("Connecting to the database and loading all table names...")
        cursor.execute("SELECT name FROM sys.tables")
        tables = cursor.fetchall()
        tables_list = [table[0] for table in tables]
        db_schema = ", ".join(tables_list)
        print("Database schema (table names) loaded successfully.")
    
    except Exception as e:
        print(f"Error: Database connection or schema loading failed. {e}")
        
    prompt_map = {
        "sales": "show me the total sales data and group it by product",
        "employees": "get a list of all employees and their job titles",
        "customers": "list all customers and their contact information"
    }
    
    cleaned_prompt = prompt_map.get(user_key.lower().strip(), user_key)
    
    print(f"User key received: '{user_key}'. Cleaned prompt generated: '{cleaned_prompt}'")
    
    return {
        **state,
        "cleaned_input": cleaned_prompt,
        "db_schema": db_schema
    }

In [None]:
#
# --- Node 2: Column Discovery ---
#
def column_discovery_node(state: RecoMindState):
    """
    Identifies relevant tables based on the user's key and loads their full schema (columns).
    """
    user_key = state["user_key"]
    
    # Mapping user key to relevant tables, with schema names included
    table_map = {
        "sales": ["Sales.SalesOrderHeader", "Sales.SalesOrderDetail", "Production.Product", "Sales.Customer"],
        "employees": ["HumanResources.Employee", "Person.Person", "HumanResources.JobCandidate"],
        "customers": ["Sales.Customer", "Person.Person", "Person.Address"]
    }
    
    relevant_tables = table_map.get(user_key, [])
    
    selected_schema = ""
    try:
        print("\nConnecting to the database to get schema for selected tables...")
        db_password = os.getenv("DB_PASSWORD")
        server = "recomind.database.windows.net"
        database = "AdventureWorks2014"
        username = "adminhossam"
        connection_string = f"mssql+pyodbc:///?odbc_connect=DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={server};DATABASE={database};UID={username};PWD={db_password}"
        
        # We must create a new SQLDatabase instance with only the selected tables
        db = SQLDatabase.from_uri(connection_string, include_tables=relevant_tables)
        
        # Get the schema for these selected tables
        selected_schema = db.get_context()
        print("Schema for selected tables loaded successfully.")
    except Exception as e:
        print(f"Error loading selected schema: {e}")
        selected_schema = "Error: Failed to load selected schema."
        
    return {
        **state,
        "selected_schema": selected_schema
    }

In [None]:
#
# --- Node 3: SQL Query Generator & Executor ---
#
def sql_agent_node(state: RecoMindState):
    """
    Uses the LLM to generate a SQL query based on the detailed schema, then executes it.
    """
    # Connect to the DB instance for query execution
    db_password = os.getenv("DB_PASSWORD")
    server = "recomind.database.windows.net"
    database = "AdventureWorks2014"
    username = "adminhossam"
    connection_string = f"mssql+pyodbc:///?odbc_connect=DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={server};DATABASE={database};UID={username};PWD={db_password}"
    db = SQLDatabase.from_uri(connection_string)

    cleaned_input = state["cleaned_input"]
    selected_schema = state["selected_schema"]

    # Use the LLM to generate a SQL query
    sql_query_chain = create_sql_query_chain(llm, db)

    print("\nGenerating SQL query based on the selected schema...")
    
    # The LLM will now use the more detailed schema to generate a correct query
    generated_sql = sql_query_chain.invoke({"question": cleaned_input, "schema": selected_schema})

    # Execute the generated SQL query
    try:
        print("\n" + "#" * 50)
        print("THE GENERATED SQL QUERY:")
        print(generated_sql)
        print("#" * 50 + "\n")
        query_results = db.run(generated_sql)
    except Exception as e:
        print(f"Error executing query: {e}")
        query_results = f"An error occurred during query execution. The query was: {generated_sql}"
        
    print("Query execution complete. Results received.")
    
    return {**state, "generated_sql": generated_sql, "query_results": str(query_results)}

## Compile and Run the Graph
This cell brings everything together to run the full workflow.

In [None]:
#
# --- This cell builds and runs the LangGraph ---
#

# Build the graph
workflow = StateGraph(RecoMindState)

# Add nodes to the graph
workflow.add_node("user_input", user_input_node)
workflow.add_node("sql_agent", sql_agent_node)

# Define the graph's entry point
workflow.set_entry_point("user_input")

# Define the graph's edges (connections between nodes)
workflow.add_edge("user_input", "sql_agent")
workflow.add_edge("sql_agent", END)

# Compile the graph
app = workflow.compile()

# Run the graph with a sample key
print("Starting the RecoMind Agent Graph...")
final_state = app.invoke({"user_key": "sales"})

print("\n--- Final Graph State ---")
print(final_state)

Starting the RecoMind Agent Graph...


Connecting to the database and loading all table names...
Database schema (table names) loaded successfully.
User key received: 'sales'. Cleaned prompt generated: 'show me the total sales data and group it by product'


  self._metadata.reflect(
  self._metadata.reflect(
  self._metadata.reflect(
  self._metadata.reflect(
  self._metadata.reflect(


KeyError: 'selected_schema'

In [None]:
#
# --- This cell handles all initial setup ---
#

# Import core libraries
import os
import pyodbc
from dotenv import load_dotenv
from typing import TypedDict
from langchain_community.chat_models import ChatOpenAI
from langchain_community.utilities import SQLDatabase
from langchain.chains import create_sql_query_chain
from langgraph.graph import StateGraph, END
from langchain_core.prompts import PromptTemplate

# Load environment variables from the .env file
load_dotenv()

# Get the API key from the environment variables
openrouter_key = os.getenv("OPENROUTER_API_KEY")

# Set the API key in the environment for the LLM
os.environ["OPENROUTER_API_KEY"] = openrouter_key

print("Environment and libraries loaded successfully.")

# Define the state schema for the graph
class RecoMindState(TypedDict):
    """
    Represents the state of the graph.
    """
    user_key: str
    cleaned_input: str
    db_schema: str
    selected_tables: str
    detailed_schema: str
    generated_sql: str
    query_results: str

# Define the LLM
model_name = "mistralai/mixtral-8x7b-instruct"
llm = ChatOpenAI(
    model=model_name,
    openai_api_base="https://openrouter.ai/api/v1",
    openai_api_key=os.environ["OPENROUTER_API_KEY"]
)

Environment and libraries loaded successfully.


In [None]:
#
# --- Node 1: User Input & Manual Table Loading ---
#
def user_input_node(state: RecoMindState):
    """
    Takes the user key, loads all database table names, and stores them in the state.
    """
    user_key = state.get("user_key", "")
    
    db_schema = "Error: Database connection or schema loading failed."
    try:
        db_password = os.getenv("DB_PASSWORD")
        server = "recomind.database.windows.net"
        database = "AdventureWorks2014"
        username = "adminhossam"
        
        conn_string = (
            f"DRIVER={{ODBC Driver 17 for SQL Server}};"
            f"SERVER={server};DATABASE={database};UID={username};PWD={db_password}"
        )
        
        cnxn = pyodbc.connect(conn_string)
        cursor = cnxn.cursor()
        
        print("Connecting to the database and loading all table names...")
        cursor.execute("SELECT name FROM sys.tables")
        tables = cursor.fetchall()
        tables_list = [table[0] for table in tables]
        db_schema = ", ".join(tables_list)
        print("Database schema (table names) loaded successfully.")
    
    except Exception as e:
        print(f"Error: Database connection or schema loading failed. {e}")
        
    print(f"User key received: '{user_key}'.")
    
    return {
        **state,
        "cleaned_input": user_key,
        "db_schema": db_schema
    }


#
# --- Node 2: LLM Table Selector (Corrected) ---
#
def llm_table_selector_node(state: RecoMindState):
    """
    Uses the LLM to select the top 3 relevant tables from the full schema.
    """
    user_key = state["user_key"]
    all_tables_string = state["db_schema"]
    
    # Updated prompt to be more specific and request a limited number of tables
    prompt_template = """
    Based on the user's request, select the top 3 most relevant table names from the following comma-separated list.
    
    List of all tables: {all_tables_string}
    
    User's request key: {user_key}
    
    Respond with a comma-separated list of only the table names, with no extra text or explanations.
    """
    
    prompt = PromptTemplate.from_template(prompt_template)
    chain = prompt | llm
    
    print("\nUsing LLM to select relevant tables from the full schema...")
    
    llm_response = chain.invoke({"all_tables_string": all_tables_string, "user_key": user_key})
    selected_tables_string = llm_response.content
    
    print(f"LLM selected these tables: {selected_tables_string}")
    
    # We will clean the output string here to be safe
    selected_tables_string_cleaned = selected_tables_string.strip().replace("\n", "").replace(" ", "")
    
    return {**state, "selected_tables": selected_tables_string_cleaned}


#
# --- Node 3: Column Discovery ---
#
def column_discovery_node(state: RecoMindState):
    """
    Fetches the detailed schema (columns) for the selected tables.
    """
    selected_tables_string = state["selected_tables"]
    if not selected_tables_string:
        return {**state, "detailed_schema": "Error: No tables were selected."}

    selected_tables_list = [table.strip() for table in selected_tables_string.split(",")]
    
    # Add schema prefixes to the table names
    prefix_map = {
        "sales": "Sales",
        "employees": "HumanResources",
        "customers": "Sales"
    }
    prefix = prefix_map.get(state["user_key"], "dbo")
    
    prefixed_tables = [f"{prefix}.{t}" for t in selected_tables_list]

    detailed_schema = "Error: Failed to load detailed schema."
    try:
        print("\nConnecting to the database to get detailed schema for selected tables...")
        db_password = os.getenv("DB_PASSWORD")
        server = "recomind.database.windows.net"
        database = "AdventureWorks2014"
        username = "adminhossam"
        connection_string = f"mssql+pyodbc:///?odbc_connect=DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={server};DATABASE={database};UID={username};PWD={db_password}"
        
        db = SQLDatabase.from_uri(connection_string, include_tables=prefixed_tables)
        detailed_schema = db.get_context()
        
        print("Detailed schema loaded successfully.")
        
    except Exception as e:
        print(f"Error loading detailed schema: {e}")
        detailed_schema = f"Error: Failed to load detailed schema. Details: {e}"
        
    return {**state, "detailed_schema": detailed_schema}


#
# --- Node 4: SQL Agent ---
#
def sql_agent_node(state: RecoMindState):
    """
    Generates and executes the final query based on the detailed schema.
    """
    cleaned_input = state["cleaned_input"]
    detailed_schema = state["detailed_schema"]

    if detailed_schema.startswith("Error"):
        generated_sql = ""
        query_results = detailed_schema
    else:
        try:
            db_password = os.getenv("DB_PASSWORD")
            server = "recomind.database.windows.net"
            database = "AdventureWorks2014"
            username = "adminhossam"
            connection_string = f"mssql+pyodbc:///?odbc_connect=DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={server};DATABASE={database};UID={username};PWD={db_password}"
            db = SQLDatabase.from_uri(connection_string)

            sql_query_chain = create_sql_query_chain(llm, db)
            generated_sql = sql_query_chain.invoke({"question": cleaned_input, "schema": detailed_schema})
            
            print("\n" + "#" * 50)
            print("THE GENERATED SQL QUERY:")
            print(generated_sql)
            print("#" * 50 + "\n")
            
            query_results = db.run(generated_sql)
            print("Query execution complete. Results received.")
        except Exception as e:
            print(f"Error executing query: {e}")
            generated_sql = ""
            query_results = f"An error occurred during query execution. The query was: {generated_sql}"
        
    return {**state, "generated_sql": generated_sql, "query_results": str(query_results)}

In [None]:
#
# --- This cell builds and runs the LangGraph ---
#

# Build the graph
workflow = StateGraph(RecoMindState)

# Add nodes to the graph
workflow.add_node("user_input", user_input_node)
workflow.add_node("llm_table_selector", llm_table_selector_node)
workflow.add_node("column_discovery", column_discovery_node)
workflow.add_node("sql_agent", sql_agent_node)

# Define the graph's entry point
workflow.set_entry_point("user_input")

# Define the graph's edges (connections between nodes)
workflow.add_edge("user_input", "llm_table_selector")
workflow.add_edge("llm_table_selector", "column_discovery")
workflow.add_edge("column_discovery", "sql_agent")
workflow.add_edge("sql_agent", END)

# Compile the graph
app = workflow.compile()

# Run the graph with a sample key
print("Starting the RecoMind Agent Graph...")
final_state = app.invoke({"user_key": "sales"})

print("\n--- Final Graph State ---")
print(final_state)

Starting the RecoMind Agent Graph...
Connecting to the database and loading all table names...
Database schema (table names) loaded successfully.
User key received: 'sales'.

Using LLM to select relevant tables from the full schema...
LLM selected these tables:  SalesOrderHeader, SalesOrderDetail, SalesPerson, SalesTerritory, SalesOrderHeaderSalesReason, SalesReason, SalesTaxRate, SpecialOffer, SpecialOfferProduct, SalesPersonQuotaHistory

Connecting to the database to get detailed schema for selected tables...
Error loading detailed schema: include_tables {'Sales.SalesPersonQuotaHistory', 'Sales.SalesTaxRate', 'Sales.SalesTerritory', 'Sales.SpecialOffer', 'Sales.SalesPerson', 'Sales.SalesOrderDetail', 'Sales.SalesReason', 'Sales.SalesOrderHeaderSalesReason', 'Sales.SalesOrderHeader', 'Sales.SpecialOfferProduct'} not found in database

--- Final Graph State ---
{'user_key': 'sales', 'cleaned_input': 'sales', 'db_schema': 'AddressType, BusinessEntity, BusinessEntityAddress, BusinessEnti

In [1]:
#
# --- This script contains the full CrewAI application ---
#

# Core CrewAI imports
import os
from dotenv import load_dotenv
from crewai import Agent, Task, Crew, Process
from langchain_openai import ChatOpenAI
from crewai.tools import BaseTool

# Database connection libraries
import pyodbc
from sqlalchemy import create_engine
import pandas as pd
from typing import List, Any, Dict
from pydantic import BaseModel, Field

# Load environment variables from .env file
load_dotenv()

# Get the API key and database credentials from environment variables
openrouter_key = os.getenv("OPENROUTER_API_KEY")
os.environ["OPENROUTER_API_KEY"] = openrouter_key

db_password = os.getenv("DB_PASSWORD")
server = "recomind.database.windows.net"
database = "AdventureWorks2014"
username = "adminhossam"

# Initialize the LLM with OpenRouter
llm_model = ChatOpenAI(
    model="openrouter/z-ai/glm-4.5-air:free",
    openai_api_key=os.getenv("OPENROUTER_API_KEY"),
    openai_api_base="https://openrouter.ai/api/v1"
)

# --- Define schemas for tool inputs ---

class GetAllTablesInput(BaseModel):
    """Input for GetAllTablesTool."""
    pass

class GetTableSchemaInput(BaseModel):
    """Input for GetTableSchemaTool."""
    table_names: str = Field(description="Comma-separated list of fully qualified table names (e.g., 'Schema.Table')")

class ExecuteSQLQueryInput(BaseModel):
    """Input for ExecuteSQLQueryTool."""
    query: str = Field(description="SQL query to execute")

# --- Define the Tools using BaseTool from crewai.tools ---

class GetAllTablesTool(BaseTool):
    name: str = "get_all_tables"
    description: str = (
        "Fetches a comma-separated list of all fully qualified table names (Schema.Table) from the database. "
        "This tool is useful for getting an overview of the database structure."
    )
    args_schema: type[BaseModel] = GetAllTablesInput

    def _run(self) -> str:
        try:
            conn_string = (
                f"DRIVER={{ODBC Driver 17 for SQL Server}};"
                f"SERVER={server};DATABASE={database};UID={username};PWD={db_password}"
            )
            cnxn = pyodbc.connect(conn_string)
            cursor = cnxn.cursor()
            cursor.execute("SELECT TABLE_SCHEMA, TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE'")
            tables = [f"{row[0]}.{row[1]}" for row in cursor.fetchall()]
            return ", ".join(tables)
        except Exception as e:
            return f"Error connecting to database or fetching tables: {e}"

class GetTableSchemaTool(BaseTool):
    name: str = "get_table_schema"
    description: str = (
        "Fetches the schema (columns and data types) for a list of tables. "
        "The input must be a comma-separated string of fully qualified table names (e.g., 'Schema.Table')."
    )
    args_schema: type[BaseModel] = GetTableSchemaInput

    def _run(self, table_names: str) -> str:
        try:
            connection_string = (
                f"mssql+pyodbc:///?odbc_connect=DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={server};"
                f"DATABASE={database};UID={username};PWD={db_password}"
            )
            engine = create_engine(connection_string)
            schema_info = []
            tables_list = [t.strip() for t in table_names.split(',')]

            for full_table_name in tables_list:
                try:
                    schema, table = full_table_name.split('.')
                except ValueError:
                    return f"Error: Invalid table name format '{full_table_name}'. Expected 'Schema.Table'."
                
                query = f"""
                SELECT COLUMN_NAME, DATA_TYPE
                FROM INFORMATION_SCHEMA.COLUMNS
                WHERE TABLE_SCHEMA = '{schema}' AND TABLE_NAME = '{table}'
                """
                result = pd.read_sql_query(query, engine)
                if not result.empty:
                    schema_info.append(f"Table {full_table_name}:\n" + result.to_string(index=False))
            return "\n\n".join(schema_info) if schema_info else "No schema found for the provided tables."
        except Exception as e:
            return f"Error fetching table schema: {e}"

class ExecuteSQLQueryTool(BaseTool):
    name: str = "execute_sql_query"
    description: str = (
        "Executes a SQL query against the database and returns the results. "
        "The input must be a valid SQL query string."
    )
    args_schema: type[BaseModel] = ExecuteSQLQueryInput

    def _run(self, query: str, **kwargs) -> str:
        try:
            # Get the user_request from the kwargs dictionary if it's available
            user_request_key = kwargs.get('user_request', 'default_data') # Use a default key if not provided
            
            conn_string = (
                f"DRIVER={{ODBC Driver 17 for SQL Server}};"
                f"SERVER={server};DATABASE={database};UID={username};PWD={db_password}"
            )
            cnxn = pyodbc.connect(conn_string)
            df = pd.read_sql(query, cnxn)
            
            # Save all data to a CSV file using the user's request key
            csv_filename = os.path.join(os.getcwd(), f"{user_request_key}_dataa.csv")
            df.to_csv(csv_filename, index=False)
            
            # Verify that the file was written
            if os.path.exists(csv_filename):
                row_count = len(df)
                return f"Data saved to {csv_filename} ({row_count} rows)\nPreview (first 10 rows):\n{df.head(10).to_string(index=False)}"
            else:
                return f"Error: Failed to save CSV file to {csv_filename}"
        except Exception as e:
            return f"Error executing query or saving CSV: {e}"

# --- Define the Agents ---
table_selector_agent = Agent(
    role='SQL Table Selector',
    goal='Identify the 3 most relevant fully qualified table names (Schema.Table) based on a user request key without any prior knowledge of the database.',
    backstory=(
        "You are an expert SQL database analyst. You have no prior knowledge of the database structure. "
        "Your task is to use tools to fetch all table names with their schemas and then select the 3 most relevant ones based on the user request. "
        "Your final answer MUST be a comma-separated list of EXACT fully qualified table names ONLY, with no extra text or explanation."
    ),
    verbose=True,
    allow_delegation=False,
    llm=llm_model,
    tools=[GetAllTablesTool()]
)

data_analyst_agent = Agent(
    role='Data Analyst and Column Selector',
    goal='Analyze table schemas and identify the most relevant columns for data analysis based on the user request, ensuring no duplicate column names are selected.',
    backstory=(
        "You are an experienced data analyst. Your job is to review the schema of a set of tables and select all columns "
        "that are most useful for analytical purposes. You must eliminate redundant or duplicate column names across the tables "
        "by selecting the column from the primary or most relevant table. Your selection must exclude sensitive data, irrelevant technical IDs, "
        "and primary keys that are not needed for joining or direct analysis. "
        "The final answer must be a comma-separated list of EXACT fully qualified column names (e.g., 'Schema.Table.ColumnA, Schema.Table.ColumnB') with no duplicates."
    ),
    verbose=True,
    allow_delegation=False,
    llm=llm_model,
    tools=[GetTableSchemaTool()]
)

query_generator_agent = Agent(
    role='SQL Query Generator',
    goal='Generate a correct SQL query that joins the provided tables and selects the specified columns for analysis, ensuring column aliases are used to prevent duplicates.',
    backstory=(
        "You are a master SQL query writer with no prior knowledge of the database. You receive a list of fully qualified columns "
        "and must generate a query that joins the relevant tables on their primary/foreign keys. "
        "To prevent duplicate column names in the final result, you must use column aliases like 'TableName.ColumnName AS UniqueName' when two or more columns share the same name. "
        "Fetch ALL rows (no limits). The query must be valid SQL syntax, using aliases for tables and columns (e.g., 'SELECT T1.Col1, T2.Col2 FROM Schema.Table1 T1 JOIN Schema.Table2 T2 ON...')."
    ),
    verbose=True,
    allow_delegation=False,
    llm=llm_model
)

query_executor_agent = Agent(
    role='SQL Query Executor',
    goal='Execute a given SQL query, save all results to CSV, and return the result or error.',
    backstory=(
        "You are a professional database administrator. You receive a SQL query, execute it, save the full results to a CSV file, "
        "and return the data preview or an error if it fails."
    ),
    verbose=True,
    allow_delegation=False,
    llm=llm_model,
    tools=[ExecuteSQLQueryTool()]
)

# --- Define the Tasks ---
task1_select_tables = Task(
    description=(
        "For the user's key '{user_request}', first, internally translate this key into a brief analytical summary "
        "focusing on specific data needs for analysis (e.g., 'customer performance metrics, profile data, and key relationships'). "
        "Based on this summary, identify the top 3 most relevant fully qualified table names (Schema.Table) from the full list. "
        "Use the 'get_all_tables' tool to get the list of available tables. "
        "Return a comma-separated list of EXACT fully qualified table names ONLY, with no extra text or explanation."
    ),
    expected_output="A comma-separated string of exactly 3 fully qualified table names (e.g., 'Sales.SalesOrderHeader, Sales.SalesOrderDetail, Person.Person').",
    agent=table_selector_agent,
    human_input=True
)

task2_analyze_schema = Task(
    description=(
        "For the user's request '{user_request}', first, internally translate this into a brief analytical summary "
        "focusing on specific data needs for analysis (e.g., 'customer performance metrics, profile data, and key relationships'). "
        "Using the selected tables from the previous task, get their schemas using the 'get_table_schema' tool. "
        "Analyze the schemas to select columns relevant to the analytical summary and suitable for analysis "
        " (e.g., numerical, dates, IDs). The final output must be a comma-separated list of fully qualified column names (e.g., 'Schema.Table.ColumnA, Schema.Table.ColumnB')."
        "This agent will receive the output from the 'get_table_schema' tool and must parse it to select relevant columns."
    ),
    expected_output="A single comma-separated string of fully qualified column names (e.g., 'Sales.SalesOrderHeader.OrderID, Sales.SalesOrderDetail.ProductID').",
    agent=data_analyst_agent,
    context=[task1_select_tables]
)

task3_generate_query = Task(
    description=(
        "Using the selected columns from the previous task, generate a correct SQL query that joins the tables and selects ONLY those columns."
        "Join tables on their primary/foreign keys and fetch ALL rows (no limits). The query must be valid SQL syntax."
    ),
    expected_output="A single valid SQL query (e.g., 'SELECT T1.col1, T2.col2 FROM Schema.Table1 T1 JOIN Schema.Table2 T2 ON...').",
    agent=query_generator_agent,
    context=[task2_analyze_schema]
)

# --- Define the Tasks ---
task4_execute_query = Task(
    description=(
        "Take the generated SQL query and execute it using the 'execute_sql_query' tool."
        "You must pass the original user request '{user_request}' to the tool to be used for the CSV file name."
        "Save the full results to a CSV file and return the data preview or any error message if the query fails."
    ),
    expected_output="The result of the SQL query as a string, including CSV save confirmation.",
    agent=query_executor_agent,
    context=[task3_generate_query]
)

# --- Create the Crew and Kick off the Process ---
# Dynamic user request
user_request = input("Enter your key (e.g., 'sales'): ")

recomind_crew = Crew(
    agents=[table_selector_agent, data_analyst_agent, query_generator_agent, query_executor_agent],
    tasks=[task1_select_tables, task2_analyze_schema, task3_generate_query, task4_execute_query],
    verbose=True,
    process=Process.sequential
)

print("Starting the CrewAI process for the user request: " + user_request)

result = recomind_crew.kickoff(inputs={'user_request': user_request})

print("\n--- Final Result from the Crew ---")
print(result)

Starting the CrewAI process for the user request: Products Sales


[1m[95m ## Final Result:[00m [92mSales.SalesOrderHeader,Sales.SalesOrderDetail,Production.Product[00m
[1m[93m 

=====
## HUMAN FEEDBACK: Provide feedback on the Final Result and Agent's actions.
Please follow these guidelines:
 - If you are happy with the result, simply hit Enter without typing anything.
 - Otherwise, provide specific improvement requests.
 - You can provide multiple rounds of feedback until satisfied.
=====
[00m



--- Final Result from the Crew ---
Data saved to /mnt/c/Users/Elzahbia/OneDrive/Desktop/IEEE - ReoMind/default_data_dataa.csv (121317 rows)
Preview (first 10 rows):
 SalesOrderID  OrderDate   ShipDate  OnlineOrderFlag  CustomerID  SalesPersonID   SubTotal    TaxAmt  Freight   TotalDue  SalesOrderDetailID  OrderQty  SalesOrderDetail_ProductID  UnitPrice  UnitPriceDiscount  LineTotal  ProductID                        Name ProductNumber  MakeFlag  FinishedGoodsFlag  Color  StandardCost  ListPrice  DaysToManufacture ProductLine Style SellStartDate DiscontinuedDate
        43659 2011-05-31 2011-06-07            False       29825          279.0 20565.6206 1971.5149 616.0984 23153.2339                   1         1                         776  2024.9940                0.0  2024.9940        776      Mountain-100 Black, 42    BK-M82B-42      True               True  Black     1898.0944    3374.99                  4          M     U     2011-05-31             None
        43659 2011-05-31 2011-

In [13]:
# Import necessary libraries
import pandas as pd
import os
from langchain_openai import ChatOpenAI
from typing import TypedDict, Optional
from langgraph.graph import StateGraph, END

# Define the LLM model to be used for classification
llm_model = ChatOpenAI(
    model="mistralai/mistral-7b-instruct:free",
    openai_api_key=os.getenv("OPENROUTER_API_KEY"),
    openai_api_base="https://openrouter.ai/api/v1"
)

# Define the state of the graph
class GraphState(TypedDict):
    """
    Represents the state of the graph.
    Attributes:
        data_type: The type of data identified (e.g., 'employees').
        dataframe: The loaded pandas DataFrame.
    """
    data_type: Optional[str]
    dataframe: Optional[pd.DataFrame]

def classify_data_with_llm(columns: list) -> str:
    """
    Uses an LLM to classify the data type based on a list of column names.
    """
    system_prompt = (
        "You are an expert data classifier. Your task is to analyze a list of CSV "
        "column names and determine the primary topic of the dataset. "
        "You must respond with a single, lowercase word from the following list: "
        "'employees', 'sales', 'customers', 'products', 'marketing', 'finance', 'hr', 'logistics', 'support', or 'unknown'."
    )
    
    user_prompt = f"""
    Based on the following list of column names, what is the main topic of the data?
    
    Column Names: {', '.join(columns)}
    
    Return ONLY one word from the allowed list, with no extra text, explanation, or punctuation.
    """
    response = llm_model.invoke(f"{system_prompt}\n\n{user_prompt}")
    
    return response.content.strip().lower()

# The actual LangGraph node function
def data_loader_and_identifier(state: GraphState):
    """
    Loads data.csv and uses an LLM to identify its type.
    """
    print("---LOADING AND IDENTIFYING DATA---")
    
    notebook_dir = os.path.dirname(os.path.abspath('__file__'))
    file_path = os.path.join(notebook_dir, 'data_preparation', 'data.csv')
    
    if not os.path.exists(file_path):
        print(f"❌ Error: The file '{file_path}' was not found.")
        return {"data_type": "error", "dataframe": None}
    
    try:
        df = pd.read_csv(file_path)
    except Exception as e:
        print(f"❌ Error reading CSV: {e}")
        return {"data_type": "error", "dataframe": None}
    
    data_type = classify_data_with_llm(df.columns.tolist())
    
    print(f"✅ Data loaded successfully. Identified as '{data_type}'.")
    return {"data_type": data_type, "dataframe": df}

# --- Build and Run the Simple LangGraph ---

# Define a new graph
workflow = StateGraph(GraphState)

# Add the first node to the graph
workflow.add_node("loader", data_loader_and_identifier)

# Set the entry point of the graph to be the "loader" node
workflow.set_entry_point("loader")

# Set the exit point. This simple graph will end after the first node.
workflow.add_edge("loader", END)

# Compile the graph
app = workflow.compile()

# --- Run the Graph and Test the Output ---

# The initial state is empty, as the first node will populate it.
initial_state = {"data_type": None, "dataframe": None}

# Run the graph and get the final state
final_state = app.invoke(initial_state)

# Print the final result
print("\n--- Final Graph State ---")
print(f"Data type: {final_state['data_type']}")
print(f"DataFrame: {type(final_state['dataframe'])}")
# Add a check to prevent the AttributeError
if final_state['dataframe'] is not None:
    print(f"DataFrame shape: {final_state['dataframe'].shape}")

---LOADING AND IDENTIFYING DATA---
✅ Data loaded successfully. Identified as 'sales'.

--- Final Graph State ---
Data type: sales
DataFrame: <class 'pandas.core.frame.DataFrame'>
DataFrame shape: (121317, 30)


In [None]:
# Import necessary libraries
import pandas as pd
import os
import json
from langchain_openai import ChatOpenAI
from typing import TypedDict, Optional
from langgraph.graph import StateGraph, END

# Define a single, unified LLM model for both tasks
llm_model = ChatOpenAI(
    model="deepseek/deepseek-r1-0528:free",
    openai_api_key=os.getenv("OPENROUTER_API_KEY"),
    openai_api_base="https://openrouter.ai/api/v1"
)

# Define the state of the graph
class GraphState(TypedDict):
    """
    Represents the state of the graph.
    Attributes:
        data_type: The type of data identified (e.g., 'employees').
        dataframe: The loaded pandas DataFrame.
    """
    data_type: Optional[str]
    dataframe: Optional[pd.DataFrame]

def classify_data_with_llm(columns: list) -> str:
    """
    Uses an LLM to classify the data type based on a list of column names.
    """
    system_prompt = (
        "You are an expert data classifier. Your task is to analyze a list of CSV "
        "column names and determine the primary topic of the dataset. "
        "You must respond with a single, lowercase word from the following list: "
        "'employees', 'sales', 'customers', 'products', 'marketing', 'finance', 'hr', 'logistics', 'support', or 'unknown'."
    )
    
    user_prompt = f"""
    Based on the following list of column names, what is the main topic of the data?
    
    Column Names: {', '.join(columns)}
    
    Return ONLY one word from the allowed list, with no extra text, explanation, or punctuation.
    """
    # Use the unified llm_model here
    response = llm_model.invoke(f"{system_prompt}\n\n{user_prompt}")
    
    return response.content.strip().lower()

# The first node in the graph: Load and Identify
def data_loader_and_identifier(state: GraphState):
    """
    Loads data.csv and uses an LLM to identify its type.
    """
    print("---LOADING AND IDENTIFYING DATA---")
    
    notebook_dir = os.path.dirname(os.path.abspath('__file__'))
    file_path = os.path.join(notebook_dir, 'data_preparation', 'data.csv')
    
    if not os.path.exists(file_path):
        print(f"❌ Error: The file '{file_path}' was not found.")
        return {"data_type": "error", "dataframe": None}
    
    try:
        df = pd.read_csv(file_path)
    except Exception as e:
        print(f"❌ Error reading CSV: {e}")
        return {"data_type": "error", "dataframe": None}
    
    data_type = classify_data_with_llm(df.columns.tolist())
    
    print(f"✅ Data loaded successfully. Identified as '{data_type}'.")
    return {"data_type": data_type, "dataframe": df}

# The second node in the graph: Data Cleaning
def data_cleaning(state: GraphState):
    """
    Cleans the DataFrame using an LLM based on a predefined set of rules.
    """
    print("---CLEANING DATA---")
    df = state.get("dataframe")

    if df is None:
        print("❌ No DataFrame found in state. Skipping cleaning.")
        return {"dataframe": None}
    
    prompt = f"""
    You are a Data Cleaning Agent.
    Your task is to clean and standardize a dataset according to the following rules:

    1. Duplicates
        - If the duplicate is natural and valid -> keep it.
        - If the duplicate is invalid or incorrect -> remove it.

    2. Missing Values
        - If the column is a key/unique identifier -> drop the entire row if the value is missing.
        - If the value can be inferred from other related columns -> infer and fill it.
        - If the value cannot be inferred -> fill it with an appropriate method (statistical or contextual) or drop it if non-essential.

    3. Negative Values
        - If negative values are logically impossible -> treat them as errors and correct/remove them.
        - If negative values are logically possible -> keep them.

    4. Text Standardization
        - Unify text format (e.g., consistent casing).
        - Remove strange or unwanted characters.
        - Expand abbreviations or acronyms into their full standardized form.
        - Unify converted terms to a consistent representation.

    5. Dates
        - Standardize all date and time values into the format: YYYY-MM-DD HH:MM:SS.

    6. Outliers
        - If outliers represent clear errors -> handle or remove them.
        - If outliers are valid values -> keep them.

    7. General Consistency
        - Ensure data types are consistent with their logical meaning.
        - Detect hidden nulls (e.g., placeholders like “NA”, “-”, or spaces) and treat them as missing values.
        - Remove or merge duplicate columns if they represent the same data.
        - Verify logical relationships between columns to maintain internal consistency.
    
    Output Rules:
    Return ONLY a valid JSON object.

    The JSON must be a list of rows (objects).
    Each row should be represented as key–value pairs, where keys are the column names and values are the cell values.

    Do not return explanations or text outside the JSON.
    The JSON must be directly loadable into a Pandas DataFrame in Python without errors.
    If some rows are missing values for certain columns, fill them with null.


    Data (first 20 rows):
    {df.head(20).to_dict()}
    """
    
    # Use the unified llm_model here
    response = llm_model.invoke(prompt)
    try:
        cleaned_data = json.loads(response.content)
        cleaned_df = pd.DataFrame(cleaned_data)
        print("✅ Data cleaning completed.")
        return {"dataframe": cleaned_df}
    except json.JSONDecodeError as e:
        print(f"❌ JSON decoding error: {e}")
        print(f"LLM response: {response.content}")
        return {"dataframe": None}

# --- Build the LangGraph ---

workflow = StateGraph(GraphState)

# Add both nodes to the graph
workflow.add_node("loader", data_loader_and_identifier)
workflow.add_node("cleaner", data_cleaning)

# Define the flow: loader -> cleaner -> END
workflow.add_edge("loader", "cleaner")
workflow.add_edge("cleaner", END)

# Set the entry point
workflow.set_entry_point("loader")

# Compile the graph
app = workflow.compile()

# --- Run the Graph and Test the Output ---

initial_state = {"data_type": None, "dataframe": None}
final_state = app.invoke(initial_state)

# Print the final result
print("\n--- Final Graph State ---")
print(f"Data type: {final_state.get('data_type')}")
print(f"DataFrame: {type(final_state.get('dataframe'))}")
if final_state.get('dataframe') is not None:
    print(f"Cleaned DataFrame shape: {final_state['dataframe'].shape}")
    print("\nFirst 5 rows of the cleaned DataFrame:")
    print(final_state['dataframe'].head())

---LOADING AND IDENTIFYING DATA---
✅ Data loaded successfully. Identified as 'sales'.
---CLEANING DATA---


KeyboardInterrupt: 

In [16]:
# Import necessary libraries
import pandas as pd
import numpy as np
import os
import json
import re
from langchain_openai import ChatOpenAI
from typing import TypedDict, Optional, List, Dict
from langgraph.graph import StateGraph, END

# Define a single, unified LLM model
llm_model = ChatOpenAI(
    model="deepseek/deepseek-r1-0528:free",
    openai_api_key=os.getenv("OPENROUTER_API_KEY"),
    openai_api_base="https://openrouter.ai/api/v1"
)

# Define the state of the graph
class GraphState(TypedDict):
    """
    Represents the state of the graph.
    Attributes:
        data_type: The type of data identified (e.g., 'employees').
        dataframe: The loaded pandas DataFrame.
        cleaning_plan: The JSON plan from the LLM.
    """
    data_type: Optional[str]
    dataframe: Optional[pd.DataFrame]
    cleaning_plan: Optional[List[Dict]]

def classify_data_with_llm(columns: list) -> str:
    """
    Uses an LLM to classify the data type based on a list of column names.
    """
    system_prompt = (
        "You are an expert data classifier. Your task is to analyze a list of CSV "
        "column names and determine the primary topic of the dataset. "
        "You must respond with a single, lowercase word from the following list: "
        "'employees', 'sales', 'customers', 'products', 'marketing', 'finance', 'logistics', 'support', 'unknown'."
    )
    
    user_prompt = f"""
    Based on the following list of column names, what is the main topic of the data?
    
    Column Names: {', '.join(columns)}
    
    Return ONLY one word from the allowed list, with no extra text, explanation, or punctuation.
    """
    response = llm_model.invoke(f"{system_prompt}\n\n{user_prompt}")
    
    return response.content.strip().lower()

def data_loader_and_identifier(state: GraphState):
    """Loads data.csv and uses an LLM to identify its type."""
    print("---LOADING AND IDENTIFYING DATA---")
    
    notebook_dir = os.path.dirname(os.path.abspath('__file__'))
    file_path = os.path.join(notebook_dir, 'data_preparation', 'data.csv')
    
    if not os.path.exists(file_path):
        print(f"❌ Error: The file '{file_path}' was not found.")
        return {"data_type": "error", "dataframe": None}
    
    try:
        df = pd.read_csv(file_path)
    except Exception as e:
        print(f"❌ Error reading CSV: {e}")
        return {"data_type": "error", "dataframe": None}
    
    data_type = classify_data_with_llm(df.columns.tolist())
    
    print(f"✅ Data loaded successfully. Identified as '{data_type}'.")
    return {"data_type": data_type, "dataframe": df}

def data_cleaning_advisor(state: GraphState):
    """
    Asks the LLM to generate a JSON cleaning plan based on a data sample.
    """
    print("---GENERATING CLEANING PLAN---")
    df = state.get("dataframe")

    if df is None:
        print("❌ No DataFrame found in state. Skipping advisor.")
        return {"cleaning_plan": None}
    
    # Take a random sample of 100 rows, or the entire DataFrame if it has fewer than 100 rows.
    sample_size = min(100, len(df))
    sample_df = df.sample(n=sample_size, random_state=42)
    
    # New prompt to handle ID columns and sequence of operations
    prompt = f"""
    You are a data cleaning expert. Your task is to analyze a sample of a DataFrame and recommend a list of cleaning actions.
    Your response must be ONLY a valid JSON object. The JSON should be a list of objects, where each object represents a cleaning action.

    Each action object must have two keys:
    - "action": A string representing the type of cleaning operation. Possible values are: "remove_duplicates", "drop_column", "handle_ids", "unify_format", "standardize_text", "standardize_text_complex", "handle_dates", "handle_numeric_values", "validate_relationships", "impute_missing_values".
    - "details": A string providing the name of the column(s) and any specific instructions for the action.

    Do not recommend dropping columns that contain constant values or have a high percentage of nulls; those are handled separately. Focus on logical cleaning tasks.
    
    Example JSON response:
    [
        {{
            "action": "drop_column",
            "details": "Drop redundant columns like 'SalesOrderID.1' and 'ModifiedDate.1'."
        }},
        {{
            "action": "handle_ids",
            "details": "Ensure 'SalesOrderNumber' and 'ProductID' are treated as unique identifiers by removing non-numeric characters and converting to string."
        }},
        {{
            "action": "handle_dates",
            "details": "Convert columns 'OrderDate', 'DueDate', 'ShipDate' to datetime objects."
        }},
        {{
            "action": "validate_relationships",
            "details": "Ensure 'OrderDate' is before 'ShipDate' to maintain logical consistency."
        }},
        {{
            "action": "impute_missing_values",
            "details": "Fill missing values in numeric columns like 'OrganizationLevel' with the median."
        }}
    ]

    Do not return any text or explanations outside the JSON object.
    Here is a sample of the DataFrame ({sample_size} random rows):
    {sample_df.to_dict()}
    """
    
    response = llm_model.invoke(prompt)
    
    try:
        cleaning_plan = json.loads(response.content)
        print("✅ Cleaning plan generated successfully.")
        return {"cleaning_plan": cleaning_plan}
    except json.JSONDecodeError as e:
        print(f"❌ JSON decoding error: {e}")
        print(f"LLM response: {response.content}")
        return {"cleaning_plan": None}

def data_cleaning_executor(state: GraphState):
    """
    Executes the cleaning plan on the entire DataFrame.
    """
    print("---EXECUTING CLEANING PLAN---")
    df = state.get("dataframe")
    cleaning_plan = state.get("cleaning_plan")

    if df is None or cleaning_plan is None:
        print("❌ Missing DataFrame or cleaning plan. Skipping executor.")
        return {"dataframe": None}
    
    cleaned_df = df.copy()

    # Automatic cleaning based on predefined rules
    print("---Executing Automatic Cleaning Rules---")
    cols_to_drop = []
    for col in cleaned_df.columns:
        # Check for columns with a single unique value (constant)
        if cleaned_df[col].nunique(dropna=False) <= 1:
            print(f"  - Dropping column '{col}' due to constant values.")
            cols_to_drop.append(col)
        # Check for columns with more than 40% missing values
        elif cleaned_df[col].isnull().mean() > 0.40:
            print(f"  - Dropping column '{col}' due to high percentage of missing values (>40%).")
            cols_to_drop.append(col)
    
    if cols_to_drop:
        cleaned_df.drop(columns=cols_to_drop, inplace=True)

    # Pre-defined mapping for complex text standardization
    TEXT_MAPPING = {
        'ny': 'new york',
        'alex': 'alexandria',
        # Add more mappings here as needed
    }

    try:
        # We'll execute the plan in a more structured order to prevent cascading errors
        for action in cleaning_plan:
            action_type = action.get("action")
            details = action.get("details")

            if action_type == "drop_column":
                print(f"  - Dropping column: {details}")
                # Use a robust regex to find all column names enclosed in quotes (' or ")
                columns_to_drop = re.findall(r"['\"](.*?)['\"]", details)
                for col in columns_to_drop:
                    if col in cleaned_df.columns:
                        cleaned_df.drop(col, axis=1, inplace=True)
            
            elif action_type == "handle_ids":
                print(f"  - Handling ID columns: {details}")
                # A more robust regex to extract numbers and keep as string
                for col in [c.strip().replace("'", "") for c in re.findall(r"'([^']*)'", details)]:
                    if col in cleaned_df.columns:
                        cleaned_df[col] = cleaned_df[col].astype(str).str.extract('(\d+)').astype(str)

            elif action_type == "unify_format":
                print(f"  - Unifying format and handling hidden nulls: {details}")
                cleaned_df.replace(['-', 'NA', '', ' '], np.nan, inplace=True)
            
            elif action_type == "standardize_text":
                print(f"  - Standardizing text: {details}")
                for col in cleaned_df.columns:
                    if pd.api.types.is_object_dtype(cleaned_df[col]):
                        cleaned_df[col] = cleaned_df[col].astype(str).str.lower().str.strip()

            elif action_type == "standardize_text_complex":
                print(f"  - Complex text standardization: {details}")
                for col in cleaned_df.columns:
                    if pd.api.types.is_object_dtype(cleaned_df[col]):
                        cleaned_df[col] = cleaned_df[col].astype(str).str.lower().str.replace(' ', '').map(TEXT_MAPPING).fillna(cleaned_df[col])

            elif action_type == "impute_missing_values":
                print(f"  - Imputing missing values: {details}")
                # A better way to handle imputation based on data type
                for col in cleaned_df.columns:
                    if cleaned_df[col].isnull().any():
                        if pd.api.types.is_numeric_dtype(cleaned_df[col]):
                            cleaned_df[col].fillna(cleaned_df[col].median(), inplace=True)
                        elif pd.api.types.is_object_dtype(cleaned_df[col]):
                            mode_val = cleaned_df[col].mode()
                            if not mode_val.empty:
                                cleaned_df[col].fillna(mode_val[0], inplace=True)

            elif action_type == "handle_dates":
                print(f"  - Handling dates: {details}")
                # Extracting specific columns from details and converting
                for col in [c.strip().replace("'", "") for c in re.findall(r"'([^']*)'", details)]:
                    if col in cleaned_df.columns:
                        cleaned_df[col] = pd.to_datetime(cleaned_df[col], errors='coerce')
            
            elif action_type == "validate_relationships":
                print(f"  - Validating logical relationships: {details}")
                # This is a general example for dates.
                if 'OrderDate' in cleaned_df.columns and 'ShipDate' in cleaned_df.columns:
                    cleaned_df.drop(cleaned_df[cleaned_df['OrderDate'] > cleaned_df['ShipDate']].index, inplace=True)

            elif action_type == "handle_numeric_values":
                print(f"  - Handling numeric values: {details}")
                # Extract column names from the LLM's details
                numerical_cols_from_plan = [col.strip().replace("'", "") for col in re.findall(r"'([^']*)'", details)]
                for col in numerical_cols_from_plan:
                    if col in cleaned_df.columns:
                        # Convert to numeric first
                        cleaned_df[col] = pd.to_numeric(cleaned_df[col], errors='coerce')
                        # Handle negative values
                        cleaned_df.loc[cleaned_df[col] < 0, col] = np.nan
                        # Handle outliers
                        Q1 = cleaned_df[col].quantile(0.25)
                        Q3 = cleaned_df[col].quantile(0.75)
                        IQR = Q3 - Q1
                        lower_bound = Q1 - 1.5 * IQR
                        upper_bound = Q3 + 1.5 * IQR
                        cleaned_df.loc[(cleaned_df[col] < lower_bound) | (cleaned_df[col] > upper_bound), col] = np.nan
            
            elif action_type == "remove_duplicates":
                print(f"  - Removing duplicates: {details}")
                cleaned_df.drop_duplicates(inplace=True)

            elif action_type == "handle_missing_values":
                print(f"  - Handling missing values: {details}")
                # This is still a simple dropna. A better way is to parse the details
                # and decide which columns to drop vs. impute.
                cleaned_df.dropna(subset=['CustomerID', 'SalesOrderID'], inplace=True)
        
        print("✅ Cleaning plan executed successfully.")
        return {"dataframe": cleaned_df}

    except Exception as e:
        print(f"❌ Error during execution: {e}")
        return {"dataframe": None}

# --- Build the LangGraph ---

workflow = StateGraph(GraphState)
workflow.add_node("loader", data_loader_and_identifier)
workflow.add_node("advisor", data_cleaning_advisor)
workflow.add_node("executor", data_cleaning_executor)

workflow.add_edge("loader", "advisor")
workflow.add_edge("advisor", "executor")
workflow.add_edge("executor", END)

workflow.set_entry_point("loader")

app = workflow.compile()

# --- Run the Graph and Test the Output ---

initial_state = {"data_type": None, "dataframe": None, "cleaning_plan": None}
final_state = app.invoke(initial_state)

print("\n--- Final Graph State ---")
print(f"Data type: {final_state.get('data_type')}")
print(f"DataFrame: {type(final_state.get('dataframe'))}")
if final_state.get('dataframe') is not None:
    print(f"Cleaned DataFrame shape: {final_state['dataframe'].shape}")
    print("\nDataFrame Info:")
    final_state['dataframe'].info()

---LOADING AND IDENTIFYING DATA---
✅ Data loaded successfully. Identified as 'employees'.
---GENERATING CLEANING PLAN---
✅ Cleaning plan generated successfully.
---EXECUTING CLEANING PLAN---
---Executing Automatic Cleaning Rules---
  - Dropping column 'Title' due to high percentage of missing values (>40%).
  - Dropping column 'Suffix' due to high percentage of missing values (>40%).
  - Dropping column 'CurrentFlag' due to constant values.
  - Dropping column 'EndDate' due to high percentage of missing values (>40%).
  - Dropping column 'AddressTypeID' due to constant values.
  - Handling dates: Convert columns 'BirthDate', 'HireDate', 'StartDate', 'EndDate', and 'RateChangeDate' to datetime objects.
  - Imputing missing values: Fill missing values in 'Title', 'MiddleName', and 'Suffix' with empty strings.
  - Complex text standardization: Standardize 'PhoneNumber' by removing non-digit characters and storing as numeric strings (e.g., '913-555-0172' becomes '9135550172').
  - Handling

The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  cleaned_df[col].fillna(mode_val[0], inplace=True)
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  cleaned_df[col].fillna(cleaned_df[col].median(), inplace=True)


In [14]:
final_state['dataframe'].info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 334 entries, 0 to 333
Data columns (total 27 columns):
 #   Column                  Non-Null Count  Dtype         
---  ------                  --------------  -----         
 0   PersonBusinessEntityID  334 non-null    int64         
 1   PersonType              334 non-null    object        
 2   FirstName               334 non-null    object        
 3   MiddleName              317 non-null    object        
 4   LastName                334 non-null    object        
 5   EmailPromotion          334 non-null    int64         
 6   LoginID                 334 non-null    object        
 7   OrganizationLevel       317 non-null    float64       
 8   JobTitle                334 non-null    object        
 9   BirthDate               334 non-null    datetime64[ns]
 10  MaritalStatus           334 non-null    object        
 11  Gender                  334 non-null    object        
 12  HireDate                334 non-null    datetime64

In [None]:
import pandas as pd
import numpy as np
import os
import json
import re
from langchain_openai import ChatOpenAI
from typing import TypedDict, Optional, List, Dict, Any
from langgraph.graph import StateGraph, END

# Define a single, unified LLM model
llm_model = ChatOpenAI(
    model="z-ai/glm-4.5-air:free",
    openai_api_key=os.getenv("OPENROUTER_API_KEY"),
    base_url="https://openrouter.ai/api/v1"  
)

# Define the state of the graph
class GraphState(TypedDict):
    """
    Represents the state of the graph.
    Attributes:
        data_type: The type of data identified (e.g., 'employees').
        dataframe: The loaded pandas DataFrame.
        cleaning_plan: The JSON plan from the LLM.
        kpi_plan: The JSON plan from the LLM for KPI calculation.
        kpis: A dictionary of calculated key performance indicators.
        analysis_report: The final, generated report text.
    """
    data_type: Optional[str]
    dataframe: Optional[pd.DataFrame]
    cleaning_plan: Optional[List[Dict]]
    kpi_plan: Optional[List[Dict]]
    kpis: Optional[Dict[str, Any]]
    analysis_report: Optional[str]

def classify_data_with_llm(columns: list) -> str:
    """
    Uses an LLM to classify the data type based on a list of column names.
    """
    system_prompt = (
        "You are an expert data classifier. Your task is to analyze a list of CSV "
        "column names and determine the primary topic of the dataset. "
        "You must respond with a single, lowercase word from the following list: "
        "'employees', 'sales', 'customers', 'products', 'marketing', 'finance', 'logistics', 'support', 'unknown'."
    )
    
    user_prompt = f"""
    Based on the following list of column names, what is the main topic of the data?
    
    Column Names: {', '.join(columns)}
    
    Return ONLY one word from the allowed list, with no extra text, explanation, or punctuation.
    """
    response = llm_model.invoke(f"{system_prompt}\n\n{user_prompt}")
    
    return response.content.strip().lower()

def data_loader_and_identifier(state: GraphState):
    """Loads data.csv and uses an LLM to identify its type."""
    print("---LOADING AND IDENTIFYING DATA---")
    
    notebook_dir = os.path.dirname(os.path.abspath('__file__'))
    file_path = os.path.join(notebook_dir, 'data_preparation', 'data.csv')
    
    if not os.path.exists(file_path):
        print(f"❌ Error: The file '{file_path}' was not found.")
        return {"data_type": "error", "dataframe": None}
    
    try:
        df = pd.read_csv(file_path)
    except Exception as e:
        print(f"❌ Error reading CSV: {e}")
        return {"data_type": "error", "dataframe": None}
    
    data_type = classify_data_with_llm(df.columns.tolist())
    
    print(f"✅ Data loaded successfully. Identified as '{data_type}'.")
    return {"data_type": data_type, "dataframe": df}

def data_cleaning_advisor(state: GraphState):
    """
    Asks the LLM to generate a JSON cleaning plan based on a data sample.
    """
    print("---GENERATING CLEANING PLAN---")
    df = state.get("dataframe")

    if df is None:
        print("❌ No DataFrame found in state. Skipping advisor.")
        return {"cleaning_plan": None}
    
    # Take a random sample of 100 rows, or the entire DataFrame if it has fewer than 100 rows.
    sample_size = min(100, len(df))
    sample_df = df.sample(n=sample_size, random_state=42)
    
    # New prompt to handle ID columns and sequence of operations
    prompt = f"""
    You are a data cleaning expert. Your task is to analyze a sample of a DataFrame and recommend a list of cleaning actions.
    Your response must be ONLY a valid JSON object. The JSON should be a list of objects, where each object represents a cleaning action.

    Each action object must have two keys:
    - "action": A string representing the type of cleaning operation. Possible values are: "remove_duplicates", "drop_column", "handle_ids", "unify_format", "standardize_text", "standardize_text_complex", "handle_dates", "handle_numeric_values", "validate_relationships", "impute_missing_values".
    - "details": A string providing the name of the column(s) and any specific instructions for the action.

    Do not recommend dropping columns that contain constant values or have a high percentage of nulls; those are handled separately. Focus on logical cleaning tasks.
    
    Example JSON response:
    [
        {{
            "action": "drop_column",
            "details": "Drop redundant columns like 'SalesOrderID.1' and 'ModifiedDate.1'."
        }},
        {{
            "action": "handle_ids",
            "details": "Ensure 'SalesOrderNumber' and 'ProductID' are treated as unique identifiers by removing non-numeric characters and converting to string."
        }},
        {{
            "action": "handle_dates",
            "details": "Convert columns 'OrderDate', 'DueDate', 'ShipDate' to datetime objects."
        }},
        {{
            "action": "validate_relationships",
            "details": "Ensure 'OrderDate' is before 'ShipDate' to maintain logical consistency."
        }},
        {{
            "action": "impute_missing_values",
            "details": "Fill missing values in numeric columns like 'OrganizationLevel' with the median."
        }}
    ]

    Do not return any text or explanations outside the JSON object.
    Here is a sample of the DataFrame ({sample_size} random rows):
    {sample_df.to_dict()}
    """
    
    response = llm_model.invoke(prompt)

    # Sanitize the LLM response to robustly extract the JSON array
    sanitized_content = response.content.strip()
    match = re.search(r'\[.*\]', sanitized_content, re.DOTALL)
    if match:
        json_content = match.group(0)
    else:
        json_content = sanitized_content
    
    try:
        cleaning_plan = json.loads(json_content)
        print("✅ Cleaning plan generated successfully.")
        return {"cleaning_plan": cleaning_plan}
    except json.JSONDecodeError as e:
        print(f"❌ JSON decoding error: {e}")
        print(f"LLM response: {response.content}")
        return {"cleaning_plan": None}

def data_cleaning_executor(state: GraphState):
    """
    Executes the cleaning plan on the entire DataFrame.
    """
    print("---EXECUTING CLEANING PLAN---")
    df = state.get("dataframe")
    cleaning_plan = state.get("cleaning_plan")

    if df is None or cleaning_plan is None:
        print("❌ Missing DataFrame or cleaning plan. Skipping executor.")
        return {"dataframe": None}
    
    cleaned_df = df.copy()

    # Automatic cleaning based on predefined rules
    print("---Executing Automatic Cleaning Rules---")
    cols_to_drop = []
    for col in cleaned_df.columns:
        # Check for columns with a single unique value (constant)
        if cleaned_df[col].nunique(dropna=False) <= 1:
            print(f"  - Dropping column '{col}' due to constant values.")
            cols_to_drop.append(col)
        # Check for columns with more than 40% missing values
        elif cleaned_df[col].isnull().mean() > 0.40:
            print(f"  - Dropping column '{col}' due to high percentage of missing values (>40%).")
            cols_to_drop.append(col)
    
    if cols_to_drop:
        cleaned_df.drop(columns=cols_to_drop, inplace=True)

    # Pre-defined mapping for complex text standardization
    TEXT_MAPPING = {
        'ny': 'new york',
        'alex': 'alexandria',
    }

    try:
        # We'll execute the plan in a more structured order to prevent cascading errors
        for action in cleaning_plan:
            action_type = action.get("action")
            details = action.get("details")

            if action_type == "drop_column":
                print(f"  - Dropping column: {details}")
                # Use a robust regex to find all column names enclosed in quotes (' or ")
                columns_to_drop = re.findall(r"['\"](.*?)['\"]", details)
                for col in columns_to_drop:
                    if col in cleaned_df.columns:
                        cleaned_df.drop(col, axis=1, inplace=True)
            
            elif action_type == "handle_ids":
                print(f"  - Handling ID columns: {details}")
                # A more robust regex to extract numbers and keep as string
                for col in [c.strip().replace("'", "") for c in re.findall(r"'([^']*)'", details)]:
                    if col in cleaned_df.columns:
                        cleaned_df[col] = cleaned_df[col].astype(str).str.extract('(\d+)').astype(str)

            elif action_type == "unify_format":
                print(f"  - Unifying format and handling hidden nulls: {details}")
                cleaned_df.replace(['-', 'NA', '', ' '], np.nan, inplace=True)
            
            elif action_type == "standardize_text":
                print(f"  - Standardizing text: {details}")
                for col in cleaned_df.columns:
                    if pd.api.types.is_object_dtype(cleaned_df[col]):
                        cleaned_df[col] = cleaned_df[col].astype(str).str.lower().str.strip()

            elif action_type == "standardize_text_complex":
                print(f"  - Complex text standardization: {details}")
                for col in cleaned_df.columns:
                    if pd.api.types.is_object_dtype(cleaned_df[col]):
                        cleaned_df[col] = cleaned_df[col].astype(str).str.lower().str.replace(' ', '').map(TEXT_MAPPING).fillna(cleaned_df[col])

            elif action_type == "impute_missing_values":
                print(f"  - Imputing missing values: {details}")
                # A better way to handle imputation based on data type
                for col in cleaned_df.columns:
                    if cleaned_df[col].isnull().any():
                        if pd.api.types.is_numeric_dtype(cleaned_df[col]):
                            cleaned_df[col].fillna(cleaned_df[col].median(), inplace=True)
                        elif pd.api.types.is_object_dtype(cleaned_df[col]):
                            mode_val = cleaned_df[col].mode()
                            if not mode_val.empty:
                                cleaned_df[col].fillna(mode_val[0], inplace=True)

            elif action_type == "handle_dates":
                print(f"  - Handling dates: {details}")
                # Extracting specific columns from details and converting
                for col in [c.strip().replace("'", "") for c in re.findall(r"'([^']*)'", details)]:
                    if col in cleaned_df.columns:
                        cleaned_df[col] = pd.to_datetime(cleaned_df[col], errors='coerce')
            
            elif action_type == "validate_relationships":
                print(f"  - Validating logical relationships: {details}")
                # This is a general example for dates.
                if 'OrderDate' in cleaned_df.columns and 'ShipDate' in cleaned_df.columns:
                    cleaned_df.drop(cleaned_df[cleaned_df['OrderDate'] > cleaned_df['ShipDate']].index, inplace=True)

            elif action_type == "handle_numeric_values":
                print(f"  - Handling numeric values: {details}")
                # Extract column names from the LLM's details
                numerical_cols_from_plan = [col.strip().replace("'", "") for col in re.findall(r"'([^']*)'", details)]
                for col in numerical_cols_from_plan:
                    if col in cleaned_df.columns:
                        # Convert to numeric first
                        cleaned_df[col] = pd.to_numeric(cleaned_df[col], errors='coerce')
                        # Handle negative values
                        cleaned_df.loc[cleaned_df[col] < 0, col] = np.nan
                        # Handle outliers
                        Q1 = cleaned_df[col].quantile(0.25)
                        Q3 = cleaned_df[col].quantile(0.75)
                        IQR = Q3 - Q1
                        lower_bound = Q1 - 1.5 * IQR
                        upper_bound = Q3 + 1.5 * IQR
                        cleaned_df.loc[(cleaned_df[col] < lower_bound) | (cleaned_df[col] > upper_bound), col] = np.nan
            
            elif action_type == "remove_duplicates":
                print(f"  - Removing duplicates: {details}")
                cleaned_df.drop_duplicates(inplace=True)

            elif action_type == "handle_missing_values":
                print(f"  - Handling missing values: {details}")
                cleaned_df.dropna(subset=['CustomerID', 'SalesOrderID'], inplace=True)
        
        print("✅ Cleaning plan executed successfully.")
        return {"dataframe": cleaned_df}

    except Exception as e:
        print(f"❌ Error during execution: {e}")
        return {"dataframe": None}

# --- Build the first part of the LangGraph workflow (Cleaning) ---
workflow1 = StateGraph(GraphState)
workflow1.add_node("loader", data_loader_and_identifier)
workflow1.add_node("advisor", data_cleaning_advisor)
workflow1.add_node("executor", data_cleaning_executor)

workflow1.add_edge("loader", "advisor")
workflow1.add_edge("advisor", "executor")
workflow1.set_entry_point("loader")
workflow1.add_edge("executor", END)

app1 = workflow1.compile()

# --- Run the Cleaning Workflow and get the cleaned DataFrame ---
initial_state = {"data_type": None, "dataframe": None, "cleaning_plan": None, "kpi_plan": None, "kpis": None, "analysis_report": None}
final_state_after_cleaning = app1.invoke(initial_state)

cleaned_df = final_state_after_cleaning.get('dataframe')
print("\n--- Cleaning Complete ---")
if cleaned_df is not None:
    print(f"✅ Data cleaned and ready. Shape: {cleaned_df.shape}")
    print(f"Calculated KPIs: {final_state_after_cleaning.get('kpis')}")
else:
    print("❌ Cleaning process failed. Cannot proceed to analysis.")

---LOADING AND IDENTIFYING DATA---


RateLimitError: Error code: 429 - {'error': {'message': 'Rate limit exceeded: free-models-per-day. Add 10 credits to unlock 1000 free model requests per day', 'code': 429, 'metadata': {'headers': {'X-RateLimit-Limit': '50', 'X-RateLimit-Remaining': '0', 'X-RateLimit-Reset': '1757030400000'}, 'provider_name': None}}, 'user_id': 'user_326hxVSI4GxaHKITRQQcOHjRfj8'}

In [10]:
import pandas as pd
import numpy as np
import os
import json
import re
from langchain_openai import ChatOpenAI
from typing import TypedDict, Optional, List, Dict, Any
from langgraph.graph import StateGraph, END

# --- 1. Define LLM and Graph State ---
# Define a single, unified LLM model for all tasks
llm_model = ChatOpenAI(
    model="z-ai/glm-4.5-air:free",
    openai_api_key=os.getenv("OPENROUTER_API_KEY"),
    base_url="https://openrouter.ai/api/v1"
)

class GraphState(TypedDict):
    """
    Represents the state of the graph.
    Attributes:
        data_type: The type of data identified (e.g., 'employees').
        dataframe: The loaded pandas DataFrame.
        cleaning_plan: The JSON plan from the LLM.
        kpi_plan: The JSON plan from the LLM for KPI calculation.
        kpis: A dictionary of calculated key performance indicators.
        analysis_report: The final, generated report text.
        recommendations_report: The generated recommendations report text.
    """
    data_type: Optional[str]
    dataframe: Optional[pd.DataFrame]
    cleaning_plan: Optional[List[Dict]]
    kpi_plan: Optional[List[Dict]]
    kpis: Optional[Dict[str, Any]]
    analysis_report: Optional[str]
    recommendations_report: Optional[str]

# --- 2. Define Graph Nodes (Functions) ---

def classify_data_with_llm(columns: list) -> str:
    """
    Uses an LLM to classify the data type based on a list of column names.
    """
    system_prompt = (
        "You are an expert data classifier. Your task is to analyze a list of CSV "
        "column names and determine the primary topic of the dataset. "
        "You must respond with a single, lowercase word from the following list: "
        "'employees', 'sales', 'customers', 'products', 'marketing', 'finance', 'logistics', 'support', 'unknown'."
    )
    
    user_prompt = f"""
    Based on the following list of column names, what is the main topic of the data?
    
    Column Names: {', '.join(columns)}
    
    Return ONLY one word from the allowed list, with no extra text, explanation, or punctuation.
    """
    response = llm_model.invoke(f"{system_prompt}\n\n{user_prompt}")
    
    return response.content.strip().lower()

def data_loader_and_identifier(state: GraphState):
    """Loads data.csv and uses an LLM to identify its type."""
    print("---LOADING AND IDENTIFYING DATA---")
    
    notebook_dir = os.path.dirname(os.path.abspath('__file__'))
    file_path = os.path.join(notebook_dir, 'data_preparation', 'data.csv')
    
    if not os.path.exists(file_path):
        print(f"❌ Error: The file '{file_path}' was not found.")
        return {"data_type": "error", "dataframe": None}
    
    try:
        df = pd.read_csv(file_path)
    except Exception as e:
        print(f"❌ Error reading CSV: {e}")
        return {"data_type": "error", "dataframe": None}
    
    data_type = classify_data_with_llm(df.columns.tolist())
    
    print(f"✅ Data loaded successfully. Identified as '{data_type}'.")
    return {"data_type": data_type, "dataframe": df}

def data_cleaning_advisor(state: GraphState):
    """
    Asks the LLM to generate a JSON cleaning plan based on a data sample.
    """
    print("---GENERATING CLEANING PLAN---")
    df = state.get("dataframe")

    if df is None:
        print("❌ No DataFrame found in state. Skipping advisor.")
        return {"cleaning_plan": None}
    
    sample_size = min(100, len(df))
    sample_df = df.sample(n=sample_size, random_state=42)
    
    prompt = f"""
    You are a data cleaning expert. Your task is to analyze a sample of a DataFrame and recommend a list of cleaning actions.
    Your response must be ONLY a valid JSON object. The JSON should be a list of objects, where each object represents a cleaning action.
    
    Each action object must have two keys:
    - "action": A string representing the type of cleaning operation. Possible values are: "remove_duplicates", "drop_column", "handle_ids", "unify_format", "standardize_text", "standardize_text_complex", "handle_dates", "handle_numeric_values", "validate_relationships", "impute_missing_values".
    - "details": A string providing the name of the column(s) and any specific instructions for the action.

    Do not recommend dropping columns that contain constant values or have a high percentage of nulls; those are handled separately. Focus on logical cleaning tasks.
    
    Example JSON response:
    [
        {{
            "action": "drop_column",
            "details": "Drop redundant columns like 'SalesOrderID.1' and 'ModifiedDate.1'."
        }},
        {{
            "action": "handle_ids",
            "details": "Ensure 'SalesOrderNumber' and 'ProductID' are treated as unique identifiers by removing non-numeric characters and converting to string."
        }},
        {{
            "action": "handle_dates",
            "details": "Convert columns 'OrderDate', 'DueDate', 'ShipDate' to datetime objects."
        }},
        {{
            "action": "validate_relationships",
            "details": "Ensure 'OrderDate' is before 'ShipDate' to maintain logical consistency."
        }},
        {{
            "action": "impute_missing_values",
            "details": "Fill missing values in numeric columns like 'OrganizationLevel' with the median."
        }}
    ]

    Do not return any text or explanations outside the JSON object.
    Here is a sample of the DataFrame ({sample_size} random rows):
    {sample_df.to_dict()}
    """
    
    response = llm_model.invoke(prompt)
    sanitized_content = response.content.strip()
    match = re.search(r'\[.*\]', sanitized_content, re.DOTALL)
    if match:
        json_content = match.group(0)
    else:
        json_content = sanitized_content
    
    try:
        cleaning_plan = json.loads(json_content)
        print("✅ Cleaning plan generated successfully.")
        return {"cleaning_plan": cleaning_plan}
    except json.JSONDecodeError as e:
        print(f"❌ JSON decoding error: {e}")
        print(f"LLM response: {response.content}")
        return {"cleaning_plan": None}

def data_cleaning_executor(state: GraphState):
    """
    Executes the cleaning plan on the entire DataFrame.
    """
    print("---EXECUTING CLEANING PLAN---")
    df = state.get("dataframe")
    cleaning_plan = state.get("cleaning_plan")

    if df is None or cleaning_plan is None:
        print("❌ Missing DataFrame or cleaning plan. Skipping executor.")
        return {"dataframe": None}
    
    cleaned_df = df.copy()

    print("---Executing Automatic Cleaning Rules---")
    cols_to_drop = []
    for col in cleaned_df.columns:
        if cleaned_df[col].nunique(dropna=False) <= 1:
            print(f"    - Dropping column '{col}' due to constant values.")
            cols_to_drop.append(col)
        elif cleaned_df[col].isnull().mean() > 0.40:
            print(f"    - Dropping column '{col}' due to high percentage of missing values (>40%).")
            cols_to_drop.append(col)
    
    if cols_to_drop:
        cleaned_df.drop(columns=cols_to_drop, inplace=True)

    TEXT_MAPPING = {
        'ny': 'new york',
        'alex': 'alexandria',
    }

    try:
        for action in cleaning_plan:
            action_type = action.get("action")
            details = action.get("details")

            if action_type == "drop_column":
                print(f"    - Dropping column: {details}")
                columns_to_drop = re.findall(r"['\"](.*?)['\"]", details)
                for col in columns_to_drop:
                    if col in cleaned_df.columns:
                        cleaned_df.drop(col, axis=1, inplace=True)
            
            elif action_type == "handle_ids":
                print(f"    - Handling ID columns: {details}")
                for col in [c.strip().replace("'", "") for c in re.findall(r"'([^']*)'", details)]:
                    if col in cleaned_df.columns:
                        cleaned_df[col] = cleaned_df[col].astype(str).str.extract('(\d+)').astype(str)

            elif action_type == "unify_format":
                print(f"    - Unifying format and handling hidden nulls: {details}")
                cleaned_df.replace(['-', 'NA', '', ' '], np.nan, inplace=True)
            
            elif action_type == "standardize_text":
                print(f"    - Standardizing text: {details}")
                for col in cleaned_df.columns:
                    if pd.api.types.is_object_dtype(cleaned_df[col]):
                        cleaned_df[col] = cleaned_df[col].astype(str).str.lower().str.strip()

            elif action_type == "standardize_text_complex":
                print(f"    - Complex text standardization: {details}")
                for col in cleaned_df.columns:
                    if pd.api.types.is_object_dtype(cleaned_df[col]):
                        cleaned_df[col] = cleaned_df[col].astype(str).str.lower().str.replace(' ', '').map(TEXT_MAPPING).fillna(cleaned_df[col])

            elif action_type == "impute_missing_values":
                print(f"    - Imputing missing values: {details}")
                for col in cleaned_df.columns:
                    if cleaned_df[col].isnull().any():
                        if pd.api.types.is_numeric_dtype(cleaned_df[col]):
                            cleaned_df[col] = cleaned_df[col].fillna(cleaned_df[col].median())
                        elif pd.api.types.is_object_dtype(cleaned_df[col]):
                            mode_val = cleaned_df[col].mode()
                            if not mode_val.empty:
                                cleaned_df[col] = cleaned_df[col].fillna(mode_val[0])

            elif action_type == "handle_dates":
                print(f"    - Handling dates: {details}")
                for col in [c.strip().replace("'", "") for c in re.findall(r"'([^']*)'", details)]:
                    if col in cleaned_df.columns:
                        cleaned_df[col] = pd.to_datetime(cleaned_df[col], errors='coerce')
            
            elif action_type == "validate_relationships":
                print(f"    - Validating logical relationships: {details}")
                if 'OrderDate' in cleaned_df.columns and 'ShipDate' in cleaned_df.columns:
                    cleaned_df.drop(cleaned_df[cleaned_df['OrderDate'] > cleaned_df['ShipDate']].index, inplace=True)

            elif action_type == "handle_numeric_values":
                print(f"    - Handling numeric values: {details}")
                numerical_cols_from_plan = [col.strip().replace("'", "") for col in re.findall(r"'([^']*)'", details)]
                for col in numerical_cols_from_plan:
                    if col in cleaned_df.columns:
                        cleaned_df[col] = pd.to_numeric(cleaned_df[col], errors='coerce')
                        cleaned_df.loc[cleaned_df[col] < 0, col] = np.nan
                        Q1 = cleaned_df[col].quantile(0.25)
                        Q3 = cleaned_df[col].quantile(0.75)
                        IQR = Q3 - Q1
                        lower_bound = Q1 - 1.5 * IQR
                        upper_bound = Q3 + 1.5 * IQR
                        cleaned_df.loc[(cleaned_df[col] < lower_bound) | (cleaned_df[col] > upper_bound), col] = np.nan
            
            elif action_type == "remove_duplicates":
                print(f"    - Removing duplicates: {details}")
                cleaned_df.drop_duplicates(inplace=True)

            elif action_type == "handle_missing_values":
                print(f"    - Handling missing values: {details}")
                cleaned_df.dropna(subset=['CustomerID', 'SalesOrderID'], inplace=True)
      
        print("✅ Cleaning plan executed successfully.")
        return {"dataframe": cleaned_df}

    except Exception as e:
        print(f"❌ Error during execution: {e}")
        return {"dataframe": None}

def kpi_advisor(state: GraphState):
    """
    Uses an LLM to generate a JSON plan for KPI calculation and trends based on cleaned data.
    """
    print("---GENERATING KPI CALCULATION PLAN---")
    df = state.get("dataframe")
    data_type = state.get("data_type")

    if df is None:
        print("❌ No DataFrame found in state. Skipping KPI advisor.")
        return {"kpi_plan": None}

    columns = df.columns.tolist()

    prompt = f"""
    You are a data analyst expert. Your task is to analyze the columns of a cleaned DataFrame and provide a JSON plan to calculate Key Performance Indicators (KPIs) and identify key trends. The data has been identified as '{data_type}'.
    
    Your response must be ONLY a valid JSON object. The JSON should be a list of objects, where each object represents a KPI or trend to be calculated.
    
    Each object must have two keys:
    - "kpi_name": A descriptive name for the KPI or trend (e.g., "Total Revenue", "Top 5 Selling Products").
    - "calculation_details": A detailed description of the columns to use and the mathematical/analytical operation to perform, in natural language. This will be given to a Pandas Agent.

    If you cannot find suitable columns for a specific KPI, do not include it.
    
    Return ONLY a valid JSON object, with no extra text, explanation, or punctuation.
    
    Here are the available columns in the cleaned DataFrame: {columns}.
    
    Example:
    [
        {{
            "kpi_name": "Total Revenue",
            "calculation_details": "Calculate the sum of the 'TotalDue' column."
        }},
        {{
            "kpi_name": "Average Order Value",
            "calculation_details": "Calculate the average of the 'TotalDue' column, grouped by a unique order identifier if available. Otherwise, calculate the overall average."
        }},
        {{
            "kpi_name": "Top 5 Selling Products",
            "calculation_details": "Identify the top 5 products by quantity sold and return a dictionary of product IDs and their total sales quantity."
        }}
    ]
    """
    
    response = llm_model.invoke(prompt)
    sanitized_content = response.content.strip()
    match = re.search(r'\[.*\]', sanitized_content, re.DOTALL)
    if match:
        json_content = match.group(0)
    else:
        json_content = sanitized_content
    
    try:
        cleaning_plan = json.loads(json_content)
        print("✅ KPI calculation plan generated successfully.")
        return {"kpi_plan": cleaning_plan}
    except json.JSONDecodeError as e:
        print(f"❌ JSON decoding error from KPI plan: {e}")
        print(f"LLM response: {response.content}")
        return {"kpi_plan": None}

def kpi_executor(state: GraphState):
    """
    Generates and executes Python code to calculate KPIs.
    """
    print("---GENERATING AND EXECUTING KPI CALCULATION CODE---")
    df = state.get("dataframe")
    kpi_plan = state.get("kpi_plan")
    kpis = {}

    if df is None or kpi_plan is None:
        print("❌ Missing DataFrame or KPI plan. Skipping executor.")
        return {"kpis": None}

    # Generate Python code to calculate all KPIs in one block
    code_generation_prompt = f"""
    You are an expert Python data analyst. Your task is to write Python code to calculate a list of Key Performance Indicators (KPIs) based on a pandas DataFrame named `df`.
    The code should calculate the KPIs and store the results in a dictionary named `results`.

    Here is the list of KPIs to calculate:
    {json.dumps(kpi_plan, indent=2)}

    Your response must be ONLY the Python code block. Do NOT include any explanations, Markdown, or surrounding text.
    
    Example output format:
    ```python
    results = {{}}
    # Calculate Total Revenue
    results['Total Revenue'] = df['TotalDue'].sum()
    # Calculate Top Selling Products
    top_products = df.groupby('ProductID')['OrderQty'].sum().nlargest(5)
    results['Top 5 Selling Products'] = top_products.to_dict()
    ```
    """

    try:
        code_response = llm_model.invoke(code_generation_prompt)
        code_block = re.search(r'```python(.*?)```', code_response.content, re.DOTALL)
        if code_block:
            code_to_execute = code_block.group(1).strip()
        else:
            code_to_execute = code_response.content.strip()

        # Sanitize and prepare the execution environment
        safe_globals = {'pd': pd, 'np': np, 'df': df}
        safe_locals = {'results': {}}

        # Execute the generated code safely
        exec(code_to_execute, safe_globals, safe_locals)
        kpis = safe_locals.get('results', {})
        
        # --- NEW CODE: Sanitize KPI dictionary for JSON compatibility ---
        def sanitize_value(value):
            if isinstance(value, (dict, list)):
                return sanitize_dict_or_list(value)
            elif isinstance(value, (pd.Series, pd.Index, np.ndarray)):
                return value.tolist()
            else:
                return str(value)
        
        def sanitize_dict_or_list(obj):
            if isinstance(obj, dict):
                return {sanitize_value(k): sanitize_value(v) for k, v in obj.items()}
            elif isinstance(obj, list):
                return [sanitize_value(item) for item in obj]
            else:
                return str(obj)

        sanitized_kpis = sanitize_dict_or_list(kpis)

        print("✅ KPIs calculated successfully.")
        return {"kpis": sanitized_kpis}

    except Exception as e:
        print(f"❌ An error occurred during KPI calculation: {e}")
        return {"kpis": {"error": f"An error occurred during KPI calculation: {e}"}}


def analysis_and_recommendations_generator(state: GraphState):
    """
    Generates a single, comprehensive report including both analysis and recommendations.
    """
    print("---GENERATING COMPREHENSIVE REPORT---")
    kpis = state.get("kpis")
    df = state.get("dataframe")

    if not kpis or 'error' in kpis or df is None:
        print("❌ Missing KPIs or DataFrame. Skipping report generation.")
        return {"analysis_report": "Unable to generate a comprehensive report due to missing or invalid data."}
    
    kpis_text = json.dumps(kpis, indent=2)
    sample_df = df.sample(n=min(100, len(df)), random_state=42)

    prompt = f"""
    You are a professional and excellent data analyst and a highly skilled Sales Recommendation Agent.
    
    Your task is to generate a single, detailed, and actionable sales report based on the provided data. The report must be structured in two main parts:
    
    **Part 1: Sales Analysis Report**
    1.  A brief introduction summarizing the main findings.
    2.  An analysis of each key performance indicator with an explanation of its importance.
    3.  A "Key Insights" section that draws deeper conclusions from the numbers and trends, such as sales trends, regional performance, or top-selling products.
    4.  A conclusion that provides a high-level summary.
    
    **Part 2: Actionable Recommendations**
    This part must be structured exactly as follows, using the insights from the analysis.
    
    1. Short-Term Plan (0-3 months)
    - Goal: Increase total orders by a data-driven percentage.
    - Analysis:
      - Current monthly orders.
      - Best performing region(s) and underperforming region(s).
      - Key trends or anomalies.
    - Recommendations / Actions:
      - Digital marketing campaigns: budget %, target regions, expected impact.
      - Incentive programs for sales reps: bonus per extra order, criteria.
      - Training programs: tailored training per sales rep based on performance.
      - Reasoning for each action.
    - Scenarios:
      - Best Case: projected orders and revenue.
      - Moderate Case: realistic projection.
      - Worst Case: flat growth and mitigation actions.
      - Risk Management: how to reallocate resources if results are below expectations.
    
    2. Mid-Term Plan (3-6 months)
    - Goal: Increase Average Order Value (AOV) and improve margins.
    - Analysis:
      - Current AOV.
      - Top-selling products.
      - High-margin products.
    - Recommendations / Actions:
      - Cross-selling and bundling strategies.
      - Pricing adjustments for high-margin products.
      - Reasoning for each action.
    - Scenarios:
      - Best / Moderate / Worst case with expected AOV and revenue impact.
    - Risk Management: adjustments if adoption is lower than expected.
    
    3. Long-Term Plan (6+ months)
    - Goal: Increase annual sales growth and expand market share.
    - Analysis:
      - Current annual growth rate.
      - Untapped regions and top-performing channels.
    - Recommendations / Actions:
      - Geographic expansion and investment amount.
      - Channel development budget and strategies.
      - Social selling: train reps to engage on LinkedIn/Twitter.
      - Reasoning for each action.
    - Scenarios:
      - Best / Moderate / Worst case with expected growth percentages.
      - Risk Management: how to reduce investment or redirect resources in case of failure.
    
    Output Format:
    - Use clear markdown headings for both parts and sub-sections.
    - Do not just list the numbers; provide real, insightful analysis and connect the dots between the different metrics.
    - Here are the KPIs for your analysis. Use these numbers directly in your report.
    {kpis_text}
    
    Here is a sample of the raw data for additional context:
    {sample_df.to_dict()}
    """

    try:
        report_response = llm_model.invoke(prompt)
        report_text = report_response.content
        print("✅ Comprehensive report generated successfully.")
    except Exception as e:
        print(f"❌ Error generating comprehensive report with LLM: {e}")
        report_text = "An error occurred while generating the comprehensive report. Please try again."

    return {"analysis_report": report_text}


def save_outputs(state: GraphState):
    """
    Saves the cleaned DataFrame and generated reports to files.
    """
    print("---SAVING OUTPUTS---")
    output_dir = "final_output"
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # Save cleaned DataFrame
    df = state.get('dataframe')
    if df is not None:
        df.to_csv(os.path.join(output_dir, 'cleaned_data.csv'), index=False)
        print(f"✅ Cleaned DataFrame saved to '{os.path.join(output_dir, 'cleaned_data.csv')}'")

    # Save comprehensive report
    analysis_report = state.get('analysis_report')
    if analysis_report is not None:
        with open(os.path.join(output_dir, 'sales_analysis_and_recommendations_report.txt'), 'w', encoding='utf-8') as f:
            f.write(analysis_report)
        print(f"✅ Comprehensive report saved to '{os.path.join(output_dir, 'sales_analysis_and_recommendations_report.txt')}'")

    print("---OUTPUTS SAVED SUCCESSFULLY---")
    return {}

def final_output_viewer(state: GraphState):
    """
    Displays the final outputs for user review before manual saving.
    """
    print("--- ALL PROCESSES COMPLETE ---")
    print("\n--- Final Graph State ---")
    print(f"Data type: {state.get('data_type')}")
    print(f"DataFrame shape after cleaning: {state.get('dataframe').shape if state.get('dataframe') is not None else 'None'}")
    
    kpis = state.get('kpis')
    print("Calculated KPIs:")
    if kpis and 'error' in kpis:
        print(f"❌ Error during KPI calculation: {kpis['error']}")
    else:
        print(json.dumps(kpis, indent=2))
    
    print("\n--- Generated Sales Analysis and Recommendations Report ---")
    print(f"Report: {state.get('analysis_report')}")

    print("\n✅ The process has finished. You can now check the 'final_output' directory for the saved files.")
    return {}

# --- 3. Build the LangGraph ---
workflow = StateGraph(GraphState)
workflow.add_node("loader", data_loader_and_identifier)
workflow.add_node("advisor", data_cleaning_advisor)
workflow.add_node("executor", data_cleaning_executor)
workflow.add_node("kpi_advisor", kpi_advisor)
workflow.add_node("kpi_executor", kpi_executor)
workflow.add_node("analysis_and_recommendations_generator", analysis_and_recommendations_generator)
workflow.add_node("save_outputs", save_outputs)
workflow.add_node("final_output_viewer", final_output_viewer)

# Connect the nodes to form the graph
workflow.add_edge("loader", "advisor")
workflow.add_edge("advisor", "executor")
workflow.add_edge("executor", "kpi_advisor")
workflow.add_edge("kpi_advisor", "kpi_executor")
workflow.add_edge("kpi_executor", "analysis_and_recommendations_generator")
workflow.add_edge("analysis_and_recommendations_generator", "save_outputs")
workflow.add_edge("save_outputs", "final_output_viewer")
workflow.set_entry_point("loader")
workflow.add_edge("final_output_viewer", END)
app = workflow.compile()

# --- 4. Run the Graph and Test the Output ---
initial_state = {"data_type": None, "dataframe": None, "cleaning_plan": None, "kpi_plan": None, "kpis": None, "analysis_report": None, "recommendations_report": None}
final_state = app.invoke(initial_state)

---LOADING AND IDENTIFYING DATA---
✅ Data loaded successfully. Identified as 'sales'.
---GENERATING CLEANING PLAN---
✅ Cleaning plan generated successfully.
---EXECUTING CLEANING PLAN---
---Executing Automatic Cleaning Rules---
    - Dropping column 'SalesPersonID_Order' due to high percentage of missing values (>40%).
    - Dropping column 'SalesPersonID_SalesPerson' due to high percentage of missing values (>40%).
    - Dropping column 'SalesPersonSalesYTD' due to high percentage of missing values (>40%).
    - Dropping column 'SalesPersonSalesLastYear' due to high percentage of missing values (>40%).
    - Handling dates: Convert columns 'OrderDate', 'DueDate', 'ShipDate' to datetime objects.
    - Validating logical relationships: Ensure that for every row: OrderDate <= ShipDate and OrderDate <= DueDate.
    - Validating logical relationships: Validate that TotalDue = SubTotal + TaxAmt + Freight (with a tolerance of 0.01).
    - Imputing missing values: Fill missing values in 'Sale

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy


✅ KPIs calculated successfully.
---GENERATING COMPREHENSIVE REPORT---
✅ Comprehensive report generated successfully.
---SAVING OUTPUTS---
✅ Cleaned DataFrame saved to 'final_output/cleaned_data.csv'
✅ Comprehensive report saved to 'final_output/sales_analysis_and_recommendations_report.txt'
---OUTPUTS SAVED SUCCESSFULLY---
--- ALL PROCESSES COMPLETE ---

--- Final Graph State ---
Data type: sales
DataFrame shape after cleaning: (121317, 22)
Calculated KPIs:
{
  "Total Revenue": "111932704.03459999",
  "Average Order Value": "3568.031112639061",
  "Top 5 Selling Products": {
    "870": "5306.0",
    "712": "5120.0",
    "711": "4560.0",
    "707": "4357.0",
    "708": "4313.0"
  },
  "Sales by Territory": {
    "Australia": "11814376.0952",
    "Canada": "16887945.6892",
    "Central": "7708465.7107",
    "France": "6755450.3541",
    "Germany": "5247188.8305",
    "Northeast": "7196295.5635",
    "Northwest": "16684851.7099",
    "Southeast": "7820178.1171",
    "Southwest": "23810894.

In [None]:
import pandas as pd
import numpy as np
import os
import json
import re
from langchain_openai import ChatOpenAI
from typing import TypedDict, Optional, List, Dict, Any
from langgraph.graph import StateGraph, END

# --- 1. Define LLM and Graph State ---
llm_model = ChatOpenAI(
    model="z-ai/glm-4.5-air:free",
    openai_api_key=os.getenv("OPENROUTER_API_KEY"),
    base_url="https://openrouter.ai/api/v1"
)

class GraphState(TypedDict):
    """
    Represents the state of the graph.
    Attributes:
        data_type: The type of data identified (e.g., 'employees').
        dataframe: The loaded pandas DataFrame.
        cleaning_plan: The JSON plan from the LLM.
        kpi_plan: The JSON plan from the LLM for KPI calculation.
        kpis: A dictionary of calculated key performance indicators.
        analysis_report: The final, generated report text.
    """
    data_type: Optional[str]
    dataframe: Optional[pd.DataFrame]
    cleaning_plan: Optional[List[Dict]]
    kpi_plan: Optional[List[Dict]]
    kpis: Optional[Dict[str, Any]]
    analysis_report: Optional[str]

# --- 2. Define Graph Nodes (Functions) ---

def classify_data_with_llm(columns: list) -> str:
    """
    Uses an LLM to classify the data type based on a list of column names.
    """
    system_prompt = (
        "You are an expert data classifier. Your task is to analyze a list of CSV "
        "column names and determine the primary topic of the dataset. "
        "You must respond with a single, lowercase word from the following list: "
        "'employees', 'sales', 'customers', 'products', 'marketing', 'finance', 'logistics', 'support', 'unknown'."
    )
    user_prompt = f"""
    Based on the following list of column names, what is the main topic of the data?
    
    Column Names: {', '.join(columns)}
    
    Return ONLY one word from the allowed list, with no extra text, explanation, or punctuation.
    """
    response = llm_model.invoke(f"{system_prompt}\n\n{user_prompt}")
    return response.content.strip().lower()

def data_loader_and_identifier(state: GraphState):
    """Loads data.csv and uses an LLM to identify its type."""
    print("---LOADING AND IDENTIFYING DATA---")
    notebook_dir = os.path.dirname(os.path.abspath('__file__'))
    file_path = os.path.join(notebook_dir, 'data_collection', 'data.csv')
    
    if not os.path.exists(file_path):
        print(f"❌ Error: The file '{file_path}' was not found.")
        return {"data_type": "error", "dataframe": None}
    
    try:
        df = pd.read_csv(file_path)
    except Exception as e:
        print(f"❌ Error reading CSV: {e}")
        return {"data_type": "error", "dataframe": None}
    
    data_type = classify_data_with_llm(df.columns.tolist())
    
    print(f"✅ Data loaded successfully. Identified as '{data_type}'.")
    return {"data_type": data_type, "dataframe": df}

def data_cleaning_advisor(state: GraphState):
    """
    Asks the LLM to generate a JSON cleaning plan based on a data sample.
    """
    print("---GENERATING CLEANING PLAN---")
    df = state.get("dataframe")

    if df is None:
        print("❌ No DataFrame found in state. Skipping advisor.")
        return {"cleaning_plan": None}
    
    sample_size = min(100, len(df))
    sample_df = df.sample(n=sample_size, random_state=42)
    
    prompt = f"""
    You are a data cleaning expert. Your task is to analyze a sample of a DataFrame and recommend a list of cleaning actions.
    Your response must be ONLY a valid JSON object. The JSON should be a list of objects, where each object represents a cleaning action.
    
    Each action object must have two keys:
    - "action": A string representing the type of cleaning operation. Possible values are: "remove_duplicates", "drop_column", "handle_ids", "unify_format", "standardize_text", "standardize_text_complex", "handle_dates", "handle_numeric_values", "validate_relationships", "impute_missing_values".
    - "details": A string providing the name of the column(s) and any specific instructions for the action.

    Do not recommend dropping columns that contain constant values or have a high percentage of nulls; those are handled separately. Focus on logical cleaning tasks.
    
    Example JSON response:
    [
        {{
            "action": "drop_column",
            "details": "Drop redundant columns like 'SalesOrderID.1' and 'ModifiedDate.1'."
        }},
        {{
            "action": "handle_ids",
            "details": "Ensure 'SalesOrderNumber' and 'ProductID' are treated as unique identifiers by removing non-numeric characters and converting to string."
        }},
        {{
            "action": "handle_dates",
            "details": "Convert columns 'OrderDate', 'DueDate', 'ShipDate' to datetime objects."
        }},
        {{
            "action": "validate_relationships",
            "details": "Ensure 'OrderDate' is before 'ShipDate' to maintain logical consistency."
        }},
        {{
            "action": "impute_missing_values",
            "details": "Fill missing values in numeric columns like 'OrganizationLevel' with the median."
        }}
    ]

    Do not return any text or explanations outside the JSON object.
    Here is a sample of the DataFrame ({sample_size} random rows):
    {sample_df.to_dict()}
    """
    
    response = llm_model.invoke(prompt)
    sanitized_content = response.content.strip()
    match = re.search(r'\[.*\]', sanitized_content, re.DOTALL)
    if match:
        json_content = match.group(0)
    else:
        json_content = sanitized_content
    
    try:
        cleaning_plan = json.loads(json_content)
        print("✅ Cleaning plan generated successfully.")
        return {"cleaning_plan": cleaning_plan}
    except json.JSONDecodeError as e:
        print(f"❌ JSON decoding error: {e}")
        print(f"LLM response: {response.content}")
        return {"cleaning_plan": None}

def data_cleaning_executor(state: GraphState):
    """
    Executes the cleaning plan on the entire DataFrame.
    """
    print("---EXECUTING CLEANING PLAN---")
    df = state.get("dataframe")
    cleaning_plan = state.get("cleaning_plan")

    if df is None or cleaning_plan is None:
        print("❌ Missing DataFrame or cleaning plan. Skipping executor.")
        return {"dataframe": None}
    
    cleaned_df = df.copy()

    print("---Executing Automatic Cleaning Rules---")
    cols_to_drop = []
    for col in cleaned_df.columns:
        if cleaned_df[col].nunique(dropna=False) <= 1:
            print(f"    - Dropping column '{col}' due to constant values.")
            cols_to_drop.append(col)
        elif cleaned_df[col].isnull().mean() > 0.40:
            print(f"    - Dropping column '{col}' due to high percentage of missing values (>40%).")
            cols_to_drop.append(col)
    
    if cols_to_drop:
        cleaned_df.drop(columns=cols_to_drop, inplace=True)

    TEXT_MAPPING = {
        'ny': 'new york',
        'alex': 'alexandria',
    }

    try:
        for action in cleaning_plan:
            action_type = action.get("action")
            details = action.get("details")

            if action_type == "drop_column":
                print(f"    - Dropping column: {details}")
                columns_to_drop = re.findall(r"['\"](.*?)['\"]", details)
                for col in columns_to_drop:
                    if col in cleaned_df.columns:
                        cleaned_df.drop(col, axis=1, inplace=True)
            
            elif action_type == "handle_ids":
                print(f"    - Handling ID columns: {details}")
                for col in [c.strip().replace("'", "") for c in re.findall(r"'([^']*)'", details)]:
                    if col in cleaned_df.columns:
                        cleaned_df[col] = cleaned_df[col].astype(str).str.extract('(\d+)').astype(str)

            elif action_type == "unify_format":
                print(f"    - Unifying format and handling hidden nulls: {details}")
                cleaned_df.replace(['-', 'NA', '', ' '], np.nan, inplace=True)
            
            elif action_type == "standardize_text":
                print(f"    - Standardizing text: {details}")
                for col in cleaned_df.columns:
                    if pd.api.types.is_object_dtype(cleaned_df[col]):
                        cleaned_df[col] = cleaned_df[col].astype(str).str.lower().str.strip()

            elif action_type == "standardize_text_complex":
                print(f"    - Complex text standardization: {details}")
                for col in cleaned_df.columns:
                    if pd.api.types.is_object_dtype(cleaned_df[col]):
                        cleaned_df[col] = cleaned_df[col].astype(str).str.lower().str.replace(' ', '').map(TEXT_MAPPING).fillna(cleaned_df[col])

            elif action_type == "impute_missing_values":
                print(f"    - Imputing missing values: {details}")
                for col in cleaned_df.columns:
                    if cleaned_df[col].isnull().any():
                        if pd.api.types.is_numeric_dtype(cleaned_df[col]):
                            cleaned_df[col] = cleaned_df[col].fillna(cleaned_df[col].median())
                        elif pd.api.types.is_object_dtype(cleaned_df[col]):
                            mode_val = cleaned_df[col].mode()
                            if not mode_val.empty:
                                cleaned_df[col] = cleaned_df[col].fillna(mode_val[0])

            elif action_type == "handle_dates":
                print(f"    - Handling dates: {details}")
                for col in [c.strip().replace("'", "") for c in re.findall(r"'([^']*)'", details)]:
                    if col in cleaned_df.columns:
                        cleaned_df[col] = pd.to_datetime(cleaned_df[col], errors='coerce')
            
            elif action_type == "validate_relationships":
                print(f"    - Validating logical relationships: {details}")
                if 'OrderDate' in cleaned_df.columns and 'ShipDate' in cleaned_df.columns:
                    cleaned_df.drop(cleaned_df[cleaned_df['OrderDate'] > cleaned_df['ShipDate']].index, inplace=True)

            elif action_type == "handle_numeric_values":
                print(f"    - Handling numeric values: {details}")
                numerical_cols_from_plan = [col.strip().replace("'", "") for col in re.findall(r"'([^']*)'", details)]
                for col in numerical_cols_from_plan:
                    if col in cleaned_df.columns:
                        cleaned_df[col] = pd.to_numeric(cleaned_df[col], errors='coerce')
                        cleaned_df.loc[cleaned_df[col] < 0, col] = np.nan
                        Q1 = cleaned_df[col].quantile(0.25)
                        Q3 = cleaned_df[col].quantile(0.75)
                        IQR = Q3 - Q1
                        lower_bound = Q1 - 1.5 * IQR
                        upper_bound = Q3 + 1.5 * IQR
                        cleaned_df.loc[(cleaned_df[col] < lower_bound) | (cleaned_df[col] > upper_bound), col] = np.nan
            
            elif action_type == "remove_duplicates":
                print(f"    - Removing duplicates: {details}")
                cleaned_df.drop_duplicates(inplace=True)

            elif action_type == "handle_missing_values":
                print(f"    - Handling missing values: {details}")
                cleaned_df.dropna(subset=['CustomerID', 'SalesOrderID'], inplace=True)
            
        print("✅ Cleaning plan executed successfully.")
        return {"dataframe": cleaned_df}

    except Exception as e:
        print(f"❌ Error during execution: {e}")
        return {"dataframe": None}

def kpi_advisor(state: GraphState):
    """
    Uses an LLM to generate a JSON plan for KPI calculation and trends based on cleaned data.
    """
    print("---GENERATING KPI CALCULATION PLAN---")
    df = state.get("dataframe")
    data_type = state.get("data_type")

    if df is None:
        print("❌ No DataFrame found in state. Skipping KPI advisor.")
        return {"kpi_plan": None}

    columns = df.columns.tolist()

    prompt = f"""
    You are a data analyst expert. Your task is to analyze the columns of a cleaned DataFrame and provide a JSON plan to calculate Key Performance Indicators (KPIs) and identify key trends. The data has been identified as '{data_type}'.
    
    Your response must be ONLY a valid JSON object. The JSON should be a list of objects, where each object represents a KPI or trend to be calculated.
    
    Each object must have two keys:
    - "kpi_name": A descriptive name for the KPI or trend (e.g., "Total Revenue", "Top 5 Selling Products").
    - "calculation_details": A detailed description of the columns to use and the mathematical/analytical operation to perform, in natural language. This will be given to a Pandas Agent.

    If you cannot find suitable columns for a specific KPI, do not include it.
    
    Return ONLY a valid JSON object, with no extra text, explanation, or punctuation.
    
    Here are the available columns in the cleaned DataFrame: {columns}.
    
    Example for 'employees' data:
    [
        {{
            "kpi_name": "Average Employee Salary",
            "calculation_details": "Calculate the mean of the 'Salary' or 'Rate' column."
        }},
        {{
            "kpi_name": "Gender Distribution",
            "calculation_details": "Count the occurrences of each gender in the 'Gender' column and return a dictionary."
        }}
    ]

    Example for 'sales' data:
    [
        {{
            "kpi_name": "Total Revenue",
            "calculation_details": "Calculate the sum of the 'TotalDue' column."
        }},
        {{
            "kpi_name": "Average Order Value",
            "calculation_details": "Calculate the average of the 'TotalDue' column, grouped by a unique order identifier if available. Otherwise, calculate the overall average."
        }}
    ]
    """
    
    response = llm_model.invoke(prompt)
    sanitized_content = response.content.strip()
    match = re.search(r'\[.*\]', sanitized_content, re.DOTALL)
    if match:
        json_content = match.group(0)
    else:
        json_content = sanitized_content
    
    try:
        kpi_plan = json.loads(json_content)
        print("✅ KPI calculation plan generated successfully.")
        return {"kpi_plan": kpi_plan}
    except json.JSONDecodeError as e:
        print(f"❌ JSON decoding error from KPI plan: {e}")
        print(f"LLM response: {response.content}")
        return {"kpi_plan": None}

def kpi_executor(state: GraphState):
    """
    Generates and executes Python code to calculate KPIs.
    """
    print("---GENERATING AND EXECUTING KPI CALCULATION CODE---")
    df = state.get("dataframe")
    kpi_plan = state.get("kpi_plan")
    kpis = {}

    if df is None or kpi_plan is None:
        print("❌ Missing DataFrame or KPI plan. Skipping executor.")
        return {"kpis": None}

    code_generation_prompt = f"""
    You are an expert Python data analyst. Your task is to write Python code to calculate a list of Key Performance Indicators (KPIs) based on a pandas DataFrame named `df`.
    The code should calculate the KPIs and store the results in a dictionary named `results`.

    Here is the list of KPIs to calculate:
    {json.dumps(kpi_plan, indent=2)}

    Your response must be ONLY the Python code block. Do NOT include any explanations, Markdown, or surrounding text.
    """

    try:
        code_response = llm_model.invoke(code_generation_prompt)
        code_block = re.search(r'```python(.*?)```', code_response.content, re.DOTALL)
        if code_block:
            code_to_execute = code_block.group(1).strip()
        else:
            code_to_execute = code_response.content.strip()

        safe_globals = {'pd': pd, 'np': np, 'df': df}
        safe_locals = {'results': {}}

        exec(code_to_execute, safe_globals, safe_locals)
        kpis = safe_locals.get('results', {})
        
        def sanitize_value(value):
            if isinstance(value, (dict, list)):
                return sanitize_dict_or_list(value)
            elif isinstance(value, (pd.Series, pd.Index, np.ndarray)):
                return value.tolist()
            elif isinstance(value, (float, np.float64)):
                return round(float(value), 2)
            else:
                return str(value)
        
        def sanitize_dict_or_list(obj):
            if isinstance(obj, dict):
                return {sanitize_value(k): sanitize_value(v) for k, v in obj.items()}
            elif isinstance(obj, list):
                return [sanitize_value(item) for item in obj]
            else:
                return str(obj)

        sanitized_kpis = sanitize_dict_or_list(kpis)

        print("✅ KPIs calculated successfully.")
        return {"kpis": sanitized_kpis}

    except Exception as e:
        print(f"❌ An error occurred during KPI calculation: {e}")
        return {"kpis": {"error": f"An error occurred during KPI calculation: {e}"}}

def sales_analysis_and_recommendations_generator(state: GraphState):
    """
    Generates a comprehensive sales report including both analysis and recommendations, using a powerful, specific prompt.
    """
    print("---GENERATING SALES REPORT---")
    kpis = state.get("kpis")
    df = state.get("dataframe")

    if not kpis or 'error' in kpis or df is None:
        print("❌ Missing KPIs or DataFrame. Skipping report generation.")
        return {"analysis_report": "Unable to generate a comprehensive sales report due to missing or invalid data."}
    
    kpis_text = json.dumps(kpis, indent=2)
    sample_df = df.sample(n=min(100, len(df)), random_state=42)

    prompt = f"""
    You are a professional and excellent data analyst and a highly skilled Sales Recommendation Agent.
    
    Your task is to generate a single, detailed, and actionable sales report based on the provided data. The report must be structured in two main parts:
    
    **Part 1: Sales Analysis Report**
    1.  A brief introduction summarizing the main findings.
    2.  An analysis of each key performance indicator with an explanation of its importance.
    3.  A "Key Insights" section that draws deeper conclusions from the numbers and trends, such as sales trends, regional performance, or top-selling products.
    4.  A conclusion that provides a high-level summary.
    
    **Part 2: Actionable Recommendations**
    This part must be structured exactly as follows, using the insights from the analysis.
    
    1. Short-Term Plan (0-3 months)
    - Goal: Increase total orders by a data-driven percentage.
    - Analysis:
      - Current monthly orders.
      - Best performing region(s) and underperforming region(s).
      - Key trends or anomalies.
    - Recommendations / Actions:
      - Digital marketing campaigns: budget %, target regions, expected impact.
      - Incentive programs for sales reps: bonus per extra order, criteria.
      - Training programs: tailored training per sales rep based on performance.
      - Reasoning for each action.
    - Scenarios:
      - Best Case: projected orders and revenue.
      - Moderate Case: realistic projection.
      - Worst Case: flat growth and mitigation actions.
      - Risk Management: how to reallocate resources if results are below expectations.
    
    2. Mid-Term Plan (3-6 months)
    - Goal: Increase Average Order Value (AOV) and improve margins.
    - Analysis:
      - Current AOV.
      - Top-selling products.
      - High-margin products.
    - Recommendations / Actions:
      - Cross-selling and bundling strategies.
      - Pricing adjustments for high-margin products.
      - Reasoning for each action.
    - Scenarios:
      - Best / Moderate / Worst case with expected AOV and revenue impact.
    - Risk Management: adjustments if adoption is lower than expected.
    
    3. Long-Term Plan (6+ months)
    - Goal: Increase annual sales growth and expand market share.
    - Analysis:
      - Current annual growth rate.
      - Untapped regions and top-performing channels.
    - Recommendations / Actions:
      - Geographic expansion and investment amount.
      - Channel development budget and strategies.
      - Social selling: train reps to engage on LinkedIn/Twitter.
      - Reasoning for each action.
    - Scenarios:
      - Best / Moderate / Worst case with expected growth percentages.
      - Risk Management: how to reduce investment or redirect resources in case of failure.
    
    Output Format:
    - Use clear markdown headings for both parts and sub-sections.
    - Do not just list the numbers; provide real, insightful analysis and connect the dots between the different metrics.
    - Here are the KPIs for your analysis. Use these numbers directly in your report.
    {kpis_text}
    
    Here is a sample of the raw data for additional context:
    {sample_df.to_dict()}
    """
    try:
        report_response = llm_model.invoke(prompt)
        report_text = report_response.content
        print("✅ Sales report generated successfully.")
    except Exception as e:
        print(f"❌ Error generating sales report with LLM: {e}")
        report_text = "An error occurred while generating the sales report. Please try again."

    return {"analysis_report": report_text}

def employee_analysis_and_recommendations_generator(state: GraphState):
    """
    Generates a comprehensive employee report with analysis and recommendations.
    """
    print("---GENERATING EMPLOYEES REPORT---")
    kpis = state.get("kpis")
    df = state.get("dataframe")

    if not kpis or 'error' in kpis or df is None:
        print("❌ Missing KPIs or DataFrame. Skipping report generation.")
        return {"analysis_report": "Unable to generate a comprehensive employee report due to missing or invalid data."}

    kpis_text = json.dumps(kpis, indent=2)
    sample_df = df.sample(n=min(100, len(df)), random_state=42)

    prompt = f"""
    You are a professional and excellent HR Data Analyst and a highly skilled Employee Performance and Retention Advisor.
    
    Your task is to generate a single, detailed, and actionable report based on the provided employee data. The report must be structured in two main parts:
    
    **Part 1: Employee Analysis Report**
    1.  A brief introduction summarizing the main findings.
    2.  An analysis of each key performance indicator (KPI), explaining its significance (e.g., average salary, department-wise distribution, retention rate).
    3.  A "Key Insights" section that draws deeper conclusions from the numbers and trends, such as factors influencing low performance, salary disparities, or departments with high turnover.
    4.  A conclusion that provides a high-level summary.
    
    **Part 2: Actionable Recommendations**
    This part must be structured exactly as follows, using the insights from the analysis.
    
    1.  Short-Term Plan (0-3 months)
    -   **Goal:** Address immediate performance and morale issues.
    -   **Analysis:** Current performance metrics (if available), salary distribution analysis, recent turnover trends.
    -   **Recommendations / Actions:** Implement performance improvement plans for underperforming employees. Conduct anonymous surveys to gauge morale. Review and adjust entry-level salaries in underpaid departments.
    -   **Reasoning:** Link each action to a specific insight from your analysis (e.g., "Adjusting salaries in 'Sales' is crucial due to high turnover rates shown in the data.").

    2.  Mid-Term Plan (3-6 months)
    -   **Goal:** Improve employee retention and engagement.
    -   **Analysis:** Analyze historical data to identify trends in employee departures. Examine training participation rates.
    -   **Recommendations / Actions:** Launch a professional development budget for each employee. Create a mentorship program to boost internal promotions.
    -   **Reasoning:** Justify why these actions will lead to higher retention (e.g., "Investing in development shows commitment and reduces the likelihood of top talent leaving for better opportunities.").

    3.  Long-Term Plan (6+ months)
    -   **Goal:** Foster a strong company culture and build a robust talent pipeline.
    -   **Analysis:** Review long-term growth and skill gaps within the company.
    -   **Recommendations / Actions:** Design a comprehensive leadership training program. Establish a formal succession planning process for key roles. Develop a plan for long-term hiring and market share expansion.
    -   **Reasoning:** Explain how these actions will benefit the company's long-term health (e.g., "A strong internal pipeline ensures business continuity and reduces hiring costs.").
    
    Output Format:
    - Use clear markdown headings for both parts and sub-sections.
    - Do not just list the numbers; provide real, insightful analysis and connect the dots between the different metrics.
    - Here are the KPIs for your analysis. Use these numbers directly in your report.
    {kpis_text}
    
    Here is a sample of the raw data for additional context:
    {sample_df.to_dict()}
    """
    try:
        report_response = llm_model.invoke(prompt)
        report_text = report_response.content
        print("✅ Employee report generated successfully.")
    except Exception as e:
        print(f"❌ Error generating employee report with LLM: {e}")
        report_text = "An error occurred while generating the employee report. Please try again."

    return {"analysis_report": report_text}

def save_outputs(state: GraphState):
    """
    Saves the cleaned DataFrame and generated reports to files.
    """
    print("---SAVING OUTPUTS---")
    output_dir = "final_output"
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    df = state.get('dataframe')
    if df is not None:
        df.to_csv(os.path.join(output_dir, 'cleaned_data.csv'), index=False)
        print(f"✅ Cleaned DataFrame saved to '{os.path.join(output_dir, 'cleaned_data.csv')}'")

    analysis_report = state.get('analysis_report')
    if analysis_report is not None:
        data_type = state.get('data_type', 'report')
        report_filename = f"{data_type}_analysis_and_recommendations_report.txt"
        with open(os.path.join(output_dir, report_filename), 'w', encoding='utf-8') as f:
            f.write(analysis_report)
        print(f"✅ Comprehensive report saved to '{os.path.join(output_dir, report_filename)}'")

    print("---OUTPUTS SAVED SUCCESSFULLY---")
    return {}

def final_output_viewer(state: GraphState):
    """
    Displays the final outputs for user review before manual saving.
    """
    print("--- ALL PROCESSES COMPLETE ---")
    print("\n--- Final Graph State ---")
    print(f"Data type: {state.get('data_type')}")
    print(f"DataFrame shape after cleaning: {state.get('dataframe').shape if state.get('dataframe') is not None else 'None'}")
    
    kpis = state.get('kpis')
    print("Calculated KPIs:")
    if kpis and 'error' in kpis:
        print(f"❌ Error during KPI calculation: {kpis['error']}")
    else:
        print(json.dumps(kpis, indent=2))
    
    print("\n--- Generated Report ---")
    print(f"Report: {state.get('analysis_report')}")

    print("\n✅ The process has finished. You can now check the 'final_output' directory for the saved files.")
    return {}

# --- 3. Build the LangGraph ---
workflow = StateGraph(GraphState)

# Add all nodes
workflow.add_node("loader", data_loader_and_identifier)
workflow.add_node("advisor", data_cleaning_advisor)
workflow.add_node("executor", data_cleaning_executor)
workflow.add_node("kpi_advisor", kpi_advisor)
workflow.add_node("kpi_executor", kpi_executor)
workflow.add_node("sales_analysis_agent", sales_analysis_and_recommendations_generator)
workflow.add_node("employee_analysis_agent", employee_analysis_and_recommendations_generator)
workflow.add_node("save_outputs", save_outputs)
workflow.add_node("final_output_viewer", final_output_viewer)

# Conditional Nodes (Routers)
def check_cleaning_plan(state):
    return "skip_cleaning" if state.get("cleaning_plan") is None else "continue"

def route_to_analysis_agent(state):
    data_type = state.get("data_type")
    if data_type == "employees":
        return "employee_analysis_agent"
    elif data_type == "sales":
        return "sales_analysis_agent"
    else:
        return "final_output_viewer"

# Define the graph flow
workflow.set_entry_point("loader")
workflow.add_edge("loader", "advisor")

# The alternative path logic
workflow.add_conditional_edges(
    "advisor",
    check_cleaning_plan,
    {"continue": "executor", "skip_cleaning": "kpi_advisor"}
)

# Connect the main path
workflow.add_edge("executor", "kpi_advisor")
workflow.add_edge("kpi_advisor", "kpi_executor")

# Route to the correct analysis agent based on data type
workflow.add_conditional_edges(
    "kpi_executor",
    route_to_analysis_agent,
    {
        "sales_analysis_agent": "sales_analysis_agent",
        "employee_analysis_agent": "employee_analysis_agent",
        "final_output_viewer": "final_output_viewer"
    }
)

# After analysis, all paths converge to saving outputs
workflow.add_edge("sales_analysis_agent", "save_outputs")
workflow.add_edge("employee_analysis_agent", "save_outputs")

workflow.add_edge("save_outputs", "final_output_viewer")

# End the graph
workflow.add_edge("final_output_viewer", END)

app = workflow.compile()

# --- 4. Run the Graph and Test the Output ---
if __name__ == "__main__":
    initial_state = {"data_type": None, "dataframe": None, "cleaning_plan": None, "kpi_plan": None, "kpis": None, "analysis_report": None}
    print("--- Starting the LangGraph workflow ---")
    final_state = app.invoke(initial_state)

--- Starting the LangGraph workflow ---
---LOADING AND IDENTIFYING DATA---
✅ Data loaded successfully. Identified as 'sales'.
---GENERATING CLEANING PLAN---
❌ JSON decoding failed: Extra data: line 1 column 77 (char 76)
--- LLM Response that caused the error ---

To create a pandas DataFrame from the provided dictionary, we need to ensure the dictionary is properly structured and then convert it using `pd.DataFrame()`. Here's the step-by-step solution:

```python
import pandas as pd

# Provided dictionary (keys are column names, values are lists of data)
data = {
    'SalesOrderID': [72924, 204656, 282769, 29472, 57033, 225558, 142542, 27719, 110217, 296229],
    'SalesOrderDetailID': [23384, 73992, 107137, 9407, 18310, 82987, 47848, 8834, 34921, 112680],
    'OrderQty': [2, 1, 1, 2, 2, 5, 1, 1, 2, 1],
    'ProductID': [722, 707, 707, 712, 765, 977, 935, 748, 722, 905],
    'SpecialOfferID': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
    'UnitPrice': [183.9382, 34.99, 34.99, 5.1865, 469.794, 323.

KeyboardInterrupt: 

In [1]:
import pandas as pd
import numpy as np
import os
import json
import re
from langchain_openai import ChatOpenAI
from typing import TypedDict, Optional, List, Dict, Any
from langgraph.graph import StateGraph, END

# --- 1. Define LLM and Graph State ---
llm_model = ChatOpenAI(
    model="z-ai/glm-4.5-air:free",
    openai_api_key=os.getenv("OPENROUTER_API_KEY"),
    base_url="https://openrouter.ai/api/v1"
)

class GraphState(TypedDict):
    """
    Represents the state of the graph.
    Attributes:
        data_type: The type of data identified (e.g., 'employees').
        dataframe: The loaded pandas DataFrame.
        cleaning_plan: The JSON plan from the LLM.
        kpi_plan: The JSON plan from the LLM for KPI calculation.
        kpis: A dictionary of calculated key performance indicators.
        analysis_report: The final, generated report text.
    """
    data_type: Optional[str]
    dataframe: Optional[pd.DataFrame]
    cleaning_plan: Optional[List[Dict]]
    kpi_plan: Optional[List[Dict]]
    kpis: Optional[Dict[str, Any]]
    analysis_report: Optional[str]

# --- Helper Function for Robust JSON Parsing ---
def extract_and_parse_json(llm_content: str) -> Optional[List[Dict]]:
    """
    Extracts a JSON list from the LLM's response, handling markdown code blocks.
    """
    # 1. First, try to find a JSON block inside ```json ... ```
    match = re.search(r'```json\s*(\[.*?\])\s*```', llm_content, re.DOTALL)
    if match:
        json_str = match.group(1)
    else:
        # 2. If not found, try to find any list [...] in the content
        match = re.search(r'(\[.*\])', llm_content, re.DOTALL)
        if match:
            json_str = match.group(0)
        else:
            # 3. If still no list found, the content itself might be the JSON
            json_str = llm_content

    try:
        # 4. Try to parse the extracted string
        return json.loads(json_str)
    except json.JSONDecodeError as e:
        # 5. If parsing fails, print the error and the problematic content
        print(f"❌ JSON decoding failed: {e}")
        print(f"--- LLM Response that caused the error ---")
        print(llm_content)
        print("-----------------------------------------")
        return None

# --- 2. Define Graph Nodes (Functions) ---

def classify_data_with_llm(columns: list) -> str:
    """
    Uses an LLM to classify the data type based on a list of column names.
    """
    system_prompt = (
        "You are an expert data classifier. Your task is to analyze a list of CSV "
        "column names and determine the primary topic of the dataset. "
        "You must respond with a single, lowercase word from the following list: "
        "'employees', 'sales', 'customers', 'products', 'marketing', 'finance', 'logistics', 'support', 'unknown'."
    )
    user_prompt = f"""
    Based on the following list of column names, what is the main topic of the data?
    
    Column Names: {', '.join(columns)}
    
    Return ONLY one word from the allowed list, with no extra text, explanation, or punctuation.
    """
    response = llm_model.invoke(f"{system_prompt}\n\n{user_prompt}")
    return response.content.strip().lower()

def data_loader_and_identifier(state: GraphState):
    """Loads data.csv and uses an LLM to identify its type."""
    print("---LOADING AND IDENTIFYING DATA---")
    notebook_dir = os.path.dirname(os.path.abspath('__file__'))
    file_path = os.path.join(notebook_dir, 'data_collection', 'data.csv')
    
    if not os.path.exists(file_path):
        print(f"❌ Error: The file '{file_path}' was not found.")
        return {"data_type": "error", "dataframe": None}
    
    try:
        df = pd.read_csv(file_path)
    except Exception as e:
        print(f"❌ Error reading CSV: {e}")
        return {"data_type": "error", "dataframe": None}
    
    data_type = classify_data_with_llm(df.columns.tolist())
    
    print(f"✅ Data loaded successfully. Identified as '{data_type}'.")
    return {"data_type": data_type, "dataframe": df}

def data_cleaning_advisor(state: GraphState):
    """
    Asks the LLM to generate a JSON cleaning plan based on a data sample.
    """
    print("---GENERATING CLEANING PLAN---")
    df = state.get("dataframe")

    if df is None:
        print("❌ No DataFrame found in state. Skipping advisor.")
        return {"cleaning_plan": None}
    
    sample_size = min(100, len(df))
    sample_df = df.sample(n=sample_size, random_state=42)
    
    prompt = f"""
    You are a data cleaning expert. Your task is to analyze a sample of a DataFrame and recommend a list of cleaning actions.
    Your response must be ONLY a valid JSON object. The JSON should be a list of objects, where each object represents a cleaning action.
    
    Each action object must have two keys:
    - "action": A string representing the type of cleaning operation. Possible values are: "remove_duplicates", "drop_column", "handle_ids", "unify_format", "standardize_text", "standardize_text_complex", "handle_dates", "handle_numeric_values", "validate_relationships", "impute_missing_values".
    - "details": A string providing the name of the column(s) and any specific instructions for the action.

    Do not recommend dropping columns that contain constant values or have a high percentage of nulls; those are handled separately. Focus on logical cleaning tasks.
    
    Example JSON response:
    [
        {{
            "action": "drop_column",
            "details": "Drop redundant columns like 'SalesOrderID.1' and 'ModifiedDate.1'."
        }},
        {{
            "action": "handle_ids",
            "details": "Ensure 'SalesOrderNumber' and 'ProductID' are treated as unique identifiers by removing non-numeric characters and converting to string."
        }},
        {{
            "action": "handle_dates",
            "details": "Convert columns 'OrderDate', 'DueDate', 'ShipDate' to datetime objects."
        }},
        {{
            "action": "validate_relationships",
            "details": "Ensure 'OrderDate' is before 'ShipDate' to maintain logical consistency."
        }},
        {{
            "action": "impute_missing_values",
            "details": "Fill missing values in numeric columns like 'OrganizationLevel' with the median."
        }}
    ]

    Do not return any text or explanations outside the JSON object.
    Here is a sample of the DataFrame in CSV format ({sample_size} random rows):
    {sample_df.to_csv(index=False)}
    """
    
    response = llm_model.invoke(prompt)
    
    cleaning_plan = extract_and_parse_json(response.content)
    
    if cleaning_plan:
        print("✅ Cleaning plan generated successfully.")
        return {"cleaning_plan": cleaning_plan}
    else:
        print("❌ Failed to generate or parse the cleaning plan.")
        return {"cleaning_plan": None}

def data_cleaning_executor(state: GraphState):
    """
    Executes the cleaning plan on the entire DataFrame.
    """
    print("---EXECUTING CLEANING PLAN---")
    df = state.get("dataframe")
    cleaning_plan = state.get("cleaning_plan")

    if df is None or cleaning_plan is None:
        print("❌ Missing DataFrame or cleaning plan. Skipping executor.")
        return {"dataframe": None}
    
    cleaned_df = df.copy()

    print("---Executing Automatic Cleaning Rules---")
    cols_to_drop = []
    for col in cleaned_df.columns:
        if cleaned_df[col].nunique(dropna=False) <= 1:
            print(f"     - Dropping column '{col}' due to constant values.")
            cols_to_drop.append(col)
        elif cleaned_df[col].isnull().mean() > 0.40:
            print(f"     - Dropping column '{col}' due to high percentage of missing values (>40%).")
            cols_to_drop.append(col)
    
    if cols_to_drop:
        cleaned_df.drop(columns=cols_to_drop, inplace=True)

    TEXT_MAPPING = {
        'ny': 'new york',
        'alex': 'alexandria',
    }

    try:
        for action in cleaning_plan:
            action_type = action.get("action")
            details = action.get("details")

            if action_type == "drop_column":
                print(f"     - Dropping column: {details}")
                columns_to_drop = re.findall(r"['\"](.*?)['\"]", details)
                for col in columns_to_drop:
                    if col in cleaned_df.columns:
                        cleaned_df.drop(col, axis=1, inplace=True)
            
            elif action_type == "handle_ids":
                print(f"     - Handling ID columns: {details}")
                for col in [c.strip().replace("'", "") for c in re.findall(r"'([^']*)'", details)]:
                    if col in cleaned_df.columns:
                        cleaned_df[col] = cleaned_df[col].astype(str).str.extract('(\d+)').astype(str)

            elif action_type == "unify_format":
                print(f"     - Unifying format and handling hidden nulls: {details}")
                cleaned_df.replace(['-', 'NA', '', ' '], np.nan, inplace=True)
            
            elif action_type == "standardize_text":
                print(f"     - Standardizing text: {details}")
                for col in cleaned_df.columns:
                    if pd.api.types.is_object_dtype(cleaned_df[col]):
                        cleaned_df[col] = cleaned_df[col].astype(str).str.lower().str.strip()

            elif action_type == "standardize_text_complex":
                print(f"     - Complex text standardization: {details}")
                for col in cleaned_df.columns:
                    if pd.api.types.is_object_dtype(cleaned_df[col]):
                        cleaned_df[col] = cleaned_df[col].astype(str).str.lower().str.replace(' ', '').map(TEXT_MAPPING).fillna(cleaned_df[col])

            elif action_type == "impute_missing_values":
                print(f"     - Imputing missing values: {details}")
                for col in cleaned_df.columns:
                    if cleaned_df[col].isnull().any():
                        if pd.api.types.is_numeric_dtype(cleaned_df[col]):
                            cleaned_df[col] = cleaned_df[col].fillna(cleaned_df[col].median())
                        elif pd.api.types.is_object_dtype(cleaned_df[col]):
                            mode_val = cleaned_df[col].mode()
                            if not mode_val.empty:
                                cleaned_df[col] = cleaned_df[col].fillna(mode_val[0])

            elif action_type == "handle_dates":
                print(f"     - Handling dates: {details}")
                for col in [c.strip().replace("'", "") for c in re.findall(r"'([^']*)'", details)]:
                    if col in cleaned_df.columns:
                        cleaned_df[col] = pd.to_datetime(cleaned_df[col], errors='coerce')
            
            elif action_type == "validate_relationships":
                print(f"     - Validating logical relationships: {details}")
                if 'OrderDate' in cleaned_df.columns and 'ShipDate' in cleaned_df.columns:
                    cleaned_df.drop(cleaned_df[cleaned_df['OrderDate'] > cleaned_df['ShipDate']].index, inplace=True)

            elif action_type == "handle_numeric_values":
                print(f"     - Handling numeric values: {details}")
                numerical_cols_from_plan = [col.strip().replace("'", "") for col in re.findall(r"'([^']*)'", details)]
                for col in numerical_cols_from_plan:
                    if col in cleaned_df.columns:
                        cleaned_df[col] = pd.to_numeric(cleaned_df[col], errors='coerce')
                        cleaned_df.loc[cleaned_df[col] < 0, col] = np.nan
                        Q1 = cleaned_df[col].quantile(0.25)
                        Q3 = cleaned_df[col].quantile(0.75)
                        IQR = Q3 - Q1
                        lower_bound = Q1 - 1.5 * IQR
                        upper_bound = Q3 + 1.5 * IQR
                        cleaned_df.loc[(cleaned_df[col] < lower_bound) | (cleaned_df[col] > upper_bound), col] = np.nan
            
            elif action_type == "remove_duplicates":
                print(f"     - Removing duplicates: {details}")
                cleaned_df.drop_duplicates(inplace=True)

            elif action_type == "handle_missing_values":
                print(f"     - Handling missing values: {details}")
                cleaned_df.dropna(subset=['CustomerID', 'SalesOrderID'], inplace=True)
            
        print("✅ Cleaning plan executed successfully.")
        return {"dataframe": cleaned_df}

    except Exception as e:
        print(f"❌ Error during execution: {e}")
        return {"dataframe": None}

def kpi_advisor(state: GraphState):
    """
    Uses an LLM to generate a JSON plan for KPI calculation and trends based on cleaned data.
    """
    print("---GENERATING KPI CALCULATION PLAN---")
    df = state.get("dataframe")
    data_type = state.get("data_type")

    if df is None:
        print("❌ No DataFrame found in state. Skipping KPI advisor.")
        return {"kpi_plan": None}

    columns = df.columns.tolist()

    prompt = f"""
    You are a data analyst expert. Your task is to analyze the columns of a cleaned DataFrame and provide a JSON plan to calculate Key Performance Indicators (KPIs) and identify key trends. The data has been identified as '{data_type}'.
    
    Your response must be ONLY a valid JSON object. The JSON should be a list of objects, where each object represents a KPI or trend to be calculated.
    
    Each object must have two keys:
    - "kpi_name": A descriptive name for the KPI or trend (e.g., "Total Revenue", "Top 5 Selling Products").
    - "calculation_details": A detailed description of the columns to use and the mathematical/analytical operation to perform, in natural language. This will be given to a Pandas Agent.

    If you cannot find suitable columns for a specific KPI, do not include it.
    
    Return ONLY a valid JSON object, with no extra text, explanation, or punctuation.
    
    Here are the available columns in the cleaned DataFrame: {columns}.
    
    Example for 'employees' data:
    [
        {{
            "kpi_name": "Average Employee Salary",
            "calculation_details": "Calculate the mean of the 'Salary' or 'Rate' column."
        }},
        {{
            "kpi_name": "Gender Distribution",
            "calculation_details": "Count the occurrences of each gender in the 'Gender' column and return a dictionary."
        }}
    ]

    Example for 'sales' data:
    [
        {{
            "kpi_name": "Total Revenue",
            "calculation_details": "Calculate the sum of the 'TotalDue' column."
        }},
        {{
            "kpi_name": "Average Order Value",
            "calculation_details": "Calculate the average of the 'TotalDue' column, grouped by a unique order identifier if available. Otherwise, calculate the overall average."
        }}
    ]
    """
    
    response = llm_model.invoke(prompt)
    
    kpi_plan = extract_and_parse_json(response.content)
    
    if kpi_plan:
        print("✅ KPI calculation plan generated successfully.")
        return {"kpi_plan": kpi_plan}
    else:
        print("❌ Failed to generate or parse the KPI plan.")
        return {"kpi_plan": None}


def kpi_executor(state: GraphState):
    """
    Generates and executes Python code to calculate KPIs.
    """
    print("---GENERATING AND EXECUTING KPI CALCULATION CODE---")
    df = state.get("dataframe")
    kpi_plan = state.get("kpi_plan")
    kpis = {}

    if df is None or kpi_plan is None:
        print("❌ Missing DataFrame or KPI plan. Skipping executor.")
        return {"kpis": None}

    code_generation_prompt = f"""
    You are an expert Python data analyst. Your task is to write Python code to calculate a list of Key Performance Indicators (KPIs) based on a pandas DataFrame named `df`.
    The code should calculate the KPIs and store the results in a dictionary named `results`.

    Here is the list of KPIs to calculate:
    {json.dumps(kpi_plan, indent=2)}

    Your response must be ONLY the Python code block. Do NOT include any explanations, Markdown, or surrounding text.
    """

    try:
        code_response = llm_model.invoke(code_generation_prompt)
        code_block = re.search(r'```python(.*?)```', code_response.content, re.DOTALL)
        if code_block:
            code_to_execute = code_block.group(1).strip()
        else:
            code_to_execute = code_response.content.strip()

        safe_globals = {'pd': pd, 'np': np, 'df': df}
        safe_locals = {'results': {}}

        exec(code_to_execute, safe_globals, safe_locals)
        kpis = safe_locals.get('results', {})
        
        def sanitize_value(value):
            if isinstance(value, (dict, list)):
                return sanitize_dict_or_list(value)
            elif isinstance(value, (pd.Series, pd.Index, np.ndarray)):
                return value.tolist()
            elif isinstance(value, (float, np.float64)):
                return round(float(value), 2)
            else:
                return str(value)
        
        def sanitize_dict_or_list(obj):
            if isinstance(obj, dict):
                return {sanitize_value(k): sanitize_value(v) for k, v in obj.items()}
            elif isinstance(obj, list):
                return [sanitize_value(item) for item in obj]
            else:
                return str(obj)

        sanitized_kpis = sanitize_dict_or_list(kpis)

        print("✅ KPIs calculated successfully.")
        return {"kpis": sanitized_kpis}

    except Exception as e:
        print(f"❌ An error occurred during KPI calculation: {e}")
        return {"kpis": {"error": f"An error occurred during KPI calculation: {e}"}}

def sales_analysis_and_recommendations_generator(state: GraphState):
    """
    Generates a comprehensive sales report including both analysis and recommendations, using a powerful, specific prompt.
    """
    print("---GENERATING SALES REPORT---")
    kpis = state.get("kpis")
    df = state.get("dataframe")

    if not kpis or 'error' in kpis or df is None:
        print("❌ Missing KPIs or DataFrame. Skipping report generation.")
        return {"analysis_report": "Unable to generate a comprehensive sales report due to missing or invalid data."}
    
    kpis_text = json.dumps(kpis, indent=2)
    sample_df = df.sample(n=min(100, len(df)), random_state=42)

    prompt = f"""
    You are a professional and excellent data analyst and a highly skilled Sales Recommendation Agent.
    
    Your task is to generate a single, detailed, and actionable sales report based on the provided data. The report must be structured in two main parts:
    
    **Part 1: Sales Analysis Report**
    1.  A brief introduction summarizing the main findings.
    2.  An analysis of each key performance indicator with an explanation of its importance.
    3.  A "Key Insights" section that draws deeper conclusions from the numbers and trends, such as sales trends, regional performance, or top-selling products.
    4.  A conclusion that provides a high-level summary.
    
    **Part 2: Actionable Recommendations**
    This part must be structured exactly as follows, using the insights from the analysis.
    
    1. Short-Term Plan (0-3 months)
    - Goal: Increase total orders by a data-driven percentage.
    - Analysis:
      - Current monthly orders.
      - Best performing region(s) and underperforming region(s).
      - Key trends or anomalies.
    - Recommendations / Actions:
      - Digital marketing campaigns: budget %, target regions, expected impact.
      - Incentive programs for sales reps: bonus per extra order, criteria.
      - Training programs: tailored training per sales rep based on performance.
      - Reasoning for each action.
    - Scenarios:
      - Best Case: projected orders and revenue.
      - Moderate Case: realistic projection.
      - Worst Case: flat growth and mitigation actions.
      - Risk Management: how to reallocate resources if results are below expectations.
    
    2. Mid-Term Plan (3-6 months)
    - Goal: Increase Average Order Value (AOV) and improve margins.
    - Analysis:
      - Current AOV.
      - Top-selling products.
      - High-margin products.
    - Recommendations / Actions:
      - Cross-selling and bundling strategies.
      - Pricing adjustments for high-margin products.
      - Reasoning for each action.
    - Scenarios:
      - Best / Moderate / Worst case with expected AOV and revenue impact.
    - Risk Management: adjustments if adoption is lower than expected.
    
    3. Long-Term Plan (6+ months)
    - Goal: Increase annual sales growth and expand market share.
    - Analysis:
      - Current annual growth rate.
      - Untapped regions and top-performing channels.
    - Recommendations / Actions:
      - Geographic expansion and investment amount.
      - Channel development budget and strategies.
      - Social selling: train reps to engage on LinkedIn/Twitter.
      - Reasoning for each action.
    - Scenarios:
      - Best / Moderate / Worst case with expected growth percentages.
      - Risk Management: how to reduce investment or redirect resources in case of failure.
    
    Output Format:
    - Use clear markdown headings for both parts and sub-sections.
    - Do not just list the numbers; provide real, insightful analysis and connect the dots between the different metrics.
    - Here are the KPIs for your analysis. Use these numbers directly in your report.
    {kpis_text}
    
    Here is a sample of the raw data for additional context:
    {sample_df.to_dict()}
    """
    try:
        report_response = llm_model.invoke(prompt)
        report_text = report_response.content
        print("✅ Sales report generated successfully.")
    except Exception as e:
        print(f"❌ Error generating sales report with LLM: {e}")
        report_text = "An error occurred while generating the sales report. Please try again."

    return {"analysis_report": report_text}

def employee_analysis_and_recommendations_generator(state: GraphState):
    """
    Generates a comprehensive employee report with analysis and recommendations.
    """
    print("---GENERATING EMPLOYEES REPORT---")
    kpis = state.get("kpis")
    df = state.get("dataframe")

    if not kpis or 'error' in kpis or df is None:
        print("❌ Missing KPIs or DataFrame. Skipping report generation.")
        return {"analysis_report": "Unable to generate a comprehensive employee report due to missing or invalid data."}

    kpis_text = json.dumps(kpis, indent=2)
    sample_df = df.sample(n=min(100, len(df)), random_state=42)

    prompt = f"""
    You are a professional and excellent HR Data Analyst and a highly skilled Employee Performance and Retention Advisor.
    
    Your task is to generate a single, detailed, and actionable report based on the provided employee data. The report must be structured in two main parts:
    
    **Part 1: Employee Analysis Report**
    1.  A brief introduction summarizing the main findings.
    2.  An analysis of each key performance indicator (KPI), explaining its significance (e.g., average salary, department-wise distribution, retention rate).
    3.  A "Key Insights" section that draws deeper conclusions from the numbers and trends, such as factors influencing low performance, salary disparities, or departments with high turnover.
    4.  A conclusion that provides a high-level summary.
    
    **Part 2: Actionable Recommendations**
    This part must be structured exactly as follows, using the insights from the analysis.
    
    1.  Short-Term Plan (0-3 months)
    -   **Goal:** Address immediate performance and morale issues.
    -   **Analysis:** Current performance metrics (if available), salary distribution analysis, recent turnover trends.
    -   **Recommendations / Actions:** Implement performance improvement plans for underperforming employees. Conduct anonymous surveys to gauge morale. Review and adjust entry-level salaries in underpaid departments.
    -   **Reasoning:** Link each action to a specific insight from your analysis (e.g., "Adjusting salaries in 'Sales' is crucial due to high turnover rates shown in the data.").

    2.  Mid-Term Plan (3-6 months)
    -   **Goal:** Improve employee retention and engagement.
    -   **Analysis:** Analyze historical data to identify trends in employee departures. Examine training participation rates.
    -   **Recommendations / Actions:** Launch a professional development budget for each employee. Create a mentorship program to boost internal promotions.
    -   **Reasoning:** Justify why these actions will lead to higher retention (e.g., "Investing in development shows commitment and reduces the likelihood of top talent leaving for better opportunities.").

    3.  Long-Term Plan (6+ months)
    -   **Goal:** Foster a strong company culture and build a robust talent pipeline.
    -   **Analysis:** Review long-term growth and skill gaps within the company.
    -   **Recommendations / Actions:** Design a comprehensive leadership training program. Establish a formal succession planning process for key roles. Develop a plan for long-term hiring and market share expansion.
    -   **Reasoning:** Explain how these actions will benefit the company's long-term health (e.g., "A strong internal pipeline ensures business continuity and reduces hiring costs.").
    
    Output Format:
    - Use clear markdown headings for both parts and sub-sections.
    - Do not just list the numbers; provide real, insightful analysis and connect the dots between the different metrics.
    - Here are the KPIs for your analysis. Use these numbers directly in your report.
    {kpis_text}
    
    Here is a sample of the raw data for additional context:
    {sample_df.to_dict()}
    """
    try:
        report_response = llm_model.invoke(prompt)
        report_text = report_response.content
        print("✅ Employee report generated successfully.")
    except Exception as e:
        print(f"❌ Error generating employee report with LLM: {e}")
        report_text = "An error occurred while generating the employee report. Please try again."

    return {"analysis_report": report_text}

def save_outputs(state: GraphState):
    """
    Saves the cleaned DataFrame and generated reports to files.
    """
    print("---SAVING OUTPUTS---")
    output_dir = "final_output"
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    df = state.get('dataframe')
    if df is not None:
        df.to_csv(os.path.join(output_dir, 'cleaned_data.csv'), index=False)
        print(f"✅ Cleaned DataFrame saved to '{os.path.join(output_dir, 'cleaned_data.csv')}'")

    analysis_report = state.get('analysis_report')
    if analysis_report is not None:
        data_type = state.get('data_type', 'report')
        report_filename = f"{data_type}_analysis_and_recommendations_report.txt"
        with open(os.path.join(output_dir, report_filename), 'w', encoding='utf-8') as f:
            f.write(analysis_report)
        print(f"✅ Comprehensive report saved to '{os.path.join(output_dir, report_filename)}'")

    print("---OUTPUTS SAVED SUCCESSFULLY---")
    return {}

def final_output_viewer(state: GraphState):
    """
    Displays the final outputs for user review before manual saving.
    """
    print("--- ALL PROCESSES COMPLETE ---")
    print("\n--- Final Graph State ---")
    print(f"Data type: {state.get('data_type')}")
    print(f"DataFrame shape after cleaning: {state.get('dataframe').shape if state.get('dataframe') is not None else 'None'}")
    
    kpis = state.get('kpis')
    print("Calculated KPIs:")
    if kpis and 'error' in kpis:
        print(f"❌ Error during KPI calculation: {kpis['error']}")
    else:
        print(json.dumps(kpis, indent=2))
    
    print("\n--- Generated Report ---")
    print(f"Report: {state.get('analysis_report')}")

    print("\n✅ The process has finished. You can now check the 'final_output' directory for the saved files.")
    return {}

# --- 3. Build the LangGraph ---
workflow = StateGraph(GraphState)

# Add all nodes
workflow.add_node("loader", data_loader_and_identifier)
workflow.add_node("advisor", data_cleaning_advisor)
workflow.add_node("executor", data_cleaning_executor)
workflow.add_node("kpi_advisor", kpi_advisor)
workflow.add_node("kpi_executor", kpi_executor)
workflow.add_node("sales_analysis_agent", sales_analysis_and_recommendations_generator)
workflow.add_node("employee_analysis_agent", employee_analysis_and_recommendations_generator)
workflow.add_node("save_outputs", save_outputs)
workflow.add_node("final_output_viewer", final_output_viewer)

# Conditional Nodes (Routers)
def check_cleaning_plan(state):
    return "skip_cleaning" if state.get("cleaning_plan") is None else "continue"

def route_to_analysis_agent(state):
    data_type = state.get("data_type")
    if data_type == "employees":
        return "employee_analysis_agent"
    elif data_type == "sales":
        return "sales_analysis_agent"
    else:
        return "final_output_viewer"

# Define the graph flow
workflow.set_entry_point("loader")
workflow.add_edge("loader", "advisor")

# The alternative path logic
workflow.add_conditional_edges(
    "advisor",
    check_cleaning_plan,
    {"continue": "executor", "skip_cleaning": "kpi_advisor"}
)

# Connect the main path
workflow.add_edge("executor", "kpi_advisor")
workflow.add_edge("kpi_advisor", "kpi_executor")

# Route to the correct analysis agent based on data type
workflow.add_conditional_edges(
    "kpi_executor",
    route_to_analysis_agent,
    {
        "sales_analysis_agent": "sales_analysis_agent",
        "employee_analysis_agent": "employee_analysis_agent",
        "final_output_viewer": "final_output_viewer"
    }
)

# After analysis, all paths converge to saving outputs
workflow.add_edge("sales_analysis_agent", "save_outputs")
workflow.add_edge("employee_analysis_agent", "save_outputs")

workflow.add_edge("save_outputs", "final_output_viewer")

# End the graph
workflow.add_edge("final_output_viewer", END)

app = workflow.compile()

# --- 4. Run the Graph and Test the Output ---
if __name__ == "__main__":
    initial_state = {"data_type": None, "dataframe": None, "cleaning_plan": None, "kpi_plan": None, "kpis": None, "analysis_report": None}
    print("--- Starting the LangGraph workflow ---")
    final_state = app.invoke(initial_state)

  from .autonotebook import tqdm as notebook_tqdm


--- Starting the LangGraph workflow ---
---LOADING AND IDENTIFYING DATA---
✅ Data loaded successfully. Identified as 'sales'.
---GENERATING CLEANING PLAN---
✅ Cleaning plan generated successfully.
---EXECUTING CLEANING PLAN---
---Executing Automatic Cleaning Rules---
     - Dropping column 'Status' due to constant values.
     - Dropping column: Drop duplicate columns: 'SpecialOfferID.1', 'ProductID.1', 'SalesOrderID.1', 'TerritoryID.1', 'TerritoryID.2'.
     - Handling dates: Convert columns 'StartDate', 'EndDate', 'OrderDate', 'DueDate', 'ShipDate' to datetime objects.
     - Standardizing text: Standardize text columns: 'Description', 'Status', 'SalesOrderNumber', 'Name', 'CountryRegionCode' by removing extra spaces and converting to title case.
     - Handling ID columns: Convert identifier columns ('ProductID', 'SalesOrderID', 'SalesOrderDetailID', 'CustomerID', 'SalesPersonID', 'TerritoryID', 'BusinessEntityID') to string type.
     - Validating logical relationships: Ensure that

KeyboardInterrupt: 

### **Part 1: Sales Analysis Report**  

#### **1. Introduction**  
The sales data reflects strong overall performance with **$2.71B in total revenue** and a high **Average Order Value (AOV) of $44,475.12**, indicating premium product positioning. However, monthly sales volatility (e.g., a 99.9% drop in April 2014) and heavy reliance on top-tier customers/products pose risks. Key drivers include five high-revenue products, five major clients, and top-performing sales reps.  

#### **2. KPI Analysis**  
- **Total Revenue ($2.71B)**: Demonstrates robust market presence but masks monthly instability (e.g., $178M in July 2013 vs. $5.5K in April 2014).  
- **Average Order Value ($44,475.12)**: Significantly above industry averages, suggesting high-value products/services but potential vulnerability to economic shifts.  
- **Tax ($231.9M) & Freight ($72.5M)**: High freight costs (2.7% of revenue) may erode margins. Tax compliance is critical but manageable.  
- **Top 5 Products (e.g., Product 760: $739K)**: Generate 1.3% of total revenue collectively. Concentration risk exists if demand shifts.  
- **Top 5 Customers (e.g., Customer 29722: $41.9M)**: Contribute 6.4% of revenue. Losing one could disproportionately impact growth.  
- **Top 5 Sales Reps (e.g., Rep 289: $353M)**: Account for 15.8% of revenue. Their expertise is invaluable but creates dependency.  
- **Monthly Sales Trend**: Shows seasonality (peaks in Q2/Q3) but extreme volatility (e.g., 2014-02: $13K; 2014-03: $197M).  

#### **3. Key Insights**  
- **Sales Volatility**: Monthly revenue swings (e.g., +400% month-over-month in 2014-03) indicate operational inconsistencies or external shocks.  
- **Customer/Rep Dependency**: Top 5 customers contribute 6.4% of revenue; top 5 reps drive 15.8%. Diversification is urgent.  
- **Product Performance**: Top-selling products are revenue-focused but likely volume-driven. Bundling could boost AOV.  
- **Profitability Concerns**: Freight costs ($72.5M) and tax ($231.9M) total 11.2% of revenue, highlighting margin pressure.  

#### **4. Conclusion**  
The business excels in revenue generation and order value but faces critical risks from volatility, customer/rep concentration, and margin erosion. Addressing operational stability and diversification is essential for sustainable growth.  

---  

### **Part 2: Actionable Recommendations**  

#### **1. Short-Term Plan (0-3 Months)**  
**Goal**: Increase total orders by 15%.  
**Analysis**:  
- Current monthly orders: ~2,906 (based on May 2014 revenue: $129.2M / AOV $44,475).  
- Best performer: Mid-year months (e.g., June 2013: $169M). Worst performer: Volatile months (e.g., Feb 2014: $13K).  
- Anomalies: Data gaps (e.g., April 2014: $5.5K) suggest reporting issues.  

**Recommendations / Actions**:  
- **Digital Marketing**:  
  - Budget: $15M (12% of May 2014 revenue).  
  - Target: High-volume regions (e.g., mid-year peaks) and top 5 products.  
  - Impact: Drive 20% new orders via targeted ads.  
- **Incentive Programs**:  
  - Bonus: $100/order exceeding 3,342 monthly targets (15% growth).  
  - Criteria: Tiered bonuses for top 3 reps to maintain momentum.  
- **Training Programs**:  
  - Tailored coaching for bottom 50% of reps using top rep (289)’s methodologies.  
  - Focus: Data-driven upselling and volatility mitigation.  

**Scenarios**:  
- **Best Case**: 20% order growth (3,487 orders/month; $155M revenue).  
- **Moderate Case**: 15% growth (3,342 orders; $148.7M revenue).  
- **Worst Case**: 5% growth (3,051 orders; $135.7M revenue).  
- **Risk Management**: Reallocate 30% of marketing budget to top 3 customers for retention if growth stalls.  

---  

#### **2. Mid-Term Plan (3-6 Months)**  
**Goal**: Increase AOV by 10% and improve margins.  
**Analysis**:  
- Current AOV: $44,475.12. Target: $48,922.63.  
- Top-selling products (e.g., 760, 782) likely high-volume but low-margin.  
- High-margin opportunities: Bundling top products with accessories.  

**Recommendations / Actions**:  
- **Cross-Selling/Bundling**:  
  - Bundle top 5 products with high-margin accessories (e.g., extended warranties).  
  - Offer 5% bundle discount to increase average items/order.  
- **Pricing Adjustments**:  
  - Increase prices of top 5 products by 5% (target 10% AOV lift).  
  - Monitor elasticity; revert if orders drop >5%.  

**Scenarios**:  
- **Best Case**: AOV +15% ($51,146); revenue +20%.  
- **Moderate Case**: AOV +10% ($48,922); revenue +15%.  
- **Worst Case**: AOV +5% ($46,698); revenue +10%.  
- **Risk Management**: If bundling flops, prioritize free shipping on bundles to offset freight costs.  

---  

#### **3. Long-Term Plan (6+ Months)**  
**Goal**: Achieve 20% annual sales growth and expand market share.  
**Analysis**:  
- Current annual growth: ~0.75% (based on 12-month comparison).  
- Untapped regions: International markets (based on revenue concentration in top customers).  
- Top channels: Digital (implied from AOV and sales rep performance).  

**Recommendations / Actions**:  
- **Geographic Expansion**:  
  - Invest $5M in EU/Asia markets. Target regions mirroring top customer profiles (e.g., B2B enterprises).  
- **Channel Development**:  
  - Budget $3M for marketplace integrations (e.g., Amazon B2B) and distributor partnerships.  
- **Social Selling**:  
  - Train all reps on LinkedIn/Twitter outreach (focusing on C-level engagement).  
  - Tools: CRM integration for lead tracking.  

**Scenarios**:  
- **Best Case**: 30% growth ($3.5B revenue).  
- **Moderate Case**: 20% growth ($3.25B revenue).  
- **Worst Case**: 5% growth ($2.85B revenue).  
- **Risk Management**: If new regions underperform, redirect 50% of expansion budget to social selling and customer retention.  

---  
**Final Note**: Prioritize operational stability to address volatility and diversify revenue streams to mitigate concentration risks. Use data from top performers to scale best practices.

### **Part 1: Sales Analysis Report**  
#### **1. Introduction**  
The current sales period reveals a critical situation with a **67.2% year-over-year sales decline**, despite generating **$2.93B in total revenue**. Key challenges include underperformance in high-potential regions (e.g., Australia, Germany), over-reliance on a few top customers, and suboptimal pricing strategies. However, opportunities exist in high-margin products, the North American market, and untapped digital channels.  

---

#### **2. Key Performance Indicator (KPI) Analysis**  
| **KPI**                  | **Value**               | **Importance**                                                                 |  
|--------------------------|-------------------------|-------------------------------------------------------------------------------|  
| **Total Revenue**        | $2.93B                  | Overall sales health; indicates market position and growth potential.        |  
| **Average Order Value (AOV)** | $93,023                | Measures transaction value; higher AOV signals strong product bundling or premium positioning. |  
| **Year-over-Year Growth** | -67.2%                  | Critical red flag; indicates severe market contraction or competitive threats. |  
| **Profit Margin**        | 10%                     | Sustainability benchmark; low margins limit reinvestment capacity.          |  
| **Order Fulfillment Time** | 7 days                  | Impacts customer satisfaction; longer delays risk churn.                     |  
| **Total Freight Cost**   | $78.7M (2.7% of revenue)| Logistics efficiency indicator; optimization could boost margins.            |  

---

#### **3. Key Insights**  
- **Sales Decline & Regional Imbalance**:  
  - **North America dominates** (81.2% of revenue), led by the **Southwest territory** ($697M) and **Canada** ($527M).  
  - **Underperforming regions**: **Australia** ($71M) and **Germany** ($93M) show untapped potential.  
  - **Europe** (16.3% of revenue) is stagnant, with France and the UK as top contributors.  

- **Customer & Product Concentration**:  
  - Top 5 customers drive **$197M** (6.7% of revenue), exposing over-reliance risk.  
  - Top 5 products generate **$20M** (0.7% of revenue), but **Special Offer 1** alone delivers **$102M** (3.5%), highlighting promotion sensitivity.  

- **Margin & Cost Issues**:  
  - **Tax (8.6% of revenue)** and **freight (2.7%)** are significant cost centers.  
  - **AOV ($93K) is high**, but low profit margin (10%) limits scalability.  

- **Operational Bottlenecks**:  
  - **7-day fulfillment time** exceeds industry benchmarks (3-5 days), risking customer attrition.  

---

#### **4. Conclusion**  
The company faces a **sales crisis** driven by regional imbalance, customer concentration, and operational inefficiencies. Immediate focus should be on stabilizing growth in underperforming markets, optimizing high-margin products, and reducing logistics costs. Long-term success requires diversifying revenue streams and expanding digital capabilities.  

---

### **Part 2: Actionable Recommendations**  

#### **1. Short-Term Plan (0-3 Months)**  
**Goal**: Increase total orders by **15%** (to ~3,015 monthly orders from ~2,622).  
**Analysis**:  
- **Current monthly orders**: ~2,622 (calculated as Total Revenue / AOV / 12).  
- **Top regions**: Southwest ($697M), Canada ($527M). **Underperformers**: Australia ($71M), Germany ($93M).  
- **Trends**: High AOV ($93K) but low order volume; Special Offer 1 drives 3.5% of revenue.  

**Recommendations / Actions**:  
- **Digital Marketing Campaigns**:  
  - **Budget**: 20% of marketing spend ($29.3M, assuming 5% of revenue).  
  - **Target**: Underperforming regions (Australia, Germany) and top 5 customers.  
  - **Strategy**: Geo-targeted ads for product 782 (top seller) and Special Offer 1.  
  - **Expected Impact**: 10% order lift from underperforming regions.  
  - **Reasoning**: Low-cost/high-reach in untapped markets; leverages proven products/offers.  

- **Incentive Programs**:  
  - **Bonus**: $50 per extra order beyond target (300 orders/rep/month).  
  - **Criteria**: Orders from new customers in Australia/Germany.  
  - **Reasoning**: Motivates reps to penetrate stagnant markets quickly.  

- **Training Programs**:  
  - **Tailored Training**: Role-play scenarios for Germany (negotiation) and Australia (relationship-building).  
  - **Reasoning**: Address region-specific cultural/sales barriers.  

**Scenarios**:  
| **Case**       | **Projected Orders** | **Revenue Impact**                     |  
|----------------|----------------------|----------------------------------------|  
| **Best Case**  | 3,146 (20% lift)    | $292.7M/month (+20%)                  |  
| **Moderate**   | 3,015 (15% lift)    | $280.5M/month (+15%)                  |  
| **Worst Case** | 2,622 (0% lift)     | $243.9M/month                         |  

**Risk Management**:  
- If orders < 2,800, reallocate 50% of marketing budget to rep bonuses.  
- Shift focus to high-frequency, low-value products to boost volume.  

---

#### **2. Mid-Term Plan (3-6 Months)**  
**Goal**: Increase **AOV by 10%** (to $102.3K) and **margins by 1%** (to 11%).  
**Analysis**:  
- **Current AOV**: $93,023; driven by top products (782, 783, 779).  
- **Top products**: 5 products generate $20M in revenue; high-margin but low volume.  
- **Bundling opportunity**: Products 779–783 are complementary (e.g., electronics).  

**Recommendations / Actions**:  
- **Cross-Selling & Bundling**:  
  - Create "Product 782 + 783" bundle with 5% discount.  
  - **Reasoning**: Increases AOV by 15–20%; leverages existing demand.  

- **Pricing Adjustments**:  
  - Increase top-product prices by 5% (if elasticity allows).  
  - **Reasoning**: 10% margin cushion supports price hikes; targets low-volume/high-margin products.  

**Scenarios**:  
| **Case**       | **Projected AOV** | **Revenue Impact**                     |  
|----------------|-------------------|----------------------------------------|  
| **Best Case**  | $106,976 (+15%)   | $3.37B/year (+15%)                     |  
| **Moderate**   | $102,325 (+10%)   | $3.22B/year (+10%)                     |  
| **Worst Case** | $93,023 (0%)      | $2.93B/year                            |  

**Risk Management**:  
- If bundle uptake < 20%, add value-added services (e.g., extended warranty).  
- If price hikes reduce volume, revert to discounts for 6–12 months.  

---

#### **3. Long-Term Plan (6+ Months)**  
**Goal**: Achieve **5% annual sales growth** and expand market share in underpenetrated regions.  
**Analysis**:  
- **Current growth**: -67.2% YoY; **untapped regions**: Australia ($71M), Germany ($93M).  
- **Top-performing channels**: B2B (implied by customer concentration); social selling potential.  

**Recommendations / Actions**:  
- **Geographic Expansion**:  
  - **Investment**: $50M for Australia/Germany (offices, local teams, localization).  
  - **Reasoning**: Low-cost entry; high ROI potential (Australia has 67% growth needed to match Canada’s per-capita sales).  

- **Channel Development**:  
  - **Budget**: $30M for e-commerce platform and CRM.  
  - **Strategy**: Integrate social selling (LinkedIn) for B2B outreach.  
  - **Reasoning**: Captures $300B global B2B e-commerce market; 40% of reps trained on social selling close deals faster.  

- **Social Selling**:  
  - **Train 100 reps** on LinkedIn/Twitter engagement for key accounts.  
  - **Reasoning**: Lowers CAC; 45% of B2B buyers engage via social.  

**Scenarios**:  
| **Case**       | **Annual Growth** | **Revenue Impact** |  
|----------------|-------------------|--------------------|  
| **Best Case**  | 10%               | $3.22B             |  
| **Moderate**   | 5%                | $3.08B             |  
| **Worst Case** | 0%                | $2.93B             |  

**Risk Management**:  
- If expansion ROI < 2x, redirect funds to digital channels.  
- If social selling fails, pivot to SEO/SEM for lead generation.  

---  
**Final Note**: Prioritize short-term stabilization (15% order growth) to fund mid-term AOV optimization. Long-term, Australia/Germany expansion could reverse negative trends if executed with localized strategies.

### **Part 1: Sales Analysis Report**  

#### **1. Introduction**  
The company generated **$2.44B in total revenue** with an **Average Order Value (AOV) of $20,126.74**, indicating high-value transactions. However, a **-0.62% year-over-year revenue decline** signals concerning trends. The **repeat order rate of 39.07%** suggests moderate customer loyalty, while **top territories (Southwest, Canada, Northwest)** dominate revenue. Key opportunities lie in optimizing product mix, improving underperforming regions, and addressing revenue volatility.  

---

#### **2. Key Performance Indicators (KPIs) Analysis**  
- **Total Revenue ($2.44B)**: Reflects overall sales health but masks a decline. High revenue is driven by top territories (Southwest: $544.8M) and products (782: $1.92M).  
- **Average Order Value (AOV) ($20,126.74)**: Indicates premium positioning. Low-volume, high-margin products (e.g., 782, 783) dominate revenue, while high-volume products (e.g., 712, 870) contribute less per unit.  
- **Total Quantity Sold (274,914)**: Highlights a broad customer base but aligns with low AOV for mass-market products (e.g., 712, 870).  
- **Top Products by Revenue (782, 783, 784, 779, 781)**: High-value items drive profitability. Contrast with top volume products (712, 870) suggests a gap in upselling opportunities.  
- **Top Territories (Southwest, Canada, Northwest)**: Generate **$1.35B combined (55% of revenue)**. Underperforming regions (Central, Northeast) lag significantly.  
- **Year-over-Year Growth (-0.62%)**: Decline driven by 2014 revenue drop ($414.9M vs. 2013’s $1.09B), likely due to seasonality or market shifts.  
- **Repeat Order Rate (39.07%)**: Room for improvement; 61% of orders are one-time transactions.  
- **Top Countries (US: $1.53B, CA: $456.6M, GB: $161.5M)**: US dominance underscores domestic strength.  

---

#### **3. Key Insights**  
- **Revenue Volatility**: 2014 revenue ($414.9M) is 62% lower than 2013 ($1.09B), with June 2014 orders plummeting to 939 (vs. 2,411 in May). This signals instability requiring urgent intervention.  
- **Product Strategy Disconnect**: Top revenue products (e.g., 782) are low-volume/high-margin, while top volume products (e.g., 712) are low-margin. Bundling or cross-selling could bridge this gap.  
- **Regional Imbalance**: Southwest and Canada drive 41% of revenue, while Central and Northeast contribute only 18%. Underperforming regions need tailored strategies.  
- **Customer Retention**: Only 39% of customers reorder, indicating missed opportunities for loyalty programs and personalized engagement.  
- **Operational Efficiency**: Average shipping time (7 days) is consistent, but freight costs (2.48%) could erode margins if not optimized.  

---

#### **4. Conclusion**  
Despite strong total revenue, the company faces critical challenges: revenue decline, regional imbalance, and untapped potential in upselling and retention. Focus on high-margin products, underperforming regions, and customer loyalty is essential to reverse the decline and sustain growth.  

---

### **Part 2: Actionable Recommendations**  

#### **1. Short-Term Plan (0-3 Months)**  
**Goal: Increase total orders by 15% (from current ~1,488/month to 1,711/month).**  

**Analysis:**  
- **Current monthly orders**: Average of April-June 2014 = 1,488 (June anomaly: 939 orders).  
- **Best regions**: Southwest, Canada.  
- **Underperforming regions**: Central, Northeast.  
- **Key trends**: Seasonal peaks in Q4, low repeat orders, and 2014 revenue volatility.  

**Recommendations/Actions:**  
- **Digital Marketing Campaigns**:  
  - **Budget**: 15% of marketing budget.  
  - **Target**: Underperforming regions (Central, Northeast) and high-value segments.  
  - **Expected impact**: 10% order increase from targeted regions.  
  - *Reasoning*: Leverage data to address regional gaps with hyper-localized ads.  
- **Incentive Programs**:  
  - **Bonus**: $50 per extra order (baseline: current monthly average).  
  - **Criteria**: Prioritize orders from underperforming regions or high-AOV products.  
  - *Reasoning*: Directly incentivizes sales teams to close gaps.  
- **Training Programs**:  
  - **Tailored training**: Reps in Central/Northeast focus on high-margin products; Southwest/Canada reps focus on cross-selling.  
  - *Reasoning*: Aligns skills with regional needs.  

**Scenarios:**  
- **Best Case**: 20% order increase → 1,786 orders/month → $360M annual revenue.  
- **Moderate Case**: 15% increase → 1,711 orders/month → $345M annual revenue.  
- **Worst Case**: 5% increase → 1,563 orders/month → $315M annual revenue.  

**Risk Management:**  
- Reallocate 50% of digital marketing budget to incentive programs if orders grow <10%. Focus retention campaigns if underperformance persists.  

---

#### **2. Mid-Term Plan (3-6 Months)**  
**Goal: Increase AOV by 10% (to $22,139) and improve margins by 5%.**  

**Analysis:**  
- **Current AOV**: $20,126.74.  
- **Top products**: High-margin revenue drivers (782, 783, 784).  
- **Low-margin high-volume products**: 712, 870, 711.  

**Recommendations/Actions:**  
- **Cross-selling/Bundling**:  
  - Bundle top revenue products (e.g., 782 + 712) at a 5% discount.  
  - *Reasoning*: Increases AOV while moving high-volume inventory.  
- **Pricing Adjustments**:  
  - Increase prices of top-margin products (782, 783) by 5–7%.  
  - *Reasoning*: Margins (Freight: 2.48%, Tax: 7.94%) allow room without alienating customers.  

**Scenarios:**  
- **Best Case**: 15% AOV increase + 7% margin boost → $423M/month revenue.  
- **Moderate Case**: 10% AOV increase + 5% margin boost → $382M/month revenue.  
- **Worst Case**: 5% AOV increase + 2% margin boost → $361M/month revenue.  

**Risk Management:**  
- If bundle adoption <30%, reduce discounts to 3%. If price hikes reduce orders, revert to original pricing and focus on cost-cutting.  

---

#### **3. Long-Term Plan (6+ Months)**  
**Goal: Increase annual sales growth by 20% and expand market share.**  

**Analysis:**  
- **Current growth**: -0.62% YoY.  
- **Untapped regions**: Germany, France, Australia (low revenue vs. potential).  
- **Top channels**: Direct sales (primary); need digital/social expansion.  

**Recommendations/Actions:**  
- **Geographic Expansion**:  
  - **Investment**: $10M in Germany/France (low-revenue, high-potential).  
  - **Strategy**: Local partnerships and bilingual support.  
- **Channel Development**:  
  - **Budget**: $5M for e-commerce and social platforms.  
  - **Strategy**: Target SMBs via LinkedIn/Twitter.  
- **Social Selling**:  
  - **Training**: 50% of reps to engage on LinkedIn (focus on high-value industries).  
  - *Reasoning*: Captures B2B opportunities missed by traditional sales.  

**Scenarios:**  
- **Best Case**: 30% growth → $3.17B annual revenue.  
- **Moderate Case**: 20% growth → $2.93B annual revenue.  
- **Worst Case**: 5% growth → $2.56B annual revenue.  

**Risk Management:**  
- If new regions underperform by >15%, reduce investment by 50% and redirect to digital channels. If social ROI <2x, pivot to email marketing.  

---  
**Final Note:** Prioritize short-term stabilization (addressing 2014 revenue drops) while executing mid/long-term strategies. Continuous monitoring of regional performance and product margins is critical.


### Part 1: Employee Analysis Report

#### 1. Introduction
This report analyzes key workforce metrics for 290 employees across the organization. The analysis reveals a highly gender-skilled workforce, significant departmental imbalances, and concerning trends in tenure and utilization. Key findings include a male-dominated workforce (81.7%), over-reliance on the Production department (68.3%), high average tenure (16.67 years), and concerning patterns in email promotion status and leave utilization. Data inconsistencies in gender, department, and email promotion counts (summing to 334 vs. 290 total employees) warrant caution in interpretations.

#### 2. KPI Analysis
- **Total Employees**: 290 workforce members, with a significant concentration in Production (198 employees, 68.3% of total).  
- **Average Employee Rate**: $18.19/hour, indicating competitive compensation for hourly roles but potential disparities for salaried employees.  
- **Gender Distribution**:  
  - Male: 237 (81.7%)  
  - Female: 97 (33.4%)  
  *Note: Total exceeds employee count (237 + 97 = 334 vs. 290), suggesting data discrepancies.*  
- **Department-wise Distribution**:  
  - Production dominates with 198 employees (68.3%).  
  - Smaller departments: Sales (18), Purchasing (17), Marketing (14), Finance (13), and others (≤10 each).  
  *Note: Department counts sum to 334, inconsistent with total employees.*  
- **Average Vacation Hours**: 48.33 hours/year, aligning with standard PTO policies.  
- **Average Sick Leave Hours**: 44.67 hours/year, higher than industry benchmarks, indicating potential overutilization.  
- **Average Tenure**: 16.67 years, reflecting strong organizational loyalty but also potential stagnation.  
- **Email Promotion Status**:  
  - 0 promotions: 185 (63.8%)  
  - 1 promotion: 71 (24.5%)  
  - 2 promotions: 78 (26.9%)  
  *Note: Total exceeds employee count (185 + 71 + 78 = 334), suggesting data errors.*  

#### 3. Key Insights
- **Gender Imbalance**: Extreme male dominance (81.7%) risks diversity gaps, particularly in departments like HR (6 employees) and Marketing (14). This may limit innovation and inclusion.  
- **Departmental Overload**: Production employs 68.3% of the workforce, creating operational fragility. High turnover in smaller departments (e.g., Sales) could exacerbate skill shortages.  
- **Tenure Stagnation**: Average tenure of 16.67 years suggests limited internal mobility, potentially hindering career growth and making roles vulnerable to retirements.  
- **Leave Utilization**: High sick leave usage (44.67 hours) may indicate burnout or health issues, especially in high-stress roles like Production.  
- **Promotion Disparity**: 63.8 of employees have zero promotions, signaling inadequate career development pathways.  
- **Data Inconsistencies**: Gender, department, and promotion counts sum to 334 (44 excess records), complicating accurate analysis.  

#### 4. Conclusion
The organization faces critical challenges: gender imbalance, over-reliance on Production, stagnant career progression, and potential burnout. While tenure and vacation hours are positive, data inconsistencies require urgent data cleansing. Addressing these issues is essential for sustainable growth, employee retention, and operational resilience.

---

### Part 2: Actionable Recommendations

#### 1. Short-Term Plan (0-3 months)  
- **Goal**: Address immediate performance and morale issues.  
- **Analysis**:  
  - High sick leave usage (44.67 hours) suggests burnout in Production.  
  - Zero-promotion status for 63.8% of employees indicates disengagement.  
  - Salary disparities exist (e.g., Production roles average $18.19/hour vs. Sales at $23.08/hour).  
- **Recommendations / Actions**:  
  1. **Implement performance improvement plans (PIPs)** for underperformers in high-turnover departments (e.g., Sales).  
     *Reasoning*: High turnover in Sales (18 employees) necessitates immediate performance interventions to stabilize the team.  
  2. **Conduct anonymous employee surveys** to gauge morale and burnout drivers.  
     *Reasoning*: High sick leave usage (44.67 hours) signals potential burnout; surveys will identify root causes.  
  3. **Adjust entry-level salaries in underpaid departments** (e.g., Production).  
     *Reasoning*: Production’s average rate ($18.19/hour) is below Sales ($23.08/hour), risking turnover; equity is critical for retention.  

#### 2. Mid-Term Plan (3-6 months)  
- **Goal**: Improve employee retention and engagement.  
- **Analysis**:  
  - Average tenure of 16.67 years suggests limited upward mobility.  
  - Only 24.5% of employees have one promotion, indicating skill gaps.  
  - Gender imbalance (81.7% male) may drive female attrition.  
- **Recommendations / Actions**:  
  1. **Launch a professional development budget** ($2,000/employee) for skill-building.  
     *Reasoning*: Low promotion rates (63.8% with zero promotions) show inadequate growth opportunities; development investments reduce attrition.  
  2. **Create a mentorship program** pairing senior employees (high tenure) with junior staff.  
     *Reasoning*: Mentorship bridges skill gaps and promotes internal promotions, reducing reliance on external hires.  
  3. **Targeted recruitment campaigns** to increase female representation in HR/Marketing.  
     *Reasoning*: Gender imbalance (81.7% male) in HR (6 employees) and Marketing (14 employees) limits diversity; targeted hiring improves inclusivity.  

#### 3. Long-Term Plan (6+ months)  
- **Goal**: Foster a strong company culture and build a robust talent pipeline.  
- **Analysis**:  
  - Production’s 68.3% workforce concentration creates operational vulnerability.  
  - Average tenure of 16.67 years risks knowledge loss during retirements.  
  - Small departments (e.g., Engineering: 9 employees) lack succession planning.  
- **Recommendations / Actions**:  
  1. **Design a comprehensive leadership training program** for mid-tier managers.  
     *Reasoning*: High tenure (16.67 years) indicates stagnation; leadership development prepares employees for senior roles, ensuring continuity.  
  2. **Establish a formal succession planning process** for key roles (e.g., Production supervisors, Engineering leads).  
     *Reasoning*: Small departments (e.g., Engineering: 9 employees) are vulnerable to attrition; succession planning mitigates disruption.  
  3. **Develop a market-expansion hiring plan** to diversify departmental distribution.  
     *Reasoning*: Production’s 68.3% dominance risks operational fragility; hiring in Sales/Marketing reduces reliance and drives growth.  

---  
**Note**: Data inconsistencies (gender, department, and promotion counts summing to 334 vs. 290 employees) must be resolved to ensure accuracy in future analyses. Immediate data cleansing is recommended.