In [47]:
# =============================================================================
# REQUERIMENTS
# =============================================================================

!pip install azure-ai-documentintelligence azure-ai-textanalytics azure-core Pillow




[notice] A new release of pip is available: 25.2 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [48]:
# =============================================================================
# CONFIGURATION AND IMPORTS
# =============================================================================

import os
import time
import re
from pathlib import Path
from dataclasses import dataclass
from typing import List, Tuple, Dict, Any

# Azure imports
from azure.ai.documentintelligence import DocumentIntelligenceClient
from azure.ai.textanalytics import TextAnalyticsClient
from azure.core.credentials import AzureKeyCredential

# Other imports
import fitz
from dotenv import load_dotenv
try:
    from PIL import Image, ImageDraw
except ImportError:
    import sys, subprocess
    subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'Pillow'])
    from PIL import Image, ImageDraw

# Load environment variables
load_dotenv('k.env')

# Azure service configuration
AZURE_DOCINT_ENDPOINT = os.getenv('AZURE_DOCINT_ENDPOINT', 'https://SEU-ENDPOINT.cognitiveservices.azure.com/')
AZURE_DOCINT_KEY = os.getenv('AZURE_DOCINT_KEY', 'COLOQUE_SUA_CHAVE')
AZURE_LANGUAGE_ENDPOINT = os.getenv('AZURE_LANGUAGE_ENDPOINT', AZURE_DOCINT_ENDPOINT)
AZURE_LANGUAGE_KEY = os.getenv('AZURE_LANGUAGE_KEY', AZURE_DOCINT_KEY)

if 'COLOQUE_SUA_CHAVE' in (AZURE_DOCINT_KEY, AZURE_LANGUAGE_KEY):
    print('ATEN√á√ÉO: configure suas chaves nas vari√°veis de ambiente.')

# Directory configuration
DATA_DIR = os.getenv('PDF_INPUT_DIR', 'data')
OUT_DIR = os.getenv('PDF_OUTPUT_DIR', 'anonimized')

DATA_PATH = Path(DATA_DIR)
OUT_PATH = Path(OUT_DIR)
OUT_PATH.mkdir(exist_ok=True)

# Feature flags
VERBOSE_OCR = False
REDACT_BARCODES = os.getenv('REDACT_BARCODES', '1') == '1'

# ALLOWLIST CONFIGURATION
ALLOWED_ORGANIZATIONS = [
    'Contoso',
    'CONTOSO LTDA',
    'MICROSOFT'
]

ALLOWED_PERSONS = [
    'Solicitant'
]

# Convert to lowercase for case-insensitive matching
ALLOWED_ORGANIZATIONS_LOWER = [org.lower() for org in ALLOWED_ORGANIZATIONS]
ALLOWED_PERSONS_LOWER = [person.lower() for person in ALLOWED_PERSONS]

# DATE FILTERING CONFIGURATION
PROTECTED_DATE_CATEGORIES = [
    'DateTime',
    'Date', 
    'Time',
    'DateRange',
    'TimeRange'
]

REDACTED_DATE_CATEGORIES = [
    'DateOfBirth'
]

# URL FILTERING CONFIGURATION
REDACT_URL_PATTERNS = [
    r'https://learn.microsoft.com/'
]

# Compile regex patterns for efficiency
REDACT_URL_REGEX = [re.compile(pattern, re.IGNORECASE) for pattern in REDACT_URL_PATTERNS]

# Text processing thresholds
TEXT_SIZE_THRESHOLDS = {
    'single_call_max': 4500,
    'chunk_size': 4000,
    'mini_chunk_size': 2000,
    'emergency_chunk_size': 3000
}

# Initialize execution control
if 'EXECUTION_CONTROL' not in globals():
    EXECUTION_CONTROL = {
        'pipeline_executed': False,
        'inspection_executed': False
    }

In [49]:
# =============================================================================
# DATA CLASSES
# =============================================================================

@dataclass
class WordBox:
    page: int
    text: str
    offset: int          
    length: int
    bbox: Tuple[float, float, float, float]  

@dataclass
class EntityPII:
    category: str
    offset: int          
    length: int
    text: str
    page: int = -1     

@dataclass
class Redaction:
    page: int
    rect: Tuple[float, float, float, float]
    label: str

@dataclass
class TextChunk:
    """Represents a text chunk with offset mapping"""
    text: str
    start_offset: int
    end_offset: int
    chunk_id: str

@dataclass
class ChunkEntity:
    """Entity found in a chunk with local offsets"""
    category: str
    chunk_offset: int
    length: int
    text: str
    global_offset: int
    chunk_id: str

@dataclass
class PageResult:
    """Results from processing a single page"""
    page_num: int
    entities: List[EntityPII]
    redactions: List[Redaction] 
    barcodes: List[Dict[str, Any]]
    success: bool
    error_message: str = ""
    word_count: int = 0
    redacted_image_path: Path = None
    text_analysis: Dict[str, Any] = None  
    allowed_entities: List[EntityPII] = None  

In [50]:
# =============================================================================
# UTILITY FUNCTIONS
# =============================================================================

def merge_rects(rects: List[Tuple[float,float,float,float]], gap: float = 2.0) -> List[Tuple[float,float,float,float]]:
    """Merge rectangles that are very close to each other"""
    if not rects:
        return []
    rects_sorted = sorted(rects, key=lambda r: (round(r[1]/10), r[0]))
    merged: List[Tuple[float,float,float,float]] = []
    cur = list(rects_sorted[0])
    for r in rects_sorted[1:]:
        same_line = abs(r[1]-cur[1]) < 5 and abs(r[3]-cur[3]) < 10
        touches = r[0] - cur[2] <= gap
        if same_line and touches:
            cur[2] = max(cur[2], r[2])
            cur[3] = max(cur[3], r[3])
            cur[0] = min(cur[0], r[0])
            cur[1] = min(cur[1], r[1])
        else:
            merged.append(tuple(cur))
            cur = list(r)
    merged.append(tuple(cur))
    return merged

def vlog(*a):
    """Verbose logging for OCR operations"""
    if VERBOSE_OCR:
        print('[OCR]', *a)

In [51]:
# =============================================================================
# AZURE CLIENT INITIALIZATION
# =============================================================================

_DOCINT_CLIENT = None
_PII_CLIENT = None

def get_docint_client():
    """Get or create Document Intelligence client"""
    global _DOCINT_CLIENT
    if _DOCINT_CLIENT is None:
        _DOCINT_CLIENT = DocumentIntelligenceClient(
            endpoint=AZURE_DOCINT_ENDPOINT,
            credential=AzureKeyCredential(AZURE_DOCINT_KEY)
        )
    return _DOCINT_CLIENT

def get_pii_client():
    """Get or create Text Analytics client"""
    global _PII_CLIENT
    if _PII_CLIENT is None:
        _PII_CLIENT = TextAnalyticsClient(
            endpoint=AZURE_LANGUAGE_ENDPOINT,
            credential=AzureKeyCredential(AZURE_LANGUAGE_KEY)
        )
    return _PII_CLIENT

In [52]:
# =============================================================================
# TEXT CHUNKING AND ANALYSIS
# =============================================================================

def analyze_text_size(text: str) -> Dict[str, Any]:
    """Analyze text size and determine processing strategy"""
    text_length = len(text)
    
    if text_length <= TEXT_SIZE_THRESHOLDS['single_call_max']:
        strategy = 'single_call'
        chunks_needed = 1
    elif text_length <= TEXT_SIZE_THRESHOLDS['chunk_size'] * 10:
        strategy = 'standard_chunks'
        chunks_needed = (text_length // TEXT_SIZE_THRESHOLDS['chunk_size']) + 1
    else:
        strategy = 'mini_chunks'
        chunks_needed = (text_length // TEXT_SIZE_THRESHOLDS['mini_chunk_size']) + 1
    
    return {
        'length': text_length,
        'strategy': strategy,
        'chunks_needed': chunks_needed,
        'threshold_used': TEXT_SIZE_THRESHOLDS['single_call_max']
    }

def create_text_chunks(text: str, chunk_size: int, overlap: int = 100) -> List[TextChunk]:
    """Create overlapping text chunks with offset tracking"""
    if len(text) <= chunk_size:
        return [TextChunk(
            text=text,
            start_offset=0,
            end_offset=len(text),
            chunk_id="chunk_0"
        )]
    
    chunks = []
    start = 0
    chunk_num = 0
    
    while start < len(text):
        end = min(start + chunk_size, len(text))
        
        if end < len(text):
            break_point = end
            for i in range(end - 50, end):
                if i > start and text[i] in ' \n\t.,;:!?':
                    break_point = i + 1
                    break
            end = break_point
        
        chunk_text = text[start:end]
        chunks.append(TextChunk(
            text=chunk_text,
            start_offset=start,
            end_offset=end,
            chunk_id=f"chunk_{chunk_num}"
        ))
        
        start = max(start + chunk_size - overlap, end)
        chunk_num += 1
    
    return chunks

def _build_page_local_text(words: List[WordBox]) -> Tuple[str, List[Tuple[WordBox,int,int]]]:
    """Build local text from page words with offset mapping"""
    mapping = []
    cur = 0
    sorted_words = sorted(words, key=lambda x: x.offset)
    
    for w in sorted_words:
        token = w.text or ''
        mapping.append((w, cur, len(token)))
        cur += len(token) + 1
    
    page_text = ' '.join(w.text for w in sorted_words)
    return page_text, mapping

In [53]:
# =============================================================================
# ALLOWLIST FILTERING
# =============================================================================

def is_allowed_organization(entity_text: str) -> bool:
    """Check if entity text matches any allowed organization"""
    entity_lower = entity_text.lower().strip()
    
    if entity_lower in ALLOWED_ORGANIZATIONS_LOWER:
        return True
    
    for allowed_org in ALLOWED_ORGANIZATIONS_LOWER:
        if allowed_org in entity_lower or entity_lower in allowed_org:
            return True
    
    return False

def is_allowed_person(entity_text: str) -> bool:
    """Check if entity text matches any allowed person/role"""
    entity_lower = entity_text.lower().strip()
    
    if entity_lower in ALLOWED_PERSONS_LOWER:
        return True
    
    for allowed_person in ALLOWED_PERSONS_LOWER:
        if allowed_person in entity_lower or entity_lower in allowed_person:
            return True
    
    return False

def is_allowed_date(entity_category: str) -> bool:
    """Check if date entity should be allowed (all dates except DateOfBirth)"""
    allowed_date_categories = [
        'DateTime',
        'Date',
        'Time',
        'DateRange',
        'TimeRange'
    ]
    
    if entity_category == 'DateOfBirth':
        return False
    
    return entity_category in allowed_date_categories

def should_redact_url(url_text: str) -> bool:
    """Check if URL should be redacted based on patterns"""
    url_text = url_text.strip()
    
    for regex_pattern in REDACT_URL_REGEX:
        if regex_pattern.match(url_text):
            return True
    
    return False

def filter_allowed_entities(entities: List[EntityPII], full_text: str = "") -> Tuple[List[EntityPII], List[EntityPII]]:
    """Filter entities, separating allowed from to-be-redacted"""
    
    to_redact = []
    allowed = []
    
    for entity in entities:
        should_allow = False
        
        if entity.category == 'Organization' and is_allowed_organization(entity.text):
            should_allow = True
        
        elif entity.category in ['Person', 'PersonType'] and is_allowed_person(entity.text):
            should_allow = True
        
        elif is_allowed_date(entity.category):
            should_allow = True
        
        elif entity.category == 'DateOfBirth':
            should_allow = False
        
        elif entity.category == 'URL' and entity.text.startswith(('http://', 'https://')):
            if not should_redact_url(entity.text):
                should_allow = True
        
        elif entity.category == 'URL' and not entity.text.startswith(('http://', 'https://')):
            should_allow = True
        
        if should_allow:
            allowed.append(entity)
        else:
            to_redact.append(entity)
    
    return to_redact, allowed

In [54]:
# =============================================================================
# IMAGE AND PDF PROCESSING
# =============================================================================

def pdf_to_images(pdf_path: str, out_dir: str, dpi: int = 200) -> List[Path]:
    """Render each PDF page to PNG images"""
    out = Path(out_dir)
    out.mkdir(exist_ok=True)
    pages = []
    with fitz.open(pdf_path) as doc:
        for i, page in enumerate(doc):
            mat = fitz.Matrix(dpi/72.0, dpi/72.0)
            pix = page.get_pixmap(matrix=mat, alpha=False)
            img_path = out / f"page_{i+1:04d}.png"
            pix.save(img_path.as_posix())
            pages.append(img_path)
    return pages

def images_to_pdf(image_paths: List[Path], output_pdf: str):
    """Convert sequence of images back to PDF"""
    doc = fitz.open()
    for img_path in image_paths:
        img = fitz.Pixmap(img_path.as_posix())
        page = doc.new_page(width=img.width, height=img.height)
        page.insert_image(page.rect, filename=img_path.as_posix())
    doc.save(output_pdf, deflate=True)
    doc.close()

def _normalize_polygon(poly: List[float], img_w: int, img_h: int) -> List[float]:
    """Detect if coords are normalized (0-1) and scale to pixels"""
    if not poly:
        return poly
    xs = poly[0::2]; ys = poly[1::2]
    max_x = max(xs); max_y = max(ys)
    if max_x <= 1.2 and max_y <= 1.2:
        scaled = []
        for i in range(0, len(poly), 2):
            scaled.append(poly[i] * img_w)
            scaled.append(poly[i+1] * img_h)
        return scaled
    return poly

def _bbox_from_polygon(poly: List[float]):
    """Convert polygon to bounding box"""
    if not poly:
        return (0,0,0,0)
    xs = poly[0::2]; ys = poly[1::2]
    return (min(xs), min(ys), max(xs), max(ys))

In [55]:
# =============================================================================
# OCR WITH AZURE DOCUMENT INTELLIGENCE
# =============================================================================

def ocr_image_docint(image_path: Path, model_id: str = 'prebuilt-layout') -> Dict[str, Any]:
    """Perform OCR on single image using Document Intelligence"""
    client = get_docint_client()
    with open(image_path, 'rb') as f:
        try:
            poller = client.begin_analyze_document(model_id=model_id, body=f.read(), content_type='image/png', features=['barcodes'])
        except TypeError:
            f.seek(0)
            poller = client.begin_analyze_document(model_id=model_id, body=f.read(), content_type='image/png')
    
    result = poller.result()
    img = Image.open(image_path)
    w, h = img.size
    words = []
    barcodes = []
    content = getattr(result, 'content', '') or ''
    pages = getattr(result, 'pages', None) or []
    
    for page_index, page in enumerate(pages):
        page_words = getattr(page, 'words', None) or []
        for wobj in page_words:
            poly = _normalize_polygon(getattr(wobj, 'polygon', []) or [], w, h)
            bbox = _bbox_from_polygon(poly)
            span = getattr(wobj, 'span', None)
            offset = getattr(span, 'offset', 0) if span else 0
            length = getattr(span, 'length', 0) if span else len(getattr(wobj, 'content', ''))
            
            word_text = getattr(wobj, 'content', '')
            words.append({
                'text': word_text,
                'offset': offset,
                'length': length,
                'bbox': bbox
            })
        
        page_barcodes = getattr(page, 'barcodes', None) or []
        if REDACT_BARCODES and page_barcodes:
            for bc in page_barcodes:
                poly_bc = _normalize_polygon(getattr(bc, 'polygon', []) or [], w, h)
                bbox_bc = _bbox_from_polygon(poly_bc)
                barcode_value = getattr(bc, 'value', '')
                barcodes.append({
                    'value': barcode_value,
                    'kind': getattr(bc, 'kind', 'barcode'),
                    'bbox': bbox_bc,
                    'page': page_index
                })
    
    return {'content': content, 'words': words, 'barcodes': barcodes, 'size': (w,h)}

In [56]:
# =============================================================================
# PII DETECTION WITH AZURE TEXT ANALYTICS
# =============================================================================

def _process_single_text(client, text: str, page_num: int, language: str) -> List[EntityPII]:
    """Process text with single API call"""
    entities = []
    
    resp = client.recognize_pii_entities(
        documents=[{"id": f"page_{page_num}", "text": text, "language": language}],
        model_version="2025-08-01-preview"
    )
    
    for doc in resp:
        if not doc.is_error:
            for ent in doc.entities:
                entities.append(EntityPII(
                    category=str(ent.category),
                    offset=ent.offset,
                    length=ent.length,
                    text=ent.text,
                    page=page_num
                ))
    
    return entities

def _process_with_chunks(client, text: str, page_num: int, language: str, chunk_size: int) -> Tuple[List[EntityPII], Dict[str, Any]]:
    """Process text with chunking strategy"""
    chunks = create_text_chunks(text, chunk_size)
    chunk_entities = []
    
    for chunk in chunks:
        try:
            resp = client.recognize_pii_entities(
                documents=[{"id": f"page_{page_num}_{chunk.chunk_id}", "text": chunk.text, "language": language}],
                model_version="2025-08-01-preview"
            )
            
            for doc in resp:
                if not doc.is_error:
                    for ent in doc.entities:
                        chunk_entities.append(ChunkEntity(
                            category=str(ent.category),
                            chunk_offset=ent.offset,
                            length=ent.length,
                            text=ent.text,
                            global_offset=chunk.start_offset + ent.offset,
                            chunk_id=chunk.chunk_id
                        ))
        except Exception as chunk_error:
            if 'too large' in str(chunk_error).lower():
                mini_chunks = create_text_chunks(chunk.text, TEXT_SIZE_THRESHOLDS['mini_chunk_size'])
                for mini_chunk in mini_chunks:
                    mini_chunk.start_offset += chunk.start_offset
                    mini_chunk.end_offset += chunk.start_offset
                    try:
                        mini_resp = client.recognize_pii_entities(
                            documents=[{"id": f"page_{page_num}_{mini_chunk.chunk_id}", "text": mini_chunk.text, "language": language}],
                            model_version="2025-08-01-preview"
                        )
                        for mini_doc in mini_resp:
                            if not mini_doc.is_error:
                                for ent in mini_doc.entities:
                                    chunk_entities.append(ChunkEntity(
                                        category=str(ent.category),
                                        chunk_offset=ent.offset,
                                        length=ent.length,
                                        text=ent.text,
                                        global_offset=mini_chunk.start_offset + ent.offset,
                                        chunk_id=mini_chunk.chunk_id
                                    ))
                    except:
                        pass
    
    global_entities = []
    for chunk_ent in chunk_entities:
        global_entities.append(EntityPII(
            category=chunk_ent.category,
            offset=chunk_ent.global_offset,
            length=chunk_ent.length,
            text=chunk_ent.text,
            page=page_num
        ))
    
    return global_entities, {
        'chunks_created': len(chunks),
        'chunks_processed': len(chunks),
        'chunk_size_used': chunk_size
    }

def _process_with_emergency_chunks(client, text: str, page_num: int, language: str) -> Tuple[List[EntityPII], Dict[str, Any]]:
    """Emergency fallback chunking with smaller sizes"""
    emergency_size = TEXT_SIZE_THRESHOLDS['emergency_chunk_size']
    chunks = create_text_chunks(text, emergency_size, overlap=50)
    
    entities = []
    successful_chunks = 0
    
    for chunk in chunks:
        try:
            resp = client.recognize_pii_entities(
                documents=[{"id": f"page_{page_num}_emergency_{chunk.chunk_id}", "text": chunk.text, "language": language}],
                model_version="2025-08-01-preview"
            )
            
            for doc in resp:
                if not doc.is_error:
                    for ent in doc.entities:
                        entities.append(EntityPII(
                            category=str(ent.category),
                            offset=chunk.start_offset + ent.offset,
                            length=ent.length,
                            text=ent.text,
                            page=page_num
                        ))
            successful_chunks += 1
        except:
            pass
    
    return entities, {
        'emergency_chunks_created': len(chunks),
        'emergency_chunks_successful': successful_chunks,
        'emergency_chunk_size': emergency_size
    }

def detect_pii_with_chunking(page_words: List[WordBox], page_num: int, language: str) -> Tuple[List[EntityPII], Dict[str, Any], List[EntityPII]]:
    """Enhanced PII detection with text size analysis, chunking, and allowlist filtering"""
    if not page_words:
        return [], {'strategy': 'no_text', 'length': 0}, []
    
    client = get_pii_client()
    page_text = ' '.join(w.text or '' for w in sorted(page_words, key=lambda x: x.offset))
    
    if len(page_text) == 0:
        return [], {'strategy': 'empty_text', 'length': 0}, []
    
    text_analysis = analyze_text_size(page_text)
    
    raw_entities = []
    
    try:
        if text_analysis['strategy'] == 'single_call':
            raw_entities = _process_single_text(client, page_text, page_num, language)
            text_analysis['chunks_processed'] = 1
            
        elif text_analysis['strategy'] == 'standard_chunks':
            raw_entities, chunk_info = _process_with_chunks(
                client, page_text, page_num, language, 
                TEXT_SIZE_THRESHOLDS['chunk_size']
            )
            text_analysis.update(chunk_info)
            
        else:
            raw_entities, chunk_info = _process_with_chunks(
                client, page_text, page_num, language, 
                TEXT_SIZE_THRESHOLDS['mini_chunk_size']
            )
            text_analysis.update(chunk_info)
            
    except Exception as e:
        error_msg = str(e).lower()
        if 'too large' in error_msg or 'request too long' in error_msg:
            try:
                raw_entities, chunk_info = _process_with_emergency_chunks(
                    client, page_text, page_num, language
                )
                text_analysis.update(chunk_info)
                text_analysis['emergency_fallback'] = True
            except Exception as emergency_error:
                text_analysis['final_error'] = str(emergency_error)
        else:
            text_analysis['error'] = str(e)
    
    entities_to_redact, allowed_entities = filter_allowed_entities(raw_entities, page_text)
    
    text_analysis['total_entities_found'] = len(raw_entities)
    text_analysis['entities_to_redact'] = len(entities_to_redact)
    text_analysis['entities_allowed'] = len(allowed_entities)
    
    return entities_to_redact, text_analysis, allowed_entities

In [57]:
# =============================================================================
# REDACTION APPLICATION
# =============================================================================

def map_entities_to_redactions_simple(entities: List[EntityPII], page_words: List[WordBox], page_num: int) -> List[Redaction]:
    """Map PII entities to redaction rectangles"""
    redactions = []
    
    if not entities or not page_words:
        return redactions
    
    page_text, word_mapping = _build_page_local_text(page_words)
    
    for entity in entities:
        entity_rects = []
        
        for word, word_start, word_len in word_mapping:
            word_end = word_start + word_len
            
            entity_start = entity.offset
            entity_end = entity.offset + entity.length
            
            if not (word_end <= entity_start or word_start >= entity_end):
                entity_rects.append(word.bbox)
        
        if entity_rects:
            merged_rects = merge_rects(entity_rects)
            for rect in merged_rects:
                redactions.append(Redaction(
                    page=page_num,
                    rect=rect,
                    label=f"{entity.category}:{entity.text[:20]}"
                ))
        else:
            entity_text_lower = entity.text.lower()
            for word in page_words:
                if word.text and entity_text_lower in word.text.lower():
                    redactions.append(Redaction(
                        page=page_num,
                        rect=word.bbox,
                        label=f"{entity.category}:{entity.text[:20]}"
                    ))
    
    return redactions

def apply_redactions_simple(image_path: Path, redactions: List[Redaction], output_path: Path):
    """Apply redactions to image by drawing black rectangles"""
    try:
        img = Image.open(image_path)
        draw = ImageDraw.Draw(img)
        
        for redaction in redactions:
            x1, y1, x2, y2 = redaction.rect
            draw.rectangle([x1, y1, x2, y2], fill='black')
        
        img.save(output_path)
        
    except Exception as e:
        import shutil
        shutil.copy2(image_path, output_path)

In [58]:
# =============================================================================
# PAGE PROCESSING
# =============================================================================

def process_single_page_simple(
    image_path: Path,
    page_num: int,
    output_dir: Path,
    language: str = 'pt-BR'
) -> PageResult:
    """Process a single page with OCR, PII detection, and redaction"""
    try:
        ocr_result = ocr_image_docint(image_path)
        
        page_words = []
        for w in ocr_result['words']:
            page_words.append(WordBox(
                page=0,
                text=w['text'],
                offset=w['offset'],
                length=w['length'],
                bbox=w['bbox']
            ))
        
        page_entities, text_analysis, allowed_entities = detect_pii_with_chunking(page_words, page_num, language)
        
        page_redactions = map_entities_to_redactions_simple(page_entities, page_words, page_num)
        
        page_barcodes = []
        for bc in ocr_result.get('barcodes', []):
            bc_copy = bc.copy()
            bc_copy['page'] = page_num
            page_barcodes.append(bc_copy)
        
        if page_barcodes and REDACT_BARCODES:
            for bc in page_barcodes:
                bbox = bc.get('bbox', (0,0,0,0))
                page_redactions.append(Redaction(
                    page=page_num,
                    rect=bbox,
                    label=f"BARCODE:{bc.get('kind', 'code')}"
                ))
        
        redacted_path = output_dir / f"redacted_page_{page_num+1:04d}.png"
        apply_redactions_simple(image_path, page_redactions, redacted_path)
        
        result = PageResult(
            page_num=page_num,
            entities=page_entities,
            redactions=page_redactions,
            barcodes=page_barcodes,
            success=True,
            word_count=len(page_words),
            redacted_image_path=redacted_path,
            text_analysis=text_analysis,
            allowed_entities=allowed_entities
        )
        
        return result
        
    except Exception as e:
        return PageResult(
            page_num=page_num,
            entities=[],
            redactions=[],
            barcodes=[],
            success=False,
            error_message=str(e),
            word_count=0,
            text_analysis={'error': str(e)},
            allowed_entities=[]
        )

def process_pages_sequential_simple(images: List[Path], output_dir: Path, language: str = 'pt-BR') -> List[PageResult]:
    """Process pages sequentially one by one"""
    results = []
    
    for i, image_path in enumerate(images):
        result = process_single_page_simple(image_path, i, output_dir, language)
        results.append(result)
        time.sleep(0.1)
    
    return results

In [59]:
# =============================================================================
# MAIN ANONYMIZATION PIPELINE
# =============================================================================

def simple_anonymization_pipeline(pdf_name: str) -> Dict[str, Any]:
    """Enhanced PDF anonymization pipeline with text size analysis and allowlist protection"""
    
    pdf_path = str(Path(DATA_PATH) / pdf_name)
    
    import tempfile
    work_dir = Path(tempfile.mkdtemp(prefix='pdf_processing_'))
    
    images = pdf_to_images(pdf_path, out_dir=str(work_dir / 'original'))
    
    output_dir = work_dir / 'redacted'
    output_dir.mkdir(exist_ok=True)
    
    page_results = process_pages_sequential_simple(images, output_dir)
    
    redacted_images = []
    for result in sorted(page_results, key=lambda r: r.page_num):
        if result.success and result.redacted_image_path and result.redacted_image_path.exists():
            redacted_images.append(result.redacted_image_path)
    
    if redacted_images:
        final_pdf_path = str(Path(OUT_PATH) / f"protected_anon_{pdf_name}")
        images_to_pdf(redacted_images, final_pdf_path)
        success = True
    else:
        final_pdf_path = None
        success = False
    
    import shutil
    try:
        shutil.rmtree(work_dir)
    except:
        pass
    
    successful_results = [r for r in page_results if r.success]
    total_entities = sum(len(r.entities) for r in successful_results)
    total_redactions = sum(len(r.redactions) for r in successful_results)
    total_allowed = sum(len(r.allowed_entities or []) for r in successful_results)
    
    entities_by_category = {}
    for result in successful_results:
        for entity in result.entities:
            entities_by_category.setdefault(entity.category, 0)
            entities_by_category[entity.category] += 1
    
    allowed_by_category = {}
    for result in successful_results:
        if result.allowed_entities:
            for entity in result.allowed_entities:
                allowed_by_category.setdefault(entity.category, 0)
                allowed_by_category[entity.category] += 1
    
    text_stats = {
        'single_call_pages': 0,
        'chunked_pages': 0,
        'emergency_pages': 0,
        'total_chunks_processed': 0
    }
    
    for result in successful_results:
        if result.text_analysis:
            strategy = result.text_analysis.get('strategy', 'unknown')
            if strategy == 'single_call':
                text_stats['single_call_pages'] += 1
            elif strategy in ['standard_chunks', 'mini_chunks']:
                text_stats['chunked_pages'] += 1
                text_stats['total_chunks_processed'] += result.text_analysis.get('chunks_processed', 0)
            
            if result.text_analysis.get('emergency_fallback'):
                text_stats['emergency_pages'] += 1
    
    return {
        'pdf_name': pdf_name,
        'success': success,
        'total_pages': len(images),
        'successful_pages': len(successful_results),
        'total_entities': total_entities,
        'total_redactions': total_redactions,
        'total_allowed_entities': total_allowed,
        'entities_by_category': entities_by_category,
        'allowed_entities_by_category': allowed_by_category,
        'text_processing_stats': text_stats,
        'output_pdf': final_pdf_path
    }

In [60]:
# =============================================================================
# EXECUTION FUNCTION
# =============================================================================

def run_simple_anonymization():
    """Run the enhanced anonymization pipeline with expanded allowlist protection"""
    
    PDF_FILE = 'contoso_documento_profissional_sem_cpf.pdf'
    
    if EXECUTION_CONTROL.get('pipeline_executed', False):
        print("‚ö†Ô∏è Pipeline already executed!")
        print("üí° To run again: set EXECUTION_CONTROL['pipeline_executed'] = False")
        return None
    
    print("üöÄ Starting PDF Anonymization Pipeline...")
    
    try:
        EXECUTION_CONTROL['pipeline_executed'] = True
        
        results = simple_anonymization_pipeline(PDF_FILE)
        
        print(f"\n{'='*70}")
        print(f"üìä RESULTS SUMMARY")
        print(f"{'='*70}")
        print(f"üìÑ Document: {results['pdf_name']}")
        print(f"üìë Pages: {results['successful_pages']}/{results['total_pages']} successful")
        print(f"üîç PII entities found: {results['total_entities']} (to redact)")
        print(f"üõ°Ô∏è Protected entities: {results['total_allowed_entities']} (NOT redacted)")
        print(f"üéØ Redactions applied: {results['total_redactions']}")
        
        text_stats = results.get('text_processing_stats', {})
        print(f"\nüìä TEXT PROCESSING:")
        print(f"   Single API call: {text_stats.get('single_call_pages', 0)} pages")
        print(f"   Chunked processing: {text_stats.get('chunked_pages', 0)} pages")
        print(f"   Emergency fallback: {text_stats.get('emergency_pages', 0)} pages")
        print(f"   Total chunks: {text_stats.get('total_chunks_processed', 0)}")
        
        if results['entities_by_category']:
            print(f"\nüè∑Ô∏è PII CATEGORIES (TO REDACT):")
            for category, count in sorted(results['entities_by_category'].items(), key=lambda x: x[1], reverse=True):
                print(f"   - {category}: {count}")
        
        if results['allowed_entities_by_category']:
            print(f"\nüõ°Ô∏è PROTECTED CATEGORIES (NOT REDACTED):")
            for category, count in sorted(results['allowed_entities_by_category'].items(), key=lambda x: x[1], reverse=True):
                print(f"   - {category}: {count}")
        
        if results['success']:
            print(f"\n‚úÖ SUCCESS!")
            print(f"üìÅ Output: {results['output_pdf']}")
        else:
            print(f"\n‚ùå FAILED!")
        
        return results

    except FileNotFoundError:
        EXECUTION_CONTROL['pipeline_executed'] = False
        print(f"‚ùå Error: PDF file '{PDF_FILE}' not found in '{DATA_PATH}' directory")
        return None
        
    except Exception as e:
        EXECUTION_CONTROL['pipeline_executed'] = False
        print(f"‚ùå Error: {str(e)}")
        return None

print("üí° To run: run_simple_anonymization()")

üí° To run: run_simple_anonymization()


In [61]:
# Reset execution control
EXECUTION_CONTROL['pipeline_executed'] = False

run_simple_anonymization()

üöÄ Starting PDF Anonymization Pipeline...

üìä RESULTS SUMMARY
üìÑ Document: contoso_documento_profissional_sem_cpf.pdf
üìë Pages: 1/1 successful
üîç PII entities found: 4 (to redact)
üõ°Ô∏è Protected entities: 2 (NOT redacted)
üéØ Redactions applied: 7

üìä TEXT PROCESSING:
   Single API call: 1 pages
   Chunked processing: 0 pages
   Emergency fallback: 0 pages
   Total chunks: 0

üè∑Ô∏è PII CATEGORIES (TO REDACT):
   - Person: 1
   - Email: 1
   - PhoneNumber: 1
   - URL: 1

üõ°Ô∏è PROTECTED CATEGORIES (NOT REDACTED):
   - Organization: 1
   - URL: 1

‚úÖ SUCCESS!
üìÅ Output: anonimized\protected_anon_contoso_documento_profissional_sem_cpf.pdf

üìä RESULTS SUMMARY
üìÑ Document: contoso_documento_profissional_sem_cpf.pdf
üìë Pages: 1/1 successful
üîç PII entities found: 4 (to redact)
üõ°Ô∏è Protected entities: 2 (NOT redacted)
üéØ Redactions applied: 7

üìä TEXT PROCESSING:
   Single API call: 1 pages
   Chunked processing: 0 pages
   Emergency fallback: 0 pages
  

{'pdf_name': 'contoso_documento_profissional_sem_cpf.pdf',
 'success': True,
 'total_pages': 1,
 'successful_pages': 1,
 'total_entities': 4,
 'total_redactions': 7,
 'total_allowed_entities': 2,
 'entities_by_category': {'Person': 1, 'Email': 1, 'PhoneNumber': 1, 'URL': 1},
 'allowed_entities_by_category': {'Organization': 1, 'URL': 1},
 'text_processing_stats': {'single_call_pages': 1,
  'chunked_pages': 0,
  'emergency_pages': 0,
  'total_chunks_processed': 0},
 'output_pdf': 'anonimized\\protected_anon_contoso_documento_profissional_sem_cpf.pdf'}