In [1]:
from langchain_community.utilities import SQLDatabase
from nodes.query_writer import QueryWriter

db =SQLDatabase.from_uri(f"sqlite:///iam_risks.db")
q = QueryWriter(db)
#res = q.process_query("get me all the users names and emails of users with that are partialy offboarded?")


In [2]:
from datetime import datetime
# List of 20 natural‐language queries to translate & execute
queries = [
    "Find all users with no MFA enabled",
    "Show inactive users in the Engineering Team",
    "List service accounts with Admin role permissions",
    "Get users with weak MFA in the Financial System",
    "Count all users by risk topic",
    "Find users who never logged in",
    "List recently joined users in the Sales Team",
    "Show local accounts in the Marketing Team",
    "Get users with MFAStatus 'none'",
    "Find users in the CRM System with weak MFA",
    "Show inactive users in the Finance department",
    "List users with risk_topic = 'NO_MFA_USERS'",
    "Find service accounts with Editor permissions",
    "Show users in the Project Management app with no MFA",
    "Count users where risk_topic IS NOT NULL",
    "List users in IT department with weak MFA",
    "Find users with risk_topic = 'INACTIVE_USERS'",
    "Show all users in the Payroll System with no MFA",
    "Get users in Engineering Team with risk_topic 'WEAK_MFA_USERS'",
    "List service accounts across all applications"
]

# Loop through each query, translate to SQL, execute, and print results
for query in queries:
    t = datetime.time()
    res = q.process_query(query)
    t = datetime.time() - t
    print(f"time tool:     {t}")
    sql_res = db.run(res.content)
    print(f"query:    {query}")
    print(f"sql:      {res.content}")
    print(f"sql_res:  {sql_res}")
    print("-" * 80)


time tool:     <module 'time' (built-in)>
query:    Find all users with no MFA enabled
sql:      SELECT u.UserID, u.Name, u.Email
   FROM Users u
   WHERE u.risk_topic = 'NO_MFA_USERS';
sql_res:  [('U001', 'Alice Johnson', 'alice@example.com'), ('U008', 'Henry Chen', 'henry.chen@example.com'), ('U014', 'Noah Garcia', 'noah.garcia@example.com'), ('U022', 'Victor Rodriguez', 'victor.rodriguez@example.com'), ('U025', 'Yasmine Ali', 'yasmine.ali@example.com')]
--------------------------------------------------------------------------------
time tool:     <module 'time' (built-in)>
query:    Show inactive users in the Engineering Team
sql:      SELECT u.UserID, u.Name, u.Email
   FROM Users u
   JOIN UserGroups ug ON u.UserID = ug.UserID
   JOIN Groups g ON ug.GroupID = g.GroupID
   WHERE u.Status = 'inactive'
   AND g.GroupName = 'Engineering Team'
   AND u.risk_topic IS NOT NULL;
sql_res:  [('U013', 'Mia Thompson', 'mia.thompson@example.com'), ('U027', 'Amy Nguyen', 'amy.nguyen@example.co

OperationalError: (sqlite3.OperationalError) no such column: r.Permission
[SQL: SELECT u.UserID, r.RoleName
   FROM Users u
   JOIN UserRoles ur ON u.UserID = ur.UserID
   JOIN Roles r ON ur.RoleID = r.RoleID
   WHERE u.AccountType = 'service' AND r.Permissions IN ('Create', 'Read', 'Update', 'Delete', 'Approve') AND r.Permission = 'Admin-only';]
(Background on this error at: https://sqlalche.me/e/20/e3q8)

In [None]:
# Import necessary packages
from langchain_community.utilities import SQLDatabase
from nodes.query_writer import QueryWriter

# Connect to the SQLite database
db_path = "iam_risks.db"  # Adjust path if needed
db = SQLDatabase.from_uri(f"sqlite:///{db_path}")

# Create the QueryWriter
query_writer = QueryWriter(db)

# Function to run a query and show results
def run_and_show_results(natural_language_query):
    print(f"User Query: {natural_language_query}")
    
    # Generate SQL from natural language
    sql_message = query_writer.process_query(natural_language_query)
    generated_sql = sql_message.content
    print(f"\nGenerated SQL: \n{generated_sql}")
    
    # Execute the SQL
    try:
        result = db.run(generated_sql)
        print(f"\nSQL Result: \n{result}")
        return result
    except Exception as e:
        error_message = f"Error executing SQL: {str(e)}"
        print(f"\n{error_message}")
        return error_message

# Test with a few example queries
examples = [
    "which users have no MFA?",
    "list inactive users in the Engineering department",
    "show me all service accounts with weak MFA"
]

# Run the examples
for example in examples:
    print("\n" + "="*50)
    run_and_show_results(example)
    print("="*50)

# Interactive query
user_query = input("\nEnter your own query: ")
run_and_show_results(user_query)


User Query: which users have no MFA?

Generated SQL: 
SELECT UserID, Name, Email
   FROM Users
   WHERE risk_topic = 'NO_MFA_USERS';

SQL Result: 
[('U001', 'Alice Johnson', 'alice@example.com'), ('U008', 'Henry Chen', 'henry.chen@example.com'), ('U014', 'Noah Garcia', 'noah.garcia@example.com'), ('U022', 'Victor Rodriguez', 'victor.rodriguez@example.com'), ('U025', 'Yasmine Ali', 'yasmine.ali@example.com')]

User Query: list inactive users in the Engineering department

Generated SQL: 
SELECT u.UserID, u.Name, u.Email
   FROM Users u
   JOIN UsersDepartment ud ON u.UserID = ud.UserID
   WHERE u.Status = 'inactive'
   AND ud.Department = 'Engineering';

Error executing SQL: (sqlite3.OperationalError) no such table: UsersDepartment
[SQL: SELECT u.UserID, u.Name, u.Email
   FROM Users u
   JOIN UsersDepartment ud ON u.UserID = ud.UserID
   WHERE u.Status = 'inactive'
   AND ud.Department = 'Engineering';]
(Background on this error at: https://sqlalche.me/e/20/e3q8)

User Query: show me 

In [11]:
db.run("""SELECT Users.UserID, Users.Name, Users.Email, Users.Department 
FROM Users 
WHERE risk_topic = 'INACTIVE_USERS';""")

"[('U003', 'Charlie Davis', 'charlie@example.com', 'Finance'), ('U017', 'Quinn Brown', 'quinn.brown@example.com', 'Marketing'), ('U019', 'Sophia Lewis', 'sophia.lewis@example.com', 'Engineering'), ('U027', 'Amy Nguyen', 'amy.nguyen@example.com', 'Engineering'), ('U028', 'Bradley Cooper', 'bradley.cooper@example.com', 'IT')]"

In [4]:
from langchain_community.utilities import SQLDatabase
from loader import JsonFileLoader
db =SQLDatabase.from_uri(f"sqlite:///iam_risks.db")
print(db.get_usable_table_names())
print(db.get_table_info())


['Applications', 'Groups', 'ResourceRoles', 'Resources', 'Roles', 'UserApplications', 'UserGroups', 'UserRoles', 'Users']

CREATE TABLE "Applications" (
	"ApplicationID" TEXT, 
	"ApplicationName" TEXT NOT NULL, 
	"Description" TEXT NOT NULL, 
	PRIMARY KEY ("ApplicationID")
)

/*
3 rows from Applications table:
ApplicationID	ApplicationName	Description
A001	Payroll System	Handles employee payroll
A002	CRM System	Customer relationship management
A003	Project Management	Team project tracking
*/


CREATE TABLE "Groups" (
	"GroupID" TEXT, 
	"GroupName" TEXT NOT NULL, 
	"Description" TEXT NOT NULL, 
	PRIMARY KEY ("GroupID")
)

/*
3 rows from Groups table:
GroupID	GroupName	Description
G001	Engineering Team	Group for all engineering staff
G002	Sales Team	Group for sales department
G003	Marketing Team	Group for marketing department
*/


CREATE TABLE "ResourceRoles" (
	"ResourceID" TEXT, 
	"RoleID" TEXT, 
	PRIMARY KEY ("ResourceID", "RoleID"), 
	FOREIGN KEY("ResourceID") REFERENCES "Resources" 

In [7]:
from langchain_community.utilities import SQLDatabase
from loader import JsonFileLoader
db =SQLDatabase.from_uri(f"sqlite:///iam_risks.db")
print(db.get_usable_table_names())
print(db.get_table_info())

['Applications', 'Groups', 'ResourceRoles', 'Resources', 'Roles', 'UserApplications', 'UserGroups', 'UserRoles', 'Users']

CREATE TABLE "Applications" (
	"ApplicationID" TEXT, 
	"ApplicationName" TEXT NOT NULL, 
	"Description" TEXT NOT NULL, 
	PRIMARY KEY ("ApplicationID")
)

/*
3 rows from Applications table:
ApplicationID	ApplicationName	Description
A001	Payroll System	Handles employee payroll
A002	CRM System	Customer relationship management
A003	Project Management	Team project tracking
*/


CREATE TABLE "Groups" (
	"GroupID" TEXT, 
	"GroupName" TEXT NOT NULL, 
	"Description" TEXT NOT NULL, 
	PRIMARY KEY ("GroupID")
)

/*
3 rows from Groups table:
GroupID	GroupName	Description
G001	Engineering Team	Group for all engineering staff
G002	Sales Team	Group for sales department
G003	Marketing Team	Group for marketing department
*/


CREATE TABLE "ResourceRoles" (
	"ResourceID" TEXT, 
	"RoleID" TEXT, 
	PRIMARY KEY ("ResourceID", "RoleID"), 
	FOREIGN KEY("ResourceID") REFERENCES "Resources" 

In [6]:
db.run("select * from Users where risk_topic = 'INACTIVE_USERS'")

OperationalError: (sqlite3.OperationalError) no such column: risk_topic
[SQL: select * from Users where risk_topic = 'INACTIVE_USERS']
(Background on this error at: https://sqlalche.me/e/20/e3q8)

In [3]:

from pprint import pprint
pprint(q)


<src.query_writer.QueryWriter object at 0x107db7910>


"[('U001', 'John Smith', 'john.smith@example.com', 'IT', 'Developer', '2020-01-15', '2023-05-10', 'Weak', 'regular', 'active'), ('U002', 'Jane Doe', 'jane.doe@example.com', 'HR', 'Manager', '2019-03-22', '2023-06-01', 'Weak', 'regular', 'active'), ('U003', 'Bob Johnson', 'bob.johnson@example.com', 'Finance', 'Analyst', '2021-02-10', '2023-06-15', 'Strong', 'regular', 'active'), ('U004', 'Alice Brown', 'alice.brown@example.com', 'Marketing', 'Director', '2018-11-05', '2023-05-28', 'Strong', 'regular', 'active'), ('U005', 'Charlie Davis', 'charlie.davis@example.com', 'IT', 'Admin', '2022-01-10', None, 'Strong', 'regular', 'active'), ('U006', 'Eva Wilson', 'eva.wilson@example.com', 'Sales', 'Representative', '2020-08-15', '2022-12-01', 'Strong', 'regular', 'active'), ('U007', 'Frank Miller', 'frank.miller@example.com', 'IT', 'DevOps', '2019-05-20', '2023-06-10', 'Strong', 'regular', 'active'), ('U008', 'Grace Lee', 'grace.lee@example.com', 'Finance', 'Controller', '2021-03-15', '2023-06-0

In [None]:

from pprint import pprint
def text_query_result(free_language_query):
    res = q.process_query(free_language_query)
    
    pprint(res.content)
    ret= db.run(res.content)
    print(ret)
    return ret

ret= text_query_result("which users are inactive?")


('SELECT u.* FROM Users u JOIN UserRiskTypes urt ON u.id = urt.user_id JOIN '
 "UserRoles ur ON u.id = ur.role_id WHERE urt.risk_topic IN ('Inactivity') AND "
 "ur.role NOT IN ('Admin')")


OperationalError: (sqlite3.OperationalError) no such column: ur.role
[SQL: SELECT u.* FROM Users u JOIN UserRiskTypes urt ON u.id = urt.user_id JOIN UserRoles ur ON u.id = ur.role_id WHERE urt.risk_topic IN ('Inactivity') AND ur.role NOT IN ('Admin')]
(Background on this error at: https://sqlalche.me/e/20/e3q8)