In [None]:
import os
import json
import mysql.connector
import decimal
import datetime
from typing import TypedDict, Dict, Any, Optional
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

from langgraph.graph import StateGraph, END
from langchain_core.messages import HumanMessage
from langchain_groq import ChatGroq
import time
import httpx
import re
from IPython.display import display, Image

In [None]:
# Load environment variables
load_dotenv()

# Groq LLM
llm = ChatGroq(model="gemma2-9b-it", api_key=os.getenv("GROQ_API_KEY"), http_client=httpx.Client(verify=False))
print(os.getenv("GROQ_API_KEY"))

# Database connection management
def get_db_connection():
    """Get a fresh database connection"""
    try:
        connection = mysql.connector.connect(
            host=os.getenv("DB_HOST"),
            user=os.getenv("DB_USER"),
            password=os.getenv("DB_PASSWORD"),
            database=os.getenv("DB_NAME"),
            port=int(os.getenv("DB_PORT", "3306")),
            autocommit=True,
            connection_timeout=30,
            pool_reset_session=True
        )
        return connection
    except Exception as e:
        print(f"Database connection error: {e}")
        raise

def get_db_cursor():
    """Get a fresh database cursor"""
    connection = get_db_connection()
    return connection, connection.cursor(dictionary=True)

print("Database connection functions created")

In [None]:
# LangGraph state
class AgentState(TypedDict):
    emp_id: int
    goal: str
    retry_count: int = 0
    user_data: Dict[str, Any]
    llm_analysis: str
    output: str
    is_valid: bool
    validation_summary: str

In [None]:
# Serialize data for JSON (handles date and decimal)
def serialize(obj):
    if isinstance(obj, dict):
        return {k: serialize(v) for k, v in obj.items()}
    elif isinstance(obj, list):
        return [serialize(v) for v in obj]
    elif isinstance(obj, (datetime.date, datetime.datetime)):
        return obj.isoformat()
    elif isinstance(obj, decimal.Decimal):
        return float(obj)
    else:
        return obj

In [None]:
# Function to extract JSON block from text
def extract_json_block(text):
    matches = re.findall(r"```json(.*?)```", text, re.DOTALL)
    if matches:
        try:
            return json.loads(matches[0].strip())
        except json.JSONDecodeError:
            pass
    return None

In [None]:
# Step 1: Collect user data
def collect_user_data(state: AgentState) -> AgentState:
    emp_id = state["emp_id"]
    
    # Get fresh database connection
    db, cursor = get_db_cursor()
    
    try:
        cursor.execute("SELECT * FROM m_emp WHERE emp_id = %s", (emp_id,))
        emp = cursor.fetchone()

        cursor.execute("SELECT * FROM m_roles WHERE role_id = %s", (emp["role_id"],))
        emp["role_details"] = cursor.fetchone()

        cursor.execute("SELECT * FROM t_emp_kpi WHERE emp_id = %s", (emp_id,))
        emp["kpis"] = cursor.fetchall()

        cursor.execute("SELECT * FROM t_emp_projects WHERE emp_id = %s", (emp_id,))
        emp["projects"] = cursor.fetchall()

        cursor.execute("SELECT * FROM t_course_completion WHERE emp_id = %s", (emp_id,))
        emp["courses"] = cursor.fetchall()

        state["user_data"] = emp
        print("User data collected:", emp)
        return state
    finally:
        cursor.close()
        db.close()

In [None]:
# Step 2: Analyze user
def analyze_user_data(state: AgentState) -> AgentState:
    emp = serialize(state["user_data"])

    prompt = f"""
Analyze the following employee using:
- General info
- Skills
- Completed courses
- Completed projects
- Role expectations
- KPIs

Return JSON with:
1. behavior_traits
2. learning_preferences
3. skill_gaps

Employee:
{json.dumps(emp, indent=2)}
"""
    result = llm.invoke([HumanMessage(content=prompt)])
    state["llm_analysis"] = result.content
    print("LLM Analysis:", state["llm_analysis"])
    return state

In [None]:
# Step 3: Generate output
def generate_output(state: AgentState) -> AgentState:
    emp = serialize(state["user_data"])
    analysis = state["llm_analysis"]
    goal = state["goal"]

    # Get fresh database connection
    db, cursor = get_db_cursor()
    
    try:
        cursor.execute("SELECT * FROM m_courses")
        courses = cursor.fetchall()

        if goal == "roadmap":
            prompt = f"""
Using the employee profile and analysis below, generate a skill development roadmap using the available courses.

Analysis:
{analysis}

Employee:
{json.dumps(emp, indent=2)}

Courses:
{json.dumps(serialize(courses), indent=2)}

Return JSON:
[
  {{
    "c_name": "...",
    "desc": "...",
    "start_date": "YYYY-MM-DD",
    "duration": months,
    "end_date": "YYYY-MM-DD"
  }},
  ...
]
"""
        else:
            prompt = f"""
Based on the employee profile and analysis, recommend a list of best-fit courses to bridge skill gaps.

Analysis:
{analysis}

Employee:
{json.dumps(emp, indent=2)}

Courses:
{json.dumps(serialize(courses), indent=2)}

Return JSON:
[
  {{
    "c_name": "...",
    "desc": "..."
  }},
  ...
]
"""

        result = llm.invoke([HumanMessage(content=prompt)])
        state["output"] = result.content
        print("Generated Output:", state["output"])
        return state
    finally:
        cursor.close()
        db.close()

In [None]:
# Step 2: Combined Analyze and Generate
def analyze_and_generate(state: AgentState) -> AgentState:
    emp = serialize(state["user_data"])
    goal = state["goal"]

    # Get fresh database connection
    db, cursor = get_db_cursor()
    
    try:
        cursor.execute("SELECT * FROM m_courses")
        courses = cursor.fetchall()

        if goal == "roadmap":
            prompt = f"""
Role:
You are an expert HR Learning & Development specialist responsible for creating personalized skill development roadmaps for employees.

Action:
Analyze the given employee profile and create a comprehensive skill development roadmap using the available courses. First, conduct a thorough analysis of the employee's current state, then generate a strategic learning pathway.

Analysis Steps:
1. Evaluate general info, skills, completed courses, completed projects, role expectations, and KPIs
2. Identify behavior_traits, learning_preferences, and skill_gaps
3. Map skill gaps to available courses
4. Create a timeline-based roadmap with logical progression

Guardrails/Guidelines:
- Only recommend courses from the provided course catalog
- Ensure logical skill progression (foundational before advanced)
- Consider the employee's current role and career trajectory
- Account for realistic timeframes and workload
- Prioritize skill gaps that align with role expectations and KPIs
- Include start dates, duration, and end dates for each course

Output format:
Return only a JSON array with the following structure:
[
  {{
    "c_name": "Course Name",
    "desc": "Brief description of why this course fits the employee",
    "start_date": "YYYY-MM-DD",
    "duration": number_of_months,
    "end_date": "YYYY-MM-DD"
  }}
]

Employee Profile:
{json.dumps(emp, indent=2)}

Available Courses:
{json.dumps(serialize(courses), indent=2)}
"""
        else:
            prompt = f"""
Role:
You are an expert HR Learning & Development specialist responsible for recommending the most suitable courses for employees based on their profiles and skill gaps.

Action:
Analyze the given employee profile and recommend the best-fit courses to bridge identified skill gaps. First, conduct a comprehensive analysis of the employee's current state, then provide targeted course recommendations.

Analysis Steps:
1. Evaluate general info, skills, completed courses, completed projects, role expectations, and KPIs
2. Identify behavior_traits, learning_preferences, and skill_gaps
3. Match skill gaps with the most relevant available courses
4. Prioritize courses based on impact and relevance

Guardrails/Guidelines:
- Only recommend courses from the provided course catalog
- Focus on courses that directly address identified skill gaps
- Consider the employee's learning preferences and past course completions
- Prioritize courses that align with role expectations and KPI improvements
- Provide clear rationale for each course recommendation
- Limit recommendations to the most impactful courses (typically 3-7 courses)

Output format:
Return only a JSON array with the following structure:
[
  {{
    "c_name": "Course Name",
    "desc": "Brief description of why this course fits the employee and addresses specific skill gaps"
  }}
]

Employee Profile:
{json.dumps(emp, indent=2)}

Available Courses:
{json.dumps(serialize(courses), indent=2)}
"""

        result = llm.invoke([HumanMessage(content=prompt)])
        state["output"] = result.content
        print("Combined Analysis and Generated Output:", state["output"])
        return state
    finally:
        cursor.close()
        db.close()

In [None]:
# Step 4: Validate output
def validate_output(state: AgentState) -> AgentState:
    emp = serialize(state["user_data"])
    output = state["output"]
    goal = state["goal"]
    
    # Get fresh database connection
    db, cursor = get_db_cursor()
    
    try:
        cursor.execute("SELECT * FROM m_courses")
        courses = cursor.fetchall()

#       prompt = f"""
# You are validating the following {goal} output.
# Courses:
# {courses}

# Analysis:
# {analysis}

# Employee:
# {json.dumps(emp, indent=2)}

# Output:
# {output}

# Return JSON:
# {{
#   "valid": true or false,
#   "reason": "...",
# }}
# """

        prompt = f"""
Role:
You are an expert Quality Assurance specialist responsible for validating AI-generated learning recommendations and skill development roadmaps for employees.

Action:
Validate the following {goal} output by checking its accuracy, relevance, and adherence to guidelines. Evaluate whether the recommendations are appropriate for the given employee profile and utilize only the available courses from the catalog.

Validation Steps:
1. Verify all recommended courses exist in the provided course catalog
2. Check if recommendations align with the employee's skill gaps and role requirements
3. Ensure the output format matches expected JSON structure
4. Validate that course selections are logical and progressive (for roadmaps)
5. Confirm recommendations consider the employee's background and completed courses

Guardrails/Guidelines:
- All recommended courses must exist in the course catalog
- Recommendations should directly address identified skill gaps
- Output must be in proper JSON format
- For roadmaps: dates should be realistic and sequential
- Recommendations should align with employee's role and KPIs
- Avoid recommending courses the employee has already completed

Output format:
Return only a JSON object with the following structure:
{{
  "valid": true or false,
  "reason": "Detailed explanation of validation result, including specific issues if invalid"
}}

Available Courses:
{courses}

Employee Profile:
{json.dumps(emp, indent=2)}

Generated {goal} Output to Validate:
{output}
"""
        result = llm.invoke([HumanMessage(content=prompt)])
        try:
            check = extract_json_block(result.content)
            print("Validation Check:", check)
            state["is_valid"] = check["valid"]
        except:
            state["is_valid"] = True  # Assume valid if parsing fails

        state["validation_summary"] = result.content
        print("Validation Summary:", state["validation_summary"])
        print("Validation Result:", state["is_valid"])
        return state
    finally:
        cursor.close()
        db.close()

In [None]:
# Step 5: Final output
def final_output(state: AgentState) -> AgentState:
    extracted_json = extract_json_block(state["output"])
    print(f"\n✅ FINAL {state['goal'].upper()}")
    if extracted_json:
        print(extracted_json)
    else:
        print("❌ Failed to extract JSON from output.")
        print(state["output"])
    return state

def fallback_output(state: AgentState) -> AgentState:
    print("\n❌ Output failed validation even after retry.")
    print("🔁 Please review manually:")
    extracted_json = extract_json_block(state["output"])
    if extracted_json:
        print(extracted_json)
    else:
        print("❌ Failed to extract JSON from output.")
        print(state["output"])
    return state

In [None]:
# LangGraph setup
def build_graph():
    builder = StateGraph(AgentState)

    builder.add_node("Collect", collect_user_data)
    builder.add_node("Analyze", analyze_user_data)
    builder.add_node("Generate", generate_output)
    builder.add_node("Validate", validate_output)
    builder.add_node("FinalOutput", final_output)
    builder.add_node("FallbackOutput", fallback_output)

    builder.set_entry_point("Collect")
    builder.add_edge("Collect", "Analyze")
    builder.add_edge("Analyze", "Generate")
    builder.add_edge("Generate", "Validate")
    # builder.add_edge("Generate", "FinalOutput")

    def output_selector(state: AgentState) -> str:
        if state["is_valid"]:
            return "FinalOutput"
        elif state["retry_count"] == 0:
            state["retry_count"] = 1
            return "Analyze"
        else:
            return "FallbackOutput"

    builder.add_conditional_edges("Validate", output_selector, {
        "FinalOutput": "FinalOutput",
        "Analyze": "Analyze",
        "FallbackOutput": "FallbackOutput"
    })
    builder.add_edge("FinalOutput", END)
    builder.add_edge("FallbackOutput", END)

    return builder.compile()

In [None]:
# Updated LangGraph setup with combined function
def build_graph_combined():
    builder = StateGraph(AgentState)

    builder.add_node("Collect", collect_user_data)
    builder.add_node("AnalyzeAndGenerate", analyze_and_generate)
    builder.add_node("Validate", validate_output)
    builder.add_node("FinalOutput", final_output)
    builder.add_node("FallbackOutput", fallback_output)

    builder.set_entry_point("Collect")
    builder.add_edge("Collect", "AnalyzeAndGenerate")
    builder.add_edge("AnalyzeAndGenerate", "Validate")

    def output_selector(state: AgentState) -> str:
        if state["is_valid"]:
            return "FinalOutput"
        elif state["retry_count"] == 0:
            state["retry_count"] = 1
            return "AnalyzeAndGenerate"
        else:
            return "FallbackOutput"

    builder.add_conditional_edges("Validate", output_selector, {
        "FinalOutput": "FinalOutput",
        "AnalyzeAndGenerate": "AnalyzeAndGenerate",
        "FallbackOutput": "FallbackOutput"
    })
    builder.add_edge("FinalOutput", END)
    builder.add_edge("FallbackOutput", END)

    return builder.compile()

In [None]:
emp_id = 120            # Change as needed
goal = "roadmap"        # Use "roadmap" or "courses"

graph = build_graph_combined()
print(graph.get_graph().print_ascii())

In [None]:
result = graph.invoke({
    "emp_id": emp_id,
    "goal": goal,
})

In [None]:
from pprint import pprint

print("\nFinal Agent State:")
pprint(result)
