# People Analytics Agent

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/DougTrajano/agentic-ai-workshop/blob/main/people_analytics_agent.ipynb)

In this notebook, we will create a People Analytics Agent that can interact with a HR database.

## Set up Environment

In [1]:
# %pip install -U pydantic pydantic-settings datasets

# chainlit==2.8.1
# langchain_community==0.3.30
# langchain_core==0.3.78
# langchain_openai==0.3.34
# langgraph==0.6.8
# numexpr==2.11.0
# pandas==2.3.3
# plotly==6.3.0
# pydantic==2.11.10
# pydantic_settings==2.11.0
# SQLAlchemy==2.0.43


In [2]:
import mlflow


mlflow.openai.autolog()
mlflow.langchain.autolog()

## Load Parameters

In [3]:
from pydantic import BaseModel, Field, field_validator
from pydantic_settings import BaseSettings


class Settings(BaseSettings):
    """People Analytics Agent Settings."""

    HF_TOKEN: str = Field(
        ...,
        description="Hugging Face API token for accessing private resources.",
    )

    HF_DATASET_NAME: str = Field(
        default="dougtrajano/hr-synthetic-database",
        description="The name of the Hugging Face dataset to load.",
    )

settings = Settings()

## Load HR Synthetic Database

In this section, we will load the [dougtrajano/hr-synthetic-database](https://huggingface.co/dougtrajano/hr-synthetic-database) dataset from Hugging Face Datasets. This dataset contains synthetic HR data including business units, departments, jobs, employees, and compensations.

We can use the [load_dataset()](https://huggingface.co/docs/datasets/v4.3.0/en/package_reference/loading_methods#datasets.load_dataset) function from the [datasets](https://huggingface.co/docs/datasets/) library to load the dataset.

In [4]:
from datasets import load_dataset


business_units = load_dataset(settings.HF_DATASET_NAME, 'business_units')
departments = load_dataset(settings.HF_DATASET_NAME, 'departments')
jobs = load_dataset(settings.HF_DATASET_NAME, 'jobs')
employees = load_dataset(settings.HF_DATASET_NAME, 'employees')
compensations = load_dataset(settings.HF_DATASET_NAME, 'compensations')

### Convert to Pandas DataFrames

In [5]:
import numpy as np
import pandas as pd


def clean_huggingface_nulls(df: pd.DataFrame) -> pd.DataFrame:
    """
    Clean Hugging Face dataset string representations of NULL values.

    Hugging Face datasets may contain string 'None' or empty strings instead
    of actual NULL values. This function replaces them with np.nan for proper
    NULL handling in PostgreSQL.

    Parameters:
    df (pd.DataFrame): Input DataFrame with potential string NULL values.

    Returns:
    pd.DataFrame: Cleaned DataFrame with proper NULL values.
    """
    df_clean = df.copy()
    for col in df_clean.columns:
        df_clean[col] = df_clean[col].replace(['None', ''], np.nan)
    return df_clean

In [6]:
# Convert Hugging Face datasets to Pandas DataFrames
df_business_units = clean_huggingface_nulls(
    business_units['train'].to_pandas()
)
df_departments = clean_huggingface_nulls(
    departments['train'].to_pandas()
)
df_jobs = clean_huggingface_nulls(
    jobs['train'].to_pandas()
)
df_employees = clean_huggingface_nulls(
    employees['train'].to_pandas()
)
df_compensations = clean_huggingface_nulls(
    compensations['train'].to_pandas()
)


print(f"Business Units: {df_business_units.shape}")
print(f"Departments: {df_departments.shape}")
print(f"Jobs: {df_jobs.shape}")
print(f"Employees: {df_employees.shape}")
print(f"Compensations: {df_compensations.shape}")

Business Units: (4, 4)
Departments: (17, 5)
Jobs: (94, 7)
Employees: (15700, 12)
Compensations: (15700, 7)


### Load DataFrames into SQLite

In [7]:
import sqlite3

from sqlalchemy import create_engine


# Create an in-memory SQLite connection
conn = sqlite3.connect(':memory:', check_same_thread=False)

# Create SQLAlchemy engine for the SQL toolkit
db_engine = create_engine('sqlite:///:memory:', creator=lambda: conn)

# Create tables in SQLite from the DataFrames
df_business_units.to_sql('business_units', conn, index=False, if_exists='replace')
df_departments.to_sql('departments', conn, index=False, if_exists='replace')
df_jobs.to_sql('jobs', conn, index=False, if_exists='replace')
df_employees.to_sql('employees', conn, index=False, if_exists='replace')
df_compensations.to_sql('compensations', conn, index=False, if_exists='replace')

# Verify the tables were created
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
print("Tables in SQLite:")
print(cursor.fetchall())

Tables in SQLite:
[('business_units',), ('departments',), ('jobs',), ('employees',), ('compensations',)]


## Compile People Analytics Agent

### Load the LLM

In [8]:
from langchain_openai import ChatOpenAI


llm = ChatOpenAI(model='gpt-4o')

### Define Agent Tools

In [9]:
import datetime
import math

import numexpr


def calculator(expression: str) -> str:
    """Calculate expression using Python's numexpr library.

    Expression should be a single line mathematical expression
    that solves the problem.

    Examples:
        "37593 * 67" for "37593 times 67"
        "37593**(1/5)" for "37593^(1/5)"
    """
    local_dict = {'pi': math.pi, 'e': math.e}
    return str(
        numexpr.evaluate(
            expression.strip(),
            global_dict={},  # restrict access to globals
            local_dict=local_dict,  # add common mathematical functions
        )
    )

def get_today_date() -> str:
    """Get today's date in YYYY-MM-DD format."""
    return datetime.date.today().isoformat()

In [10]:
from langchain_community.agent_toolkits.sql.toolkit import SQLDatabaseToolkit
from langchain_community.utilities import SQLDatabase


db = SQLDatabase(engine=db_engine)
toolkit = SQLDatabaseToolkit(db=db, llm=llm)
sql_tools = toolkit.get_tools()

print(f"Available tables: {db.get_usable_table_names()}")

Available tables: ['business_units', 'compensations', 'departments', 'employees', 'jobs']


In [11]:
tools = [calculator, get_today_date] + sql_tools

### Define System Prompt

In [12]:
def get_system_prompt(dialect: str, top_k: int = 5) -> str:
    """Get the system prompt for the agent.

    Args:
        dialect (str): The SQL dialect of the database.
        top_k (int): The maximum number of results to return.

    Returns:
        str: The system prompt.
    """

    system_prompt = """
    You are an agent designed to interact with a SQL database.
    Given an input question, create a syntactically correct {dialect} query to run,
    then look at the results of the query and return the answer. Unless the user
    specifies a specific number of examples they wish to obtain, always limit your
    query to at most {top_k} results.

    You can order the results by a relevant column to return the most interesting
    examples in the database. Never query for all the columns from a specific table,
    only ask for the relevant columns given the question.

    You MUST double check your query before executing it. If you get an error while
    executing a query, rewrite the query and try again.

    DO NOT make any DML statements (INSERT, UPDATE, DELETE, DROP etc.) to the
    database.

    To start you should ALWAYS look at the tables in the database to see what you
    can query. Do NOT skip this step.

    Then you should query the schema of the most relevant tables.
    """

    return system_prompt.format(
        dialect=dialect,
        top_k=top_k,
    )


system_prompt = get_system_prompt(dialect=db.dialect, top_k=5)

### Define Response Format

In [None]:
import sqlparse


class AgentOutput(BaseModel):
    """Agent response containing analysis summary, SQL query, dataset, and an optional Plotly JSON Chart."""

    summary: str = Field(
        ...,
        description=(
            "Natural language answer with key findings and specific details. "
            "Example: 'The Sales department has 150 employees.'"
        ),
    )

    sql_query: str | None = Field(
        default=None,
        description=(
            "SQL query executed to retrieve data. "
            "Example: 'SELECT AVG(Salary), MIN(Salary), MAX(Salary) FROM employees;'"
        ),
    )

    dataset: str | None = Field(
        default=None,
        description=(
            "JSON representation of query results, pandas-compatible format. "
            'Example: \'{"data": [[120000, 70000, 210000]], "columns": ["avg_salary", "min_salary", "max_salary"]}\''
        ),
    )

    plotly_json_fig: str | None = Field(
        default=None,
        description=(
            'A Plotly JSON figure representation for visualizing the dataset. '
            'Also include this field when a graphical representation of the data is helpful. '
            "IMPORTANT: Always include data labels on the chart by setting 'text' in the trace "
            "and 'textposition' to display values on the bars/points. "
            "For bar charts, use 'textposition': 'auto' or 'outside'. "
            "For scatter plots, use 'mode': 'markers+text'. "
            'Example: \'{"data": [{"type": "bar", "x": ["A", "B"], "y": [10, 20], '
            '"text": [10, 20], "textposition": "auto"}], '
            '"layout": {"title": "Sample Bar Chart"}}\''
        ),
    )

    @field_validator('sql_query')
    @classmethod
    def format_sql_query(cls, v: str | None) -> str | None:
        """Format SQL query by removing common leading whitespace."""
        if v is None:
            return None
        return sqlparse.format(v, reindent=True, keyword_case='upper', indent_width=4).strip()

    def get_message(self) -> str:
        """Get a user-friendly message summarizing the agent's response."""
        message = self.summary
        if self.sql_query:
            message += f'\n\nSQL Query Executed:\n```sql\n{self.sql_query}\n```'
        return message


### Define LangChain Agent

In [14]:
from langchain.agents import create_agent


agent = create_agent(
    model=llm,
    tools=tools,
    system_prompt=system_prompt,
    response_format=AgentOutput,
)

### Test Agent

In [17]:
from langchain_core.runnables import RunnableConfig


inputs = {
    "messages": [
        {
            "role": "user",
            "content": "show the number of employees by business unit",
        }
    ]
}

user_config = RunnableConfig(
    configurable={'thread_id': 'user_thread_id_1993'},
    recursion_limit=40,
)

for step in agent.stream(inputs, config=user_config, stream_mode="values"):
    step["messages"][-1].pretty_print()


show the number of employees by business unit
Tool Calls:
  sql_db_list_tables (call_pqQ2DIPqWr2IN5MxOJ7RL205)
 Call ID: call_pqQ2DIPqWr2IN5MxOJ7RL205
  Args:
Name: sql_db_list_tables

business_units, compensations, departments, employees, jobs
Tool Calls:
  sql_db_schema (call_bCdGeISy7zDSe8eGMiIqNqFQ)
 Call ID: call_bCdGeISy7zDSe8eGMiIqNqFQ
  Args:
    table_names: business_units, employees
Name: sql_db_schema


CREATE TABLE business_units (
	id TEXT, 
	name TEXT, 
	description TEXT, 
	director_job_id TEXT
)

/*
3 rows from business_units table:
id	name	description	director_job_id
b1a0e1f2-1111-4c1a-9a10-0001d0b00101	Domestic Retail Division	Oversees all retail formats in the home market, including supercenters, supermarkets, and discount s	d1b2c3d4-0001-4e01-9000-000000000001
b1a0e1f2-2222-4c1a-9a10-0001d0b00102	International Markets Division	Manages international retail formats and regional marketplaces in EMEA, APAC, and LATAM.	d1b2c3d4-0002-4e01-9000-000000000002
ad21e882-b9e3-4

In [18]:
import plotly.io as pio


if "structured_response" in step:
    response = AgentOutput.model_validate(step["structured_response"])

    print(f"Agent Response:\n{response.get_message()}")

    if response.plotly_json_fig:
        fig = pio.from_json(response.plotly_json_fig, skip_invalid=True)
        fig.show()

Agent Response:
The number of employees by business unit is as follows:
- Membership-Based Wholesale Club Division: 93 employees
- Domestic Retail Division: 10,795 employees
- International Markets Division: 4,746 employees
- Shared Services: 66 employees

These figures provide a view of how the workforce is distributed among different business sectors, with the Domestic Retail Division having the largest number of employees.

SQL Query Executed:
```sql
SELECT b.name,
       COUNT(e.id) AS employee_count
FROM business_units b
LEFT JOIN employees e ON b.id = e.business_unit_id
GROUP BY b.id
LIMIT 5;
```


## Chainlit (Conversational AI UI)