In [1]:
import { useState, useEffect } from 'react';
import { supabase } from '../lib/supabase';
import permissionCache from '../lib/permission-cache';

/**
 * Permission Monitoring Service
 * Captures and tracks metrics about the permission system
 */
export class PermissionMonitoringService {
  private static instance: PermissionMonitoringService;
  private metrics: {
    // Cache metrics
    cacheHits: number;
    cacheMisses: number;
    cacheSize: number;
    cacheExpirations: number;
    cacheInvalidations: number;
    
    // Permission check metrics
    permissionChecks: number;
    permissionChecksByResource: Record<string, number>;
    permissionChecksByAction: Record<string, number>;
    permissionGranted: number;
    permissionDenied: number;
    permissionErrors: number;
    permissionCheckLatency: number[];
    
    // Role management metrics
    roleAssignments: number;
    roleRemovals: number;
    
    // Security events
    unauthorizedAccessAttempts: number;
    
    // Edge function metrics
    edgeFunctionCalls: number;
    edgeFunctionErrors: number;
    edgeFunctionLatency: number[];
  };
  
  private eventLog: {
    timestamp: Date;
    eventType: string;
    details: any;
  }[] = [];
  
  private constructor() {
    // Initialize metrics
    this.metrics = {
      // Cache metrics
      cacheHits: 0,
      cacheMisses: 0,
      cacheSize: 0,
      cacheExpirations: 0,
      cacheInvalidations: 0,
      
      // Permission check metrics
      permissionChecks: 0,
      permissionChecksByResource: {},
      permissionChecksByAction: {},
      permissionGranted: 0,
      permissionDenied: 0,
      permissionErrors: 0,
      permissionCheckLatency: [],
      
      // Role management metrics
      roleAssignments: 0,
      roleRemovals: 0,
      
      // Security events
      unauthorizedAccessAttempts: 0,
      
      // Edge function metrics
      edgeFunctionCalls: 0,
      edgeFunctionErrors: 0,
      edgeFunctionLatency: [],
    };
    
    this.monkeyPatchPermissionCache();
    this.monkeyPatchPermissionFunctions();
  }
  
  /**
   * Get the singleton instance
   */
  public static getInstance(): PermissionMonitoringService {
    if (!PermissionMonitoringService.instance) {
      PermissionMonitoringService.instance = new PermissionMonitoringService();
    }
    return PermissionMonitoringService.instance;
  }
  
  /**
   * Extend the PermissionCache to track cache metrics
   */
  private monkeyPatchPermissionCache(): void {
    const originalGet = permissionCache.get;
    const originalSet = permissionCache.set;
    const originalDelete = permissionCache.delete;
    const originalInvalidateUserPermissions = permissionCache.invalidateUserPermissions;
    const self = this;
    
    // Override get method to track hits and misses
    permissionCache.get = function<T>(key: string): T | undefined {
      const startTime = performance.now();
      const value = originalGet.call(this, key);
      const endTime = performance.now();
      
      if (value === undefined) {
        self.metrics.cacheMisses++;
        self.logEvent('CACHE_MISS', { key });
      } else {
        self.metrics.cacheHits++;
        self.logEvent('CACHE_HIT', { key });
      }
      
      self.updateCacheSize();
      return value;
    };
    
    // Override set method to track cache updates
    permissionCache.set = function<T>(key: string, value: T, ttl?: number): void {
      originalSet.call(this, key, value, ttl);
      self.updateCacheSize();
      self.logEvent('CACHE_SET', { key });
    };
    
    // Override delete method to track cache deletions
    permissionCache.delete = function(key: string): void {
      originalDelete.call(this, key);
      self.updateCacheSize();
      self.logEvent('CACHE_DELETE', { key });
    };
    
    // Override invalidateUserPermissions to track invalidations
    permissionCache.invalidateUserPermissions = function(userId: string): void {
      originalInvalidateUserPermissions.call(this, userId);
      self.metrics.cacheInvalidations++;
      self.updateCacheSize();
      self.logEvent('CACHE_INVALIDATE', { userId });
    };
  }
  
  /**
   * Track the current size of the cache
   */
  private updateCacheSize(): void {
    // Use Object.keys on the private cache property
    // This is a bit hacky but works for monitoring
    // @ts-ignore - Accessing private property
    if (permissionCache.cache && permissionCache.cache instanceof Map) {
      // @ts-ignore - Accessing private property
      this.metrics.cacheSize = permissionCache.cache.size;
    }
  }
  
  /**
   * Extend the permission functions to track metrics
   */
  private monkeyPatchPermissionFunctions(): void {
    // Import the permission functions dynamically to avoid circular dependencies
    import('../lib/permission').then((permissionModule) => {
      const originalCheckPermission = permissionModule.checkPermission;
      const originalAssignRoleToUser = permissionModule.assignRoleToUser;
      const originalRemoveRoleFromUser = permissionModule.removeRoleFromUser;
      const self = this;
      
      // Override checkPermission to track permission checks
      permissionModule.checkPermission = async function(userId: string, resource: string, action: string): Promise<boolean> {
        self.metrics.permissionChecks++;
        
        // Track checks by resource
        self.metrics.permissionChecksByResource[resource] = 
          (self.metrics.permissionChecksByResource[resource] || 0) + 1;
        
        // Track checks by action
        self.metrics.permissionChecksByAction[action] = 
          (self.metrics.permissionChecksByAction[action] || 0) + 1;
        
        const startTime = performance.now();
        try {
          const result = await originalCheckPermission.call(this, userId, resource, action);
          const endTime = performance.now();
          
          // Record latency
          self.metrics.permissionCheckLatency.push(endTime - startTime);
          
          // Track result
          if (result) {
            self.metrics.permissionGranted++;
            self.logEvent('PERMISSION_GRANTED', { userId, resource, action });
          } else {
            self.metrics.permissionDenied++;
            self.logEvent('PERMISSION_DENIED', { userId, resource, action });
          }
          
          return result;
        } catch (error) {
          self.metrics.permissionErrors++;
          self.logEvent('PERMISSION_ERROR', { userId, resource, action, error });
          throw error;
        }
      };
      
      // Override assignRoleToUser to track role assignments
      permissionModule.assignRoleToUser = async function(userId: string, roleName: string): Promise<boolean> {
        try {
          const result = await originalAssignRoleToUser.call(this, userId, roleName);
          if (result) {
            self.metrics.roleAssignments++;
            self.logEvent('ROLE_ASSIGNED', { userId, roleName });
          }
          return result;
        } catch (error) {
          self.logEvent('ROLE_ASSIGNMENT_ERROR', { userId, roleName, error });
          throw error;
        }
      };
      
      // Override removeRoleFromUser to track role removals
      permissionModule.removeRoleFromUser = async function(userId: string, roleName: string): Promise<boolean> {
        try {
          const result = await originalRemoveRoleFromUser.call(this, userId, roleName);
          if (result) {
            self.metrics.roleRemovals++;
            self.logEvent('ROLE_REMOVED', { userId, roleName });
          }
          return result;
        } catch (error) {
          self.logEvent('ROLE_REMOVAL_ERROR', { userId, roleName, error });
          throw error;
        }
      };
    });
  }
  
  /**
   * Log an event to the event log
   */
  private logEvent(eventType: string, details: any): void {
    this.eventLog.push({
      timestamp: new Date(),
      eventType,
      details
    });
    
    // Keep event log at a reasonable size
    if (this.eventLog.length > 1000) {
      this.eventLog.shift();
    }
    
    // For serious security events, send to audit logs in the database
    if (
      eventType === 'PERMISSION_DENIED' || 
      eventType === 'UNAUTHORIZED_ACCESS' ||
      eventType.includes('ERROR')
    ) {
      this.logToAuditDatabase(eventType, details);
    }
  }
  
  /**
   * Log critical events to the audit logs database
   */
  private async logToAuditDatabase(eventType: string, details: any): Promise<void> {
    try {
      const userId = details.userId || 'system';
      await supabase.from('app_0be8fb8541_audit_logs').insert({
        user_id: userId,
        action_type: eventType,
        resource_type: 'PERMISSION',
        resource_id: details.resource || null,
        details
      });
    } catch (error) {
      console.error('Failed to log to audit database:', error);
    }
  }
  
  /**
   * Get current metrics
   */
  public getMetrics() {
    return {
      ...this.metrics,
      
      // Calculate derived metrics
      cacheHitRate: this.calculateCacheHitRate(),
      averagePermissionCheckLatency: this.calculateAverageLatency(this.metrics.permissionCheckLatency),
      averageEdgeFunctionLatency: this.calculateAverageLatency(this.metrics.edgeFunctionLatency),
      eventLog: this.getRecentEvents(50)
    };
  }
  
  /**
   * Calculate cache hit rate
   */
  private calculateCacheHitRate(): number {
    const total = this.metrics.cacheHits + this.metrics.cacheMisses;
    return total > 0 ? this.metrics.cacheHits / total : 0;
  }
  
  /**
   * Calculate average latency from an array of latency measurements
   */
  private calculateAverageLatency(latencyArray: number[]): number {
    if (latencyArray.length === 0) return 0;
    const sum = latencyArray.reduce((acc, val) => acc + val, 0);
    return sum / latencyArray.length;
  }
  
  /**
   * Get recent events from the event log
   */
  private getRecentEvents(count: number): typeof this.eventLog {
    return this.eventLog.slice(-count);
  }
  
  /**
   * Reset metrics (for testing or time-based resets)
   */
  public resetMetrics(): void {
    this.metrics = {
      cacheHits: 0,
      cacheMisses: 0,
      cacheSize: this.metrics.cacheSize, // Keep current cache size
      cacheExpirations: 0,
      cacheInvalidations: 0,
      
      permissionChecks: 0,
      permissionChecksByResource: {},
      permissionChecksByAction: {},
      permissionGranted: 0,
      permissionDenied: 0,
      permissionErrors: 0,
      permissionCheckLatency: [],
      
      roleAssignments: 0,
      roleRemovals: 0,
      
      unauthorizedAccessAttempts: 0,
      
      edgeFunctionCalls: 0,
      edgeFunctionErrors: 0,
      edgeFunctionLatency: [],
    };
  }
}

// Export singleton instance
export default PermissionMonitoringService.getInstance();

SyntaxError: invalid syntax (2445020747.py, line 1)

In [2]:
from metagpt.tools.libs.terminal import Terminal
terminal = Terminal()
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import json
import re
from datetime import datetime, timedelta
import time

class PermissionMonitoringSystem:
    """Python implementation of a permission monitoring system for AIvue v2"""
    
    def __init__(self):
        """Initialize the monitoring system with metrics storage"""
        # Cache metrics
        self.cache_metrics = {
            'hits': 0,
            'misses': 0,
            'size': 0,
            'expirations': 0,
            'invalidations': 0,
            'hit_rate_history': [],
            'timestamps': []
        }
        
        # Permission check metrics
        self.permission_metrics = {
            'total_checks': 0,
            'checks_by_resource': {},
            'checks_by_action': {},
            'granted': 0,
            'denied': 0,
            'errors': 0,
            'latency': [],
            'avg_latency_history': []
        }
        
        # Role management metrics
        self.role_metrics = {
            'assignments': 0,
            'removals': 0,
            'users_per_role': {},
            'permission_changes': 0
        }
        
        # Security events
        self.security_metrics = {
            'unauthorized_attempts': 0,
            'permission_denials_by_resource': {},
            'audit_logs_count': 0
        }
        
        # Edge function metrics
        self.edge_metrics = {
            'calls': 0,
            'errors': 0,
            'latency': [],
            'avg_latency_history': []
        }
        
        # Event log
        self.event_log = []
    
    def simulate_permission_activity(self, duration_seconds=60, interval=1):
        """Simulate permission system activity for testing"""
        print(f"Simulating permission system activity for {duration_seconds} seconds...")
        
        resources = ['outlets', 'reviews', 'campaigns', 'users', 'roles', 'permissions']
        actions = ['read', 'create', 'update', 'delete']
        users = ['user1', 'user2', 'user3', 'admin1', 'admin2']
        roles = ['merchant', 'staff', 'admin', 'super_admin']
        
        start_time = time.time()
        end_time = start_time + duration_seconds
        
        while time.time() < end_time:
            # Simulate cache activity
            for _ in range(np.random.randint(1, 5)):
                if np.random.random() < 0.7:  # 70% hit rate
                    self.record_cache_hit()
                else:
                    self.record_cache_miss()
            
            # Simulate permission checks
            for _ in range(np.random.randint(1, 10)):
                resource = np.random.choice(resources)
                action = np.random.choice(actions)
                user = np.random.choice(users)
                latency = np.random.uniform(5, 100)  # ms
                
                granted = np.random.random() < 0.85  # 85% success rate
                self.record_permission_check(user, resource, action, granted, latency)
            
            # Simulate role changes (less frequent)
            if np.random.random() < 0.1:  # 10% chance
                user = np.random.choice(users)
                role = np.random.choice(roles)
                if np.random.random() < 0.7:
                    self.record_role_assignment(user, role)
                else:
                    self.record_role_removal(user, role)
            
            # Simulate edge function calls
            if np.random.random() < 0.3:  # 30% chance
                function_name = np.random.choice([
                    'app_0be8fb8541_check_permission',
                    'app_0be8fb8541_manage_user_role'
                ])
                success = np.random.random() < 0.95  # 95% success rate
                latency = np.random.uniform(50, 500)  # ms
                self.record_edge_function_call(function_name, success, latency)
            
            # Update history at regular intervals
            self.update_history()
            
            # Sleep for the interval
            time.sleep(interval)
        
        print("Simulation completed!")
    
    def record_cache_hit(self):
        """Record a cache hit"""
        self.cache_metrics['hits'] += 1
        self.log_event('CACHE_HIT', {'timestamp': datetime.now().isoformat()})
    
    def record_cache_miss(self):
        """Record a cache miss"""
        self.cache_metrics['misses'] += 1
        self.log_event('CACHE_MISS', {'timestamp': datetime.now().isoformat()})
    
    def record_cache_invalidation(self, user_id):
        """Record a cache invalidation for a user"""
        self.cache_metrics['invalidations'] += 1
        self.log_event('CACHE_INVALIDATION', {
            'user_id': user_id,
            'timestamp': datetime.now().isoformat()
        })
    
    def record_permission_check(self, user_id, resource, action, granted, latency):
        """Record a permission check"""
        self.permission_metrics['total_checks'] += 1
        
        # Track by resource
        if resource not in self.permission_metrics['checks_by_resource']:
            self.permission_metrics['checks_by_resource'][resource] = 0
        self.permission_metrics['checks_by_resource'][resource] += 1
        
        # Track by action
        if action not in self.permission_metrics['checks_by_action']:
            self.permission_metrics['checks_by_action'][action] = 0
        self.permission_metrics['checks_by_action'][action] += 1
        
        # Track result
        if granted:
            self.permission_metrics['granted'] += 1
        else:
            self.permission_metrics['denied'] += 1
            # Track denied permissions by resource
            if resource not in self.security_metrics['permission_denials_by_resource']:
                self.security_metrics['permission_denials_by_resource'][resource] = 0
            self.security_metrics['permission_denials_by_resource'][resource] += 1
            
            # Track as security event
            if resource in ['users', 'roles', 'permissions']:
                self.security_metrics['unauthorized_attempts'] += 1
        
        # Track latency
        self.permission_metrics['latency'].append(latency)
        
        # Log the event
        event_type = 'PERMISSION_GRANTED' if granted else 'PERMISSION_DENIED'
        self.log_event(event_type, {
            'user_id': user_id,
            'resource': resource,
            'action': action,
            'latency': latency,
            'timestamp': datetime.now().isoformat()
        })
    
    def record_role_assignment(self, user_id, role_name):
        """Record a role assignment"""
        self.role_metrics['assignments'] += 1
        
        # Track users per role
        if role_name not in self.role_metrics['users_per_role']:
            self.role_metrics['users_per_role'][role_name] = set()
        self.role_metrics['users_per_role'][role_name].add(user_id)
        
        # Log the event
        self.log_event('ROLE_ASSIGNED', {
            'user_id': user_id,
            'role_name': role_name,
            'timestamp': datetime.now().isoformat()
        })
        
        # Log to audit database (simulated)
        self.security_metrics['audit_logs_count'] += 1
    
    def record_role_removal(self, user_id, role_name):
        """Record a role removal"""
        self.role_metrics['removals'] += 1
        
        # Update users per role
        if role_name in self.role_metrics['users_per_role'] and user_id in self.role_metrics['users_per_role'][role_name]:
            self.role_metrics['users_per_role'][role_name].remove(user_id)
        
        # Log the event
        self.log_event('ROLE_REMOVED', {
            'user_id': user_id,
            'role_name': role_name,
            'timestamp': datetime.now().isoformat()
        })
        
        # Log to audit database (simulated)
        self.security_metrics['audit_logs_count'] += 1
    
    def record_edge_function_call(self, function_name, success, latency):
        """Record an edge function call"""
        self.edge_metrics['calls'] += 1
        
        if not success:
            self.edge_metrics['errors'] += 1
        
        # Track latency
        self.edge_metrics['latency'].append(latency)
        
        # Log the event
        event_type = 'EDGE_FUNCTION_CALL' if success else 'EDGE_FUNCTION_ERROR'
        self.log_event(event_type, {
            'function': function_name,
            'success': success,
            'latency': latency,
            'timestamp': datetime.now().isoformat()
        })
    
    def log_event(self, event_type, details):
        """Log an event to the event log"""
        self.event_log.append({
            'timestamp': datetime.now(),
            'event_type': event_type,
            'details': details
        })
        
        # Keep event log at a reasonable size
        if len(self.event_log) > 1000:
            self.event_log = self.event_log[-1000:]
    
    def update_history(self):
        """Update historical metrics for trending"""
        now = datetime.now()
        
        # Update cache hit rate history
        total_cache_attempts = self.cache_metrics['hits'] + self.cache_metrics['misses']
        hit_rate = self.cache_metrics['hits'] / total_cache_attempts if total_cache_attempts > 0 else 0
        self.cache_metrics['hit_rate_history'].append(hit_rate)
        self.cache_metrics['timestamps'].append(now)
        
        # Update average latency history
        if self.permission_metrics['latency']:
            avg_latency = sum(self.permission_metrics['latency']) / len(self.permission_metrics['latency'])
            self.permission_metrics['avg_latency_history'].append(avg_latency)
        
        # Update edge function latency history
        if self.edge_metrics['latency']:
            avg_edge_latency = sum(self.edge_metrics['latency']) / len(self.edge_metrics['latency'])
            self.edge_metrics['avg_latency_history'].append(avg_edge_latency)
        
        # Keep histories at a reasonable size
        max_history = 1000
        if len(self.cache_metrics['hit_rate_history']) > max_history:
            self.cache_metrics['hit_rate_history'] = self.cache_metrics['hit_rate_history'][-max_history:]
            self.cache_metrics['timestamps'] = self.cache_metrics['timestamps'][-max_history:]
        
        if len(self.permission_metrics['avg_latency_history']) > max_history:
            self.permission_metrics['avg_latency_history'] = self.permission_metrics['avg_latency_history'][-max_history:]
        
        if len(self.edge_metrics['avg_latency_history']) > max_history:
            self.edge_metrics['avg_latency_history'] = self.edge_metrics['avg_latency_history'][-max_history:]
    
    def get_metrics_summary(self):
        """Get a summary of all metrics"""
        # Calculate derived metrics
        total_cache_attempts = self.cache_metrics['hits'] + self.cache_metrics['misses']
        cache_hit_rate = self.cache_metrics['hits'] / total_cache_attempts if total_cache_attempts > 0 else 0
        
        avg_permission_latency = np.mean(self.permission_metrics['latency']) if self.permission_metrics['latency'] else 0
        avg_edge_latency = np.mean(self.edge_metrics['latency']) if self.edge_metrics['latency'] else 0
        
        return {
            'cache': {
                'hit_rate': cache_hit_rate,
                'hits': self.cache_metrics['hits'],
                'misses': self.cache_metrics['misses'],
                'invalidations': self.cache_metrics['invalidations']
            },
            'permissions': {
                'total_checks': self.permission_metrics['total_checks'],
                'granted_rate': self.permission_metrics['granted'] / self.permission_metrics['total_checks'] 
                               if self.permission_metrics['total_checks'] > 0 else 0,
                'avg_latency_ms': avg_permission_latency,
                'top_resources': dict(sorted(self.permission_metrics['checks_by_resource'].items(), 
                                             key=lambda x: x[1], reverse=True)[:5])
            },
            'roles': {
                'assignments': self.role_metrics['assignments'],
                'removals': self.role_metrics['removals'],
                'users_by_role': {role: len(users) for role, users in self.role_metrics['users_per_role'].items()}
            },
            'security': {
                'unauthorized_attempts': self.security_metrics['unauthorized_attempts'],
                'permission_denials': sum(self.security_metrics['permission_denials_by_resource'].values()),
                'audit_logs': self.security_metrics['audit_logs_count']
            },
            'edge_functions': {
                'calls': self.edge_metrics['calls'],
                'error_rate': self.edge_metrics['errors'] / self.edge_metrics['calls'] 
                              if self.edge_metrics['calls'] > 0 else 0,
                'avg_latency_ms': avg_edge_latency
            }
        }

    def visualize_metrics(self):
        """Visualize key metrics with matplotlib"""
        metrics = self.get_metrics_summary()
        
        # Set up the figure with subplots
        fig = plt.figure(figsize=(15, 10))
        
        # 1. Cache Performance
        ax1 = fig.add_subplot(2, 3, 1)
        cache_labels = ['Hits', 'Misses']
        cache_values = [self.cache_metrics['hits'], self.cache_metrics['misses']]
        ax1.pie(cache_values, labels=cache_labels, autopct='%1.1f%%', startangle=90, colors=['#4CAF50', '#F44336'])
        ax1.set_title('Cache Performance')
        
        # 2. Permission Checks by Resource
        ax2 = fig.add_subplot(2, 3, 2)
        resources = list(metrics['permissions']['top_resources'].keys())
        checks = list(metrics['permissions']['top_resources'].values())
        ax2.bar(resources, checks, color='#2196F3')
        ax2.set_title('Top Resources Accessed')
        ax2.set_ylabel('Number of Checks')
        plt.setp(ax2.get_xticklabels(), rotation=45, ha='right')
        
        # 3. Permission Results
        ax3 = fig.add_subplot(2, 3, 3)
        results_labels = ['Granted', 'Denied']
        results_values = [self.permission_metrics['granted'], self.permission_metrics['denied']]
        ax3.pie(results_values, labels=results_labels, autopct='%1.1f%%', startangle=90, colors=['#4CAF50', '#F44336'])
        ax3.set_title('Permission Check Results')
        
        # 4. Cache Hit Rate Over Time
        ax4 = fig.add_subplot(2, 3, 4)
        if len(self.cache_metrics['hit_rate_history']) > 1:
            ax4.plot(self.cache_metrics['timestamps'][-30:], self.cache_metrics['hit_rate_history'][-30:], 'g-')
            ax4.set_title('Cache Hit Rate (Recent)')
            ax4.set_ylabel('Hit Rate')
            ax4.set_ylim(0, 1)
            plt.setp(ax4.get_xticklabels(), rotation=45, ha='right')
        else:
            ax4.text(0.5, 0.5, 'Insufficient data', horizontalalignment='center', verticalalignment='center')
            ax4.set_title('Cache Hit Rate (Recent)')
        
        # 5. Users Per Role
        ax5 = fig.add_subplot(2, 3, 5)
        roles = list(metrics['roles']['users_by_role'].keys())
        user_counts = list(metrics['roles']['users_by_role'].values())
        if roles:
            ax5.bar(roles, user_counts, color='#9C27B0')
            ax5.set_title('Users Per Role')
            ax5.set_ylabel('Number of Users')
            plt.setp(ax5.get_xticklabels(), rotation=45, ha='right')
        else:
            ax5.text(0.5, 0.5, 'No role data available', horizontalalignment='center', verticalalignment='center')
            ax5.set_title('Users Per Role')
        
        # 6. Security Metrics
        ax6 = fig.add_subplot(2, 3, 6)
        security_labels = ['Unauthorized\nAttempts', 'Permission\nDenials', 'Audit\nLogs']
        security_values = [
            metrics['security']['unauthorized_attempts'],
            metrics['security']['permission_denials'],
            metrics['security']['audit_logs']
        ]
        ax6.bar(security_labels, security_values, color='#FF9800')
        ax6.set_title('Security Metrics')
        ax6.set_ylabel('Count')
        
        plt.tight_layout()
        plt.savefig('permission_monitoring_dashboard.png')
        plt.close()
        
        return "Permission monitoring visualizations created and saved as 'permission_monitoring_dashboard.png'"

# Create a monitoring system and simulate some activity
monitoring_system = PermissionMonitoringSystem()
monitoring_system.simulate_permission_activity(duration_seconds=5)

# Generate visualization
visualization_result = monitoring_system.visualize_metrics()
print(visualization_result)

# Display metrics summary
metrics_summary = monitoring_system.get_metrics_summary()
print("\nPermission System Metrics Summary:")
print(json.dumps(metrics_summary, indent=2, default=str))

2025-07-27 09:26:55.664 | INFO     | metagpt.const:get_metagpt_root:33 - PROJECT_ROOT set from environment variable to /


ValueError: numpy.dtype size changed, may indicate binary incompatibility. Expected 96 from C header, got 88 from PyObject

In [3]:
from metagpt.tools.libs.terminal import Terminal
terminal = Terminal()
import json
import time
from datetime import datetime, timedelta
import matplotlib.pyplot as plt

class PermissionMonitoringSystem:
    """Python implementation of a permission monitoring system for AIvue v2"""
    
    def __init__(self):
        """Initialize the monitoring system with metrics storage"""
        # Cache metrics
        self.cache_metrics = {
            'hits': 0,
            'misses': 0,
            'size': 0,
            'expirations': 0,
            'invalidations': 0,
            'hit_rate_history': [],
            'timestamps': []
        }
        
        # Permission check metrics
        self.permission_metrics = {
            'total_checks': 0,
            'checks_by_resource': {},
            'checks_by_action': {},
            'granted': 0,
            'denied': 0,
            'errors': 0,
            'latency': [],
            'avg_latency_history': []
        }
        
        # Role management metrics
        self.role_metrics = {
            'assignments': 0,
            'removals': 0,
            'users_per_role': {},
            'permission_changes': 0
        }
        
        # Security events
        self.security_metrics = {
            'unauthorized_attempts': 0,
            'permission_denials_by_resource': {},
            'audit_logs_count': 0
        }
        
        # Edge function metrics
        self.edge_metrics = {
            'calls': 0,
            'errors': 0,
            'latency': [],
            'avg_latency_history': []
        }
        
        # Event log
        self.event_log = []
    
    def simulate_permission_activity(self, duration_seconds=60, interval=1):
        """Simulate permission system activity for testing"""
        print(f"Simulating permission system activity for {duration_seconds} seconds...")
        
        resources = ['outlets', 'reviews', 'campaigns', 'users', 'roles', 'permissions']
        actions = ['read', 'create', 'update', 'delete']
        users = ['user1', 'user2', 'user3', 'admin1', 'admin2']
        roles = ['merchant', 'staff', 'admin', 'super_admin']
        
        import random  # Using standard library random instead of numpy
        
        start_time = time.time()
        end_time = start_time + duration_seconds
        
        while time.time() < end_time:
            # Simulate cache activity
            for _ in range(random.randint(1, 5)):
                if random.random() < 0.7:  # 70% hit rate
                    self.record_cache_hit()
                else:
                    self.record_cache_miss()
            
            # Simulate permission checks
            for _ in range(random.randint(1, 10)):
                resource = random.choice(resources)
                action = random.choice(actions)
                user = random.choice(users)
                latency = random.uniform(5, 100)  # ms
                
                granted = random.random() < 0.85  # 85% success rate
                self.record_permission_check(user, resource, action, granted, latency)
            
            # Simulate role changes (less frequent)
            if random.random() < 0.1:  # 10% chance
                user = random.choice(users)
                role = random.choice(roles)
                if random.random() < 0.7:
                    self.record_role_assignment(user, role)
                else:
                    self.record_role_removal(user, role)
            
            # Simulate edge function calls
            if random.random() < 0.3:  # 30% chance
                function_name = random.choice([
                    'app_0be8fb8541_check_permission',
                    'app_0be8fb8541_manage_user_role'
                ])
                success = random.random() < 0.95  # 95% success rate
                latency = random.uniform(50, 500)  # ms
                self.record_edge_function_call(function_name, success, latency)
            
            # Update history at regular intervals
            self.update_history()
            
            # Sleep for the interval
            time.sleep(interval)
        
        print("Simulation completed!")
    
    def record_cache_hit(self):
        """Record a cache hit"""
        self.cache_metrics['hits'] += 1
        self.log_event('CACHE_HIT', {'timestamp': datetime.now().isoformat()})
    
    def record_cache_miss(self):
        """Record a cache miss"""
        self.cache_metrics['misses'] += 1
        self.log_event('CACHE_MISS', {'timestamp': datetime.now().isoformat()})
    
    def record_cache_invalidation(self, user_id):
        """Record a cache invalidation for a user"""
        self.cache_metrics['invalidations'] += 1
        self.log_event('CACHE_INVALIDATION', {
            'user_id': user_id,
            'timestamp': datetime.now().isoformat()
        })
    
    def record_permission_check(self, user_id, resource, action, granted, latency):
        """Record a permission check"""
        self.permission_metrics['total_checks'] += 1
        
        # Track by resource
        if resource not in self.permission_metrics['checks_by_resource']:
            self.permission_metrics['checks_by_resource'][resource] = 0
        self.permission_metrics['checks_by_resource'][resource] += 1
        
        # Track by action
        if action not in self.permission_metrics['checks_by_action']:
            self.permission_metrics['checks_by_action'][action] = 0
        self.permission_metrics['checks_by_action'][action] += 1
        
        # Track result
        if granted:
            self.permission_metrics['granted'] += 1
        else:
            self.permission_metrics['denied'] += 1
            # Track denied permissions by resource
            if resource not in self.security_metrics['permission_denials_by_resource']:
                self.security_metrics['permission_denials_by_resource'][resource] = 0
            self.security_metrics['permission_denials_by_resource'][resource] += 1
            
            # Track as security event
            if resource in ['users', 'roles', 'permissions']:
                self.security_metrics['unauthorized_attempts'] += 1
        
        # Track latency
        self.permission_metrics['latency'].append(latency)
        
        # Log the event
        event_type = 'PERMISSION_GRANTED' if granted else 'PERMISSION_DENIED'
        self.log_event(event_type, {
            'user_id': user_id,
            'resource': resource,
            'action': action,
            'latency': latency,
            'timestamp': datetime.now().isoformat()
        })
    
    def record_role_assignment(self, user_id, role_name):
        """Record a role assignment"""
        self.role_metrics['assignments'] += 1
        
        # Track users per role
        if role_name not in self.role_metrics['users_per_role']:
            self.role_metrics['users_per_role'][role_name] = set()
        self.role_metrics['users_per_role'][role_name].add(user_id)
        
        # Log the event
        self.log_event('ROLE_ASSIGNED', {
            'user_id': user_id,
            'role_name': role_name,
            'timestamp': datetime.now().isoformat()
        })
        
        # Log to audit database (simulated)
        self.security_metrics['audit_logs_count'] += 1
    
    def record_role_removal(self, user_id, role_name):
        """Record a role removal"""
        self.role_metrics['removals'] += 1
        
        # Update users per role
        if role_name in self.role_metrics['users_per_role'] and user_id in self.role_metrics['users_per_role'][role_name]:
            self.role_metrics['users_per_role'][role_name].remove(user_id)
        
        # Log the event
        self.log_event('ROLE_REMOVED', {
            'user_id': user_id,
            'role_name': role_name,
            'timestamp': datetime.now().isoformat()
        })
        
        # Log to audit database (simulated)
        self.security_metrics['audit_logs_count'] += 1
    
    def record_edge_function_call(self, function_name, success, latency):
        """Record an edge function call"""
        self.edge_metrics['calls'] += 1
        
        if not success:
            self.edge_metrics['errors'] += 1
        
        # Track latency
        self.edge_metrics['latency'].append(latency)
        
        # Log the event
        event_type = 'EDGE_FUNCTION_CALL' if success else 'EDGE_FUNCTION_ERROR'
        self.log_event(event_type, {
            'function': function_name,
            'success': success,
            'latency': latency,
            'timestamp': datetime.now().isoformat()
        })
    
    def log_event(self, event_type, details):
        """Log an event to the event log"""
        self.event_log.append({
            'timestamp': datetime.now(),
            'event_type': event_type,
            'details': details
        })
        
        # Keep event log at a reasonable size
        if len(self.event_log) > 1000:
            self.event_log = self.event_log[-1000:]
    
    def update_history(self):
        """Update historical metrics for trending"""
        now = datetime.now()
        
        # Update cache hit rate history
        total_cache_attempts = self.cache_metrics['hits'] + self.cache_metrics['misses']
        hit_rate = self.cache_metrics['hits'] / total_cache_attempts if total_cache_attempts > 0 else 0
        self.cache_metrics['hit_rate_history'].append(hit_rate)
        self.cache_metrics['timestamps'].append(now)
        
        # Update average latency history
        if self.permission_metrics['latency']:
            avg_latency = sum(self.permission_metrics['latency']) / len(self.permission_metrics['latency'])
            self.permission_metrics['avg_latency_history'].append(avg_latency)
        
        # Update edge function latency history
        if self.edge_metrics['latency']:
            avg_edge_latency = sum(self.edge_metrics['latency']) / len(self.edge_metrics['latency'])
            self.edge_metrics['avg_latency_history'].append(avg_edge_latency)
        
        # Keep histories at a reasonable size
        max_history = 1000
        if len(self.cache_metrics['hit_rate_history']) > max_history:
            self.cache_metrics['hit_rate_history'] = self.cache_metrics['hit_rate_history'][-max_history:]
            self.cache_metrics['timestamps'] = self.cache_metrics['timestamps'][-max_history:]
        
        if len(self.permission_metrics['avg_latency_history']) > max_history:
            self.permission_metrics['avg_latency_history'] = self.permission_metrics['avg_latency_history'][-max_history:]
        
        if len(self.edge_metrics['avg_latency_history']) > max_history:
            self.edge_metrics['avg_latency_history'] = self.edge_metrics['avg_latency_history'][-max_history:]
    
    def get_metrics_summary(self):
        """Get a summary of all metrics"""
        # Calculate derived metrics
        total_cache_attempts = self.cache_metrics['hits'] + self.cache_metrics['misses']
        cache_hit_rate = self.cache_metrics['hits'] / total_cache_attempts if total_cache_attempts > 0 else 0
        
        avg_permission_latency = 0
        if self.permission_metrics['latency']:
            avg_permission_latency = sum(self.permission_metrics['latency']) / len(self.permission_metrics['latency'])
        
        avg_edge_latency = 0
        if self.edge_metrics['latency']:
            avg_edge_latency = sum(self.edge_metrics['latency']) / len(self.edge_metrics['latency'])
        
        # Sort resources by check count
        sorted_resources = sorted(
            self.permission_metrics['checks_by_resource'].items(),
            key=lambda x: x[1],
            reverse=True
        )
        top_resources = dict(sorted_resources[:5])
        
        return {
            'cache': {
                'hit_rate': cache_hit_rate,
                'hits': self.cache_metrics['hits'],
                'misses': self.cache_metrics['misses'],
                'invalidations': self.cache_metrics['invalidations']
            },
            'permissions': {
                'total_checks': self.permission_metrics['total_checks'],
                'granted_rate': self.permission_metrics['granted'] / self.permission_metrics['total_checks'] 
                               if self.permission_metrics['total_checks'] > 0 else 0,
                'avg_latency_ms': avg_permission_latency,
                'top_resources': top_resources
            },
            'roles': {
                'assignments': self.role_metrics['assignments'],
                'removals': self.role_metrics['removals'],
                'users_by_role': {role: len(users) for role, users in self.role_metrics['users_per_role'].items()}
            },
            'security': {
                'unauthorized_attempts': self.security_metrics['unauthorized_attempts'],
                'permission_denials': sum(self.security_metrics['permission_denials_by_resource'].values()),
                'audit_logs': self.security_metrics['audit_logs_count']
            },
            'edge_functions': {
                'calls': self.edge_metrics['calls'],
                'error_rate': self.edge_metrics['errors'] / self.edge_metrics['calls'] 
                              if self.edge_metrics['calls'] > 0 else 0,
                'avg_latency_ms': avg_edge_latency
            }
        }

    def generate_text_report(self):
        """Generate a text-based report of the monitoring metrics"""
        metrics = self.get_metrics_summary()
        
        report = []
        report.append("=" * 60)
        report.append("PERMISSION SYSTEM MONITORING REPORT")
        report.append(f"Generated at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        report.append("=" * 60)
        
        # Cache metrics
        report.append("\nCACHE PERFORMANCE")
        report.append("-" * 30)
        report.append(f"Hit Rate: {metrics['cache']['hit_rate']:.2%}")
        report.append(f"Hits: {metrics['cache']['hits']}")
        report.append(f"Misses: {metrics['cache']['misses']}")
        report.append(f"Invalidations: {metrics['cache']['invalidations']}")
        
        # Permission metrics
        report.append("\nPERMISSION CHECKS")
        report.append("-" * 30)
        report.append(f"Total Checks: {metrics['permissions']['total_checks']}")
        report.append(f"Grant Rate: {metrics['permissions']['granted_rate']:.2%}")
        report.append(f"Avg Latency: {metrics['permissions']['avg_latency_ms']:.2f} ms")
        
        report.append("\nTop Resources:")
        for resource, count in metrics['permissions']['top_resources'].items():
            report.append(f"  - {resource}: {count} checks")
        
        # Role metrics
        report.append("\nROLE MANAGEMENT")
        report.append("-" * 30)
        report.append(f"Role Assignments: {metrics['roles']['assignments']}")
        report.append(f"Role Removals: {metrics['roles']['removals']}")
        
        report.append("\nUsers per Role:")
        for role, count in metrics['roles']['users_by_role'].items():
            report.append(f"  - {role}: {count} users")
        
        # Security metrics
        report.append("\nSECURITY METRICS")
        report.append("-" * 30)
        report.append(f"Unauthorized Access Attempts: {metrics['security']['unauthorized_attempts']}")
        report.append(f"Permission Denials: {metrics['security']['permission_denials']}")
        report.append(f"Audit Log Entries: {metrics['security']['audit_logs']}")
        
        # Edge function metrics
        report.append("\nEDGE FUNCTION PERFORMANCE")
        report.append("-" * 30)
        report.append(f"Total Calls: {metrics['edge_functions']['calls']}")
        report.append(f"Error Rate: {metrics['edge_functions']['error_rate']:.2%}")
        report.append(f"Avg Latency: {metrics['edge_functions']['avg_latency_ms']:.2f} ms")
        
        # Recent Events
        report.append("\nRECENT EVENTS")
        report.append("-" * 30)
        
        recent_events = self.event_log[-10:] if len(self.event_log) > 10 else self.event_log
        for event in recent_events:
            timestamp = event['timestamp'].strftime('%H:%M:%S')
            event_type = event['event_type']
            report.append(f"{timestamp} - {event_type}")
        
        report.append("\n" + "=" * 60)
        
        return "\n".join(report)

    def create_dashboard(self):
        """
        Create a simple console-based dashboard
        This will be used when matplotlib is not available
        """
        return self.generate_text_report()

    def try_create_visualization(self):
        """Try to create visualizations with matplotlib, fall back to text if not available"""
        try:
            metrics = self.get_metrics_summary()
            
            # Set up the figure with subplots
            plt.figure(figsize=(15, 10))
            
            # Just create a simple text figure with the metrics
            plt.text(0.1, 0.9, "Permission System Monitoring Dashboard", fontsize=16)
            plt.text(0.1, 0.8, f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", fontsize=12)
            
            # Cache metrics
            plt.text(0.1, 0.7, f"Cache Hit Rate: {metrics['cache']['hit_rate']:.2%}", fontsize=12)
            plt.text(0.1, 0.65, f"Permission Checks: {metrics['permissions']['total_checks']}", fontsize=12)
            plt.text(0.1, 0.6, f"Permission Grant Rate: {metrics['permissions']['granted_rate']:.2%}", fontsize=12)
            
            # Security metrics
            plt.text(0.1, 0.5, f"Unauthorized Access Attempts: {metrics['security']['unauthorized_attempts']}", fontsize=12)
            plt.text(0.1, 0.45, f"Permission Denials: {metrics['security']['permission_denials']}", fontsize=12)
            
            # Role metrics
            plt.text(0.5, 0.7, f"Role Assignments: {metrics['roles']['assignments']}", fontsize=12)
            plt.text(0.5, 0.65, f"Role Removals: {metrics['roles']['removals']}", fontsize=12)
            
            # Edge function metrics
            plt.text(0.5, 0.5, f"Edge Function Calls: {metrics['edge_functions']['calls']}", fontsize=12)
            plt.text(0.5, 0.45, f"Edge Function Error Rate: {metrics['edge_functions']['error_rate']:.2%}", fontsize=12)
            
            # Add notes
            plt.text(0.1, 0.3, "Note: This is a simplified visualization dashboard.", fontsize=10)
            plt.text(0.1, 0.25, "For detailed metrics, please view the text report.", fontsize=10)
            
            plt.axis('off')  # Hide axes
            plt.savefig('permission_monitoring_dashboard_simple.png')
            plt.close()
            
            return "Permission monitoring visualization saved as 'permission_monitoring_dashboard_simple.png'"
        except Exception as e:
            print(f"Visualization failed: {e}. Falling back to text report.")
            return self.generate_text_report()

# Create a monitoring system and simulate some activity
monitoring_system = PermissionMonitoringSystem()
monitoring_system.simulate_permission_activity(duration_seconds=5)

# Generate visualization or text report based on what's available
try:
    visualization_result = monitoring_system.try_create_visualization()
    print("Visualization created successfully.")
except Exception as e:
    print(f"Could not create visualization: {e}")
    print("Falling back to text report...")
    visualization_result = monitoring_system.create_dashboard()

# Display report
print("\n" + visualization_result)

# Display metrics summary
metrics_summary = monitoring_system.get_metrics_summary()
print("\nPermission System Metrics Summary (JSON format):")
print(json.dumps(metrics_summary, indent=2, default=str))

Simulating permission system activity for 5 seconds...
Simulation completed!
Visualization created successfully.

Permission monitoring visualization saved as 'permission_monitoring_dashboard_simple.png'

Permission System Metrics Summary (JSON format):
{
  "cache": {
    "hit_rate": 0.5714285714285714,
    "hits": 8,
    "misses": 6,
    "invalidations": 0
  },
  "permissions": {
    "total_checks": 26,
    "granted_rate": 0.9615384615384616,
    "avg_latency_ms": 46.12750349504604,
    "top_resources": {
      "outlets": 6,
      "permissions": 5,
      "reviews": 5,
      "campaigns": 4,
      "roles": 3
    }
  },
  "roles": {
    "assignments": 0,
    "removals": 0,
    "users_by_role": {}
  },
  "security": {
    "unauthorized_attempts": 1,
    "permission_denials": 1,
    "audit_logs": 0
  },
  "edge_functions": {
    "calls": 1,
    "error_rate": 0.0,
    "avg_latency_ms": 446.6331732063254
  }
}
