# Compliance Analyzer System Documentation

## Overview
This system performs automated compliance analysis by comparing the internal security policy documents against regulatory requirements using natural language processing and machine learning techniques.


### 1. DocumentProcessor
Extracts text content from various document formats for analysis.

**Features**:
- Supports multiple file formats (.pdf, .docx, .txt, .html)
- Error handling for corrupted or unreadable files
- Metadata extraction including word count and file paths



In [None]:
import os
import pandas as pd
from pathlib import Path
import PyPDF2
import docx
from bs4 import BeautifulSoup
from collections import defaultdict
import spacy
import re
from sentence_transformers import SentenceTransformer
import numpy as np
import faiss
import pickle
from typing import List, Dict, Optional
import torch
from sklearn.metrics.pairwise import cosine_similarity
import json
from datetime import datetime

In [None]:
class DocumentProcessor:
    def __init__(self):
        self.supported_formats = ['.pdf', '.docx', '.txt', '.html']

    def extract_text_from_pdf(self, file_path: Path) -> str:
        """Extract text from PDF files"""
        text = ""
        try:
            with open(file_path, 'rb') as file:
                pdf_reader = PyPDF2.PdfReader(file)
                for page in pdf_reader.pages:
                    text += page.extract_text()
        except Exception as e:
            print(f"Error extracting text from PDF {file_path}: {e}")
        return text

    def extract_text_from_docx(self, file_path: Path) -> str:
        """Extract text from DOCX files"""
        text = ""
        try:
            doc = docx.Document(file_path)
            for paragraph in doc.paragraphs:
                text += paragraph.text + "\n"
        except Exception as e:
            print(f"Error extracting text from DOCX {file_path}: {e}")
        return text

    def process_documents(self, folder_path: str, doc_type: str = "policy") -> List[Dict]:
        """Process all documents in a folder"""
        documents = []
        for file_path in Path(folder_path).rglob("*"):
            if file_path.suffix.lower() in self.supported_formats:
                try:
                    text = ""
                    if file_path.suffix.lower() == '.pdf':
                        text = self.extract_text_from_pdf(file_path)
                    elif file_path.suffix.lower() == '.docx':
                        text = self.extract_text_from_docx(file_path)
                    elif file_path.suffix.lower() == '.html':
                        with open(file_path, 'r', encoding='utf-8') as f:
                            soup = BeautifulSoup(f.read(), 'html.parser')
                            text = soup.get_text()
                    else: # .txt or unsupported with fallback to read_text
                        text = file_path.read_text(encoding='utf-8')

                    if text.strip(): # Only add documents with extracted text
                        documents.append({
                            'filename': file_path.name,
                            'path': str(file_path),
                            'text': text,
                            'type': doc_type,
                            'word_count': len(text.split())
                        })
                    else:
                        print(f"Warning: No text extracted from {file_path.name}.")
                except Exception as e:
                    print(f"Error processing {file_path}: {e}")

        return documents

### 2. TextProcessor
 Processes and analyzes extracted text to prepare it for similarity comparison.

**Features**:
- Uses spaCy for advanced natural language processing
- Implements overlapping chunking strategy to maintain context
- Pattern-based requirement extraction using regex for compliance language


In [None]:
class TextProcessor:
    def __init__(self):
        # Ensure 'en_core_web_sm' is downloaded: python -m spacy download en_core_web_sm
        try:
            self.nlp = spacy.load("en_core_web_sm")
        except OSError:
            print("Downloading spacy model 'en_core_web_sm'...")
            spacy.cli.download("en_core_web_sm")
            self.nlp = spacy.load("en_core_web_sm")
        self.sentence_model = SentenceTransformer('all-MiniLM-L6-v2') # This model is not used if EmbeddingGenerator is initialized with a different one.

    def clean_text(self, text: str) -> str:
        """Clean and normalize text"""
        # Remove extra whitespace and special characters
        text = re.sub(r'\s+', ' ', text)
        # Keep basic punctuation (periods, commas, semicolons, colons, exclamation marks, question marks)
        text = re.sub(r'[^\w\s\.\,\;\:\!\?]', '', text)
        return text.strip()

    def chunk_document(self, text: str, chunk_size: int = 500, overlap: int = 50) -> List[str]:
        """Split document into overlapping chunks"""
        words = text.split()
        chunks = []

        if not words:
            return []

        for i in range(0, len(words), chunk_size - overlap):
            chunk = ' '.join(words[i:i + chunk_size])
            # Minimum chunk size to ensure meaningful content
            if len(chunk.split()) > 50:
                chunks.append(chunk)

        return chunks

    def extract_requirements(self, text: str) -> List[str]:
        """Extract requirement-like sentences using NLP"""
        doc = self.nlp(text)
        requirements = []

        # Look for sentences with modal verbs (shall, must, should) or compliance-related terms
        requirement_patterns = [
            r'\b(shall|must|should|required|mandatory|obligated|need to)\b',
            r'\b(compliance|conform|adhere|abide by|in accordance with)\b',
            r'\b(prohibited|forbidden|not permitted|unlawful)\b',
            r'\b(ensure that|it is essential that)\b'
        ]

        for sent in doc.sents:
            sent_text = sent.text.strip()
            # Ensure the sentence is not too short to be meaningful
            if len(sent_text.split()) > 5 and any(re.search(pattern, sent_text, re.IGNORECASE)
                                                   for pattern in requirement_patterns):
                requirements.append(sent_text)

        return requirements


### 3. EmbeddingGenerator
 Converts text into numerical vector representations for semantic similarity analysis.

**Features**:
- Uses advanced sentence transformer models (all-mpnet-base-v2)
- GPU acceleration when available
- Batch processing for efficiency
- Hierarchical indexing with FAISS for scalable similarity search


In [None]:
class EmbeddingGenerator:
    def __init__(self, model_name: str = 'all-mpnet-base-v2'):
        """
        Initialize with a more powerful sentence transformer model
        Options: 'all-mpnet-base-v2', 'all-distilroberta-v1', 'multi-qa-mpnet-base-dot-v1'
        """
        self.model = SentenceTransformer(model_name)
        self.dimension = self.model.get_sentence_embedding_dimension()
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        self.model.to(self.device)

    def generate_contextual_embeddings(self, texts: List[str],
                                       batch_size: int = 32) -> np.ndarray:
        """Generate embeddings with better batching and normalization"""
        if not texts:
            return np.array([])

        # Filter empty texts
        valid_texts = [text for text in texts if text and text.strip()]
        if not valid_texts:
            return np.array([])

        # Generate embeddings in batches
        embeddings = self.model.encode(
            valid_texts,
            batch_size=batch_size,
            convert_to_tensor=False,
            normalize_embeddings=True,  # L2 normalization for better cosine similarity
            show_progress_bar=len(valid_texts) > 100
        )

        return embeddings.astype(np.float32)

    def create_hierarchical_index(self, embeddings: np.ndarray,
                                  text_metadata: List[Dict]) -> Dict:
        """Create hierarchical FAISS index with metadata"""
        if embeddings.size == 0:
            return {'index': None, 'embeddings': np.array([]), 'metadata': [], 'clusters': [], 'dimension': self.dimension}

        # Create main index
        index = faiss.IndexFlatIP(self.dimension)
        index.add(embeddings)

        # Create clustering for better organization (requires sklearn)
        try:
            clusters = self._create_semantic_clusters(embeddings, text_metadata)
        except ImportError:
            print("Scikit-learn not found. Skipping semantic clustering.")
            clusters = []

        return {
            'index': index,
            'embeddings': embeddings,
            'metadata': text_metadata,
            'clusters': clusters,
            'dimension': self.dimension
        }

    def _create_semantic_clusters(self, embeddings: np.ndarray,
                                  metadata: List[Dict], n_clusters: int = None) -> List[Dict]:
        """Create semantic clusters for better organization"""
        try:
            from sklearn.cluster import KMeans
        except ImportError:
            raise ImportError("scikit-learn is not installed. Please install it to use clustering.")

        if len(embeddings) < 2:  # Need at least 2 items to cluster meaningfully
            return [{'cluster_id': 0, 'items': list(range(len(embeddings))), 'centroid': None, 'size': len(embeddings)}]

        if n_clusters is None:
            # Adaptive clustering: min 2, max 20, or 10% of items, whichever is appropriate
            n_clusters = min(max(2, len(embeddings) // 10), 20)
            if n_clusters > len(embeddings): # Prevent n_clusters from being greater than number of samples
                n_clusters = len(embeddings)

        # Handle cases where n_clusters might still be too large for the number of samples
        if n_clusters < 1:
            return [{'cluster_id': 0, 'items': list(range(len(embeddings))), 'centroid': None, 'size': len(embeddings)}]

        # Ensure n_init is appropriate for the number of samples
        n_init_val = min(10, max(1, n_clusters))

        kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=n_init_val)
        cluster_labels = kmeans.fit_predict(embeddings)

        clusters = []
        for i in range(n_clusters):
            cluster_items = [idx for idx, label in enumerate(cluster_labels) if label == i]
            if cluster_items:  # Only add non-empty clusters
                clusters.append({
                    'cluster_id': i,
                    'items': cluster_items,
                    'centroid': kmeans.cluster_centers_[i].tolist(), # Convert to list for JSON serialization
                    'size': len(cluster_items)
                })
        return clusters

### 4. SimilarityAnalyzer
 Performs comprehensive similarity analysis between policy and regulatory content.


**Analysis Features**:
- **Multi-threshold Analysis**: Uses different similarity thresholds (exact match: 0.85, strong coverage: 0.70, partial coverage: 0.50, weak coverage: 0.30)
- **Coverage Analysis**: Calculates what percentage of regulatory requirements are covered by policies
- **Gap Identification**: Categorizes gaps as critical, significant, or minor based on similarity scores
- **Compliance Scoring**: Generates overall compliance scores with letter grades (A-F)
- **Semantic Clustering**: Groups related requirements and policies for better organization


In [None]:
class SimilarityAnalyzer:
    def __init__(self, embedding_generator: EmbeddingGenerator):
        self.embedding_gen = embedding_generator
        # Multiple thresholds for different analysis levels
        self.thresholds = {
            'exact_match': 0.85,       # Very high similarity
            'strong_coverage': 0.70,   # Good coverage
            'partial_coverage': 0.50,  # Some coverage
            'weak_coverage': 0.30      # Minimal coverage
        }

    def comprehensive_similarity_analysis(self,
                                          policy_data: Dict,
                                          regulatory_data: Dict) -> Dict:
        """Perform comprehensive similarity analysis"""

        # Extract embeddings and metadata
        policy_embeddings = policy_data.get('embeddings', np.array([]))
        regulatory_embeddings = regulatory_data.get('embeddings', np.array([]))
        policy_metadata = policy_data.get('metadata', [])
        regulatory_metadata = regulatory_data.get('metadata', [])

        if regulatory_embeddings.size == 0 or policy_embeddings.size == 0 or \
           len(regulatory_metadata) == 0 or len(policy_metadata) == 0:
            print("Warning: Missing or empty embeddings/metadata for similarity analysis.")
            return {
                'similarity_matrix': np.array([[]]),
                'detailed_matches': [],
                'coverage_analysis': {},
                'gap_analysis': {
                    'critical_gaps': [], 'significant_gaps': [], 'minor_gaps': [],
                    'gap_categories': defaultdict(list), 'recommendations': []
                },
                'semantic_clusters': {},
                'compliance_score': {'overall_score': 0.0, 'grade': 'F', 'metrics': {}}
            }

        # Calculate similarity matrices
        similarity_matrix = cosine_similarity(regulatory_embeddings, policy_embeddings)

        # Perform multi-level analysis
        results = {
            'similarity_matrix': similarity_matrix,
            'detailed_matches': self._find_detailed_matches(
                regulatory_metadata, policy_metadata, similarity_matrix
            ),
            'coverage_analysis': self._analyze_coverage(
                regulatory_metadata, policy_metadata, similarity_matrix
            ),
            'gap_analysis': self._perform_gap_analysis(
                regulatory_metadata, policy_metadata, similarity_matrix
            ),
            'semantic_clusters': self._analyze_semantic_clusters(
                regulatory_data.get('clusters', []),
                policy_data.get('clusters', []),
                similarity_matrix
            ),
            'compliance_score': self._calculate_overall_compliance_score(similarity_matrix)
        }

        return results

    def _find_detailed_matches(self, regulatory_metadata: List[Dict],
                               policy_metadata: List[Dict],
                               similarity_matrix: np.ndarray) -> List[Dict]:
        """Find detailed matches with context and confidence"""
        matches = []

        if len(regulatory_metadata) == 0 or similarity_matrix.size == 0:
            return matches

        for reg_idx, reg_item in enumerate(regulatory_metadata):
            reg_scores = similarity_matrix[reg_idx]

            # Find top matches above minimum threshold
            if len(policy_metadata) == 0:
                best_score = 0.0
                valid_matches = []
            else:
                valid_matches = [(idx, score) for idx, score in enumerate(reg_scores)
                                 if score >= self.thresholds['weak_coverage']]
                valid_matches.sort(key=lambda x: x[1], reverse=True)
                best_score = valid_matches[0][1] if valid_matches else 0.0

            match_details = {
                'regulatory_item': reg_item, # Contains 'text', 'source', 'type'
                'regulatory_index': reg_idx,
                'regulatory_source_doc': reg_item.get('source', 'N/A'), # Added source for clarity
                'matches': [],
                'best_score': float(best_score), # Ensure float for JSON serialization
                'coverage_level': self._determine_coverage_level(best_score)
            }

            # Add top 5 matches (or fewer if not enough valid_matches)
            for pol_idx, score in valid_matches[:5]:
                policy_item = policy_metadata[pol_idx]

                match_details['matches'].append({
                    'policy_item': policy_item, # Contains 'text', 'source', 'type'
                    'policy_index': pol_idx,
                    'policy_source_doc': policy_item.get('source', 'N/A'), # Added source for clarity
                    'similarity_score': float(score),
                    'coverage_level': self._determine_coverage_level(score),
                    'semantic_overlap': self._calculate_semantic_overlap(
                        reg_item, policy_item # These should contain the actual text/metadata for term extraction if implemented
                    )
                })

            matches.append(match_details)

        return matches

    def _analyze_coverage(self, regulatory_metadata: List[Dict],
                          policy_metadata: List[Dict],
                          similarity_matrix: np.ndarray) -> Dict:
        """Analyze overall coverage statistics"""
        if similarity_matrix.size == 0 or len(regulatory_metadata) == 0:
            return {'total_requirements': 0, 'total_policies': 0, 'coverage_stats': {}, 'average_best_match_score': 0.0, 'coverage_distribution': {}}

        max_scores = np.max(similarity_matrix, axis=1)

        coverage_stats = {}
        for level, threshold in self.thresholds.items():
            covered_count = np.sum(max_scores >= threshold)
            coverage_stats[level] = {
                'count': int(covered_count),
                'percentage': float(covered_count / len(regulatory_metadata) * 100) if regulatory_metadata else 0.0
            }

        return {
            'total_requirements': len(regulatory_metadata),
            'total_policies': len(policy_metadata),
            'coverage_stats': coverage_stats,
            'average_best_match_score': float(np.mean(max_scores)) if len(max_scores) > 0 else 0.0,
            'coverage_distribution': self._analyze_coverage_distribution(max_scores)
        }

    def _perform_gap_analysis(self, regulatory_metadata: List[Dict],
                              policy_metadata: List[Dict],
                              similarity_matrix: np.ndarray) -> Dict:
        """Perform detailed gap analysis"""
        gaps = {
            'critical_gaps': [],
            'significant_gaps': [],
            'minor_gaps': [],
            'gap_categories': defaultdict(list),
            'recommendations': []
        }

        if similarity_matrix.size == 0 or len(regulatory_metadata) == 0:
            return gaps

        for reg_idx, reg_item in enumerate(regulatory_metadata):
            best_score = np.max(similarity_matrix[reg_idx]) if similarity_matrix.shape[1] > 0 else 0.0

            gap_info = {
                'regulatory_requirement': reg_item, # Contains 'text', 'source', 'type'
                'regulatory_source_doc': reg_item.get('source', 'N/A'), # Added source for clarity
                'best_match_score': float(best_score),
                'gap_severity': self._calculate_gap_severity(best_score),
                'requirement_type': reg_item.get('type', 'unknown'),
                'key_terms': reg_item.get('key_terms', []), # Assuming key_terms can be extracted or added
                'recommendations': self._generate_targeted_recommendations(reg_item, best_score)
            }

            # Categorize gaps
            if best_score < self.thresholds['weak_coverage']:
                gaps['critical_gaps'].append(gap_info)
            elif best_score < self.thresholds['partial_coverage']:
                gaps['significant_gaps'].append(gap_info)
            elif best_score < self.thresholds['strong_coverage']:
                gaps['minor_gaps'].append(gap_info)

            # Categorize by requirement type (if available in metadata)
            req_type = reg_item.get('type', 'unknown')
            if best_score < self.thresholds['partial_coverage']: # Consider gaps below 'partial coverage' for category analysis
                gaps['gap_categories'][req_type].append(gap_info)

        # Generate high-level recommendations
        gaps['recommendations'] = self._generate_strategic_recommendations(gaps)

        return gaps

    def _determine_coverage_level(self, score: float) -> str:
        """Determine coverage level based on similarity score"""
        if score >= self.thresholds['exact_match']:
            return 'excellent'
        elif score >= self.thresholds['strong_coverage']:
            return 'good'
        elif score >= self.thresholds['partial_coverage']:
            return 'partial'
        elif score >= self.thresholds['weak_coverage']:
            return 'weak'
        else:
            return 'none'

    def _calculate_semantic_overlap(self, reg_item: Dict, policy_item: Dict) -> Dict:
        """Calculate semantic overlap between items
        NOTE: This currently uses placeholder for key term extraction.
        To make this fully functional, key_terms need to be extracted and added
        to the metadata of each chunk/requirement in TextProcessor.
        """
        # Placeholder - needs actual key term extraction from TextProcessor or here
        reg_terms = set(reg_item.get('key_terms', []))
        pol_terms = set(policy_item.get('key_terms', []))

        if not reg_terms and not pol_terms:
            return {'overlap_score': 0.0, 'common_terms': [], 'missing_terms': []}

        common_terms = list(reg_terms.intersection(pol_terms))
        overlap_score = len(common_terms) / len(reg_terms) if reg_terms else 0.0
        missing_terms = list(reg_terms - pol_terms) # Terms in regulation not found in policy

        return {
            'overlap_score': float(overlap_score), # Ensure float for JSON
            'common_terms': common_terms,
            'missing_terms': missing_terms
        }

    def _calculate_gap_severity(self, score: float) -> str:
        """Calculate gap severity with more nuanced levels"""
        if score < 0.20:
            return "Critical"
        elif score < 0.35:
            return "High"
        elif score < 0.50:
            return "Medium"
        elif score < 0.70:
            return "Low"
        else:
            return "Minimal"

    def _generate_targeted_recommendations(self, reg_item: Dict, score: float) -> List[str]:
        """Generate targeted recommendations based on requirement analysis"""
        recommendations = []
        # Access the 'text' key safely and truncate for display
        reg_item_text_excerpt = reg_item.get('text', 'N/A')[:50] + '...'

        key_terms = reg_item.get('key_terms', []) # This would need to be extracted

        if score < self.thresholds['weak_coverage']:
            recommendations.append(f"Create new policy section addressing the regulatory requirement: '{reg_item_text_excerpt}' (Source: {reg_item.get('source', 'N/A')}).")
            if key_terms:
                recommendations.append(f"Focus on key areas: {', '.join(key_terms[:5])}")
        elif score < self.thresholds['partial_coverage']:
            recommendations.append(f"Enhance existing policy to better address the regulatory requirement: '{reg_item_text_excerpt}' (Source: {reg_item.get('source', 'N/A')}).")
            recommendations.append("Review policy language for alignment with regulatory terminology.")
        elif score < self.thresholds['strong_coverage']:
            recommendations.append(f"Minor policy adjustments needed for full compliance with this requirement: '{reg_item_text_excerpt}' (Source: {reg_item.get('source', 'N/A')}).")

        recommendations.append("Engage legal/compliance team for detailed review.")
        return recommendations

    def _analyze_coverage_distribution(self, scores: np.ndarray) -> Dict:
        """Analyze the distribution of coverage scores"""
        if len(scores) == 0:
            return {}

        return {
            'min_score': float(np.min(scores)),
            'max_score': float(np.max(scores)),
            'median_score': float(np.median(scores)),
            'std_dev': float(np.std(scores)),
            'quartiles': {
                'q1': float(np.percentile(scores, 25)),
                'q2': float(np.percentile(scores, 50)),
                'q3': float(np.percentile(scores, 75))
            }
        }

    def _analyze_semantic_clusters(self, reg_clusters: List[Dict],
                                   pol_clusters: List[Dict],
                                   similarity_matrix: np.ndarray) -> Dict:
        """Analyze semantic clusters for better insights"""
        cluster_analysis = {
            'regulatory_clusters': len(reg_clusters),
            'policy_clusters': len(pol_clusters),
            'cluster_coverage': []
        }

        if similarity_matrix.size == 0 or similarity_matrix.shape[1] == 0:
            return cluster_analysis

        for reg_cluster in reg_clusters:
            cluster_items = reg_cluster['items']
            if not cluster_items:
                continue

            # Calculate coverage for this cluster
            # Ensure indices are within bounds of similarity_matrix
            valid_indices = [idx for idx in cluster_items if idx < similarity_matrix.shape[0]]
            if not valid_indices:
                continue

            cluster_scores = similarity_matrix[valid_indices]
            # Max along axis=1 means for each regulatory item, find its best match in policy
            avg_coverage = np.mean(np.max(cluster_scores, axis=1)) if cluster_scores.size > 0 else 0.0

            cluster_analysis['cluster_coverage'].append({
                'cluster_id': reg_cluster['cluster_id'],
                'size': reg_cluster['size'],
                'average_coverage': float(avg_coverage),
                'coverage_level': self._determine_coverage_level(avg_coverage)
            })

        return cluster_analysis

    def _calculate_overall_compliance_score(self, similarity_matrix: np.ndarray) -> Dict:
        """Calculate overall compliance score with multiple metrics"""
        if similarity_matrix.size == 0 or similarity_matrix.shape[0] == 0:
            return {'overall_score': 0.0, 'grade': 'F', 'metrics': {}}

        max_scores = np.max(similarity_matrix, axis=1)

        # Weighted scoring system
        weights = {
            'excellent': 1.0,
            'good': 0.8,
            'partial': 0.5,
            'weak': 0.2,
            'none': 0.0
        }

        weighted_score = 0.0
        for score in max_scores:
            level = self._determine_coverage_level(score)
            weighted_score += weights[level]

        overall_score = weighted_score / len(max_scores) if len(max_scores) > 0 else 0.0

        # Assign letter grade
        if overall_score >= 0.9:
            grade = 'A'
        elif overall_score >= 0.8:
            grade = 'B'
        elif overall_score >= 0.7:
            grade = 'C'
        elif overall_score >= 0.6:
            grade = 'D'
        else:
            grade = 'F'

        return {
            'overall_score': float(overall_score), # Ensure float
            'grade': grade,
            'metrics': {
                'average_similarity': float(np.mean(max_scores)),
                'median_similarity': float(np.median(max_scores)),
                'requirements_above_70': int(np.sum(max_scores >= 0.7)),
                'requirements_below_30': int(np.sum(max_scores < 0.3))
            }
        }

    def _generate_strategic_recommendations(self, gaps: Dict) -> List[str]:
        """Generate strategic recommendations based on gap analysis"""
        recommendations = []

        critical_count = len(gaps['critical_gaps'])
        significant_count = len(gaps['significant_gaps'])
        minor_count = len(gaps['minor_gaps'])

        if critical_count > 0:
            recommendations.append(f"PRIORITY: Address {critical_count} critical compliance gaps immediately to mitigate high legal and operational risks.")

        if significant_count > 0:
            recommendations.append(f"Address {significant_count} significant gaps within the next review cycle to improve compliance posture.")

        # Category-specific recommendations based on 'gap_categories'
        gap_categories = gaps['gap_categories']
        # Note: 'obligations' and 'prohibitions' types would need explicit extraction in TextProcessor
        # For now, we'll check for their existence.
        if 'obligations' in gap_categories and len(gap_categories['obligations']) > 0:
            recommendations.append(f"Focus on mandatory obligation compliance for {len(gap_categories['obligations'])} items – these carry the highest legal risk.")

        if 'prohibitions' in gap_categories and len(gap_categories['prohibitions']) > 0:
            recommendations.append(f"Review prohibition compliance for {len(gap_categories['prohibitions'])} items – ensure clear policy restrictions are in place.")

        if minor_count > 0:
             recommendations.append(f"Monitor {minor_count} minor compliance gaps proactively; while less severe, they can accumulate.")


        # General recommendations if no specific high-priority gaps were found, or in addition to
        if not recommendations or (critical_count + significant_count) == 0:
             recommendations.append("Implement continuous monitoring system for regulatory changes and regular policy reviews.")
             recommendations.append("Consider cross-functional workshops involving legal, compliance, and operational teams to align policies with regulatory intent.")


        # Final fallback recommendation if no specific recommendations generated (e.g., if all documents are perfectly compliant or empty)
        if not recommendations:
            recommendations.append("Compliance appears to be strong based on current analysis. Maintain regular reviews and proactive monitoring for future changes.")

        return recommendations



### 5. ComplianceAnalyzer
 Main orchestrator that coordinates the entire compliance analysis pipeline.



In [None]:
class ComplianceAnalyzer:
    def __init__(self):
        self.doc_processor = DocumentProcessor()
        self.text_processor = TextProcessor()
        self.embedding_gen = EmbeddingGenerator()
        self.similarity_analyzer = SimilarityAnalyzer(self.embedding_gen)

    def analyze_compliance(self, policy_folder: str, regulatory_folder: str, output_path: str):
        """Main compliance analysis pipeline"""
        print("Step 1: Processing documents...")

        # Process policy documents
        policy_docs = self.doc_processor.process_documents(
            policy_folder, doc_type="policy"
        )

        # Process regulatory documents
        regulatory_docs = self.doc_processor.process_documents(
            regulatory_folder, doc_type="regulatory"
        )

        print(f"Found {len(policy_docs)} policy documents and {len(regulatory_docs)} regulatory documents")

        print("Step 2: Extracting requirements...")

        # Extract and chunk policy content
        policy_chunks = []
        for doc in policy_docs:
            chunks = self.text_processor.chunk_document(doc['text'])
            for chunk in chunks:
                policy_chunks.append({
                    'text': chunk,
                    'source': doc['filename'], # Source file of the policy
                    'type': 'policy'
                })

        # Extract regulatory requirements
        regulatory_requirements = []
        for doc in regulatory_docs:
            requirements = self.text_processor.extract_requirements(doc['text'])
            for req in requirements:
                # Add 'source' to regulatory_requirements for detailed match reporting
                regulatory_requirements.append({
                    'text': req,
                    'source': doc['filename'], # Source file of the regulation
                    'type': 'regulatory'
                })

        print(f"Extracted {len(policy_chunks)} policy chunks and {len(regulatory_requirements)} regulatory requirements")

        print("Step 3: Generating embeddings...")

        # Generate embeddings
        policy_texts = [chunk['text'] for chunk in policy_chunks]
        regulatory_texts = [req['text'] for req in regulatory_requirements]

        # Handle cases where there are no texts to embed
        policy_embeddings = self.embedding_gen.generate_contextual_embeddings(policy_texts)
        regulatory_embeddings = self.embedding_gen.generate_contextual_embeddings(regulatory_texts)

        print("Step 4: Analyzing similarities...")

        # Prepare data for SimilarityAnalyzer
        policy_data = {
            'embeddings': policy_embeddings,
            'metadata': policy_chunks,
            'clusters': self.embedding_gen.create_hierarchical_index(policy_embeddings, policy_chunks).get('clusters', [])
        }
        regulatory_data = {
            'embeddings': regulatory_embeddings,
            'metadata': regulatory_requirements,
            'clusters': self.embedding_gen.create_hierarchical_index(regulatory_embeddings, regulatory_requirements).get('clusters', [])
        }

        # Let SimilarityAnalyzer handle similarity and analysis
        results = self.similarity_analyzer.comprehensive_similarity_analysis(policy_data, regulatory_data)

        # Flatten gaps for reporting (as _perform_gap_analysis already categorizes them)
        gaps = []
        for gap_type in ['critical_gaps', 'significant_gaps', 'minor_gaps']:
            gaps.extend(results['gap_analysis'].get(gap_type, []))

        print("Step 5: Generating report...")

        report = self.generate_report(results, policy_docs, regulatory_docs) # Pass full results for more data
        self.save_results(report, output_path) # Simplified save, report contains all info

        return report, results

    def generate_report(self, analysis_results: Dict, policy_docs: List[Dict], regulatory_docs: List[Dict]) -> Dict:
        """Generate comprehensive compliance report"""
        matches = analysis_results.get('detailed_matches', [])
        gaps = []
        for gap_type in ['critical_gaps', 'significant_gaps', 'minor_gaps']:
            gaps.extend(analysis_results['gap_analysis'].get(gap_type, []))

        total_requirements = len(analysis_results['regulatory_data']['metadata']) if 'regulatory_data' in analysis_results else len(matches) # Use original count

        # Calculate covered requirements based on 'good' or 'excellent' coverage
        covered_requirements = sum(
            1 for match in matches
            if match.get('best_score', 0.0) >= self.similarity_analyzer.thresholds['strong_coverage'] # Using best_score
        )
        coverage_percentage = (covered_requirements / total_requirements) * 100 if total_requirements else 0.0

        report = {
            'analysis_date': datetime.now().isoformat(),
            'summary': {
                'total_policy_documents': len(policy_docs),
                'total_regulatory_documents': len(regulatory_docs),
                'total_requirements_analyzed': total_requirements,
                'requirements_covered_by_good_or_excellent_match': covered_requirements,
                'coverage_percentage': float(coverage_percentage), # Ensure float
                'total_gaps_identified': len(gaps),
                'overall_compliance_score': analysis_results['compliance_score'].get('overall_score', 0.0),
                'compliance_grade': analysis_results['compliance_score'].get('grade', 'F')
            },
            'gap_severity_breakdown': self.get_gap_severity_breakdown(gaps),
            'high_level_recommendations': analysis_results['gap_analysis'].get('recommendations', []), # Use recommendations from analyzer
            'detailed_analysis': {
                'matches': matches, # Already includes regulatory_item and details
                'gaps': gaps
            }
        }
        return report

    def get_gap_severity_breakdown(self, gaps: List[Dict]) -> Dict:
        """Return a breakdown of gap severities."""
        from collections import Counter
        severities = [gap.get('gap_severity', 'Unknown') for gap in gaps]
        return dict(Counter(severities))

    def get_high_level_recommendations(self, gaps: List[Dict]) -> List[str]:
        """Deprecated: High-level recommendations are now generated by SimilarityAnalyzer._generate_strategic_recommendations."""
        # This method is now effectively replaced by the _generate_strategic_recommendations in SimilarityAnalyzer.
        # Keeping it for compatibility if something else in the code still calls it, but its logic should be moved.
        # For simplicity in this fix, I'll return an empty list and ensure the report uses the analyzer's recommendations.
        return []

    def save_results(self, report: Dict, output_path: str):
        """Save analysis results to files"""
        # Ensure output directory exists
        os.makedirs(output_path, exist_ok=True)

        # Save JSON report
        report_filename = os.path.join(output_path, "compliance_report.json")
        with open(report_filename, 'w') as f:
            json.dump(report, f, indent=2)
        print(f"JSON report saved to {report_filename}")

        # Save Excel summary
        excel_filename = os.path.join(output_path, "compliance_analysis.xlsx")
        self.create_excel_report(report, excel_filename)
        print(f"Excel report saved to {excel_filename}")

    def create_excel_report(self, report: Dict, filename: str):
        """Create Excel report with multiple sheets"""
        with pd.ExcelWriter(filename, engine='xlsxwriter') as writer:
            # Summary sheet
            summary_data = []
            for key, value in report['summary'].items():
                summary_data.append({'Metric': key.replace('_', ' ').title(), 'Value': value})

            pd.DataFrame(summary_data).to_excel(
                writer, sheet_name='Summary', index=False
            )

            # Gaps sheet
            gaps_data = []
            for gap in report['detailed_analysis']['gaps']:
                reg_req_text = gap['regulatory_requirement'].get('text', 'N/A')
                gaps_data.append({
                    'Regulatory Requirement (Excerpt)': reg_req_text[:200] + '...' if len(reg_req_text) > 200 else reg_req_text,
                    'Regulatory Source Document': gap.get('regulatory_source_doc', 'N/A'), # Explicitly added
                    'Best Match Score': gap.get('best_match_score', 0.0),
                    'Gap Severity': gap.get('gap_severity', 'Unknown'),
                    'Recommendations': "; ".join(gap.get('recommendations', []))
                })
            pd.DataFrame(gaps_data).to_excel(
                writer, sheet_name='Gaps', index=False
            )

            # Matches sheet
            matches_data = []
            for match_detail in report['detailed_analysis']['matches']:
                # The primary regulatory requirement
                reg_item_text = match_detail['regulatory_item'].get('text', 'N/A')
                reg_item_source = match_detail['regulatory_item'].get('source', 'N/A') # Ensure source is pulled from reg_item

                # Add a row for the regulatory requirement itself, with its best match
                if match_detail['matches']: # If there's at least one match
                    best_pol_match = match_detail['matches'][0] # Top match
                    pol_item_text = best_pol_match['policy_item'].get('text', 'N/A')
                    pol_item_source = best_pol_match['policy_item'].get('source', 'N/A') # Ensure source is pulled from pol_item
                    matches_data.append({
                        'Regulatory Requirement (Excerpt)': reg_item_text[:200] + '...' if len(reg_item_text) > 200 else reg_item_text,
                        'Regulatory Source Document': reg_item_source, # Explicitly added
                        'Best Policy Match (Excerpt)': pol_item_text[:200] + '...' if len(pol_item_text) > 200 else pol_item_text,
                        'Policy Source Document': pol_item_source, # Explicitly added
                        'Best Similarity Score': best_pol_match.get('similarity_score', 0.0),
                        'Coverage Level': best_pol_match.get('coverage_level', 'none')
                    })
                else: # No matches found
                    matches_data.append({
                        'Regulatory Requirement (Excerpt)': reg_item_text[:200] + '...' if len(reg_item_text) > 200 else reg_item_text,
                        'Regulatory Source Document': reg_item_source, # Explicitly added
                        'Best Policy Match (Excerpt)': 'No direct policy match',
                        'Policy Source Document': 'N/A',
                        'Best Similarity Score': 0.0,
                        'Coverage Level': 'none'
                    })
            pd.DataFrame(matches_data).to_excel(
                writer, sheet_name='Matches', index=False
            )

            # Recommendations sheet
            recommendations_data = [{'Recommendation': rec} for rec in report['high_level_recommendations']]
            pd.DataFrame(recommendations_data).to_excel(
                writer, sheet_name='High-Level Recommendations', index=False
            )


In [None]:
if __name__ == "__main__":
    analyzer = ComplianceAnalyzer()
    report, matches = analyzer.analyze_compliance(
        policy_folder="./policies",
        regulatory_folder="./regulations",
        output_path="./results"
    )
    print(f"Analysis complete! Coverage: {report['summary']['coverage_percentage']:.1f}%")
    print(f"Gaps identified: {len(gaps)}")



## Technical Features

### Similarity Thresholds
- **Exact Match (0.85)**: Very high similarity indicating strong alignment
- **Strong Coverage (0.70)**: Good coverage with minor gaps
- **Partial Coverage (0.50)**: Some coverage but significant improvements needed
- **Weak Coverage (0.30)**: Minimal coverage requiring attention

### Gap Analysis Categories
- **Critical Gaps**: Requirements with similarity scores below 0.30
- **Significant Gaps**: Requirements with scores between 0.30-0.50
- **Minor Gaps**: Requirements with scores between 0.50-0.70

### Machine Learning Components
- **Sentence Transformers**: For generating contextual embeddings
- **FAISS**: For efficient similarity search and indexing
- **K-means Clustering**: For semantic grouping of content
- **spaCy**: For natural language processing and requirement extraction

### Output Capabilities
- Detailed similarity matrices
- Coverage statistics and distributions
- Categorized gap analysis with recommendations
- Compliance scoring with letter grades
- Semantic cluster analysis
- Strategic recommendations for improvement

## Use Cases
- **Regulatory Compliance Audits**: Compare internal policies against regulatory frameworks
- **Policy Gap Analysis**: Identify areas where organizational policies don't meet regulatory requirements
- **Compliance Monitoring**: Track compliance coverage over time
- **Risk Assessment**: Prioritize compliance efforts based on gap severity
- **Policy Development**: Guide creation of new policies to address identified gaps