# **Policy Framework Analysis**

In [2]:
"""
Policy Framework Analysis
Maps policies to responsible AI frameworks and generates comparative analysis
"""

import pandas as pd
import numpy as np
import json
import re
from typing import List, Dict, Tuple
from collections import defaultdict
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
import logging

class PolicyFrameworkMapper:
    def __init__(self):
        """Initialize the policy framework mapper"""
        self.logger = logging.getLogger(__name__)

        # Core responsible AI principles with associated keywords
        self.ai_principles = {
            'fairness': {
                'keywords': ['fairness', 'bias', 'discrimination', 'equity', 'equal', 'impartial',
                           'équité', 'biais', 'discrimination', 'égal',  # French
                           'عدالة', 'تحيز', 'تمييز', 'منصف'],  # Arabic
                'weight': 1.0
            },
            'transparency': {
                'keywords': ['transparency', 'explainable', 'interpretable', 'understandable',
                           'clear', 'open', 'transparent', 'explicable', 'interprétable',  # French
                           'شفافية', 'واضح', 'مفهوم'],  # Arabic
                'weight': 1.0
            },
            'accountability': {
                'keywords': ['accountability', 'responsible', 'liability', 'oversight',
                           'governance', 'responsabilité', 'gouvernance',  # French
                           'مساءلة', 'مسؤولية', 'حوكمة'],  # Arabic
                'weight': 1.0
            },
            'privacy': {
                'keywords': ['privacy', 'data protection', 'confidentiality', 'personal data',
                           'vie privée', 'protection des données', 'confidentialité',  # French
                           'خصوصية', 'حماية البيانات', 'سرية'],  # Arabic
                'weight': 1.0
            },
            'human_oversight': {
                'keywords': ['human oversight', 'human control', 'human intervention',
                           'human-in-the-loop', 'supervision', 'contrôle humain',  # French
                           'إشراف بشري', 'تحكم بشري', 'تدخل بشري'],  # Arabic
                'weight': 1.0
            },
            'robustness': {
                'keywords': ['robustness', 'reliability', 'safety', 'security', 'resilience',
                           'robustesse', 'fiabilité', 'sécurité',  # French
                           'قوة', 'موثوقية', 'أمان', 'أمن'],  # Arabic
                'weight': 1.0
            },
            'non_maleficence': {
                'keywords': ['harm', 'risk', 'safety', 'protection', 'prevent', 'avoid',
                           'dommage', 'risque', 'prévenir', 'éviter',  # French
                           'ضرر', 'خطر', 'حماية', 'منع', 'تجنب'],  # Arabic
                'weight': 1.0
            }
        }

        # Policy document classification patterns
        self.policy_types = {
            'law': ['act', 'law', 'regulation', 'statute', 'legal framework'],
            'strategy': ['strategy', 'plan', 'roadmap', 'framework', 'initiative'],
            'guideline': ['guideline', 'guidance', 'recommendation', 'best practice'],
            'report': ['report', 'study', 'analysis', 'assessment', 'evaluation']
        }

    def analyze_policy_document(self, document: Dict) -> Dict:
        """Analyze a single policy document for AI principles coverage"""
        content = document.get('content_preview', '') + ' ' + document.get('title', '')

        # Calculate principle scores
        principle_scores = {}
        for principle, config in self.ai_principles.items():
            score = self._calculate_principle_score(content, config['keywords'])
            principle_scores[principle] = score

        # Extract additional metadata
        analysis = {
            'document_id': document.get('url', 'unknown'),
            'title': document.get('title', 'Unknown'),
            'country': document.get('country', 'unknown'),
            'document_type': document.get('document_type', 'unknown'),
            'principle_scores': principle_scores,
            'overall_coverage': np.mean(list(principle_scores.values())),
            'top_principles': self._get_top_principles(principle_scores, top_n=3),
            'coverage_gaps': self._identify_gaps(principle_scores),
            'analysis_date': datetime.now().isoformat()
        }

        return analysis

    def _calculate_principle_score(self, text: str, keywords: List[str]) -> float:
        """Calculate how well a text covers a specific AI principle"""
        if not text:
            return 0.0

        text_lower = text.lower()
        total_words = len(text.split())

        if total_words == 0:
            return 0.0

        # Count keyword occurrences
        keyword_count = sum(text_lower.count(keyword.lower()) for keyword in keywords)

        # Calculate normalized score
        score = min(keyword_count / total_words * 100, 1.0)  # Cap at 1.0

        return round(score, 3)

    def _get_top_principles(self, principle_scores: Dict, top_n: int = 3) -> List[Tuple[str, float]]:
        """Get top N principles by score"""
        sorted_principles = sorted(principle_scores.items(), key=lambda x: x[1], reverse=True)
        return sorted_principles[:top_n]

    def _identify_gaps(self, principle_scores: Dict, threshold: float = 0.1) -> List[str]:
        """Identify principles with low coverage (potential gaps)"""
        gaps = [principle for principle, score in principle_scores.items() if score < threshold]
        return gaps

    def analyze_document_collection(self, documents: List[Dict]) -> Dict:
        """Analyze a collection of policy documents"""
        analyses = []

        for doc in documents:
            analysis = self.analyze_policy_document(doc)
            analyses.append(analysis)

        # Generate collection-level insights
        collection_analysis = {
            'total_documents': len(documents),
            'documents': analyses,
            'country_coverage': self._analyze_country_coverage(analyses),
            'principle_coverage_summary': self._summarize_principle_coverage(analyses),
            'document_type_distribution': self._analyze_document_types(analyses),
            'gap_analysis': self._perform_gap_analysis(analyses),
            'recommendations': self._generate_recommendations(analyses)
        }

        return collection_analysis

    def _analyze_country_coverage(self, analyses: List[Dict]) -> Dict:
        """Analyze principle coverage by country"""
        country_coverage = defaultdict(lambda: defaultdict(list))

        for analysis in analyses:
            country = analysis['country']
            for principle, score in analysis['principle_scores'].items():
                country_coverage[country][principle].append(score)

        # Calculate averages
        country_averages = {}
        for country, principles in country_coverage.items():
            country_averages[country] = {
                principle: np.mean(scores) for principle, scores in principles.items()
            }

        return country_averages

    def _summarize_principle_coverage(self, analyses: List[Dict]) -> Dict:
        """Summarize overall principle coverage across all documents"""
        principle_scores = defaultdict(list)

        for analysis in analyses:
            for principle, score in analysis['principle_scores'].items():
                principle_scores[principle].append(score)

        summary = {}
        for principle, scores in principle_scores.items():
            summary[principle] = {
                'mean': np.mean(scores),
                'std': np.std(scores),
                'min': np.min(scores),
                'max': np.max(scores),
                'documents_with_coverage': sum(1 for score in scores if score > 0.1)
            }

        return summary

    def _analyze_document_types(self, analyses: List[Dict]) -> Dict:
        """Analyze distribution of document types"""
        type_counts = defaultdict(int)
        type_scores = defaultdict(list)

        for analysis in analyses:
            doc_type = analysis['document_type']
            type_counts[doc_type] += 1
            type_scores[doc_type].append(analysis['overall_coverage'])

        type_analysis = {}
        for doc_type, count in type_counts.items():
            type_analysis[doc_type] = {
                'count': count,
                'average_coverage': np.mean(type_scores[doc_type]) if type_scores[doc_type] else 0,
                'percentage': (count / len(analyses)) * 100
            }

        return type_analysis

    def _perform_gap_analysis(self, analyses: List[Dict]) -> Dict:
        """Identify gaps in AI principle coverage"""
        all_gaps = defaultdict(int)
        country_gaps = defaultdict(lambda: defaultdict(int))

        for analysis in analyses:
            country = analysis['country']
            for gap in analysis['coverage_gaps']:
                all_gaps[gap] += 1
                country_gaps[country][gap] += 1

        gap_analysis = {
            'most_common_gaps': dict(sorted(all_gaps.items(), key=lambda x: x[1], reverse=True)),
            'gaps_by_country': dict(country_gaps),
            'total_documents_with_gaps': sum(1 for analysis in analyses if analysis['coverage_gaps'])
        }

        return gap_analysis

    def _generate_recommendations(self, analyses: List[Dict]) -> List[str]:
        """Generate recommendations based on analysis"""
        recommendations = []

        # Analyze principle coverage
        principle_scores = defaultdict(list)
        for analysis in analyses:
            for principle, score in analysis['principle_scores'].items():
                principle_scores[principle].append(score)

        # Identify weak areas
        weak_principles = []
        for principle, scores in principle_scores.items():
            if np.mean(scores) < 0.3:  # Threshold for weak coverage
                weak_principles.append(principle)

        if weak_principles:
            recommendations.append(f"Strengthen coverage of: {', '.join(weak_principles)}")

        # Country-specific recommendations
        country_coverage = self._analyze_country_coverage(analyses)
        for country, principles in country_coverage.items():
            weak_country_principles = [p for p, score in principles.items() if score < 0.2]
            if weak_country_principles:
                recommendations.append(f"{country}: Focus on {', '.join(weak_country_principles)}")

        # Document type recommendations
        type_analysis = self._analyze_document_types(analyses)
        low_coverage_types = [t for t, data in type_analysis.items() if data['average_coverage'] < 0.3]
        if low_coverage_types:
            recommendations.append(f"Improve AI principle integration in: {', '.join(low_coverage_types)}")

        return recommendations

    def generate_coverage_matrix(self, analyses: List[Dict]) -> pd.DataFrame:
        """Generate a coverage matrix showing principles vs countries/documents"""
        countries = list(set(analysis['country'] for analysis in analyses))
        principles = list(self.ai_principles.keys())

        # Create matrix
        matrix_data = []
        for country in countries:
            country_docs = [a for a in analyses if a['country'] == country]
            country_row = {'country': country}

            for principle in principles:
                scores = [doc['principle_scores'][principle] for doc in country_docs]
                country_row[principle] = np.mean(scores) if scores else 0

            matrix_data.append(country_row)

        df = pd.DataFrame(matrix_data)
        return df.set_index('country')

    def visualize_coverage_matrix(self, coverage_matrix: pd.DataFrame, save_path: str = None):
        """Create a heatmap visualization of the coverage matrix"""
        plt.figure(figsize=(12, 8))

        # Create heatmap
        sns.heatmap(coverage_matrix,
                   annot=True,
                   cmap='RdYlGn',
                   vmin=0,
                   vmax=1,
                   fmt='.3f',
                   cbar_kws={'label': 'Coverage Score'})

        plt.title('AI Principle Coverage by Country')
        plt.xlabel('AI Principles')
        plt.ylabel('Countries')
        plt.xticks(rotation=45, ha='right')
        plt.tight_layout()

        if save_path:
            plt.savefig(save_path, dpi=300, bbox_inches='tight')

        plt.show()

    def export_analysis(self, analysis: Dict, output_path: str):
        """Export analysis results to JSON file"""
        with open(output_path, 'w', encoding='utf-8') as f:
            json.dump(analysis, f, indent=2, ensure_ascii=False)

        self.logger.info(f"Analysis exported to {output_path}")

    def generate_summary_report(self, analysis: Dict) -> str:
        """Generate a human-readable summary report"""
        report = []
        report.append("AI POLICY FRAMEWORK ANALYSIS REPORT")
        report.append("=" * 50)
        report.append(f"Analysis Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        report.append(f"Total Documents Analyzed: {analysis['total_documents']}")
        report.append("")

        # Country coverage summary
        report.append("COUNTRY COVERAGE SUMMARY")
        report.append("-" * 30)
        for country, principles in analysis['country_coverage'].items():
            avg_coverage = np.mean(list(principles.values()))
            report.append(f"{country.upper()}: {avg_coverage:.3f} average coverage")
        report.append("")

        # Top principles globally
        report.append("GLOBAL PRINCIPLE COVERAGE")
        report.append("-" * 30)
        principle_summary = analysis['principle_coverage_summary']
        for principle, stats in sorted(principle_summary.items(),
                                     key=lambda x: x[1]['mean'], reverse=True):
            report.append(f"{principle.replace('_', ' ').title()}: {stats['mean']:.3f} "
                         f"(in {stats['documents_with_coverage']} documents)")
        report.append("")

        # Gap analysis
        report.append("GAP ANALYSIS")
        report.append("-" * 15)
        gaps = analysis['gap_analysis']['most_common_gaps']
        if gaps:
            report.append("Most common gaps:")
            for gap, count in gaps.items():
                report.append(f"- {gap.replace('_', ' ').title()}: {count} documents")
        else:
            report.append("No significant gaps identified.")
        report.append("")

        # Recommendations
        report.append("RECOMMENDATIONS")
        report.append("-" * 15)
        for i, rec in enumerate(analysis['recommendations'], 1):
            report.append(f"{i}. {rec}")

        return "\n".join(report)

