# üß™ Auth Utils Tests

## ‚ö†Ô∏è SQLite for Local Testing

These tests use **SQLite** instead of Azure PostgreSQL to avoid connection pool exhaustion.

### Problem Encountered (Jan 2026)
When running `nbdev_prepare` with Azure PostgreSQL, we hit:
```
FATAL: remaining connection slots are reserved for roles with 
privileges of the "pg_use_reserved_connections" role
```

**Root Cause**: Each test notebook creates database connections that weren't being properly closed. Azure PostgreSQL has limited connection slots (~25 for Basic tier), and zombie connections from failed test runs exhausted the pool.

**Solution**: 
1. Use `DB_TYPE=SQLITE` for local testing (no connection limits)
2. Added `HostDatabase.close()` and proper `engine.dispose()` calls
3. Updated `get_or_create_tenant_db()` to close internal host_db connection
4. Updated `provision_new_user()` to close tenant_db after use

### To Run with Azure PostgreSQL
Set these environment variables (from `.env`):
```
DB_TYPE=POSTGRESQL
DB_HOST=your-server.postgres.database.azure.com
DB_USER=your_user
DB_PASS=your_password
DB_NAME=app_host
```

In [None]:
#| hide
# ============================================
# USE SQLITE FOR LOCAL TESTING
# ============================================
import os
os.environ['DB_TYPE'] = 'SQLITE'
os.environ['DB_NAME'] = 'test_auth_host'

# Test imports
from fh_saas.utils_auth import (
    generate_oauth_state, verify_oauth_state,
    create_or_get_global_user, get_user_membership, verify_membership,
    provision_new_user, create_user_session, get_current_user, clear_session,
    auth_redirect, route_user_after_login, require_tenant_access,
    handle_login_request, handle_oauth_callback, handle_logout
)
from fh_saas.db_host import HostDatabase, GlobalUser, Membership
from fh_saas.db_tenant import get_or_create_tenant_db
from sqlalchemy import text
from unittest.mock import Mock, patch

# Reset singleton to use SQLite
HostDatabase.reset_instance()
print("‚úÖ Using SQLite for testing (no Azure connection needed)")

In [None]:
#| hide

print("üß™ Running OAuth Tests...\n")

# Initialize HostDatabase singleton (uses SQLite from env set above)
host_db = HostDatabase.from_env()

# ==========================================
# CLEANUP: Make tests idempotent
# ==========================================

def cleanup_oauth_test_data():
    """Remove test data created during tests"""
    host_db.rollback()
    
    # For SQLite, just delete rows - no DROP DATABASE needed
    # Tables may not exist on first run, so wrap in try/except
    try:
        host_db.db.conn.execute(text(
            "DELETE FROM sys_audit_logs WHERE event_type='tenant_provisioned' "
            "AND details LIKE '%test_oauth%'"
        ))
    except Exception:
        pass
    
    try:
        host_db.db.conn.execute(text(
            "DELETE FROM core_memberships WHERE user_id IN "
            "(SELECT id FROM core_users WHERE email LIKE '%@test_oauth.com')"
        ))
    except Exception:
        pass
    
    try:
        host_db.db.conn.execute(text(
            "DELETE FROM core_tenants WHERE name LIKE '%test_oauth%' OR name LIKE '%newuser%'"
        ))
    except Exception:
        pass
    
    try:
        host_db.db.conn.execute(text(
            "DELETE FROM core_users WHERE email LIKE '%@test_oauth.com'"
        ))
    except Exception:
        pass
    
    host_db.commit()
    
    # For SQLite tenant DBs: delete all database files and artifacts
    import glob
    db_patterns = ["t_*_db.db", "t_*_db.db-shm", "t_*_db.db-wal", "t_*_db.db-journal",
                   "*.db", "*.db-shm", "*.db-wal", "*.db-journal"]
    for pattern in db_patterns:
        for f in glob.glob(pattern):
            try:
                os.remove(f)
                print(f"   üóëÔ∏è Removed: {f}")
            except Exception:
                pass

cleanup_oauth_test_data()
print("üßπ Cleaned up previous test data\n")

In [None]:
#| hide

# ==========================================
# TEST 1: CSRF State Generation & Validation
# ==========================================

print("1Ô∏è‚É£ Testing CSRF State Protection...")

# Generate state
state1 = generate_oauth_state()
state2 = generate_oauth_state()
assert len(state1) == 32, "State should be 32-char UUID hex"
assert state1 != state2, "Each state should be unique"
print("   ‚úÖ State generation works")

# Valid state verification
mock_session = {'oauth_state': state1}
try:
    verify_oauth_state(mock_session, state1)
    print("   ‚úÖ Valid state verification works")
except ValueError:
    raise AssertionError("Valid state should not raise error")

# State should be cleared after verification
assert 'oauth_state' not in mock_session, "State should be cleared after use"
print("   ‚úÖ State cleared after verification")

# Invalid state (mismatch)
mock_session = {'oauth_state': state1}
try:
    verify_oauth_state(mock_session, state2)
    raise AssertionError("State mismatch should raise error")
except ValueError as e:
    assert "CSRF validation failed" in str(e)
    print("   ‚úÖ State mismatch detected")

# Missing state in session
mock_session = {}
try:
    verify_oauth_state(mock_session, state1)
    raise AssertionError("Missing state should raise error")
except ValueError as e:
    assert "No state in session" in str(e)
    print("   ‚úÖ Missing state detected")

In [None]:
#| hide

# ==========================================
# TEST 2: New User Auto-Provisioning
# ==========================================

print("\n2Ô∏è‚É£ Testing New User Auto-Provisioning...")

# Simulate OAuth callback for new user
new_user = create_or_get_global_user(
    host_db=host_db,
    oauth_id='google_new_user_123',
    email='newuser@test_oauth.com'
)
host_db.commit()
assert new_user.email == 'newuser@test_oauth.com'
assert new_user.oauth_id == 'google_new_user_123'
print("   ‚úÖ GlobalUser created")

# Check no membership exists yet
membership = get_user_membership(host_db, new_user.id)
assert membership is None, "New user should have no membership"
print("   ‚úÖ No membership found (expected)")

# Auto-provision tenant (commits internally)
tenant_id = provision_new_user(host_db, new_user)
assert tenant_id is not None
print(f"   ‚úÖ Tenant provisioned: {tenant_id}")

# Verify membership created (need fresh read)
host_db.rollback()  # Clear any stale transaction state
membership = get_user_membership(host_db, new_user.id)
assert membership is not None, "Membership should exist after provisioning"
assert membership.tenant_id == tenant_id
assert membership.role == 'owner', "First user should be owner"
print("   ‚úÖ Membership created with 'owner' role")

# Verify tenant catalog entry
all_tenants = host_db.tenant_catalogs()
tenant = [t for t in all_tenants if t.id == tenant_id]
assert len(tenant) == 1, "Tenant should be registered"
assert "newuser's Workspace" in tenant[0].name
print("   ‚úÖ Tenant registered in catalog")

# Verify TenantUser in tenant database
username = new_user.email.split('@')[0]
tenant_name = f"{username}'s Workspace"
tenant_db = get_or_create_tenant_db(tenant_id, tenant_name)
from fh_saas.db_tenant import init_tenant_core_schema
core_tables = init_tenant_core_schema(tenant_db)
tenant_db.conn.rollback()
tenant_users = core_tables['tenant_users']()
tenant_user = [u for u in tenant_users if u.id == new_user.id]
assert len(tenant_user) == 1, "TenantUser should exist"
assert tenant_user[0].local_role == 'admin'
print("   ‚úÖ TenantUser created in tenant database")

# Verify audit log
all_logs = host_db.audit_logs()
log = [l for l in all_logs if l.target_id == tenant_id]
assert len(log) == 1, "Provisioning should be logged"
assert log[0].event_type == 'tenant_provisioned'
print("   ‚úÖ Audit log created")

In [None]:
#| hide

# ==========================================
# TEST 3: Returning User Login
# ==========================================

print("\n3Ô∏è‚É£ Testing Returning User Login...")

# Simulate OAuth callback for existing user
returning_user = create_or_get_global_user(
    host_db=host_db,
    oauth_id='google_new_user_123',  # Same oauth_id
    email='newuser@test_oauth.com'
)
host_db.commit()
assert returning_user.id == new_user.id, "Should return same user"
print("   ‚úÖ Existing user retrieved")

# Check membership still exists
membership = get_user_membership(host_db, returning_user.id)
assert membership is not None
assert membership.tenant_id == tenant_id
print("   ‚úÖ Membership found")

# Create session
mock_session = {}
create_user_session(mock_session, returning_user, membership)
assert mock_session['user_id'] == returning_user.id
assert mock_session['tenant_id'] == tenant_id
assert mock_session['tenant_role'] == 'owner'
print("   ‚úÖ Session created")

# Get current user from session
current_user = get_current_user(mock_session)
assert current_user is not None
assert current_user['email'] == 'newuser@test_oauth.com'
print("   ‚úÖ Current user retrieved from session")

In [None]:
#| hide

# ==========================================
# TEST 4: Cross-Tenant Access Prevention (CRITICAL)
# ==========================================

print("\n4Ô∏è‚É£ Testing Cross-Tenant Access Prevention (CRITICAL SECURITY)...")

# Create second user with their own tenant
user2 = create_or_get_global_user(
    host_db=host_db,
    oauth_id='google_user2_456',
    email='user2@test_oauth.com'
)
host_db.commit()
tenant2_id = provision_new_user(host_db, user2)
membership2 = get_user_membership(host_db, user2.id)
print(f"   ‚úÖ Second user and tenant created: {tenant2_id}")

# Verify user1 cannot access tenant2
can_access = verify_membership(host_db, new_user.id, tenant2_id)
assert not can_access, "User1 should NOT have access to tenant2"
print("   ‚úÖ User1 blocked from tenant2")

# Verify user2 cannot access tenant1
can_access = verify_membership(host_db, user2.id, tenant_id)
assert not can_access, "User2 should NOT have access to tenant1"
print("   ‚úÖ User2 blocked from tenant1")

# Try to access tenant2 with user1's session (should fail)
malicious_session = {
    'user_id': new_user.id,
    'tenant_id': tenant2_id,  # Wrong tenant!
    'email': 'newuser@test_oauth.com',
    'tenant_role': 'owner'
}

try:
    tenant_db = require_tenant_access(malicious_session)
    raise AssertionError("Cross-tenant access should be blocked!")
except PermissionError as e:
    assert "Access denied" in str(e)
    print("   ‚úÖ Cross-tenant access blocked by require_tenant_access()")

# Verify each user can only access their own tenant
session1 = {
    'user_id': new_user.id,
    'tenant_id': tenant_id,
    'email': 'newuser@test_oauth.com',
    'tenant_role': 'owner'
}
tenant_db1 = require_tenant_access(session1)
assert tenant_db1 is not None
print("   ‚úÖ User1 can access tenant1")

session2 = {
    'user_id': user2.id,
    'tenant_id': tenant2_id,
    'email': 'user2@test_oauth.com',
    'tenant_role': 'owner'
}
tenant_db2 = require_tenant_access(session2)
assert tenant_db2 is not None
print("   ‚úÖ User2 can access tenant2")

In [None]:
#| hide

# ==========================================
# TEST 5: System Admin Routing
# ==========================================

print("\n5Ô∏è‚É£ Testing System Admin Routing...")

# Create system admin user
from fh_saas.db_host import GlobalUser, timestamp, gen_id
admin_user = GlobalUser(
    id=gen_id(),
    email='admin@test_oauth.com',
    oauth_id='google_admin_789',
    is_sys_admin=True,
    created_at=timestamp(),
    last_login=timestamp()
)
host_db.global_users.insert(admin_user)
host_db.commit()
print("   ‚úÖ System admin user created")

# Admin should route to /admin/dashboard
redirect_url = route_user_after_login(admin_user, None)
assert redirect_url == '/admin/dashboard'
print("   ‚úÖ Admin routed to /admin/dashboard")

# Regular user should route to /dashboard
redirect_url = route_user_after_login(new_user, membership)
assert redirect_url == '/dashboard'
print("   ‚úÖ Regular user routed to /dashboard")

In [None]:
#| hide

# ==========================================
# TEST 6: Session Management
# ==========================================

print("\n6Ô∏è‚É£ Testing Session Management...")

# Test session creation
test_session = {}
create_user_session(test_session, new_user, membership)
assert 'user_id' in test_session
assert 'tenant_id' in test_session
assert 'login_at' in test_session
print("   ‚úÖ Session created with all required keys")

# Test get_current_user
user_info = get_current_user(test_session)
assert user_info is not None
assert user_info['email'] == 'newuser@test_oauth.com'
print("   ‚úÖ get_current_user() works")

# Test session clear
clear_session(test_session)
assert len(test_session) == 0
user_info = get_current_user(test_session)
assert user_info is None
print("   ‚úÖ Session cleared (logout)")

In [None]:
#| hide

# ==========================================
# TEST 7: handle_oauth_callback Function Signature
# ==========================================

print("\n7Ô∏è‚É£ Testing handle_oauth_callback Function Signature...")

# This test ensures that handle_oauth_callback correctly passes host_db
# to all internal functions. Previously there was a bug where oauth_id
# was being passed as host_db, causing "'str' object has no attribute 'rollback'" error.

# Create mock objects for OAuth flow simulation
mock_request = Mock()
mock_request.url = Mock()
mock_request.url.scheme = 'http'
mock_request.url.netloc = 'localhost:8000'

mock_session = {'oauth_state': 'test_state_123'}

# Mock GoogleAppClient to avoid real OAuth calls
mock_client = Mock()
mock_client.id_key = 'sub'
mock_client.retr_info.return_value = {
    'sub': 'google_callback_test_789',
    'email': 'callback_test@test_oauth.com'
}

# Mock provision_new_user to avoid creating real tenant DB connections
# This prevents connection pool exhaustion during testing
mock_tenant_id = 'mock_tenant_id_for_test'

# Mock membership for route_user_after_login
mock_membership = Mock()
mock_membership.tenant_id = mock_tenant_id
mock_membership.role = 'owner'

with patch('fh_saas.utils_auth.get_google_oauth_client', return_value=mock_client):
    with patch('fh_saas.utils_auth.redir_url', return_value='http://localhost:8000/auth/callback'):
        with patch('fh_saas.utils_auth.verify_oauth_state'):
            with patch('fh_saas.utils_auth.provision_new_user', return_value=mock_tenant_id) as mock_provision:
                with patch('fh_saas.utils_auth.get_user_membership', return_value=mock_membership):
                    try:
                        # This should NOT raise "'str' object has no attribute 'rollback'"
                        # or "'HostDatabase' object has no attribute 'close'"
                        result = handle_oauth_callback('test_code', 'test_state_123', mock_request, mock_session)
                        print("   ‚úÖ handle_oauth_callback executed without attribute errors")
                        
                        # Verify the result is a RedirectResponse
                        from starlette.responses import RedirectResponse
                        assert isinstance(result, RedirectResponse), "Should return RedirectResponse"
                        print("   ‚úÖ Returns RedirectResponse as expected")
                        
                        # Verify provision_new_user was called (meaning new user flow worked)
                        if mock_provision.called:
                            print("   ‚úÖ provision_new_user was called for new user")
                        
                    except AttributeError as e:
                        if 'rollback' in str(e) or 'close' in str(e):
                            raise AssertionError(
                                f"BUG DETECTED: host_db parameter issue - {e}\n"
                                "This indicates create_or_get_global_user is receiving wrong arguments."
                            )
                        raise

# Verify the user was actually created in the database
host_db.rollback()
all_users = host_db.global_users()
callback_user = [u for u in all_users if u.oauth_id == 'google_callback_test_789']
assert len(callback_user) == 1, "User should be created by handle_oauth_callback"
print(f"   ‚úÖ User created: {callback_user[0].email}")

# Cleanup test user (no tenant DB was created due to mock)
host_db.db.conn.execute(text(
    "DELETE FROM core_memberships WHERE user_id = :user_id"
), {'user_id': callback_user[0].id})
host_db.db.conn.execute(text(
    "DELETE FROM core_users WHERE oauth_id = 'google_callback_test_789'"
))
host_db.commit()
print("   ‚úÖ Test data cleaned up")

In [None]:
#| hide

# ==========================================
# CLEANUP & SUMMARY
# ==========================================

print("\n" + "="*60)
print("‚úÖ ALL OAUTH TESTS PASSED!")
print("="*60)
print("\nTests Completed:")
print("  1. CSRF state generation and validation ‚úÖ")
print("  2. New user auto-provisioning ‚úÖ")
print("  3. Returning user login ‚úÖ")
print("  4. Cross-tenant access prevention (CRITICAL) ‚úÖ")
print("  5. System admin routing ‚úÖ")
print("  6. Session management ‚úÖ")
print("  7. handle_oauth_callback function signature ‚úÖ")
print("\nüîí Security: Tenant isolation validated")
print("üéØ Ready for production integration")

# Cleanup test data
cleanup_oauth_test_data()
print("\nüßπ Test data cleaned up")

# ==========================================
# CRITICAL: Close all database connections
# ==========================================
print("\nüîå Closing database connections...")

# Close host_db connection
try:
    host_db.db.conn.close()
    host_db.engine.dispose()
    print("   ‚úÖ host_db connection closed")
except Exception as e:
    print(f"   ‚ö†Ô∏è host_db close error: {e}")

# Close any tenant_db connections created during tests
for tdb_name in ['tenant_db', 'tenant_db1', 'tenant_db2']:
    try:
        tdb = globals().get(tdb_name)
        if tdb:
            tdb.conn.close()
            tdb.engine.dispose()
            print(f"   ‚úÖ {tdb_name} connection closed")
    except Exception as e:
        pass  # Already closed or doesn't exist

print("\n‚úÖ All connections disposed")

## NEW: Using HostDatabase Singleton with Transaction Management

The refactored OAuth functions now use a singleton `HostDatabase` class for dependency injection and proper transaction management.

In [None]:
#| hide

# ==========================================
# EXAMPLE: NEW PATTERN WITH HostDatabase
# ==========================================

print("\nüîß NEW PATTERN: Using HostDatabase Singleton")
print("="*60)

# 1. Reset and reinitialize HostDatabase singleton (connection was closed in cleanup)
from fh_saas.db_host import HostDatabase
HostDatabase.reset_instance()
host_db = HostDatabase.from_env()
print("‚úÖ HostDatabase singleton initialized")

# 2. Create or get user (with transaction management)
try:
    new_user = create_or_get_global_user(
        host_db=host_db,
        oauth_id='google_123',
        email='test@example.com'
    )
    host_db.commit()  # Caller commits
    print(f"‚úÖ User created/retrieved: {new_user.email}")
except Exception as e:
    host_db.rollback()  # Auto-rolled back on error
    print(f"‚ùå User creation failed: {e}")

# 3. Get user membership (read-only, no transaction needed)
membership = get_user_membership(
    host_db=host_db,
    user_id=new_user.id
)
print(f"‚úÖ Membership: {membership.tenant_id if membership else 'None'}")

# 4. Provision new tenant (full transaction management inside function)
if not membership:
    try:
        tenant_id = provision_new_user(
            host_db=host_db,
            global_user=new_user
        )
        # No commit needed - provision_new_user commits internally
        print(f"‚úÖ Tenant provisioned: {tenant_id}")
    except Exception as e:
        # No rollback needed - provision_new_user rolls back internally
        print(f"‚ùå Provisioning failed: {e}")

# 5. Verify membership (read-only, security check)
host_db.rollback()  # Clear any transaction state
membership = get_user_membership(host_db=host_db, user_id=new_user.id)
has_access = verify_membership(
    host_db=host_db,
    user_id=new_user.id,
    tenant_id=membership.tenant_id if membership else 'fake_id'
)
print(f"‚úÖ Access verification: {has_access}")

print("\nüí° Key Benefits:")
print("  - Singleton pattern ensures single host DB connection")
print("  - Explicit transaction management (commit/rollback)")
print("  - Integrated logging via module-level logger")
print("  - Read-only operations skip transactions for performance")
print("  - Module is independent of application-level objects")

# Cleanup example user
try:
    host_db.db.conn.execute(text("DELETE FROM core_memberships WHERE user_id IN (SELECT id FROM core_users WHERE email = 'test@example.com')"))
    host_db.db.conn.execute(text("DELETE FROM core_tenants WHERE name LIKE '%test%'"))
    host_db.db.conn.execute(text("DELETE FROM core_users WHERE email = 'test@example.com'"))
    host_db.commit()
except Exception:
    host_db.rollback()

In [None]:
#| hide

# ==========================================
# TEST: schema_init Parameter
# ==========================================

print("\nüß™ Testing schema_init Parameter...")
print("="*60)

from unittest.mock import MagicMock, Mock, patch
from fh_saas.utils_auth import create_auth_beforeware

# Test 1: schema_init=None (default) - backward compatible
print("\n1Ô∏è‚É£ Testing backward compatibility (schema_init=None)...")
bw_no_schema = create_auth_beforeware()
assert hasattr(bw_no_schema, 'skip'), "Should create valid Beforeware"
print("   ‚úÖ Beforeware created without schema_init")

# Test 2: schema_init provided - tables populated
print("\n2Ô∏è‚É£ Testing schema_init callback populates tables...")

class MockState:
    pass

class MockTenantDb:
    """Minimal mock for tenant database"""
    pass

def mock_schema_init(tenant_db):
    """Mock schema initializer returning table dict"""
    return {
        'projects': Mock(name='projects_table'),
        'tasks': Mock(name='tasks_table'),
    }

# Create mock request with state
mock_req = MagicMock()
mock_req.state = MockState()
mock_req.state.tenant_db = MockTenantDb()

# Simulate the check_auth behavior
mock_req.state.tables = mock_schema_init(mock_req.state.tenant_db)

assert 'projects' in mock_req.state.tables, "Should have 'projects' table"
assert 'tasks' in mock_req.state.tables, "Should have 'tasks' table"
print("   ‚úÖ schema_init callback returns table dict")
print(f"   ‚úÖ Tables available: {list(mock_req.state.tables.keys())}")

# Test 3: schema_init with exception - graceful fallback
print("\n3Ô∏è‚É£ Testing schema_init exception handling...")

def failing_schema_init(tenant_db):
    """Schema initializer that fails"""
    raise RuntimeError("Database connection error")

mock_req_fail = MagicMock()
mock_req_fail.state = MockState()
mock_req_fail.state.tenant_db = MockTenantDb()

# Simulate the error handling in check_auth
try:
    mock_req_fail.state.tables = failing_schema_init(mock_req_fail.state.tenant_db)
except Exception:
    mock_req_fail.state.tables = {}  # Fallback behavior

assert mock_req_fail.state.tables == {}, "Should fallback to empty dict on error"
print("   ‚úÖ Exception handled, tables set to empty dict")

# Test 4: schema_init with tenant_db=None - tables set to empty
print("\n4Ô∏è‚É£ Testing schema_init when tenant_db is None...")

mock_req_no_db = MagicMock()
mock_req_no_db.state = MockState()
mock_req_no_db.state.tenant_db = None

# Simulate check_auth behavior when tenant_db is None
if mock_schema_init and mock_req_no_db.state.tenant_db:
    mock_req_no_db.state.tables = mock_schema_init(mock_req_no_db.state.tenant_db)
else:
    mock_req_no_db.state.tables = {}

assert mock_req_no_db.state.tables == {}, "Should be empty dict when tenant_db is None"
print("   ‚úÖ tables is empty dict when tenant_db is None")

# Test 5: Verify create_auth_beforeware accepts schema_init parameter
print("\n5Ô∏è‚É£ Testing create_auth_beforeware accepts schema_init...")

try:
    bw_with_schema = create_auth_beforeware(
        skip=[r'/api/.*'],
        schema_init=mock_schema_init
    )
    assert hasattr(bw_with_schema, 'skip'), "Should create valid Beforeware"
    print("   ‚úÖ Beforeware created with schema_init parameter")
except TypeError as e:
    raise AssertionError(f"create_auth_beforeware should accept schema_init: {e}")

print("\n" + "="*60)
print("‚úÖ All schema_init tests PASSED!")
print("="*60)

## üé≠ Role-Based Access Control Tests

In [None]:
#| hide
# Test: ROLE_HIERARCHY and has_min_role

print("üß™ Testing ROLE_HIERARCHY and has_min_role...")
print("="*60)

from fh_saas.utils_auth import ROLE_HIERARCHY, has_min_role

# Test 1: ROLE_HIERARCHY structure
assert 'admin' in ROLE_HIERARCHY, "Should have admin role"
assert 'editor' in ROLE_HIERARCHY, "Should have editor role"
assert 'viewer' in ROLE_HIERARCHY, "Should have viewer role"
assert ROLE_HIERARCHY['admin'] > ROLE_HIERARCHY['editor'] > ROLE_HIERARCHY['viewer'], "Hierarchy should be admin > editor > viewer"
print("   ‚úÖ ROLE_HIERARCHY has correct structure")

# Test 2: has_min_role with admin user
admin_user = {'role': 'admin'}
assert has_min_role(admin_user, 'admin') == True, "Admin should meet admin requirement"
assert has_min_role(admin_user, 'editor') == True, "Admin should meet editor requirement"
assert has_min_role(admin_user, 'viewer') == True, "Admin should meet viewer requirement"
print("   ‚úÖ Admin user passes all role checks")

# Test 3: has_min_role with editor user
editor_user = {'role': 'editor'}
assert has_min_role(editor_user, 'admin') == False, "Editor should NOT meet admin requirement"
assert has_min_role(editor_user, 'editor') == True, "Editor should meet editor requirement"
assert has_min_role(editor_user, 'viewer') == True, "Editor should meet viewer requirement"
print("   ‚úÖ Editor user passes correct role checks")

# Test 4: has_min_role with viewer user
viewer_user = {'role': 'viewer'}
assert has_min_role(viewer_user, 'admin') == False, "Viewer should NOT meet admin requirement"
assert has_min_role(viewer_user, 'editor') == False, "Viewer should NOT meet editor requirement"
assert has_min_role(viewer_user, 'viewer') == True, "Viewer should meet viewer requirement"
print("   ‚úÖ Viewer user passes correct role checks")

# Test 5: has_min_role with missing role (defaults to viewer)
no_role_user = {}
assert has_min_role(no_role_user, 'viewer') == True, "No role should default to viewer and meet viewer requirement"
assert has_min_role(no_role_user, 'editor') == False, "No role should NOT meet editor requirement"
print("   ‚úÖ Missing role defaults to viewer correctly")

# Test 6: has_min_role with unknown role
unknown_role_user = {'role': 'unknown'}
assert has_min_role(unknown_role_user, 'viewer') == False, "Unknown role should have level 0"
print("   ‚úÖ Unknown role handled correctly")

print("\n" + "="*60)
print("‚úÖ All ROLE_HIERARCHY and has_min_role tests PASSED!")
print("="*60)

In [None]:
#| hide
# Test: get_user_role

print("üß™ Testing get_user_role...")
print("="*60)

from fh_saas.utils_auth import get_user_role

# Test 1: Owner always gets admin role
owner_session = {'tenant_role': 'owner', 'user_id': 'usr_123'}
assert get_user_role(owner_session) == 'admin', "Owner should always get admin role"
print("   ‚úÖ Owner automatically gets admin role")

# Test 2: System admin always gets admin role
sysadmin_session = {'is_sys_admin': True, 'user_id': 'usr_456'}
assert get_user_role(sysadmin_session) == 'admin', "System admin should always get admin role"
print("   ‚úÖ System admin automatically gets admin role")

# Test 3: Regular user without tenant_db returns None
regular_session = {'tenant_role': 'member', 'user_id': 'usr_789'}
assert get_user_role(regular_session) == None, "Regular user without tenant_db should return None"
print("   ‚úÖ Regular user without tenant_db returns None")

# Test 4: Owner takes precedence over system admin
owner_and_sysadmin = {'tenant_role': 'owner', 'is_sys_admin': True, 'user_id': 'usr_101'}
assert get_user_role(owner_and_sysadmin) == 'admin', "Owner+sysadmin should still be admin"
print("   ‚úÖ Owner flag is checked first")

print("\n" + "="*60)
print("‚úÖ All get_user_role tests PASSED!")
print("="*60)

In [None]:
#| hide
# Test: require_role decorator

print("üß™ Testing require_role decorator...")
print("="*60)

from fh_saas.utils_auth import require_role
from unittest.mock import MagicMock

# Create mock request objects
class MockState:
    def __init__(self, user=None):
        self.user = user

class MockRequest:
    def __init__(self, user=None):
        self.state = MockState(user)

# Test 1: require_role accepts minimum role parameter
@require_role('admin')
async def admin_only(request):
    return "admin content"

assert callable(admin_only), "Decorated function should be callable"
print("   ‚úÖ require_role decorator creates callable function")

# Test 2: Admin user passes admin check
admin_request = MockRequest(user={'role': 'admin', 'user_id': 'usr_1'})
result = await admin_only(admin_request)
assert result == "admin content", "Admin should access admin route"
print("   ‚úÖ Admin user accesses admin route")

# Test 3: Editor user fails admin check
@require_role('admin')
async def admin_route(request):
    return "admin content"

editor_request = MockRequest(user={'role': 'editor', 'user_id': 'usr_2'})
result = await admin_route(editor_request)
assert result.status_code == 403, "Editor should get 403 for admin route"
print("   ‚úÖ Editor user gets 403 for admin route")

# Test 4: Viewer passes viewer check
@require_role('viewer')
async def viewer_route(request):
    return "viewer content"

viewer_request = MockRequest(user={'role': 'viewer', 'user_id': 'usr_3'})
result = await viewer_route(viewer_request)
assert result == "viewer content", "Viewer should access viewer route"
print("   ‚úÖ Viewer user accesses viewer route")

# Test 5: No user returns 401
@require_role('viewer')
async def any_route(request):
    return "content"

no_user_request = MockRequest(user=None)
result = await any_route(no_user_request)
assert result.status_code == 401, "No user should get 401"
print("   ‚úÖ No user returns 401 Unauthorized")

# Test 6: User with no role assigned returns 403
@require_role('viewer')
async def role_required_route(request):
    return "content"

no_role_request = MockRequest(user={'user_id': 'usr_4', 'role': None})
result = await role_required_route(no_role_request)
assert result.status_code == 403, "User with no role should get 403"
print("   ‚úÖ User without role assignment gets 403")

print("\n" + "="*60)
print("‚úÖ All require_role decorator tests PASSED!")
print("="*60)

## ‚ö° Session Caching Tests

In [None]:
#| hide
# Test: Session Caching

print("üß™ Testing session caching...")
print("="*60)

from fh_saas.utils_auth import (
    invalidate_auth_cache, _get_cached_auth, _set_auth_cache,
    _AUTH_CACHE_KEY, create_auth_beforeware
)
import time

# Test 1: _set_auth_cache stores data correctly
session = {}
user = {'user_id': 'usr_123', 'email': 'test@test.com', 'role': 'admin'}
_set_auth_cache(session, user, 'tnt_456')
assert _AUTH_CACHE_KEY in session, "Should store cache in session"
assert session[_AUTH_CACHE_KEY]['user']['user_id'] == 'usr_123'
assert session[_AUTH_CACHE_KEY]['tenant_id'] == 'tnt_456'
assert 'cached_at' in session[_AUTH_CACHE_KEY]
print("   ‚úÖ _set_auth_cache stores data correctly")

# Test 2: _get_cached_auth returns valid cache
cached = _get_cached_auth(session, 300)
assert cached is not None, "Should return cached data"
assert cached['user']['user_id'] == 'usr_123'
print("   ‚úÖ _get_cached_auth returns valid cache")

# Test 3: _get_cached_auth returns None for expired cache
session[_AUTH_CACHE_KEY]['cached_at'] = time.time() - 400  # Expired
cached = _get_cached_auth(session, 300)
assert cached is None, "Should return None for expired cache"
print("   ‚úÖ _get_cached_auth returns None for expired cache")

# Test 4: invalidate_auth_cache clears cache
session = {}
_set_auth_cache(session, user, 'tnt_456')
assert _AUTH_CACHE_KEY in session
invalidate_auth_cache(session)
assert _AUTH_CACHE_KEY not in session, "Should clear cache"
print("   ‚úÖ invalidate_auth_cache clears cache")

# Test 5: create_auth_beforeware accepts cache parameters
bw_cached = create_auth_beforeware(session_cache=True, session_cache_ttl=600)
assert hasattr(bw_cached, 'skip'), "Should create valid Beforeware"
print("   ‚úÖ create_auth_beforeware accepts session_cache parameters")

# Test 6: Cache is not used when session_cache=False (default)
bw_no_cache = create_auth_beforeware(session_cache=False)
assert hasattr(bw_no_cache, 'skip'), "Should create valid Beforeware"
print("   ‚úÖ session_cache=False (default) works")

# Test 7: _set_auth_cache creates a copy of user dict
original_user = {'user_id': 'usr_789', 'role': 'editor'}
session = {}
_set_auth_cache(session, original_user, 'tnt_999')
original_user['role'] = 'admin'  # Modify original
assert session[_AUTH_CACHE_KEY]['user']['role'] == 'editor', "Cache should be independent copy"
print("   ‚úÖ Cache stores independent copy of user dict")

print("\n" + "="*60)
print("‚úÖ All session caching tests PASSED!")
print("="*60)

## üö¶ auth_redirect Tests

### Why `auth_redirect` Exists

When HTMX makes a partial request and receives a standard redirect (302/303), it follows the redirect and swaps the response into the target element. This causes the **login page to appear inside the partial content area** instead of navigating the full page.

### When to Use `auth_redirect`

Use it in your route handlers when you need to redirect unauthenticated users to login:

```python
from fh_saas.utils_auth import auth_redirect, get_current_user

# ‚ùå OLD WAY - causes HTMX partial load issue
@app.get('/dashboard')
def dashboard(request):
    if not get_current_user(request.session):
        return RedirectResponse('/login', status_code=303)  # Breaks HTMX!
    return render_dashboard()

# ‚úÖ NEW WAY - HTMX-aware
@app.get('/dashboard')
def dashboard(request):
    if not get_current_user(request.session):
        return auth_redirect(request)  # Works with HTMX!
    return render_dashboard()
```

### How It Works

| Request Type | Behavior |
|--------------|----------|
| **HTMX request** (`HX-Request` header present) | Returns 200 with `HX-Redirect` header ‚Üí full page navigation |
| **Standard request** | Returns 303 `RedirectResponse` ‚Üí normal redirect |

### Integration Summary

| Component | Automatic? | Your Action |
|-----------|------------|-------------|
| `create_auth_beforeware` | ‚úÖ Auto | Sets up `request.state.user` |
| `auth_redirect` | ‚ùå Manual | Call it in routes when redirecting to login |

In [None]:
#| hide
# Test: auth_redirect HTMX-aware authentication redirects

print("üß™ Testing auth_redirect...")
print("="*60)

from fh_saas.utils_auth import auth_redirect
from starlette.responses import RedirectResponse, Response
from unittest.mock import Mock

# Test 1: HTMX request returns HX-Redirect header
mock_htmx_request = Mock()
mock_htmx_request.headers = {'HX-Request': 'true'}
response = auth_redirect(mock_htmx_request)
assert isinstance(response, Response), "Should return Response"
assert response.status_code == 200, "Should return 200 for HTMX"
assert response.headers.get('HX-Redirect') == '/login', "Should have HX-Redirect header"
print("   ‚úÖ HTMX request returns 200 with HX-Redirect header")

# Test 2: HTMX request with custom redirect URL
response = auth_redirect(mock_htmx_request, redirect_url='/auth/signin')
assert response.headers.get('HX-Redirect') == '/auth/signin', "Should use custom URL"
print("   ‚úÖ HTMX request uses custom redirect URL")

# Test 3: Non-HTMX request returns standard RedirectResponse
mock_standard_request = Mock()
mock_standard_request.headers = {}  # No HX-Request header
response = auth_redirect(mock_standard_request)
assert isinstance(response, RedirectResponse), "Should return RedirectResponse"
assert response.status_code == 303, "Should return 303 See Other"
print("   ‚úÖ Non-HTMX request returns standard 303 redirect")

# Test 4: Non-HTMX request with custom redirect URL
response = auth_redirect(mock_standard_request, redirect_url='/custom/login')
assert response.headers.get('location') == '/custom/login', "Should redirect to custom URL"
print("   ‚úÖ Non-HTMX request uses custom redirect URL")

# Test 5: Request with other headers but no HX-Request
mock_other_headers = Mock()
mock_other_headers.headers = {'Accept': 'text/html', 'User-Agent': 'Mozilla/5.0'}
response = auth_redirect(mock_other_headers)
assert isinstance(response, RedirectResponse), "Should treat as standard request"
print("   ‚úÖ Request with other headers (no HX-Request) uses standard redirect")

# Test 6: HTMX request with additional headers
mock_htmx_with_extras = Mock()
mock_htmx_with_extras.headers = {
    'HX-Request': 'true',
    'HX-Trigger': 'load-content',
    'HX-Target': '#main-content'
}
response = auth_redirect(mock_htmx_with_extras)
assert response.status_code == 200, "Should handle HTMX with extra headers"
assert 'HX-Redirect' in response.headers, "Should set HX-Redirect"
print("   ‚úÖ HTMX request with additional headers handled correctly")

print("\n" + "="*60)
print("‚úÖ All auth_redirect tests PASSED!")
print("="*60)

## üîÑ HTMX-Aware Beforeware Tests

These tests verify that `create_auth_beforeware` uses HTMX-aware redirects.

### Problem (Fixed)
When using `hx_boost="true"`, clicking on protected routes would show the login page 
**inside** the partial content area instead of doing a full-page redirect.

### Solution
`create_auth_beforeware` now uses `auth_redirect()` instead of `RedirectResponse()`.

In [None]:
#| hide
# Test: HTMX-aware redirects in create_auth_beforeware

print("üß™ Testing HTMX-aware redirects in create_auth_beforeware...")
print("="*60)

from fh_saas.utils_auth import create_auth_beforeware, auth_redirect
from starlette.responses import RedirectResponse, Response
from unittest.mock import Mock, MagicMock

# Create beforeware with minimal config
beforeware = create_auth_beforeware(
    redirect_path="/login",
    skip=[r"/public/.*"],
    setup_tenant_db=False  # Disable DB setup for unit test
)

# Get the check_auth function from beforeware
check_auth = beforeware.f

# Test 1: HTMX request without session returns HX-Redirect
print("\n1Ô∏è‚É£ Testing HTMX request without session...")
mock_htmx_req = MagicMock()
mock_htmx_req.headers = {"HX-Request": "true"}
mock_htmx_req.state = MagicMock()
mock_empty_session = {}

result = check_auth(mock_htmx_req, mock_empty_session)
assert result is not None, "Should return response for unauthenticated"
assert result.status_code == 200, f"HTMX should get 200, got {result.status_code}"
assert "HX-Redirect" in result.headers, "Should have HX-Redirect header"
assert result.headers["HX-Redirect"] == "/login", f"Should redirect to /login"
print("   ‚úÖ HTMX request gets 200 + HX-Redirect header")

# Test 2: Standard request without session returns 303 redirect
print("\n2Ô∏è‚É£ Testing standard request without session...")
mock_std_req = MagicMock()
mock_std_req.headers = {}
mock_std_req.state = MagicMock()

result = check_auth(mock_std_req, mock_empty_session)
assert result is not None, "Should return response for unauthenticated"
assert isinstance(result, RedirectResponse), f"Standard should get RedirectResponse"
assert result.status_code == 303, f"Standard should get 303, got {result.status_code}"
print("   ‚úÖ Standard request gets 303 RedirectResponse")

# Test 3: HX-Boosted request also gets HTMX treatment
print("\n3Ô∏è‚É£ Testing HX-Boosted request without session...")
mock_boosted_req = MagicMock()
mock_boosted_req.headers = {"HX-Request": "true", "HX-Boosted": "true"}
mock_boosted_req.state = MagicMock()

result = check_auth(mock_boosted_req, mock_empty_session)
assert result.status_code == 200, "HX-Boosted should get 200"
assert "HX-Redirect" in result.headers, "Should have HX-Redirect header"
print("   ‚úÖ HX-Boosted request gets 200 + HX-Redirect header")

# Test 4: Authenticated user passes through
print("\n4Ô∏è‚É£ Testing authenticated user passes through...")
mock_auth_session = {"user_id": "usr_123"}
mock_auth_req = MagicMock()
mock_auth_req.headers = {"HX-Request": "true"}
mock_auth_req.state = MagicMock()

result = check_auth(mock_auth_req, mock_auth_session)
assert result is None, "Authenticated user should pass through (None)"
print("   ‚úÖ Authenticated user passes through")

# Test 5: Custom redirect path works with HTMX
print("\n5Ô∏è‚É£ Testing custom redirect path with HTMX...")
custom_beforeware = create_auth_beforeware(
    redirect_path="/auth/signin",
    setup_tenant_db=False
)
custom_check = custom_beforeware.f

result = custom_check(mock_htmx_req, mock_empty_session)
assert result.headers["HX-Redirect"] == "/auth/signin", "Should use custom path"
print("   ‚úÖ Custom redirect path works with HTMX")

print("\n" + "="*60)
print("‚úÖ All HTMX-aware beforeware tests PASSED!")

In [None]:
#| hide

print("="*60)
print("üß™ SLIDING SESSION TESTS")
print("="*60)

from fh_saas.utils_auth import SessionConfig, SlidingSessionMiddleware, create_session_middleware
import time

# Test 1: SessionConfig defaults
print("\n1Ô∏è‚É£ Testing SessionConfig defaults...")
config = SessionConfig()
assert config.max_age == 3600, f"Default max_age should be 3600, got {config.max_age}"
assert config.sliding == True, "Default sliding should be True"
assert config.absolute_max is None, "Default absolute_max should be None"
assert config.secure == True, "Default secure should be True"
assert config.same_site == 'lax', f"Default same_site should be 'lax', got {config.same_site}"
print("   ‚úÖ SessionConfig defaults are correct")

# Test 2: SessionConfig.default() factory
print("\n2Ô∏è‚É£ Testing SessionConfig.default() factory...")
default_config = SessionConfig.default()
assert default_config.max_age == 3600, "default() should return 1 hour max_age"
assert default_config.sliding == True, "default() should have sliding=True"
print("   ‚úÖ SessionConfig.default() works")

# Test 3: SessionConfig.strict() factory
print("\n3Ô∏è‚É£ Testing SessionConfig.strict() factory...")
strict_config = SessionConfig.strict()
assert strict_config.max_age == 1800, f"strict() should have 30 min max_age, got {strict_config.max_age}"
assert strict_config.absolute_max == 28800, f"strict() should have 8h absolute_max, got {strict_config.absolute_max}"
print("   ‚úÖ SessionConfig.strict() works")

# Test 4: SessionConfig.relaxed() factory
print("\n4Ô∏è‚É£ Testing SessionConfig.relaxed() factory...")
relaxed_config = SessionConfig.relaxed()
assert relaxed_config.max_age == 28800, f"relaxed() should have 8h max_age, got {relaxed_config.max_age}"
assert relaxed_config.absolute_max == 604800, f"relaxed() should have 7d absolute_max, got {relaxed_config.absolute_max}"
print("   ‚úÖ SessionConfig.relaxed() works")

# Test 5: Custom SessionConfig
print("\n5Ô∏è‚É£ Testing custom SessionConfig...")
custom_config = SessionConfig(max_age=7200, absolute_max=86400, secure=False)
assert custom_config.max_age == 7200, "Custom max_age should work"
assert custom_config.absolute_max == 86400, "Custom absolute_max should work"
assert custom_config.secure == False, "Custom secure should work"
print("   ‚úÖ Custom SessionConfig works")

# Test 6: create_session_middleware factory
print("\n6Ô∏è‚É£ Testing create_session_middleware factory...")
middleware_factory = create_session_middleware('test-secret-key', SessionConfig(max_age=1800))
assert callable(middleware_factory), "create_session_middleware should return a callable"

# Simulate wrapping an app
mock_app = lambda scope, receive, send: None
wrapped = middleware_factory(mock_app)
assert isinstance(wrapped, SlidingSessionMiddleware), "Should return SlidingSessionMiddleware instance"
assert wrapped._config.max_age == 1800, "Middleware should use provided config"
print("   ‚úÖ create_session_middleware factory works")

# Test 7: create_user_session includes session_started_at
print("\n7Ô∏è‚É£ Testing create_user_session includes session_started_at...")
from fh_saas.utils_auth import create_user_session
from fh_saas.db_host import GlobalUser, Membership

mock_session = {}
mock_user = GlobalUser(
    id='usr_test',
    email='test@example.com',
    oauth_id='oauth_123',
    is_sys_admin=False,
    created_at='2024-01-01'
)
mock_membership = Membership(
    id='mem_test',
    user_id='usr_test',
    tenant_id='tnt_test',
    role='owner',
    created_at='2024-01-01'
)

before_time = time.time()
create_user_session(mock_session, mock_user, mock_membership)
after_time = time.time()

assert 'session_started_at' in mock_session, "Session should include session_started_at"
assert before_time <= mock_session['session_started_at'] <= after_time, "session_started_at should be current time"
assert mock_session['user_id'] == 'usr_test', "user_id should be set"
assert mock_session['tenant_id'] == 'tnt_test', "tenant_id should be set"
print("   ‚úÖ create_user_session includes session_started_at")

# Test 8: create_auth_beforeware with session_config absolute_max check
print("\n8Ô∏è‚É£ Testing create_auth_beforeware absolute_max enforcement...")
from fh_saas.utils_auth import create_auth_beforeware, clear_session
from unittest.mock import MagicMock
from starlette.responses import Response

# Create session that is past absolute_max
expired_session = {
    'user_id': 'usr_test',
    'session_started_at': time.time() - 100  # 100 seconds ago
}

# Config with 50 second absolute_max (so session is expired)
test_config = SessionConfig(max_age=3600, absolute_max=50)
beforeware = create_auth_beforeware(
    setup_tenant_db=False,
    session_config=test_config
)
check_fn = beforeware.f

mock_req = MagicMock()
mock_req.headers = {}
mock_req.state = MagicMock()

# Session should be cleared and user redirected
result = check_fn(mock_req, expired_session)
assert result is not None, "Should return response when absolute_max exceeded"
# Session should be cleared
assert 'user_id' not in expired_session, "Session should be cleared due to absolute_max"
print("   ‚úÖ create_auth_beforeware enforces absolute_max")

# Test 9: Valid session passes through with session_config
print("\n9Ô∏è‚É£ Testing valid session passes through with session_config...")
valid_session = {
    'user_id': 'usr_test',
    'session_started_at': time.time() - 10  # 10 seconds ago, well within limit
}
test_config_2 = SessionConfig(max_age=3600, absolute_max=3600)  # 1 hour absolute max
beforeware_2 = create_auth_beforeware(
    setup_tenant_db=False,
    session_config=test_config_2
)
check_fn_2 = beforeware_2.f

mock_req_2 = MagicMock()
mock_req_2.headers = {}
mock_req_2.state = MagicMock()

result_2 = check_fn_2(mock_req_2, valid_session)
assert result_2 is None, "Valid session should pass through"
assert 'user_id' in valid_session, "Session should NOT be cleared for valid session"
print("   ‚úÖ Valid session passes through with session_config")

print("\n" + "="*60)
print("‚úÖ All SLIDING SESSION tests PASSED!")
print("="*60)
print("="*60)
