# CREATE A SQL AGENT USING CREWAI AND GROQ


[medium article](https://medium.com/the-ai-forum/create-a-sql-agent-using-crewai-and-groq-005895ba31b3#id_token=eyJhbGciOiJSUzI1NiIsImtpZCI6ImVlYzUzNGZhNWI4Y2FjYTIwMWNhOGQwZmY5NmI1NGM1NjIyMTBkMWUiLCJ0eXAiOiJKV1QifQ.eyJpc3MiOiJodHRwczovL2FjY291bnRzLmdvb2dsZS5jb20iLCJhenAiOiIyMTYyOTYwMzU4MzQtazFrNnFlMDYwczJ0cDJhMmphbTRsamRjbXMwMHN0dGcuYXBwcy5nb29nbGV1c2VyY29udGVudC5jb20iLCJhdWQiOiIyMTYyOTYwMzU4MzQtazFrNnFlMDYwczJ0cDJhMmphbTRsamRjbXMwMHN0dGcuYXBwcy5nb29nbGV1c2VyY29udGVudC5jb20iLCJzdWIiOiIxMTQwNDEwNjgxNjIzOTM5OTc4OTIiLCJlbWFpbCI6InZhbXNpLmFyQGdtYWlsLmNvbSIsImVtYWlsX3ZlcmlmaWVkIjp0cnVlLCJuYmYiOjE3MzkwMzI5NTUsIm5hbWUiOiJSYWdodSB2YW1zaSIsInBpY3R1cmUiOiJodHRwczovL2xoMy5nb29nbGV1c2VyY29udGVudC5jb20vYS9BQ2c4b2NKaEZBeHY3OHdBOVc0TzhfS1JTUlR2dUFhLVpJVThOXzd6R2NuclJLSnNWcUt4bmo2aD1zOTYtYyIsImdpdmVuX25hbWUiOiJSYWdodSIsImZhbWlseV9uYW1lIjoidmFtc2kiLCJpYXQiOjE3MzkwMzMyNTUsImV4cCI6MTczOTAzNjg1NSwianRpIjoiMGIzZjI0M2I0NGI3MTIyODI3MDcxNGIxZDkwNjZhYWQ5ZDMyZGQyNyJ9.lu6yyFe0oX76nWUn5UiuoizCqH-ay9FoL_dbh4jQOZlmMM_3XPXzwbq8b-nd-71SA8ktDAO5Nr0R4n5sdDduWAmOcCJWm0Dm3yrB6_Gg4IPeKGBco5ZRquGcnQSvqh-wOCNjwrcs_k57QNzu9yjsUXpBFEmaN7k28joZZXqHUYz9hAKemDFHVJWp43byyXLBQJeT2z9I5CA4WlAA851GSBNJlq1Ko4baMXffyEziITIHJWKfW28of-J-t7F-Uaqrx4r4KKazSISa58dhWdufIiwU8E1C0iDQyj6BhGemmSKd4yTbkuW9MR3V7XVqH-qk0axFeE13GKI76ijfEO5SHw)

## SETUP


In [85]:
import json
import os
import sqlite3
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from pathlib import Path
from textwrap import dedent
from typing import Any, Dict, List, Tuple, Union

import pandas as pd
from crewai import Agent, Crew, Process, Task
from crewai.tools import tool
from langchain.schema import AgentFinish
from langchain.schema.output import LLMResult
from langchain_community.tools.sql_database.tool import (
    InfoSQLDatabaseTool,
    ListSQLDatabaseTool,
    QuerySQLCheckerTool,
    # QuerySQLDataBaseTool,
)
from langchain_community.tools import QuerySQLDatabaseTool
from langchain_community.utilities.sql_database import SQLDatabase
from langchain_core.callbacks.base import BaseCallbackHandler
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

# from google.colab import userdata

In [86]:
# import openai apikey
import os
from dotenv import load_dotenv, find_dotenv

_ = load_dotenv(find_dotenv())
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY

## DATA

- [dataset of ds_salaries](https://github.com/YuluDuan/Hypothesis-Testing-Data-Science-salary-comparison-in-different-location/blob/main/ds_salaries.csv)

In [87]:
# load the data
df = pd.read_csv(
    "/Users/vamsi_mbmax/Library/CloudStorage/OneDrive-Personal/01_vam_PROJECTS/LEARNING/proj_AI/dev_proj_AI/practise_ai_crewai_learn/data/ds_salaries.csv",
    index_col=0,
)

df.head()

Unnamed: 0,work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size
0,2020,MI,FT,Data Scientist,70000,EUR,79833,DE,0,DE,L
1,2020,SE,FT,Machine Learning Scientist,260000,USD,260000,JP,0,JP,S
2,2020,SE,FT,Big Data Engineer,85000,GBP,109024,GB,50,GB,M
3,2020,MI,FT,Product Data Analyst,20000,USD,20000,HN,0,HN,S
4,2020,SE,FT,Machine Learning Engineer,150000,USD,150000,US,50,US,L


In [88]:
connection = sqlite3.connect("../data/salaries.db")

try:
    df.to_sql(name="salaries", con=connection)
except:
    print("db already exist")

db already exist


In [89]:
# connect to database
db = SQLDatabase.from_uri("sqlite:///../data/salaries.db")
db.dialect

'sqlite'

## SETUP LOGGER

In [90]:
@dataclass
class Event:
    event: str
    timestamp: str
    text: str


def _current_time() -> str:
    return datetime.now(timezone.utc).isoformat()

In [91]:
class LLMCallbackHandler(BaseCallbackHandler):
    def __init__(self, log_path: Path):
        self.log_path = log_path

    def on_llm_start(
        self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
    ) -> Any:
        """Run when LLM starts running."""
        assert len(prompts) == 1
        event = Event(event="llm_start", timestamp=_current_time(), text=prompts[0])
        with self.log_path.open("a", encoding="utf-8") as file:
            file.write(json.dumps(asdict(event)) + "\n")

    def on_llm_end(self, response: LLMResult, **kwargs: Any) -> Any:
        """Run when LLM ends running."""
        generation = response.generations[-1][-1].message.content
        event = Event(event="llm_end", timestamp=_current_time(), text=generation)
        with self.log_path.open("a", encoding="utf-8") as file:
            file.write(json.dumps(asdict(event)) + "\n")

## SETUP THE LLM

In [92]:
llm = ChatOpenAI(
    api_key=OPENAI_API_KEY,
    model="gpt-4o-mini",
    callbacks=[
        LLMCallbackHandler(
            Path(
                "/Users/vamsi_mbmax/Library/CloudStorage/OneDrive-Personal/01_vam_PROJECTS/LEARNING/proj_AI/dev_proj_AI/practise_ai_crewai_learn/logs/prompts.jsonl"
            )
        )
    ],
)

# llm.invoke("what is 2+2")

## CREATE TOOLS

In [93]:
# tool1 : list all the tables in the database
@tool("list_table")
def list_tables() -> str:
    """List the available tables in the database"""
    return ListSQLDatabaseTool(db=db).invoke("")


list_tables.run()

Using Tool: list_table


'salaries'

In [94]:
# tool2: return the schema and sample rows
@tool("tables_schema")
def tables_schema(tables: str) -> str:
    """
    Input is a comma-separated list of tables, output is the schema and sample rows
    for those tables. Be sure that the tables actually exist by calling `list_tables` first!
    Example Input: table1, table2, table3
    """
    tool = InfoSQLDatabaseTool(db=db)
    return tool.invoke(tables)


print(tables_schema.run("salaries"))

Using Tool: tables_schema

CREATE TABLE salaries (
	"index" INTEGER, 
	work_year INTEGER, 
	experience_level TEXT, 
	employment_type TEXT, 
	job_title TEXT, 
	salary INTEGER, 
	salary_currency TEXT, 
	salary_in_usd INTEGER, 
	employee_residence TEXT, 
	remote_ratio INTEGER, 
	company_location TEXT, 
	company_size TEXT
)

/*
3 rows from salaries table:
index	work_year	experience_level	employment_type	job_title	salary	salary_currency	salary_in_usd	employee_residence	remote_ratio	company_location	company_size
0	2020	MI	FT	Data Scientist	70000	EUR	79833	DE	0	DE	L
1	2020	SE	FT	Machine Learning Scientist	260000	USD	260000	JP	0	JP	S
2	2020	SE	FT	Big Data Engineer	85000	GBP	109024	GB	50	GB	M
*/


In [95]:
# tool3: executes a given sql query
@tool("execute_sql")
def execute_sql(sql_query: str) -> str:
    """Execute a SQL query against the database. Returns the result"""
    return QuerySQLDataBaseTool(db=db).invoke(sql_query)


execute_sql.run("SELECT * FROM salaries LIMIT 5")

Using Tool: execute_sql


"[(0, 2020, 'MI', 'FT', 'Data Scientist', 70000, 'EUR', 79833, 'DE', 0, 'DE', 'L'), (1, 2020, 'SE', 'FT', 'Machine Learning Scientist', 260000, 'USD', 260000, 'JP', 0, 'JP', 'S'), (2, 2020, 'SE', 'FT', 'Big Data Engineer', 85000, 'GBP', 109024, 'GB', 50, 'GB', 'M'), (3, 2020, 'MI', 'FT', 'Product Data Analyst', 20000, 'USD', 20000, 'HN', 0, 'HN', 'S'), (4, 2020, 'SE', 'FT', 'Machine Learning Engineer', 150000, 'USD', 150000, 'US', 50, 'US', 'L')]"

In [103]:
# tool4: check the sql query before executing
@tool("check_sql")
def check_sql(sql_query: str) -> str:
    """
    Use this tool to double check if your query is correct before executing it. Always use this tool before executing a query with  'execute_sql'
    """
    return QuerySQLCheckerTool(db=db, llm=llm).invoke({"query": sql_query})


check_sql.run("SELECT * WHERE salary > 10000 LIMIT 5 table = salaries")

Using Tool: check_sql


'```sql\nSELECT * FROM salaries WHERE salary > 10000 LIMIT 5;\n```'

## AGENTS

In [108]:
# agent1 : database developer
sql_dev = Agent(
    role="Senior Database Developer",
    goal="Construct and execute SQL queries based on a request",
    backstory=dedent(
        """
        You are an experienced database engineer who is master at creating efficient and complex SQL queries.
        You have a deep understanding of how different databases work and how to optimize queries.
        Use the `list_tables` to find available tables.
        Use the `tables_schema` to understand the metadata for the tables.
        Use the `execute_sql` to check your queries for correctness.
        Use the `check_sql` to execute queries against the database.
    """
    ),
    llm=llm,
    tools=[list_tables, tables_schema, execute_sql, check_sql],
    allow_delegation=False,
)

In [112]:
# agent2: data analyst
data_analyst = Agent(
    role="Senior Data Analyst",
    goal="You receive data from the database developer and analyze it",
    backstory=dedent(
        """
        You have deep experience with analyzing datasets using Python.
        Your work is always based on the provided data and is clear,
        easy-to-understand and to the point. You have attention
        to detail and always produce very detailed work (as long as you need).
    """
    ),
    llm=llm,
    allow_delegation=False,
)

In [114]:
# agent3: Senior report writer
report_writer = Agent(
    role="Senior Report Editor",
    goal="Write an Executive Summary type of report based on the work of the Data Analyst",
    backstory="""You writing skill is well known for clear and effective communication. You always summarize long texts into bullet points that contain the most important details""",
    llm=llm,
    allow_delegation=False,
)

## TASKS

In [116]:
# task1: extract data
extract_data = Task(
    description="Extract data that is required for the query {query}.",
    expected_output="Database result for the query",
    agent=sql_dev,
)

In [117]:
# task2: analyze data
analyze_data = Task(
    description="Analyze the data from the database and write an analysis for {query}.",
    expected_output="Detailed analysis text",
    agent=data_analyst,
    context=[extract_data],
)

In [118]:
# task3: write report
write_report = Task(
    description=dedent(
        """Write an executive summary of the report from the analysis. The report must be less than 100 words."""
    ),
    expected_output="Markdown Report",
    agent=report_writer,
    context=[analyze_data],
)

## CREW

In [126]:
crew = Crew(
    agents=[sql_dev, data_analyst, report_writer],
    tasks=[extract_data, analyze_data, write_report],
    process=Process.sequential,
    verbose=False,
    memory=False,
    output_log_file="crew.log",
)

Overriding of current TracerProvider is not allowed


In [127]:
inputs = {
    "query": "Effects on salary (in US Dollar) based on company location, size and employee experience"
}

result = crew.kickoff(inputs=inputs)

In [129]:
from IPython.display import Markdown, display

display(Markdown(result.raw))

### Executive Summary

This report analyzes the impact of company location, size, and employee experience on average salaries. Key findings include:

- **Location**: The US offers the highest salaries, with mid-level salaries at $69,298.33 and entry-level roles reaching $416,000. Canada and Australia also show competitive wages, while Brazil and Vietnam have significantly lower averages. 
- **Company Size**: Larger companies pay the highest salaries. For example, mid-level roles in Germany average $80,982.4, whereas small companies, particularly in Brazil, offer lower salaries.
- **Employee Experience**: Higher experience leads to significantly greater salaries, exemplified by the US's entry-level at $21,453.5 and experienced roles at $416,000. 

Overall, this data highlights substantial salary variations driven by these factors, guiding both job seekers and employers.