In [10]:
import os
from langchain.chat_models import init_chat_model
import psycopg2
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from langchain_community.utilities import SQLDatabase
%reload_ext autoreload
%autoreload 2

In [11]:
model = init_chat_model("google_genai:gemini-2.5-flash-lite", api_key = os.getenv("GOOGLE_API_KEY"))

In [3]:
model.invoke("Where as the first worldcup held in?")

AIMessage(content='The first FIFA World Cup was held in **Uruguay** in **1930**.', additional_kwargs={}, response_metadata={'prompt_feedback': {'block_reason': 0, 'safety_ratings': []}, 'finish_reason': 'STOP', 'model_name': 'gemini-2.5-flash-lite', 'safety_ratings': [], 'model_provider': 'google_genai'}, id='lc_run--df0bd0d0-b39c-4f08-a665-c418f27eec9e-0', usage_metadata={'input_tokens': 10, 'output_tokens': 20, 'total_tokens': 30, 'input_token_details': {'cache_read': 0}})

In [4]:
import psycopg2

conn = psycopg2.connect(
    dbname = 'company',
    user = 'postgres',
    password = '6520',
    host = 'localhost',
    port = '5432'
)

In [5]:
from configparser import ConfigParser

def load_config(filename='database.ini', section='postgresql'):
    parser = ConfigParser()
    parser.read(filename)
    
    config = {}
    if parser.has_section(section):
        params = parser.items(section)
        for param in params:
            config[param[0]] = param[1]
    else:
        raise Exception('Section {0} not found in the {1} file'.format(section, filename))
    return config

In [6]:
config = load_config()

In [7]:
config

{'host': 'localhost',
 'database': 'company',
 'user': 'postgres',
 'password': '6520',
 'port': '5432'}

In [8]:
def connect(config):
    try:
        with psycopg2.connect(**config) as conn:
            print("Connection successful")
            return conn
    except (psycopg2.DatabaseError) as error:
        print(error)

In [9]:
connect(config)

Connection successful


<connection object at 0x7f8d5fe2c2c0; dsn: 'user=postgres password=xxx dbname=company host=localhost port=5432', closed: 0>

# DB Conn

In [12]:
db = SQLDatabase.from_uri("postgresql+psycopg2://postgres:6520@localhost:5432/company")

In [28]:
print(db.dialect)
print(db.get_usable_table_names())

postgresql
['employees']


# SQLDatabase wrapper
- to execute SQL queries and fetch result
- wrapper for sql database

In [16]:
from langchain_community.agent_toolkits import SQLDatabaseToolkit

toolkit = SQLDatabaseToolkit(db=db, llm = model)

In [None]:
tools = toolkit.get_tools()

<class 'langchain_community.tools.sql_database.tool.QuerySQLDatabaseTool'>


In [23]:
for tool in tools:
    print(f"{tool.name}: {tool.description}")

sql_db_query: Input to this tool is a detailed and correct SQL query, output is a result from the database. If the query is not correct, an error message will be returned. If an error is returned, rewrite the query, check the query, and try again. If you encounter an issue with Unknown column 'xxxx' in 'field list', use sql_db_schema to query the correct table fields.
sql_db_schema: Input to this tool is a comma-separated list of tables, output is the schema and sample rows for those tables. Be sure that the tables actually exist by calling sql_db_list_tables first! Example Input: table1, table2, table3
sql_db_list_tables: Input is an empty string, output is a comma-separated list of tables in the database.
sql_db_query_checker: Use this tool to double check if your query is correct before executing it. Always use this tool before executing a query with sql_db_query!


In [29]:
system_prompt = """
You are an agent designed to interact with a SQL database.
Given an input question, create a syntactically correct {dialect} query to run,
then look at the results of the query and return the answer. Unless the user
specifies a specific number of examples they wish to obtain, always limit your
query to at most {top_k} results.

You can order the results by a relevant column to return the most interesting
examples in the database. Never query for all the columns from a specific table,
only ask for the relevant columns given the question.

You MUST double check your query before executing it. If you get an error while
executing a query, rewrite the query and try again.

DO NOT make any DML statements (INSERT, UPDATE, DELETE, DROP etc.) to the
database.

To start you should ALWAYS look at the tables in the database to see what you
can query. Do NOT skip this step.

Then you should query the schema of the most relevant tables.
""".format(
    dialect=db.dialect,
    top_k=5,
)

In [31]:
from langchain.agents import create_agent

agent = create_agent(
    model,
    tools,
    system_prompt=system_prompt,
)

In [33]:
question = "who has the highest salary?"
for step in agent.stream(
    {"messages": [{"role": "user", "content": question}]},
    stream_mode = "values",
):
    step["messages"][-1].pretty_print()


who has the highest salary?
Tool Calls:
  sql_db_list_tables (3d1cb496-251c-4305-b340-8f3f50c0f6dc)
 Call ID: 3d1cb496-251c-4305-b340-8f3f50c0f6dc
  Args:
    tool_input:
Name: sql_db_list_tables

employees
Tool Calls:
  sql_db_schema (f3eba9b5-9af7-4e68-949d-1325c3d1d8b8)
 Call ID: f3eba9b5-9af7-4e68-949d-1325c3d1d8b8
  Args:
    table_names: employees
Name: sql_db_schema


CREATE TABLE employees (
	employee_id SERIAL NOT NULL, 
	first_name VARCHAR(50) NOT NULL, 
	last_name VARCHAR(50) NOT NULL, 
	date_of_birth DATE NOT NULL, 
	position VARCHAR(100) NOT NULL, 
	salary NUMERIC(10, 2) NOT NULL, 
	contact_no VARCHAR(20) NOT NULL, 
	address TEXT NOT NULL, 
	email VARCHAR(100), 
	hire_date DATE DEFAULT CURRENT_DATE, 
	created_at TIMESTAMP WITHOUT TIME ZONE DEFAULT CURRENT_TIMESTAMP, 
	CONSTRAINT employees_pkey PRIMARY KEY (employee_id), 
	CONSTRAINT employees_email_key UNIQUE NULLS DISTINCT (email), 
	CONSTRAINT employees_salary_check CHECK (salary > 0::numeric)
)

/*
3 rows from employee

# Define State

In [21]:
class DatabaseState(TypedDict):
    query: str
    results: str

In [None]:
graph = StateGraph(DatabaseState)

# Nodes
graph.add_node('')