# IBM i Security SQL Statements Test

This notebook tests the SQL statements used in the security_assistant.py file. It uses the same DB2 for i services to check for user profiles exposed to potential security vulnerabilities and to generate corrective measures.

Date: June 6, 2025

In [11]:
from mapepire_python import connect
import os
from textwrap import dedent
from typing import Any, Dict, Optional

from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.models.ollama import Ollama
from agno.tools import tool
from dotenv import load_dotenv
from mapepire_python import connect
from pep249 import QueryParameters
from dotenv import load_dotenv

In [12]:
load_dotenv()

credentials = {
    "host": os.getenv("HOST"),
    "user": os.getenv("DB_USER"),
    "password": os.getenv("PASSWORD"),
    "port": os.getenv("DB_PORT"),
}

def run_sql_statement(
    sql: str,
    parameters: Optional[QueryParameters] = None,
    creds: Dict[str, Any] = credentials,
) -> Any:
    with connect(creds) as conn:
        with conn.execute(sql, parameters=parameters) as cur:
            if cur.has_results:
                result = cur.fetchall()
                return result["data"]
            else:
                return "SQL executed successfully. No results returned."



In [13]:
# Define the SQL statements from security_assistant.py
sql = {
    "security_metrics": {
        "count_exposed_profiles": {
            "name": "Count Exposed User Profiles",
            "description": "How many *USRPRF's do not have *PUBLIC set to *EXCLUDE?",
            "sql": dedent(
                """
                SELECT COUNT(*)
                FROM qsys2.object_privileges
                WHERE system_object_schema = 'QSYS' AND object_type = '*USRPRF' AND object_name NOT IN ('QDBSHR',
                        'QDBSHRDO', 'QDOC', 'QTMPLPD') AND user_name = '*PUBLIC' AND object_authority <>
                        '*EXCLUDE'
                """
            ),
        },
        "list_exposed_profiles": {
            "name": "List Exposed User Profiles",
            "description": "Which *USRPRF's do not have *PUBLIC set to *EXCLUDE?",
            "sql": dedent(
                """
                SELECT object_name AS user_name, object_authority
                FROM qsys2.object_privileges
                WHERE system_object_schema = 'QSYS' AND object_type = '*USRPRF' AND object_name NOT IN ('QDBSHR',
                        'QDBSHRDO', 'QDOC', 'QTMPLPD') AND user_name = '*PUBLIC' AND object_authority <>
                        '*EXCLUDE'
                
                """
            ),
        },
        "fix_exposed_profiles": {
            "name": "Fix Exposed User Profiles",
            "description": "Which *USRPRF's do not have *PUBLIC set to *EXCLUDE? Include a query that corrects the exposure",
            "sql": dedent(
                """
                SELECT object_name AS user_name, object_authority,
                'SELECT qsys2.qcmdexc(''GRTOBJAUT OBJ(QSYS/' || object_name || ') OBJTYPE(*USRPRF) USER(*PUBLIC) AUT(*EXCLUDE)'') FROM sysibm.sysdummy1'
                    AS corrective_query
                FROM qsys2.object_privileges
                WHERE system_object_schema = 'QSYS' AND object_type = '*USRPRF' AND object_name NOT IN ('QDBSHR',
                        'QDBSHRDO', 'QDOC', 'QTMPLPD') AND user_name = '*PUBLIC' AND object_authority <>
                        '*EXCLUDE'
            
                """
            ),
        },
    }
}

## 1. Test Setup - Create User Profiles for Testing

First, we'll create some test user profiles that have *PUBLIC set to *USE instead of *EXCLUDE to test our queries.

In [14]:
# Create test user profiles that are exposed to attack
test_setup_sql = """
cl: crtusrprf topadmin;
cl: crtusrprf topadmin2;
cl:GRTOBJAUT OBJ(QSYS/TOPADMIN) OBJTYPE(*USRPRF) USER(*PUBLIC) AUT(*USE);
cl:GRTOBJAUT OBJ(QSYS/TOPADMIN2) OBJTYPE(*USRPRF) USER(*PUBLIC) AUT(*USE);
"""

# Let's run each command separately
commands = test_setup_sql.strip().split(';')
for cmd in commands:
    if not cmd.strip():
        continue
    print(f"Executing: {cmd}")
    try:
        result = run_sql_statement(f"select qsys2.qcmdexc('{cmd.strip()}') from sysibm.sysdummy1")
        print(f"Result: {result}")
    except Exception as e:
        print(f"Error: {str(e)}")

Executing: cl: crtusrprf topadmin
Result: [{'00001': -1}]
Executing: 
cl: crtusrprf topadmin2
Result: [{'00001': -1}]
Executing: 
cl:GRTOBJAUT OBJ(QSYS/TOPADMIN) OBJTYPE(*USRPRF) USER(*PUBLIC) AUT(*USE)
Result: [{'00001': 1}]
Executing: 
cl:GRTOBJAUT OBJ(QSYS/TOPADMIN2) OBJTYPE(*USRPRF) USER(*PUBLIC) AUT(*USE)
Result: [{'00001': 1}]


## 2. Count Exposed Profiles

Now let's run the query to count how many user profiles have *PUBLIC authority not set to *EXCLUDE.

In [15]:
# Count exposed profiles
count_sql = sql["security_metrics"]["count_exposed_profiles"]["sql"]
print(f"Running SQL:\n{count_sql}\n")

result = run_sql_statement(count_sql)
print(f"Number of exposed profiles: {result}")

Running SQL:

SELECT COUNT(*)
FROM qsys2.object_privileges
WHERE system_object_schema = 'QSYS' AND object_type = '*USRPRF' AND object_name NOT IN ('QDBSHR',
        'QDBSHRDO', 'QDOC', 'QTMPLPD') AND user_name = '*PUBLIC' AND object_authority <>
        '*EXCLUDE'


Number of exposed profiles: [{'00001': 2}]


## 3. List Exposed Profiles

Now let's run the query to list all user profiles that have *PUBLIC authority not set to *EXCLUDE.

In [16]:
# List exposed profiles
list_sql = sql["security_metrics"]["list_exposed_profiles"]["sql"]
print(f"Running SQL:\n{list_sql}\n")

result = run_sql_statement(list_sql)
print("Exposed profiles:")
for profile in result:
    print(f"User: {profile.get('USER_NAME')}, Authority: {profile.get('OBJECT_AUTHORITY')}")

Running SQL:

SELECT object_name AS user_name, object_authority
FROM qsys2.object_privileges
WHERE system_object_schema = 'QSYS' AND object_type = '*USRPRF' AND object_name NOT IN ('QDBSHR',
        'QDBSHRDO', 'QDOC', 'QTMPLPD') AND user_name = '*PUBLIC' AND object_authority <>
        '*EXCLUDE'



Exposed profiles:
User: TOPADMIN, Authority: *USE
User: TOPADMIN2, Authority: *USE


## 4. Generate Corrective Queries

Now let's run the query to generate corrective measures for user profiles that have *PUBLIC authority not set to *EXCLUDE.

In [17]:
# Generate corrective queries
fix_sql = sql["security_metrics"]["fix_exposed_profiles"]["sql"]
print(f"Running SQL:\n{fix_sql}\n")

result = run_sql_statement(fix_sql)
print("Corrective measures:")
for profile in result:
    print(f"User: {profile.get('USER_NAME')}, Authority: {profile.get('OBJECT_AUTHORITY')}")
    print(f"Corrective query: {profile.get('CORRECTIVE_QUERY')}")
    print("---")

Running SQL:

SELECT object_name AS user_name, object_authority,
'SELECT qsys2.qcmdexc(''GRTOBJAUT OBJ(QSYS/' || object_name || ') OBJTYPE(*USRPRF) USER(*PUBLIC) AUT(*EXCLUDE)'') FROM sysibm.sysdummy1'
    AS corrective_query
FROM qsys2.object_privileges
WHERE system_object_schema = 'QSYS' AND object_type = '*USRPRF' AND object_name NOT IN ('QDBSHR',
        'QDBSHRDO', 'QDOC', 'QTMPLPD') AND user_name = '*PUBLIC' AND object_authority <>
        '*EXCLUDE'



Corrective measures:
User: TOPADMIN, Authority: *USE
Corrective query: SELECT qsys2.qcmdexc('GRTOBJAUT OBJ(QSYS/TOPADMIN) OBJTYPE(*USRPRF) USER(*PUBLIC) AUT(*EXCLUDE)') FROM sysibm.sysdummy1
---
User: TOPADMIN2, Authority: *USE
Corrective query: SELECT qsys2.qcmdexc('GRTOBJAUT OBJ(QSYS/TOPADMIN2) OBJTYPE(*USRPRF) USER(*PUBLIC) AUT(*EXCLUDE)') FROM sysibm.sysdummy1
---


## 5. Execute Corrective Queries

Now let's run the corrective queries to fix the exposed user profiles.

In [8]:
# Execute corrective queries
result = run_sql_statement(fix_sql)

# Loop through the results and execute each corrective query
for profile in result:
    user_name = profile.get('USER_NAME')
    corrective_query = profile.get('CORRECTIVE_QUERY')
    
    print(f"Fixing user profile: {user_name}")
    print(f"Executing: {corrective_query}")
    
    try:
        fix_result = run_sql_statement(corrective_query)
        print(f"Result: {fix_result}")
    except Exception as e:
        print(f"Error: {str(e)}")
    print("---")

Fixing user profile: TOPADMIN
Executing: SELECT qsys2.qcmdexc('GRTOBJAUT OBJ(QSYS/TOPADMIN) OBJTYPE(*USRPRF) USER(*PUBLIC) AUT(*EXCLUDE)') FROM sysibm.sysdummy1
Result: [{'00001': 1}]
---
Fixing user profile: TOPADMIN2
Executing: SELECT qsys2.qcmdexc('GRTOBJAUT OBJ(QSYS/TOPADMIN2) OBJTYPE(*USRPRF) USER(*PUBLIC) AUT(*EXCLUDE)') FROM sysibm.sysdummy1
Result: [{'00001': 1}]
---


In [19]:
def run_corrective_query(profile_name: str = None) -> str:
    """Execute the corrective SQL query to fix an exposed user profile
    
    Args:
        profile_name: The name of the user profile to fix. If not provided, 
                      all exposed profiles will be fixed.
    
    Returns:
        Results of executing the corrective queries
    """
    # First get the list of exposed profiles with their corrective queries
    result = run_sql_statement(dedent(sql["security_metrics"]["fix_exposed_profiles"]["sql"]))
    
    # Parse the result to extract user profiles and their corrective queries
    import json
    import re
    
    try:
        data = result
        results = []
        
        for row in data:
            current_profile = row.get("USER_NAME")
            corrective_query = row.get("CORRECTIVE_QUERY")
            
            # If a specific profile was requested, only fix that one
            if profile_name and profile_name.upper() != current_profile.upper():
                continue
                
            # Execute the corrective query
            query_result = run_sql_statement(corrective_query)
            results.append(f"Fixed profile {current_profile}: {query_result}")
        
        if not results:
            if profile_name:
                return f"No exposed profile found with name '{profile_name}'"
            else:
                return "No exposed profiles found that need fixing"
        
        return "\n".join(results)
    except Exception as e:
        return f"Error executing corrective queries: {str(e)}\nRaw result: {result}"
    
run_corrective_query()

"Fixed profile TOPADMIN: [{'00001': 1}]\nFixed profile TOPADMIN2: [{'00001': 1}]"

## 6. Verify Fixes

Now let's run the count and list queries again to verify that the user profiles have been fixed.

In [10]:
# Verify fixes by running the count query again
count_sql = sql["security_metrics"]["count_exposed_profiles"]["sql"]
result = run_sql_statement(count_sql)
print(f"Number of exposed profiles after fixes: {result}")

# List any remaining exposed profiles
if result:
    list_sql = sql["security_metrics"]["list_exposed_profiles"]["sql"]
    result = run_sql_statement(list_sql)
    print("\nRemaining exposed profiles:")
    for profile in result:
        print(f"User: {profile.get('USER_NAME')}, Authority: {profile.get('OBJECT_AUTHORITY')}")
else:
    print("\nAll user profiles have been fixed!")

Number of exposed profiles after fixes: [{'00001': 0}]

Remaining exposed profiles:


## 7. Clean Up Test Environment

Finally, let's clean up by deleting the test user profiles we created.

In [None]:
# Clean up test user profiles
cleanup_sql = """
cl: dltusrprf topadmin;
cl: dltusrprf topadmin2;
"""

# Let's run each command separately
commands = cleanup_sql.strip().split(';')
for cmd in commands:
    if not cmd.strip():
        continue
    print(f"Executing: {cmd}")
    try:
        result = run_sql_statement(f"select qsys2.qcmdexc('{cmd.strip()}') from sysibm.sysdummy1")
        print(f"Result: {result}")
    except Exception as e:
        print(f"Error: {str(e)}")

## Conclusion

In this notebook, we have demonstrated how to:

1. Create test user profiles with security vulnerabilities
2. Count exposed user profiles
3. List exposed user profiles
4. Generate corrective SQL queries
5. Execute the corrective queries to fix the vulnerabilities
6. Verify that the fixes were applied correctly
7. Clean up the test environment

These same SQL queries and techniques are used in the security_assistant.py agent to provide an interactive way to identify and fix security vulnerabilities in IBM i user profiles.