In [None]:
# -*- coding: utf-8 -*-
"""
SISTEMA DE AUDITORÍA Y TRAZABILIDAD - PIPELINE BIG DATA
Universidad Politécnica Metropolitana de Hidalgo
Maestría en Inteligencia Artificial - Big Data

Autores: Sánchez Ríos José Luis, Santos Martínez Víctor Manuel
Fecha: 17 de enero de 2026
Profesor: Dr. Jaime Aguilar Ortiz

Este módulo implementa un sistema robusto de auditoría con trazabilidad completa
para garantizar integridad, repetibilidad y conformidad con estándares de data engineering.
"""

import os
import re
import json
import hashlib
import logging
from datetime import datetime
from dataclasses import dataclass, asdict, field
from typing import Dict, List, Optional, Any
from enum import Enum
from io import StringIO

import pandas as pd
import requests
from bs4 import BeautifulSoup

# Configuración de entrada: Logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('audit_pipeline.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)


# Enumeraciones para tipos y constantes comunes

class SourceType(Enum):
    """Tipos de fuentes de datos soportadas"""
    TXT = "txt"
    CSV = "csv"
    HTML = "html"
    JSON = "json"
    XML = "xml"


class AuditStatus(Enum):
    """Estados de auditoría"""
    SUCCESS = "success"
    FAILURE = "failure"
    WARNING = "warning"
    PENDING = "pending"


# Configuración de URLs
DATA_SOURCES = {
    "TXT": "https://raw.githubusercontent.com/VManuelSM/Actividad_1_Big_Data/refs/heads/main/logs_sistema.txt",
    "CSV": "https://github.com/VManuelSM/Actividad_1_Big_Data/raw/refs/heads/main/registros_eventos.csv",
    "HTML": "https://github.com/VManuelSM/Actividad_1_Big_Data/raw/refs/heads/main/reporte_incidencias.html"
}

OUTPUT_DIR = "outputs"
AUDIT_DIR = "audit_logs"

# Crear directorios
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(AUDIT_DIR, exist_ok=True)

# Clases de datos para metadatos y registros de auditoría:

@dataclass
class HashMetadata:
    """Metadatos de integridad criptográfica"""
    sha256: str
    md5: str
    timestamp: str
    algorithm_version: str = "SHA-256, MD5"
    
    def to_dict(self) -> Dict[str, str]:
        return asdict(self)


@dataclass
class DataQualityMetrics:
    """Métricas de calidad de datos"""
    total_records: int
    null_count: int
    duplicate_count: int
    data_types: Dict[str, str]
    completeness_ratio: float
    
    def to_dict(self) -> Dict[str, Any]:
        return asdict(self)


@dataclass
class SourceMetadata:
    """Metadatos completos de la fuente de datos"""
    source_id: str
    source_name: str
    source_type: str
    source_url: str
    size_bytes: int
    encoding: str
    processing_timestamp: str
    hash_metadata: HashMetadata
    quality_metrics: Optional[DataQualityMetrics] = None
    additional_info: Dict[str, Any] = field(default_factory=dict)
    
    def to_dict(self) -> Dict[str, Any]:
        data = asdict(self)
        data['hash_metadata'] = self.hash_metadata.to_dict()
        if self.quality_metrics:
            data['quality_metrics'] = self.quality_metrics.to_dict()
        return data


@dataclass
class AuditRecord:
    """Registro completo de auditoría"""
    audit_id: str
    pipeline_version: str
    execution_timestamp: str
    sources_processed: List[SourceMetadata]
    audit_status: str
    total_sources: int
    successful_sources: int
    failed_sources: int
    warnings: List[str] = field(default_factory=list)
    errors: List[str] = field(default_factory=list)
    environment_info: Dict[str, str] = field(default_factory=dict)
    
    def to_dict(self) -> Dict[str, Any]:
        data = asdict(self)
        data['sources_processed'] = [s.to_dict() for s in self.sources_processed]
        return data
    
    def save_to_json(self, filepath: str) -> None:
        """Guarda el registro de auditoría en formato JSON"""
        with open(filepath, 'w', encoding='utf-8') as f:
            json.dump(self.to_dict(), f, indent=2, ensure_ascii=False)
        logger.info(f"Auditoría guardada en: {filepath}")


# ============================================================================
# UTILIDADES CRIPTOGRÁFICAS
# ============================================================================

class IntegrityValidator:
    """Validador de integridad de datos"""
    
    @staticmethod
    def compute_hashes(data: str) -> HashMetadata:
        """
        Calcula múltiples hashes para redundancia
        
        Args:
            data: Contenido a hashear
            
        Returns:
            HashMetadata con SHA-256 y MD5
        """
        data_bytes = data.encode('utf-8')
        
        sha256_hash = hashlib.sha256(data_bytes).hexdigest()
        md5_hash = hashlib.md5(data_bytes).hexdigest()
        
        return HashMetadata(
            sha256=sha256_hash,
            md5=md5_hash,
            timestamp=datetime.now().isoformat()
        )
    
    @staticmethod
    def verify_integrity(data: str, expected_sha256: str) -> bool:
        """
        Verifica la integridad de los datos
        
        Args:
            data: Contenido a verificar
            expected_sha256: Hash SHA-256 esperado
            
        Returns:
            True si la integridad es válida
        """
        current_hash = hashlib.sha256(data.encode('utf-8')).hexdigest()
        return current_hash == expected_sha256


# ============================================================================
# PROCESADORES DE FUENTES
# ============================================================================

class DataSourceProcessor:
    """Procesador genérico de fuentes de datos"""
    
    def __init__(self, timeout: int = 30):
        self.timeout = timeout
        self.session = requests.Session()
    
    def download_source(self, url: str) -> str:
        """
        Descarga contenido desde una URL
        
        Args:
            url: URL de la fuente
            
        Returns:
            Contenido como texto
        """
        try:
            response = self.session.get(url, timeout=self.timeout)
            response.raise_for_status()
            logger.info(f"Descarga exitosa: {url}")
            return response.text
        except requests.exceptions.RequestException as e:
            logger.error(f"Error descargando {url}: {e}")
            raise
    
    def process_txt(self, url: str, source_id: str) -> SourceMetadata:
        """Procesa archivo TXT"""
        logger.info(f"Procesando TXT: {source_id}")
        
        raw_content = self.download_source(url)
        hash_meta = IntegrityValidator.compute_hashes(raw_content)
        
        lines = raw_content.split('\n')
        
        metadata = SourceMetadata(
            source_id=source_id,
            source_name=url.split('/')[-1],
            source_type=SourceType.TXT.value,
            source_url=url,
            size_bytes=len(raw_content.encode('utf-8')),
            encoding='utf-8',
            processing_timestamp=datetime.now().isoformat(),
            hash_metadata=hash_meta,
            additional_info={
                'total_lines': len(lines),
                'non_empty_lines': len([l for l in lines if l.strip()]),
                'total_characters': len(raw_content)
            }
        )
        
        return metadata
    
    def process_csv(self, url: str, source_id: str) -> SourceMetadata:
        """Procesa archivo CSV con métricas de calidad"""
        logger.info(f"Procesando CSV: {source_id}")
        
        raw_content = self.download_source(url)
        hash_meta = IntegrityValidator.compute_hashes(raw_content)
        
        # Cargar en DataFrame para análisis
        df = pd.read_csv(StringIO(raw_content))
        
        # Métricas de calidad
        quality_metrics = DataQualityMetrics(
            total_records=len(df),
            null_count=int(df.isnull().sum().sum()),
            duplicate_count=int(df.duplicated().sum()),
            data_types={col: str(dtype) for col, dtype in df.dtypes.items()},
            completeness_ratio=float(1 - (df.isnull().sum().sum() / df.size))
        )
        
        metadata = SourceMetadata(
            source_id=source_id,
            source_name=url.split('/')[-1],
            source_type=SourceType.CSV.value,
            source_url=url,
            size_bytes=len(raw_content.encode('utf-8')),
            encoding='utf-8',
            processing_timestamp=datetime.now().isoformat(),
            hash_metadata=hash_meta,
            quality_metrics=quality_metrics,
            additional_info={
                'rows': len(df),
                'columns': len(df.columns),
                'column_names': list(df.columns)
            }
        )
        
        return metadata
    
    def process_html(self, url: str, source_id: str) -> SourceMetadata:
        """Procesa archivo HTML"""
        logger.info(f"Procesando HTML: {source_id}")
        
        raw_content = self.download_source(url)
        hash_meta = IntegrityValidator.compute_hashes(raw_content)
        
        # Parsear HTML
        soup = BeautifulSoup(raw_content, 'lxml')
        text_content = soup.get_text(separator=' ', strip=True)
        
        metadata = SourceMetadata(
            source_id=source_id,
            source_name=url.split('/')[-1],
            source_type=SourceType.HTML.value,
            source_url=url,
            size_bytes=len(raw_content.encode('utf-8')),
            encoding='utf-8',
            processing_timestamp=datetime.now().isoformat(),
            hash_metadata=hash_meta,
            additional_info={
                'html_tags_count': len(soup.find_all()),
                'text_length': len(text_content),
                'has_title': bool(soup.title),
                'title': soup.title.string if soup.title else None
            }
        )
        
        return metadata


# ============================================================================
# MOTOR DE AUDITORÍA
# ============================================================================

class AuditEngine:
    """Motor principal de auditoría y trazabilidad"""
    
    def __init__(self, pipeline_version: str = "1.0.0"):
        self.pipeline_version = pipeline_version
        self.processor = DataSourceProcessor()
        self.audit_id = self._generate_audit_id()
        
    def _generate_audit_id(self) -> str:
        """Genera un ID único para la auditoría"""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        random_suffix = hashlib.sha256(str(datetime.now()).encode()).hexdigest()[:8]
        return f"AUDIT_{timestamp}_{random_suffix}"
    
    def _get_environment_info(self) -> Dict[str, str]:
        """Captura información del entorno de ejecución"""
        import sys
        import platform
        
        return {
            'python_version': sys.version,
            'platform': platform.platform(),
            'machine': platform.machine(),
            'processor': platform.processor(),
            'execution_user': os.getenv('USER', 'unknown')
        }
    
    def run_audit(self, sources: Dict[str, str]) -> AuditRecord:
        """
        Ejecuta auditoría completa de todas las fuentes
        
        Args:
            sources: Diccionario {source_id: url}
            
        Returns:
            AuditRecord con resultados completos
        """
        logger.info(f"Iniciando auditoría: {self.audit_id}")
        
        processed_sources = []
        successful = 0
        failed = 0
        warnings = []
        errors = []
        
        for source_id, url in sources.items():
            try:
                # Determinar tipo de procesador
                extension = url.split('.')[-1].lower()
                
                if extension == 'txt':
                    metadata = self.processor.process_txt(url, source_id)
                elif extension == 'csv':
                    metadata = self.processor.process_csv(url, source_id)
                elif extension in ['html', 'htm']:
                    metadata = self.processor.process_html(url, source_id)
                else:
                    warnings.append(f"Tipo no soportado para {source_id}: {extension}")
                    continue
                
                processed_sources.append(metadata)
                successful += 1
                
                # Verificar calidad de datos si aplica
                if hasattr(metadata, 'quality_metrics') and metadata.quality_metrics:
                    if metadata.quality_metrics.completeness_ratio < 0.95:
                        warnings.append(
                            f"{source_id}: Completitud baja ({metadata.quality_metrics.completeness_ratio:.2%})"
                        )
                
            except Exception as e:
                failed += 1
                error_msg = f"Error procesando {source_id}: {str(e)}"
                errors.append(error_msg)
                logger.error(error_msg)
        
        # Determinar estado general
        if failed == 0:
            status = AuditStatus.SUCCESS.value
        elif successful > 0:
            status = AuditStatus.WARNING.value
        else:
            status = AuditStatus.FAILURE.value
        
        # Crear registro de auditoría
        audit_record = AuditRecord(
            audit_id=self.audit_id,
            pipeline_version=self.pipeline_version,
            execution_timestamp=datetime.now().isoformat(),
            sources_processed=processed_sources,
            audit_status=status,
            total_sources=len(sources),
            successful_sources=successful,
            failed_sources=failed,
            warnings=warnings,
            errors=errors,
            environment_info=self._get_environment_info()
        )
        
        # Guardar auditoría
        audit_filepath = os.path.join(
            AUDIT_DIR, 
            f"{self.audit_id}.json"
        )
        audit_record.save_to_json(audit_filepath)
        
        # Generar reporte en consola
        self._print_audit_summary(audit_record)
        
        return audit_record
    
    def _print_audit_summary(self, audit: AuditRecord) -> None:
        """Imprime resumen de auditoría en consola"""
        print(f"RESUMEN DE AUDITORÍA: {audit.audit_id}")
        print("="*80)
        print(f"Estado: {audit.audit_status.upper()}")
        print(f"Fuentes totales: {audit.total_sources}")
        print(f"Exitosas: {audit.successful_sources}")
        print(f"Fallidas: {audit.failed_sources}")
        print(f"Advertencias: {len(audit.warnings)}")
        print(f"Errores: {len(audit.errors)}")
        print("\nFuentes procesadas:")
        for source in audit.sources_processed:
            print(f"  ✓ {source.source_id} ({source.source_type})")
            print(f"    SHA-256: {source.hash_metadata.sha256}")
            print(f"    Tamaño: {source.size_bytes:,} bytes")
        print("="*80 + "\n")

# Programa de ejecución final:
def main():
    """Función principal de ejecución"""
    print("Iniciando Sistema de Auditoría y Trazabilidad")
    print("Universidad Politécnica Metropolitana de Hidalgo\n")
    
    # Crear motor de auditoría
    engine = AuditEngine(pipeline_version="1.0.0")
    
    # Ejecutar auditoría completa
    audit_result = engine.run_audit(DATA_SOURCES)
    
    # Generar DataFrame con resumen
    summary_data = []
    for source in audit_result.sources_processed:
        summary_data.append({
            'Source ID': source.source_id,
            'Type': source.source_type,
            'Size (bytes)': source.size_bytes,
            'SHA-256': source.hash_metadata.sha256[:16] + '...',
            'Timestamp': source.processing_timestamp
        })
    
    df_summary = pd.DataFrame(summary_data)
    print("\nResumen de Fuentes Procesadas:")
    print(df_summary.to_string(index=False))
    
    return audit_result


if __name__ == "__main__":
    audit_result = main()