In [28]:
from fastapi import FastAPI
from sqlalchemy import create_engine, inspect
from langchain_community.utilities import SQLDatabase
import sqlite3
import pandas as pd
app = FastAPI()

from langchain_core.tools import tool

engine = create_engine("sqlite:///claims.db")
print(inspect(engine).get_table_names()) 
db = SQLDatabase.from_uri("sqlite:///claims.db")  # Replace with your actual DB URI


@tool
def get_users_by_user_id(user_id: str) -> list[dict]:
    """
    Retrieve rows from the 'users' table filtered by 'user_id'.
    Useful for understanding the users for a given user_id.
    
    Args:
        user_id: The user ID to search for
        
    Returns:
        List of dictionaries containing user information (user_id, name, dob, 
        health_card, email, phone, provider_id) for matching users
    """
    query = """
    SELECT user_id, name, dob, health_card, email, phone, provider_id  FROM users  WHERE user_id = :user_id
    """
    try:
        results = db.run(query, parameters={"user_id": user_id})
        return results 
    except Exception as e:
        print(f"Database error: {str(e)}")


print(get_users_by_user_id('User1'))

['auth_users', 'claim_audit_logs', 'claim_documents', 'claims', 'communications_log', 'coverage_limits', 'dental_details', 'drug_details', 'hospital_visits', 'insurance_providers', 'policies', 'pre_authorizations', 'premium_payments', 'provider_plans', 'user_preferences', 'users', 'vision_claims']
[('User1', 'User_name1', '1989-12-02', 'HC1000', 'user1@example.com', '555-0100', 'prov3')]


In [2]:
from langchain.tools import tool
from typing import List, Dict

# User-related tools
@tool
def get_user_by_id(user_id: str) -> List[Dict]:
    """Retrieve a single user's details by their UUID"""
    query = """
    SELECT user_id, name, dob, health_card, email, phone, provider_id 
    FROM users 
    WHERE user_id = :user_id
    """
    try:
        return db.run(query, parameters={"user_id": user_id})
    except Exception as e:
        print(f"Database error: {str(e)}")
        return []

@tool
def get_users_by_provider(provider_id: str) -> List[Dict]:
    """Find all users associated with a specific insurance provider"""
    query = """
    SELECT user_id, name, email, phone 
    FROM users 
    WHERE provider_id = :provider_id
    """
    try:
        return db.run(query, parameters={"provider_id": provider_id})
    except Exception as e:
        print(f"Database error: {str(e)}")
        return []

# Policy tools
@tool
def get_policies_by_user(user_id: str) -> List[Dict]:
    """Retrieve all insurance policies for a specific user"""
    query = """
    SELECT p.policy_id, p.policy_number, p.plan_type, 
           p.coverage_start, p.coverage_end, p.monthly_premium,
           ip.name as provider_name
    FROM policies p
    JOIN insurance_providers ip ON p.provider_id = ip.provider_id
    WHERE p.user_id = :user_id AND p.active = TRUE
    """
    try:
        return db.run(query, parameters={"user_id": user_id})
    except Exception as e:
        print(f"Database error: {str(e)}")
        return []

@tool
def get_active_policies() -> List[Dict]:
    """List all currently active insurance policies"""
    query = """
    SELECT p.policy_id, u.name as user_name, ip.name as provider_name,
           p.policy_number, p.coverage_end, p.monthly_premium
    FROM policies p
    JOIN users u ON p.user_id = u.user_id
    JOIN insurance_providers ip ON p.provider_id = ip.provider_id
    WHERE p.active = TRUE
    """
    try:
        return db.run(query)
    except Exception as e:
        print(f"Database error: {str(e)}")
        return []

# Claim tools
@tool
def get_claims_by_user(user_id: str) -> List[Dict]:
    """Retrieve all claims submitted by a specific user"""
    try:
        query= """
                SELECT c.claim_id, c.service_date, c.claim_type, 
                       c.amount_claimed, c.amount_approved, c.status,
                       p.policy_number
                FROM claims c
                JOIN policies p ON c.policy_id = p.policy_id
                WHERE c.user_id = ?
                ORDER BY c.service_date DESC
            """
        with sqlite3.connect("claims.db") as conn:
            cursor = conn.cursor()
            cursor.execute(query, (user_id,))
            
            # Convert to list of dictionaries
            columns = [col[0] for col in cursor.description]
            return [dict(zip(columns, row)) for row in cursor.fetchall()]
            
    except Exception as e:
        print(f"Database error: {str(e)}")
        return []

@tool
def get_claim_details(claim_id: str) -> List[Dict]:
    """Get detailed information about a specific claim"""
    query = """
    SELECT c.claim_id,
    c.user_id,
    c.provider_id,
    c.policy_id,
    c.service_date,
    c.claim_type,
    c.service_code,
    c.description,
    c.amount_claimed,
    c.amount_approved,
    c.status,
    c.submitted_at,
    , u.name as user_name, p.policy_number,
           ip.name as provider_name
    FROM claims c
    JOIN users u ON c.user_id = u.user_id
    JOIN policies p ON c.policy_id = p.policy_id
    JOIN insurance_providers ip ON c.provider_id = ip.provider_id
    WHERE c.claim_id = :claim_id
    """
    try:
        return db.run(query, parameters={"claim_id": claim_id})
    except Exception as e:
        print(f"Database error: {str(e)}")
        return []

# Provider tools
@tool
def get_provider_details(provider_id: str) -> List[Dict]:
    """Get information about an insurance provider"""
    query = """
    SELECT provider_id, name, description
    FROM insurance_providers
    WHERE provider_id = :provider_id
    """
    try:
        return db.run(query, parameters={"provider_id": provider_id})
    except Exception as e:
        print(f"Database error: {str(e)}")
        return []

@tool
def get_provider_plans(provider_id: str) -> List[Dict]:
    """List all available plans from a specific insurance provider"""
    query = """
    SELECT plan_id, name, description, base_premium,
           drug_limit, dental_limit, vision_limit
    FROM provider_plans
    WHERE provider_id = :provider_id
    """
    try:
        return db.run(query, parameters={"provider_id": provider_id})
    except Exception as e:
        print(f"Database error: {str(e)}")
        return []

# Payment tools
@tool
def get_payments_by_policy(policy_id: str) -> List[Dict]:
    """Retrieve payment history for a specific policy"""
    query = """
    SELECT payment_id, due_date, paid_date, 
           amount_due, amount_paid, payment_status
    FROM premium_payments
    WHERE policy_id = :policy_id
    ORDER BY due_date DESC
    """
    try:
        return db.run(query, parameters={"policy_id": policy_id})
    except Exception as e:
        print(f"Database error: {str(e)}")
        return []

# Coverage tools
@tool
def get_coverage_limits(user_id: str) -> List[Dict]:
    """Get coverage limits and usage for a specific user"""
    query = """
    SELECT claim_type, year, max_coverage, used_coverage
    FROM coverage_limits
    WHERE user_id = :user_id
    ORDER BY year DESC, claim_type
    """
    try:
        return db.run(query, parameters={"user_id": user_id})
    except Exception as e:
        print(f"Database error: {str(e)}")
        return []

# Pre-authorization tools
@tool
def get_pre_authorizations(user_id: str) -> List[Dict]:
    """Retrieve pre-authorization requests for a user"""
    query = """
    SELECT auth_id, service_requested, estimated_cost,
           request_date, approved_date, status
    FROM pre_authorizations
    WHERE user_id = :user_id
    ORDER BY request_date DESC
    """
    try:
        return db.run(query, parameters={"user_id": user_id})
    except Exception as e:
        print(f"Database error: {str(e)}")
        return []

In [40]:
from datetime import datetime, timedelta
import sqlite3
conn = sqlite3.connect('claims.db')
cursor = conn.cursor()

# Create table if not exists
cursor.execute("""
CREATE TABLE IF NOT EXISTS auth_users (
    user_id TEXT PRIMARY KEY,
    username TEXT UNIQUE,
    password_hash TEXT,
    role TEXT CHECK (role IN ('user', 'admin', 'agent')),
    last_login TIMESTAMP,
    is_active BOOLEAN DEFAULT TRUE
);
""")

from hashlib import sha256

SECRET = "super-secret-key"

def hash_password(password):
    return sha256(password.encode()).hexdigest()

# Insert test records
test_users = [
    {
        'user_id': 'User1',
        'username': 'admin_john',
        'password_hash': hash_password('User1'),  # In real apps, use proper hashing
        'role': 'admin',
        'last_login': datetime.now() - timedelta(days=1),
        'is_active': True
    },
    {
        'user_id': 'User2',
        'username': 'agent_sarah',
        'password_hash': hash_password('User2'),
        'role': 'agent',
        'last_login': datetime.now() - timedelta(hours=3),
        'is_active': True
    },
    {
        'user_id': 'User3',
        'username': 'user_mike',
        'password_hash': hash_password('User3'),
        'role': 'user',
        'last_login': datetime.now() - timedelta(days=7),
        'is_active': True
    },
    {
        'user_id': 'User4',
        'username': 'inactive_tom',
        'password_hash': hash_password('User4'),
        'role': 'user',
        'last_login': datetime.now() - timedelta(days=30),
        'is_active': False
    }
]

# Insert each user
for user in test_users:
    try:
        cursor.execute("""
        INSERT INTO auth_users 
        (user_id, username, password_hash, role, last_login, is_active)
        VALUES (?, ?, ?, ?, ?, ?)
        """, (
            user['user_id'],
            user['username'],
            user['password_hash'],
            user['role'],
            user['last_login'].isoformat(),
            user['is_active']
        ))
    except sqlite3.IntegrityError:
        print(f"User {user['username']} already exists, skipping")

# Commit changes
conn.commit()

# Verify inserted data
print("\nCurrent auth_users:")
cursor.execute("SELECT user_id, username, role, last_login, is_active FROM auth_users")
for row in cursor.fetchall():
    print(row)

# Close connection
conn.close()

User admin_john already exists, skipping
User agent_sarah already exists, skipping
User user_mike already exists, skipping
User inactive_tom already exists, skipping

Current auth_users:
('User1', 'user1', 'user', '2025-07-25 14:04:22', 1)
('User2', 'user2', 'user', '2025-07-25 14:04:22', 1)
('User3', 'user3', 'user', '2025-07-25 14:04:22', 1)
('User4', 'user4', 'user', '2025-07-25 14:04:22', 1)
('User5', 'user5', 'user', '2025-07-25 14:04:22', 1)
('4f086a55-9d83-4127-9c89-076e75395af0', 'admin', 'admin', '2025-07-25 14:04:22', 1)


In [3]:
print(get_user_by_id('User1'))
print(get_users_by_provider('prov1'))
print(get_policies_by_user('User1'))
print(get_active_policies(''))
print(get_claims_by_user('User1')) 
print(get_provider_details('prov1'))
print(get_provider_plans('prov3')) 
print(get_payments_by_policy('b7423d52-9749-481f-b527-3b90ee8b5992'))
print(get_coverage_limits('User1'))
print(get_pre_authorizations('User5'))



[('User1', 'User_name1', '1989-12-02', 'HC1000', 'user1@example.com', '555-0100', 'prov3')]
[('User3', 'User_name3', 'user3@example.com', '555-0102'), ('User4', 'User_name4', 'user4@example.com', '555-0103')]
[('d59253b0-99e1-4111-983e-a1ad731420b9', 'POL6422', 'Premium', '2025-03-12', '2026-03-12', 408.01, 'TrueCare Insurance')]
[('d59253b0-99e1-4111-983e-a1ad731420b9', 'User_name1', 'TrueCare Insurance', 'POL6422', '2026-03-12', 408.01), ('1f0b3874-1773-43a8-9caf-08e95bbf1f41', 'User_name2', 'Maple Health', 'POL4516', '2026-03-22', 404.41), ('4a4d9d3d-ef68-413d-b934-8d20323c49db', 'User_name3', 'Maple Health', 'POL2808', '2025-12-03', 327.91), ('9ef85bbb-cc91-4641-b34d-e142890f577c', 'User_name4', 'TrueCare Insurance', 'POL9868', '2025-11-29', 333.18), ('ebefbbc0-82cf-453a-8246-bfefb8811d8a', 'User_name5', 'WellSpring', 'POL9494', '2025-08-08', 128.44)]
[]
[('prov1', 'Maple Health', 'Leading national provider')]
[('b5511141-7af7-4e91-beca-aa1b560e18c0', 'Plan_794', 'Standard insuranc

In [39]:
from langchain.tools import tool
from typing import List, Dict

# Dental Claims Tools
@tool
def get_dental_claims_by_user(user_id: str) -> List[Dict]:
    """Retrieve all dental claims for a specific user with procedure details"""
    query = """
    SELECT c.claim_id, c.service_date, c.status, 
           d.category, d.tooth_code, d.procedure_code
    FROM claims c
    JOIN dental_details d ON c.claim_id = d.claim_id
    WHERE c.user_id = :user_id
    ORDER BY c.service_date DESC
    """
    try:
        return db.run(query, parameters={"user_id": user_id})
    except Exception as e:
        print(f"Database error: {str(e)}")
        return []

# Drug Claims Tools  
@tool
def get_drug_claims_by_user(user_id: str) -> List[Dict]:
    """Get all prescription drug claims for a user with medication details"""
    query = """
    SELECT c.claim_id, c.service_date, c.status,
           d.drug_name, d.DIN_code, d.quantity, d.dosage
    FROM claims c
    JOIN drug_details d ON c.claim_id = d.claim_id
    WHERE c.user_id = :user_id
    ORDER BY c.service_date DESC
    """
    try:
        return db.run(query, parameters={"user_id": user_id})
    except Exception as e:
        print(f"Database error: {str(e)}")
        return []

# Hospital Visits Tools
@tool
def get_hospital_visits_by_user(user_id: str) -> List[Dict]:
    """Retrieve all hospital visits for a user with stay details"""
    query = """
    SELECT c.claim_id, c.service_date, c.status,
           h.room_type, h.admission_date, h.discharge_date
    FROM claims c
    JOIN hospital_visits h ON c.claim_id = h.claim_id
    WHERE c.user_id = :user_id
    ORDER BY h.admission_date DESC
    """
    try:
        return db.run(query, parameters={"user_id": user_id})
    except Exception as e:
        print(f"Database error: {str(e)}")
        return []

# Vision Claims Tools
@tool
def get_vision_claims_by_user(user_id: str) -> List[Dict]:
    """Get all vision care claims for a user with product details"""
    query = """
    SELECT c.claim_id, c.service_date, c.status,
           v.product_type, v.coverage_limit, v.eligibility_date
    FROM claims c
    JOIN vision_claims v ON c.claim_id = v.claim_id
    WHERE c.user_id = :user_id
    ORDER BY c.service_date DESC
    """
    try:
        return db.run(query, parameters={"user_id": user_id})
    except Exception as e:
        print(f"Database error: {str(e)}")
        return []

# Coverage Limits Tools
@tool
def get_user_coverage_limits(user_id: str) -> List[Dict]:
    """Retrieve all coverage limits and usage for a specific user"""
    query = """
    SELECT claim_type, year, max_coverage, used_coverage,
           (max_coverage - used_coverage) as remaining_coverage
    FROM coverage_limits
    WHERE user_id = :user_id
    ORDER BY year DESC, claim_type
    """
    try:
        return db.run(query, parameters={"user_id": user_id})
    except Exception as e:
        print(f"Database error: {str(e)}")
        return []

# Claim Audit Tools
@tool
def get_claim_audit_logs(user_id: str) -> List[Dict]:
    """Get audit history for all claims belonging to a user"""
    query = """
    SELECT a.audit_id, a.event_time, a.event_type,
           a.performed_by, c.claim_id, c.claim_type
    FROM claim_audit_logs a
    JOIN claims c ON a.claim_id = c.claim_id
    WHERE c.user_id = :user_id
    ORDER BY a.event_time DESC
    LIMIT 50
    """
    try:
        return db.run(query, parameters={"user_id": user_id})
    except Exception as e:
        print(f"Database error: {str(e)}")
        return []

# Claim Documents Tools
@tool
def get_user_claim_documents(user_id: str) -> List[Dict]:
    """Retrieve all documents associated with a user's claims"""
    query = """
    SELECT d.document_id, d.file_name, d.uploaded_at,
           d.document_type, c.claim_id, c.claim_type
    FROM claim_documents d
    JOIN claims c ON d.claim_id = c.claim_id
    WHERE c.user_id = :user_id
    ORDER BY d.uploaded_at DESC
    """
    try:
        return db.run(query, parameters={"user_id": user_id})
    except Exception as e:
        print(f"Database error: {str(e)}")
        return []

# User Preferences Tools
@tool
def get_user_preferences(user_id: str) -> List[Dict]:
    """Get communication preferences and settings for a user"""
    query = """
    SELECT communication_opt_in, consent_to_share_data,
           language_preference, timezone
    FROM user_preferences
    WHERE user_id = :user_id
    """
    try:
        return db.run(query, parameters={"user_id": user_id})
    except Exception as e:
        print(f"Database error: {str(e)}")
        return []

# Communications Log Tools
@tool
def get_user_communications(user_id: str) -> List[Dict]:
    """Retrieve all communications sent to/from a user"""
    query = """
    SELECT log_id, type, subject, sent_at, status
    FROM communications_log
    WHERE user_id = :user_id
    ORDER BY sent_at DESC
    LIMIT 50
    """
    try:
        return db.run(query, parameters={"user_id": user_id})
    except Exception as e:
        print(f"Database error: {str(e)}")
        return []

In [5]:
print(get_dental_claims_by_user('User5'))
print(get_drug_claims_by_user('User1'))
print(get_hospital_visits_by_user('User1'))
print(get_vision_claims_by_user('User5')) ## Setup 
print(get_user_coverage_limits('User1'))
print(get_claim_audit_logs('User1'))
print(get_user_claim_documents('User1'))
print(get_user_preferences('User1'))
print(get_user_communications('User4')) ## setup


[('a7e86d90-d1eb-4b7b-8c1c-97eb6891af00', '2023-03-13', 'Approved', 'Ibuprofen', 'D97119', 27, '951mg')]


[('dental', 2024, 1000.0, 460.23045603745703, 539.7695439625429)]
[('e4e93729-6394-4d4f-b112-0ba0c846eb65', '2025-07-18 14:49:29.638692', 'Submitted', 'system', 'a7e86d90-d1eb-4b7b-8c1c-97eb6891af00', 'hospital')]
[('57650d04-a856-48c3-ad8e-460c258843ea', 'receipt.pdf', '2025-07-18 14:49:29.638763', 'Receipt', 'a7e86d90-d1eb-4b7b-8c1c-97eb6891af00', 'hospital')]
[(0, 0, 'en', 'America/Toronto')]



In [25]:
import sqlite3
import pandas as pd

In [26]:
print(get_claims_by_user('User2')) ## check or setup data

[]


In [32]:
conn = sqlite3.connect("claims.db")
cursor = conn.cursor()
df_providers = pd.read_sql_query("SELECT * FROM vision_claims", conn)
print(df_providers.head)
conn.close()

<bound method NDFrame.head of Empty DataFrame
Columns: [claim_id, product_type, coverage_limit, eligibility_date]
Index: []>


In [41]:
conn = sqlite3.connect("claims.db")
#cursor = conn.cursor()
#cursor.execute("Update claims set claim_id='29da09f9-411b-4098-8519-6e40b92a842d' where user_id='User1'")
#conn.commit()
df_providers = pd.read_sql_query("SELECT * FROM auth_users", conn)
print(df_providers.head())
conn.close()


  user_id username   password_hash  role           last_login  is_active
0   User1    user1  hashedpassword  user  2025-07-25 14:04:22          1
1   User2    user2  hashedpassword  user  2025-07-25 14:04:22          1
2   User3    user3  hashedpassword  user  2025-07-25 14:04:22          1
3   User4    user4  hashedpassword  user  2025-07-25 14:04:22          1
4   User5    user5  hashedpassword  user  2025-07-25 14:04:22          1


In [10]:
conn = sqlite3.connect("claims.db")
df_providers = pd.read_sql_query("""
                                 
    SELECT c.claim_id, 
            c.status, 
            c.policy_id,
            p.policy_number,
            pr.provider_id,
            dd.procedure_code,
            dr.drug_name,
                 d.document_type
    FROM claims c JOIN policies AS p ON c.policy_id = p.policy_id 
        JOIN provider_plans AS pr ON p.provider_id = pr.provider_id 
         LEFT JOIN dental_details AS dd ON c.claim_id = dd.claim_id 
        LEFT JOIN drug_details AS dr ON c.claim_id = dr.claim_id 
         LEFT JOIN claim_documents AS d ON c.claim_id = d.claim_id 
        where c.user_id='User1' and c.status = 'Approved'
                                 
""", conn)
print(df_providers.head())
conn.close()


Empty DataFrame
Columns: [claim_id, status, policy_id, policy_number, provider_id, procedure_code, drug_name, document_type]
Index: []


In [11]:
conn = sqlite3.connect("claims.db")
query = """SELECT c.claim_id, 
            c.status, 
            c.policy_id,
            p.policy_number,
            pr.provider_id,
            dd.procedure_code,
            dr.drug_name,
                 d.document_type
    FROM claims c JOIN policies AS p ON c.policy_id = p.policy_id 
        JOIN provider_plans AS pr ON p.provider_id = pr.provider_id 
         LEFT JOIN dental_details AS dd ON c.claim_id = dd.claim_id 
        LEFT JOIN drug_details AS dr ON c.claim_id = dr.claim_id 
         LEFT JOIN claim_documents AS d ON c.claim_id = d.claim_id 
        where c.user_id='User1' and c.status = 'Approved'"""
query2=""" SELECT c.claim_id, c.status, c.policy_id, p.policy_number, pr.provider_id, dd.procedure_code, dr.drug_name, d.document_type FROM claims AS c JOIN policies AS p ON c.policy_id = p.policy_id JOIN provider_plans AS pr ON p.provider_id = pr.provider_id LEFT JOIN dental_details AS dd ON c.claim_id = dd.claim_id LEFT JOIN drug_details AS dr ON c.claim_id = dr.claim_id LEFT JOIN claim_documents AS d ON c.claim_id = d.claim_id WHERE c.user_id = 'User1' AND c.status = 'Approved'"""
df_providers = pd.read_sql_query(query2,conn)
print(df_providers.head)
conn.close()

#print(get_claims_by_user('User1')) ## check or setup data

<bound method NDFrame.head of Empty DataFrame
Columns: [claim_id, status, policy_id, policy_number, provider_id, procedure_code, drug_name, document_type]
Index: []>


In [12]:
conn = sqlite3.connect("claims.db")
cursor = conn.cursor()
df_providers = pd.read_sql_query("""
    SELECT c.claim_id, c.service_date, c.claim_type, 
           c.amount_claimed, c.amount_approved, c.status,
           p.policy_number
    FROM claims c
    JOIN policies p ON c.policy_id = p.policy_id
    WHERE c.user_id = 'User1'
    ORDER BY c.service_date DESC
                                 
                                 """, conn)
print(df_providers.head)
conn.close()

<bound method NDFrame.head of Empty DataFrame
Columns: [claim_id, service_date, claim_type, amount_claimed, amount_approved, status, policy_number]
Index: []>


In [13]:
from langchain.tools import tool
from typing import List, Dict
import sqlite3
import pandas as pd

@tool
def get_claims_by_user(user_id: str) -> List[Dict]:
    """
    Retrieve all claims submitted by a specific user.
    Returns a list of dictionaries with claim details.
    
    Args:
        user_id: The user ID to search for (e.g., 'User1')
        
    Returns:
        List of dictionaries containing:
        - claim_id
        - service_date
        - claim_type
        - amount_claimed
        - amount_approved
        - status
        - policy_number
    """
    # Implementation 1: Direct SQLite connection (working version)
    try:
        with sqlite3.connect("claims.db") as conn:
            cursor = conn.cursor()
            cursor.execute("""
                SELECT c.claim_id, c.service_date, c.claim_type, 
                       c.amount_claimed, c.amount_approved, c.status,
                       p.policy_number
                FROM claims c
                JOIN policies p ON c.policy_id = p.policy_id
                WHERE c.user_id = ?
                ORDER BY c.service_date DESC
            """, (user_id,))
            
            # Convert to list of dictionaries
            columns = [col[0] for col in cursor.description]
            return [dict(zip(columns, row)) for row in cursor.fetchall()]
            
    except Exception as e:
        print(f"Database error: {str(e)}")
        return []

    # Alternative: If you want to keep trying with LangChain SQLDatabase
    # try:
    #     query = """
    #     SELECT c.claim_id, c.service_date, c.claim_type, 
    #            c.amount_claimed, c.amount_approved, c.status,
    #            p.policy_number
    #     FROM claims c
    #     JOIN policies p ON c.policy_id = p.policy_id
    #     WHERE c.user_id = :user_id
    #     ORDER BY c.service_date DESC
    #     """
    #     results = db.run(query, parameters={"user_id": user_id})
    #     # You may need to parse the string results if db.run returns a string
    #     if isinstance(results, str):
    #         return parse_sql_results(results)
    #     return results
    # except Exception as e:
    #     print(f"Database error: {str(e)}")
    #     return []

print(get_claims_by_user('User1'))

[]


In [14]:
conn = sqlite3.connect("claims.db")
cursor = conn.cursor()
cursor.execute("Update claims set status='approved' where user_id='User1'")
conn.commit()
df_providers = pd.read_sql_query("SELECT * FROM claims", conn)
print(df_providers.head)
conn.close()

<bound method NDFrame.head of                                claim_id user_id provider_id  \
0  a7e86d90-d1eb-4b7b-8c1c-97eb6891af00   User1       prov3   
1  69f8c1f7-beda-4f8b-98f1-a7788f0b45bc   User2       prov2   
2  ff55dc26-a64b-481b-b9c2-965641588e4a   User3       prov1   
3  02825dce-9fd5-4062-91dd-e78819c31c23   User4       prov1   
4  c100f813-9898-407b-8789-8aa048eac91e   User5       prov2   

                              policy_id service_date claim_type service_code  \
0  5e20a488-ab7e-45d5-bdaf-8a102e623d27   2023-03-13   hospital       SVC174   
1  5f26b3bf-d4b3-4fab-b58c-4e4a9a295e44   2023-10-02     vision       SVC885   
2  051010d4-f6aa-4a62-9fab-7d5c3c3b51fa   2023-02-07       drug       SVC345   
3  729ffcc8-7b53-4ce7-a96b-d4985f2efb24   2023-07-03     vision       SVC188   
4  d5a9ac50-fc30-4651-be78-d9289aa58961   2024-03-28   hospital       SVC833   

                     description  amount_claimed  amount_approved    status  \
0  Routine check or prescriptio

In [15]:
## Claim Summary with All Services and Provider Details ##

conn = sqlite3.connect("claims.db")
df_providers = pd.read_sql_query("""
SELECT 
    u.user_id,
    u.name,
    c.claim_id,
    c.status,
    c.service_date,
    p.policy_number,
    pr.name,
    dd.procedure_code,
    dr.drug_name,
    vc.product_type
FROM users u
JOIN claims c ON u.user_id = c.user_id
JOIN policies p ON c.policy_id = p.policy_id
JOIN provider_plans pr ON p.provider_id = pr.provider_id
LEFT JOIN dental_details dd ON c.claim_id = dd.claim_id
LEFT JOIN drug_details dr ON c.claim_id = dr.claim_id
LEFT JOIN hospital_visits hv ON c.claim_id = hv.claim_id
LEFT JOIN vision_claims vc ON c.claim_id = vc.claim_id
WHERE c.status = 'approved';

""", conn)
print(df_providers.head())
conn.close()

Empty DataFrame
Columns: [user_id, name, claim_id, status, service_date, policy_number, name, procedure_code, drug_name, product_type]
Index: []


In [16]:
## Claims with Missing Documents and Pending Pre-Authorizations ##

conn = sqlite3.connect("claims.db")
df_providers = pd.read_sql_query("""
SELECT 
    c.claim_id,
    c.user_id,
    c.status,
    doc.document_id,
    pa.status
FROM claims c
LEFT JOIN claim_documents doc ON c.claim_id = doc.claim_id
LEFT JOIN pre_authorizations pa ON c.user_id = pa.user_id
WHERE c.user_id='User5' and doc.document_id is NULL ;

""", conn)
print(df_providers.head())
conn.close()

Empty DataFrame
Columns: [claim_id, user_id, status, document_id, status]
Index: []


In [17]:
from fastapi import FastAPI
from sqlalchemy import create_engine, inspect
from langchain_community.utilities import SQLDatabase
import sqlite3
import pandas as pd
app = FastAPI()

from langchain_core.tools import tool

engine = create_engine("sqlite:///claims.db")
print(inspect(engine).get_table_names()) 
db = SQLDatabase.from_uri("sqlite:///claims.db")  # Replace with your actual DB URI


@tool
def get_users_by_user_id(user_id: str) -> list[dict]:
    """
    Retrieve rows from the 'users' table filtered by 'user_id'.
    Useful for understanding the users for a given user_id.
    
    Args:
        user_id: The user ID to search for
        
    Returns:
        List of dictionaries containing user information (user_id, name, dob, 
        health_card, email, phone, provider_id) for matching users
    """
    query = """
    SELECT user_id, name, dob, health_card, email, phone, provider_id  FROM users  WHERE user_id = :user_id
    """
    try:
        results = db.run(query, parameters={"user_id": user_id})
        return results 
    except Exception as e:
        print(f"Database error: {str(e)}")


print(get_users_by_user_id('User1'))

['auth_users', 'claim_audit_logs', 'claim_documents', 'claims', 'communications_log', 'coverage_limits', 'dental_details', 'drug_details', 'hospital_visits', 'insurance_providers', 'policies', 'pre_authorizations', 'premium_payments', 'provider_plans', 'user_preferences', 'users', 'vision_claims']
[('User1', 'User_name1', '1989-12-02', 'HC1000', 'user1@example.com', '555-0100', 'prov3')]


In [18]:
## Audit Logs for Claims with Denied Drug Claims ##

conn = sqlite3.connect("claims.db")
df_providers = pd.read_sql_query("""
SELECT 
    ca.claim_id,
    ca.notes,
    ca.event_type,
    ca.event_time,
    dr.drug_name,
    c.status
FROM claim_audit_logs ca
JOIN claims c ON ca.claim_id = c.claim_id
JOIN drug_details dr ON c.claim_id = dr.claim_id
WHERE c.status = 'Pending'
""", conn)
print(df_providers.head())
conn.close()

                               claim_id               notes event_type  \
0  c100f813-9898-407b-8789-8aa048eac91e  Initial submission  Submitted   
1  ff55dc26-a64b-481b-b9c2-965641588e4a  Initial submission  Submitted   

                   event_time     drug_name   status  
0  2025-07-18 14:49:29.638701  Atorvastatin  Pending  
1  2025-07-18 14:49:29.638708     Ibuprofen  Pending  


In [19]:
##  Total Approved Claim Amount by Provider and Type ##

conn = sqlite3.connect("claims.db")
df_providers = pd.read_sql_query("""
SELECT 
    pr.name,
    c.claim_type,
    SUM(c.amount_approved) AS total_approved
FROM claims c
JOIN provider_plans pr ON c.provider_id = pr.provider_id
WHERE c.status = 'Approved'
GROUP BY pr.name, c.claim_type 
""", conn)
print(df_providers.head())
conn.close()

       name claim_type  total_approved
0  Plan_458     vision          109.41


In [20]:
import sqlglot
from sqlglot.expressions import Subquery, Table, Column


def get_known_tables_and_columns(db_uri: str) -> dict:
    engine = create_engine(db_uri)
    inspector = inspect(engine)

    known = {}
    for table in inspector.get_table_names():
        columns = {col["name"].lower() for col in inspector.get_columns(table)}
        known[table.lower()] = columns
    return known

# Example usage
KNOWN_TABLES = get_known_tables_and_columns("sqlite:///claims.db")

def validate_sql_query(query: str) -> str | None:
    print("🔍 Query in validation:", query)

    try:
        parsed = sqlglot.parse_one(query)
        print("✅ SQL parsed into AST")

        def check_expr(expr, known_tables):
            for node in expr.walk():
                if isinstance(node, Table):
                    table_name = node.name.lower()
                    if table_name not in known_tables:
                        raise ValueError(f"❌ Unknown table: '{table_name}'")
                elif isinstance(node, Column):
                    col_name = node.name.lower()
                    table_name = node.table.lower() if node.table else None
                    if table_name:
                        if table_name not in known_tables:
                            raise ValueError(f"❌ Unknown table (in column scope): '{table_name}'")
                        if col_name not in known_tables[table_name]:
                            raise ValueError(f"❌ Unknown column '{col_name}' in table '{table_name}'")
                    else:
                        # Check if column exists in any table
                        if not any(col_name in cols for cols in known_tables.values()):
                            raise ValueError(f"❌ Unknown column: '{col_name}'")

        check_expr(parsed, KNOWN_TABLES)

    except Exception as e:
        print(f"⚠️ Validation failed: {e}")
        

validate_sql_query("SELECT c.claim_id, c.status, p.policy_id, p.policy_number, pr.provider_id, dd.procedure_code, dr.username_name, d.document_type FROM claims AS c JOIN policies AS p ON c.policy_id = p.policy_id JOIN provider_plans AS pr ON p.provider_id = pr.provider_id LEFT JOIN dental_details AS dd ON c.claim_id = dd.claim_id LEFT JOIN drug_details AS dr ON c.claim_id = dr.claim_id LEFT JOIN claim_documents AS d ON c.claim_id = d.claim_id WHERE c.user_id = (SELECT user_id FROM users WHERE username = 'User1') AND c.status = 'approved'") 

🔍 Query in validation: SELECT c.claim_id, c.status, p.policy_id, p.policy_number, pr.provider_id, dd.procedure_code, dr.username_name, d.document_type FROM claims AS c JOIN policies AS p ON c.policy_id = p.policy_id JOIN provider_plans AS pr ON p.provider_id = pr.provider_id LEFT JOIN dental_details AS dd ON c.claim_id = dd.claim_id LEFT JOIN drug_details AS dr ON c.claim_id = dr.claim_id LEFT JOIN claim_documents AS d ON c.claim_id = d.claim_id WHERE c.user_id = (SELECT user_id FROM users WHERE username = 'User1') AND c.status = 'approved'
✅ SQL parsed into AST
⚠️ Validation failed: ❌ Unknown table (in column scope): 'c'


In [21]:
from difflib import get_close_matches
original_col= 'timestamp'
possible_cols = (
                KNOWN_TABLES.get("claim_audit_logs", set()) if "claim_audit_logs" else set().union(*KNOWN_TABLES.values())
            )
print(possible_cols)
best_col = get_close_matches(original_col, possible_cols, n=1, cutoff=0.5)
print(best_col)

{'event_time', 'event_type', 'notes', 'performed_by', 'claim_id', 'audit_id'}
[]


In [22]:
column_metadata = {
    "audit_id": "UUID",
    "claim_id": "UUID",
    "event_time": "TIMESTAMP",
    "event_type": "VARCHAR(50)",
    "performed_by": "VARCHAR(50)",
    "notes": "TEXT"
}

semantic_aliases = {
    "timestamp": ["event_time", "created_at", "logged_at"],
    "user": ["performed_by", "created_by"],
    "notes": ["description", "comments"],
}
from difflib import get_close_matches

def resolve_column_with_fallback(user_input_col: str, column_names: list[str], column_metadata: dict[str, str], semantic_aliases: dict[str, list[str]]):
    # 1. Fuzzy match
    match = get_close_matches(user_input_col, column_names, n=1)
    if match:
        return match[0], "fuzzy"

    # 2. Semantic alias match
    for canonical, aliases in semantic_aliases.items():
        if user_input_col.lower() == canonical.lower():
            for alias in aliases:
                if alias in column_names:
                    return alias, "semantic-alias"

    # 3. Type-based fallback (e.g. 'timestamp' → TIMESTAMP)
    for col, col_type in column_metadata.items():
        base_type = col_type.split("(")[0].upper()
        if user_input_col.upper() == base_type:
            return col, "type-fallback"

    return None, "not-found"


In [23]:
from sqlalchemy import create_engine, inspect

def get_column_metadata_by_table(db_uri: str) -> dict[str, dict[str, str]]:
    """
    Returns a dict mapping table_name -> {column_name: column_type}
    """
    engine = create_engine(db_uri)
    inspector = inspect(engine)

    table_metadata = {}
    for table in inspector.get_table_names():
        column_info = inspector.get_columns(table)
        column_types = {
            col["name"].lower(): str(col["type"]).upper()
            for col in column_info
        }
        table_metadata[table.lower()] = column_types
    return table_metadata

metadata = get_column_metadata_by_table("sqlite:///claims.db")
table_meta = metadata["claims"]
col_names = list(table_meta.keys())

resolved_col, method = resolve_column_with_fallback(
    user_input_col="type",
    column_names=col_names,
    column_metadata=table_meta,
    semantic_aliases=semantic_aliases
)

print(resolved_col)  # event_time
print(method)        # type-fallback


None
not-found


In [24]:
KNOWN_TABLES

{'auth_users': {'is_active',
  'last_login',
  'password_hash',
  'role',
  'user_id',
  'username'},
 'claim_audit_logs': {'audit_id',
  'claim_id',
  'event_time',
  'event_type',
  'notes',
  'performed_by'},
 'claim_documents': {'claim_id',
  'document_id',
  'document_type',
  'file_name',
  'secure_url',
  'uploaded_at'},
 'claims': {'amount_approved',
  'amount_claimed',
  'claim_id',
  'claim_type',
  'description',
  'policy_id',
  'provider_id',
  'service_code',
  'service_date',
  'status',
  'submitted_at',
  'user_id'},
 'communications_log': {'content',
  'log_id',
  'sent_at',
  'status',
  'subject',
  'type',
  'user_id'},
 'coverage_limits': {'claim_type',
  'max_coverage',
  'used_coverage',
  'user_id',
  'year'},
 'dental_details': {'category', 'claim_id', 'procedure_code', 'tooth_code'},
 'drug_details': {'claim_id', 'din_code', 'dosage', 'drug_name', 'quantity'},
 'hospital_visits': {'admission_date',
  'claim_id',
  'discharge_date',
  'room_type'},
 'insurance