# Auditoría de Integridad y Métricas — DataSecure Lab

Este notebook lee las auditorías generadas por los scripts del pipeline (`30_fsck_audit.sh`) y construye tablas resumen con métricas de integridad y recomendaciones.

## 1) Configuración de rutas

En el entorno del aula, el NameNode monta un volumen (por ejemplo `./notebooks` → `/media/notebooks`).
La idea es que el script `30_fsck_audit.sh` deje copias de auditoría en una ruta que Jupyter pueda leer.


In [None]:
from pathlib import Path
import pandas as pd
import re

# Ruta al directorio de auditorías (montado desde docker-compose)
AUDIT_DIR = Path('/media/notebooks/audit/fsck')
INVENTORY_DIR = Path('/media/notebooks/audit/inventory')
METRICS_DIR = Path('/media/notebooks/audit/metrics')

print('AUDIT_DIR exists:', AUDIT_DIR.exists())
print('INVENTORY_DIR exists:', INVENTORY_DIR.exists())
print('METRICS_DIR exists:', METRICS_DIR.exists())

if AUDIT_DIR.exists():
    print('Contenido de AUDIT_DIR:', sorted(AUDIT_DIR.glob('*'))[:10])

## 2) Parseo básico de fsck

Contabilizamos palabras clave típicas:
- `CORRUPT`
- `MISSING`
- `Under replicated`


In [None]:
def parse_fsck_text(text: str) -> dict:
    """Extrae métricas clave de una salida de hdfs fsck."""
    metrics = {
        'CORRUPT': len(re.findall(r'\bCORRUPT\b', text, flags=re.IGNORECASE)),
        'MISSING': len(re.findall(r'\bMISSING\b', text, flags=re.IGNORECASE)),
        'UNDER_REPLICATED': len(re.findall(r'Under replicated', text, flags=re.IGNORECASE)),
    }
    
    # Extraer total size
    size_match = re.search(r'Total size:\s+(\d+)', text)
    metrics['total_size_bytes'] = int(size_match.group(1)) if size_match else 0
    
    # Extraer total files
    files_match = re.search(r'Total files:\s+(\d+)', text)
    metrics['total_files'] = int(files_match.group(1)) if files_match else 0
    
    # Extraer total blocks
    blocks_match = re.search(r'Total blocks[^:]*:\s+(\d+)', text)
    metrics['total_blocks'] = int(blocks_match.group(1)) if blocks_match else 0
    
    # Estado general
    if 'HEALTHY' in text:
        metrics['status'] = 'HEALTHY'
    elif 'CORRUPT' in text.upper():
        metrics['status'] = 'CORRUPT'
    else:
        metrics['status'] = 'UNKNOWN'
    
    return metrics

# Recopilar auditorías de todos los días disponibles
rows = []
for dt_dir in sorted(AUDIT_DIR.glob('*')):
    if not dt_dir.is_dir():
        continue
    # Buscar todos los ficheros fsck en el directorio
    for fsck_file in sorted(dt_dir.glob('fsck_*.txt')):
        text = fsck_file.read_text(encoding='utf-8', errors='ignore')
        m = parse_fsck_text(text)
        m['date'] = dt_dir.name
        m['audit_type'] = fsck_file.stem  # e.g. fsck_data, fsck_pre_incident, etc.
        rows.append(m)

df_audit = pd.DataFrame(rows) if rows else pd.DataFrame(
    columns=['date', 'audit_type', 'CORRUPT', 'MISSING', 'UNDER_REPLICATED', 
             'total_size_bytes', 'total_files', 'total_blocks', 'status']
)

print(f"Total de auditorías encontradas: {len(df_audit)}")
df_audit

## 3) Análisis del incidente

Comparación antes/después del incidente (caída de DataNode).

In [None]:
# Filtrar auditorías pre y post incidente
incident_types = ['fsck_pre_incident', 'fsck_post_incident', 'fsck_recovery', 'fsck_final']
df_incident = df_audit[df_audit['audit_type'].isin(incident_types)].copy()

if not df_incident.empty:
    print("=== Evolución del incidente ===")
    print(df_incident[['audit_type', 'CORRUPT', 'MISSING', 'UNDER_REPLICATED', 'status']].to_string(index=False))
else:
    print("No se encontraron auditorías de incidente. Ejecuta 70_incident_simulation.sh y 80_recovery_restore.sh primero.")

## 4) Tabla de métricas de rendimiento

Completar con los tiempos obtenidos durante la ejecución del pipeline.

In [None]:
# Tabla de métricas — completar con valores reales tras la ejecución
metrics_data = {
    'Operacion': [
        'Generacion de datos',
        'Ingesta logs a HDFS',
        'Ingesta IoT a HDFS',
        'Copia backup logs',
        'Copia backup IoT',
        'Auditoria fsck',
        'Comparacion inventario',
    ],
    'Tiempo_seg': [
        0,  # Completar con tiempo real
        0,  # Completar con tiempo real
        0,  # Completar con tiempo real
        0,  # Completar con tiempo real
        0,  # Completar con tiempo real
        0,  # Completar con tiempo real
        0,  # Completar con tiempo real
    ],
    'Tamano_MB': [
        0,  # Completar
        0,  # Completar
        0,  # Completar
        0,  # Completar
        0,  # Completar
        0,  # N/A
        0,  # N/A
    ]
}

df_metrics = pd.DataFrame(metrics_data)
print("=== Métricas de rendimiento ===")
df_metrics

## 5) Impacto de replicación

Comparación del coste de almacenamiento según factor de replicación.

In [None]:
# Impacto de replicación — leer datos reales del script 60_replication_metrics.sh
# Si no hay datos reales, se muestran valores placeholder

df_replication = None

# Intentar leer CSV generado por el script
for dt_dir in sorted(METRICS_DIR.glob('*')) if METRICS_DIR.exists() else []:
    rep_csv = dt_dir / 'replication_metrics.csv'
    if rep_csv.exists():
        df_replication = pd.read_csv(rep_csv)
        df_replication['tolerancia_fallos'] = df_replication['factor'].map({
            1: 'Sin tolerancia',
            2: 'Tolera 1 fallo',
            3: 'Tolera 1 fallo + re-replicación segura'
        })
        print(f"Datos de replicación leídos desde: {rep_csv}")
        break

# Si no hay datos reales, crear tabla placeholder
if df_replication is None:
    replication_data = {
        'factor': [1, 2, 3],
        'logical_size_bytes': [0, 0, 0],
        'physical_size_bytes': [0, 0, 0],
        'physical_size_mb': [0, 0, 0],
        'time_setrep_sec': [0, 0, 0],
        'tolerancia_fallos': [
            'Sin tolerancia',
            'Tolera 1 fallo',
            'Tolera 1 fallo + re-replicación segura'
        ]
    }
    df_replication = pd.DataFrame(replication_data)
    print("Sin datos reales. Ejecuta 60_replication_metrics.sh primero.")

print("\n=== Impacto de replicación ===")
df_replication

## 6) Conclusiones y recomendaciones

Basándonos en las métricas recogidas:

**Factor de replicación recomendado: 3**
- Con 3 DataNodes y replicación 3, cada bloque tiene copia en todos los nodos.
- Ante la caída de un nodo, HDFS re-replica automáticamente desde las 2 réplicas restantes.
- El coste es 3x en disco, pero el clúster educativo tiene capacidad suficiente.

**Frecuencia de auditoría recomendada: diaria**
- La auditoría con `hdfs fsck` es una operación de solo lectura (metadata) y no impacta el rendimiento.
- Una auditoría diaria permite detectar bloques UNDER_REPLICATED o CORRUPT antes de que se acumulen.
- En producción con volúmenes mayores, una frecuencia semanal podría ser más adecuada, complementada con alertas automáticas del NameNode.

**Estrategia de prevención:**
- Mantener siempre replicación >= 2 para tolerancia a fallos.
- Backup periódico a `/backup` con validación por inventario.
- Monitorizar el NameNode UI para detectar Dead Nodes o Under-replicated blocks.

In [None]:
# Exportar todas las tablas a CSV
out_dir = Path('.')

df_audit.to_csv(out_dir / 'audit_summary.csv', index=False)
print(f"Exportado: {out_dir / 'audit_summary.csv'}")

df_metrics.to_csv(out_dir / 'metrics_summary.csv', index=False)
print(f"Exportado: {out_dir / 'metrics_summary.csv'}")

df_replication.to_csv(out_dir / 'replication_comparison.csv', index=False)
print(f"Exportado: {out_dir / 'replication_comparison.csv'}")