# Informe de Integridad y Métricas del Clúster

Este notebook procesa las evidencias generadas por los scripts de automatización (`30_fsck_data_audit.py` y `60_fsck_backup_audit.py`) y presenta el resumen de métricas de rendimiento.

**Objetivos:**
1. Parsear logs de `fsck` para `/data` y `/backup`.
2. Generar CSVs resumen de integridad.
3. Presentar tabla de tiempos de ingestión y replicación (R7).
4. Conclusiones finales.

## 1) Imports y configuración

In [None]:
# Instalamos librerías necesarias (solo hace falta ejecutarlo una vez)
!pip install pandas --break-system-packages

Defaulting to user installation because normal site-packages is not writeable
Collecting pandas
  Downloading pandas-3.0.0-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl.metadata (79 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m79.5/79.5 kB[0m [31m454.1 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hCollecting numpy>=1.26.0 (from pandas)
  Downloading numpy-2.4.2-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (6.6 kB)
Collecting python-dateutil>=2.8.2 (from pandas)
  Downloading python_dateutil-2.9.0.post0-py2.py3-none-any.whl.metadata (8.4 kB)
Collecting six>=1.5 (from python-dateutil>=2.8.2->pandas)
  Downloading six-1.17.0-py2.py3-none-any.whl.metadata (1.7 kB)
Downloading pandas-3.0.0-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl (10.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.9/10.9 MB[0m [31m16.6 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hDownloading numpy-2.4.2

In [None]:
import sys
import os

# Añadimos la ruta de usuario donde pip instaló pandas
# (Basado en el log anterior que mostraba /opt/bd/.local)
ruta_librerias = os.path.expanduser("~/.local/lib/python3.12/site-packages")
sys.path.append(ruta_librerias)

print(f"Ruta añadida: {ruta_librerias}")
print("Ahora intenta importar pandas en la siguiente celda.")

In [3]:
import pandas as pd
import re
from pathlib import Path

# CONFIGURACIÓN DE RUTAS
# Los scripts 30 y 60 dejan los txt en 'raw_audits' dentro de la carpeta notebooks
AUDIT_DIR = Path("raw_audits")

RESUMEN_DIR = Path("resumen_audits")

# Verificamos que existe el directorio
if not AUDIT_DIR.exists():
    print(f"ATENCIÓN: No encuentro la carpeta {AUDIT_DIR}. Asegúrate de haber ejecutado los scripts 30 y 60.")
    AUDIT_DIR.mkdir(exist_ok=True)
else:
    print(f"Directorio de auditorías encontrado: {AUDIT_DIR.resolve()}")
    print("Archivos disponibles:", [f.name for f in AUDIT_DIR.glob('*.txt')])

if not RESUMEN_DIR.exists():
    print(f"ATENCIÓN: No encuentro la carpeta {RESUMEN_DIR}. Procediendo a su creación")
    RESUMEN_DIR.mkdir(exist_ok=True)
else:
    print(f"Directorio de resumen de auditorías encontrado: {RESUMEN_DIR.resolve()}")

Directorio de auditorías encontrado: /media/notebooks/raw_audits
Archivos disponibles: ['fsck_backup_dt=2026-02-10.txt', 'fsck_data_dt=2026-02-10.txt']
Directorio de resumen de auditorías encontrado: /media/notebooks/resumen_audits


## 2) Función de Parseo y Procesamiento

Contabilizamos palabras clave típicas:
- `CORRUPT`
- `MISSING`
- `Under replicated`

In [4]:
def parse_fsck_report(text: str):
    """
    Extrae los VALORES numéricos de las métricas clave del reporte fsck.
    Busca patrones como 'Missing blocks: 0' y extrae el 0.
    """
    
    # Función auxiliar para extraer y sumar números encontrados tras un patrón
    def get_count(pattern):
        # Buscamos el número (\d+) que va después de la etiqueta y dos puntos
        matches = re.findall(pattern, text, flags=re.IGNORECASE)
        # Convertimos a entero y sumamos (por si aparece en varias secciones)
        return sum(int(m) for m in matches) if matches else 0

    return {
        # Busca "Corrupt blocks: X"
        'CORRUPT': get_count(r'Corrupt blocks:\s+(\d+)'),
        
        # Busca "Missing blocks: X"
        'MISSING': get_count(r'Missing blocks:\s+(\d+)'),
        
        # Busca "Under-replicated blocks: X" (ojo al guion o espacio)
        'UNDER_REPLICATED': get_count(r'Under[- ]replicated blocks:\s+(\d+)'),
        
        # Busca explícitamente la frase final de éxito
        'HEALTHY': 1 if "is HEALTHY" in text else 0
    }

data_rows = []
backup_rows = []

# Iteramos sobre todos los archivos txt en la carpeta raw_audits
for log_file in sorted(AUDIT_DIR.glob('*.txt')):
    try:
        text = log_file.read_text(encoding='utf-8', errors='ignore')
        metrics = parse_fsck_report(text)
        
        # Extraemos la fecha del nombre del archivo (ej: fsck_data_2026-02-04.txt)
        fecha = log_file.stem.split('_')[-1]
        metrics['fecha'] = fecha
        metrics['archivo'] = log_file.name

        # Clasificamos si es de /data o de /backup
        if 'fsck_data' in log_file.name:
            data_rows.append(metrics)
        elif 'fsck_backup' in log_file.name:
            backup_rows.append(metrics)
            
    except Exception as e:
        print(f"Error leyendo {log_file.name}: {e}")

# Creamos DataFrames
df_data = pd.DataFrame(data_rows).sort_values('fecha') if data_rows else pd.DataFrame()
df_backup = pd.DataFrame(backup_rows).sort_values('fecha') if backup_rows else pd.DataFrame()


## 3) Resultados /data

In [None]:
import subprocess
from datetime import datetime
from pathlib import Path

# --- FUNCIÓN AUXILIAR ---
def ejecutar_cmd(comando):
    # Ejecuta el comando directamente en el sistema
    print(f"Ejecutando: {comando}")
    subprocess.run(comando, shell=True, check=True)

print("Resultados para /data:")
display(df_data)

if not df_backup.empty:
    # --- 1. PREPARACIÓN ---
    # Calculamos la fecha primero para usarla en el nombre
    dt_hoy = datetime.now().strftime('%Y-%m-%d')
    
    # Creamos el nombre con la fecha: "resumen_auditoria_backup_2026-02-05.csv"
    nombre_archivo = f'resumen_auditoria_data_{dt_hoy}.csv'
    
    # Definimos la ruta local completa
    ruta_csv_data = RESUMEN_DIR / nombre_archivo

    # --- 2. GUARDADO LOCAL ---
    df_data.to_csv(ruta_csv_data, index=False)
    print(f"CSV generado en disco local: {ruta_csv_data}")

    # --- 3. SUBIDA A HDFS ---
    try:
        # Ruta carpeta destino
        ruta_hdfs_dir = f"/audit/fsck/dt={dt_hoy}"
        
        # Ruta absoluta del archivo local que acabamos de crear
        ruta_local_absoluta = ruta_csv_data.resolve()
        
        print(f"Subiendo a HDFS: {ruta_hdfs_dir} ...")

        # PASO A: Subir archivo
        # Al poner la carpeta destino con barra al final, el archivo conserva su nombre (con fecha)
        ejecutar_cmd(f'hdfs dfs -put -f "{ruta_local_absoluta}" {ruta_hdfs_dir}/')

        print(f"Éxito: Archivo disponible en HDFS: {ruta_hdfs_dir}/{nombre_archivo}")

    except subprocess.CalledProcessError as e:
        print(f"Error al ejecutar comando HDFS: {e}")
    except Exception as e:
        print(f"Error general: {e}")

Resultados para /data:


Unnamed: 0,CORRUPT,MISSING,UNDER_REPLICATED,HEALTHY,fecha,archivo
0,0,0,0,1,dt=2026-02-10,fsck_data_dt=2026-02-10.txt


CSV generado en disco local: resumen_audits/resumen_auditoria_data_2026-02-10.csv
Subiendo a HDFS: /audit/fsck/dt=2026-02-10 ...
Ejecutando: hdfs dfs -put -f "/media/notebooks/resumen_audits/resumen_auditoria_data_2026-02-10.csv" /audit/fsck/dt=2026-02-10/


2026-02-10 20:07:37,724 WARN hdfs.DataStreamer: Exception in createBlockOutputStream blk_1073741859_1035
java.io.IOException: Got error, status=ERROR, status message , ack with firstBadLink as 172.18.0.7:9866
	at org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:128)
	at org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:104)
	at org.apache.hadoop.hdfs.DataStreamer.createBlockOutputStream(DataStreamer.java:1827)
	at org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1728)
	at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:713)
2026-02-10 20:07:37,726 WARN hdfs.DataStreamer: Abandoning BP-610021310-172.18.0.3-1770753826894:blk_1073741859_1035
2026-02-10 20:07:37,766 WARN hdfs.DataStreamer: Excluding datanode DatanodeInfoWithStorage[172.18.0.7:9866,DS-877c5857-6151-437f-80e8-d0d244412889,DISK]
2026-02-10 20:07:52,056 WARN hdfs.Da

Éxito: Archivo disponible en HDFS: /audit/fsck/dt=2026-02-10/resumen_auditoria_data_2026-02-10.csv


## 4) Resultados /backup

In [6]:
import subprocess
from datetime import datetime
from pathlib import Path

# --- FUNCIÓN AUXILIAR ---
def ejecutar_cmd(comando):
    # Ejecuta el comando directamente en el sistema
    print(f"Ejecutando: {comando}")
    subprocess.run(comando, shell=True, check=True)

print("Resultados para /backup:")
display(df_backup) # Solo funciona en Jupyter

if not df_backup.empty:
    # --- 1. PREPARACIÓN (Cambio Clave) ---
    # Calculamos la fecha primero para usarla en el nombre
    dt_hoy = datetime.now().strftime('%Y-%m-%d')
    
    # Creamos el nombre con la fecha: "resumen_auditoria_backup_2026-02-05.csv"
    nombre_archivo = f'resumen_auditoria_backup_{dt_hoy}.csv'
    
    # Definimos la ruta local completa
    ruta_csv_backup = AUDIT_DIR / nombre_archivo

    # --- 2. GUARDADO LOCAL ---
    df_backup.to_csv(ruta_csv_backup, index=False)
    print(f"CSV generado en disco local: {ruta_csv_backup}")

    # --- 3. SUBIDA A HDFS ---
    try:
        # Ruta carpeta destino
        ruta_hdfs_dir = f"/audit/fsck/dt={dt_hoy}"
        
        # Ruta absoluta del archivo local que acabamos de crear
        ruta_local_absoluta = ruta_csv_backup.resolve()
        
        print(f"Subiendo a HDFS: {ruta_hdfs_dir} ...")

        # PASO A: Subir archivo
        # Al poner la carpeta destino con barra al final, el archivo conserva su nombre (con fecha)
        ejecutar_cmd(f'hdfs dfs -put -f "{ruta_local_absoluta}" {ruta_hdfs_dir}/')

        print(f"Éxito: Archivo disponible en HDFS: {ruta_hdfs_dir}/{nombre_archivo}")

    except subprocess.CalledProcessError as e:
        print(f"Error al ejecutar comando HDFS: {e}")
    except Exception as e:
        print(f"Error general: {e}")

Resultados para /backup:


Unnamed: 0,CORRUPT,MISSING,UNDER_REPLICATED,HEALTHY,fecha,archivo
0,0,0,0,1,dt=2026-02-10,fsck_backup_dt=2026-02-10.txt


CSV generado en disco local: raw_audits/resumen_auditoria_backup_2026-02-10.csv
Subiendo a HDFS: /audit/fsck/dt=2026-02-10 ...
Ejecutando: hdfs dfs -put -f "/media/notebooks/raw_audits/resumen_auditoria_backup_2026-02-10.csv" /audit/fsck/dt=2026-02-10/


2026-02-10 20:07:57,144 WARN hdfs.DataStreamer: Exception in createBlockOutputStream blk_1073741862_1038
java.net.NoRouteToHostException: No route to host
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)
	at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:205)
	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:600)
	at org.apache.hadoop.hdfs.DataStreamer.createSocketForPipeline(DataStreamer.java:253)
	at org.apache.hadoop.hdfs.DataStreamer.createBlockOutputStream(DataStreamer.java:1774)
	at org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1728)
	at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:713)
2026-02-10 20:07:57,146 WARN hdfs.DataStreamer: Abandoning BP-610021310-172.18.0.3-1770753826894:blk_1073741862_1038
2026-02-10 20:07:57,193 WARN hdfs.DataStreamer: Excluding datanode DatanodeInfoWithStorage[172.18.0.7:9866,DS-877c5857

Éxito: Archivo disponible en HDFS: /audit/fsck/dt=2026-02-10/resumen_auditoria_backup_2026-02-10.csv


## 5) Métricas de Rendimiento

### 5.1) Tabla de Tiempos
| Actividad | Factor de replicación | Tiempo |
| :--- | :--- | :--- |
| **Ingesta de datos (Script 20)** | Block replication = 1 | `26.98 s` |
| **Copia/Replicación (Script 40)** | Block replication = 1  | `10.62 s`|
| **Ingesta de datos (Script 20)** | Block replication = 2 | `33.85 s`|
| **Copia/Replicación (Script 40)** | Block replication = 2 | `11.01 s` |
| **Ingesta de datos (Script 20)** | Block replication = 3 | `35.41 s` |
| **Copia/Replicación (Script 40)** | Block replication = 3 | `14.55 s` |
| **Ingesta de datos (Script 20)** | Block replication = 4 | `38.54 s` |
| **Copia/Replicación (Script 40)** | Block replication = 4 | `16.17 s` |

### 5.2) Evidencias de Uso de Recursos
### 5.2.1) Uso de Recursos Proceso de Ingestión
- Factor de replicación 1 :
![Captura Subida a HDFS (Ingesta) con Factor de Replicación = 1](./img/stats_ingestion_fr1.png)

- Factor de replicación 2 :
![Captura Subida a HDFS (Ingesta) con Factor de Replicación = 2](./img/stats_ingestion_fr2.png)

- Factor de replicación 3 :
![Captura Subida a HDFS (Ingesta) con Factor de Replicación = 3](./img/stats_ingestion_fr3.png)

- Factor de replicación 4 :
![Captura Subida a HDFS (Ingesta) con Factor de Replicación = 4](./img/stats_ingestion_fr4.png)

### 5.2.2) Uso de Recursos Proceso de Backup
- Factor de replicación 1 :
![Captura Replicación/copia (Backup) en HDFS con Factor de Replicación = 1](./img/stats_backup_fr1.png)

- Factor de replicación 2 :
![Captura Replicación/copia (Backup) en HDFS con Factor de Replicación = 2](./img/stats_backup_fr2.png)

- Factor de replicación 3 :
![Captura Replicación/copia (Backup) en HDFS con Factor de Replicación = 3](./img/stats_backup_fr3.png)

- Factor de replicación 4 :
![Captura Replicación/copia (Backup) en HDFS con Factor de Replicación = 4](./img/stats_backup_fr4.png)

### 5.3) Análisis de Impacto en el Almacenamiento

| Replicación | DFS Used (Aprox.) | Comportamiento | Impacto Operativo |
| :--- | :--- | :--- | :--- |
| **Factor 1** | `~1.00 GB` | **Sin redundancia.** 1 copia única por bloque. | **Crítico:** La caída de 1 nodo implica pérdida de datos inmediata. |
| **Factor 2** | `~2.01 GB` | **Duplicación.** 2 copias por bloque. | **Bajo:** Soporta la caída de **1 nodo** sin interrumpir el servicio. |
| **Factor 3** | `~3.01 GB` | **Triplicación (Default).** Estándar de Hadoop. | **Óptimo:** Balance coste/seguridad. Soporta la caída de **2 nodos**. |
| **Factor 4** | `~4.01 GB` | **Copia Total.** Todos los nodos tienen todo. | **Máximo:** Seguridad total, pero con el coste de almacenamiento más alto (x4). |



## 6) Conclusiones y Recomendaciones
A partir del análisis de las métricas de rendimiento y almacenamiento obtenidas en la simulación, se presentan las siguientes conclusiones técnicas y las recomendaciones arquitectónicas derivadas.

### 6.1) Conclusiones
- **Escalabilidad No Lineal en la Escritura:** El incremento del Factor de Replicación de 1 a 4 resultó en un aumento del tiempo de ingesta de solo un ~42%. Esto evidencia que el cuello de botella principal no es la replicación interna entre los nodos del clúster (que ocurre en paralelo), sino la transferencia de datos desde el cliente externo hacia la infraestructura.

- **Eficiencia del Tráfico Intra-Clúster:** Las operaciones de respaldo interno (Backup) demostraron ser 2.5 veces más rápidas que la ingesta inicial. Este dato valida la eficiencia de la red interna de Docker frente al tráfico externo, confirmando que la gestión de copias de seguridad debe realizarse dentro del propio ecosistema del clúster.

- **Saturación por Simetría (RF=4):** La configuración de replicación total (RF=4 en 4 nodos) fuerza una ocupación idéntica en todos los DataNodes. Esta rigidez anula la capacidad del sistema para gestionar el balanceo de carga o reasignar bloques, eliminando la flexibilidad operativa necesaria en entornos de producción.

### 6.2) Recomendaciones

- **Arquitectura de Almacenamiento (RF=3):** Se recomienda establecer el Factor de Replicación 3 como estándar. Esta configuración ofrece el equilibrio óptimo entre integridad (tolerancia a fallo simultáneo de 2 nodos) y eficiencia, conservando un 25% de capacidad de almacenamiento libre para operaciones temporales y balanceo de carga.

- **Estrategia de Integridad Cíclica:** Para garantizar la calidad del dato sin comprometer el rendimiento del NameNode, se debe implementar el siguiente flujo operativo secuencial: **Ingesta → Auditoría (`fsck`) → Backup Diario.** Esto asegura que nunca se respalden datos corruptos.

- **Política de Recuperación:** Se aconseja mantener la estrategia de **"Full Snapshot" diario** en el directorio `/backup`.
    * *Justificación*: Dado el volumen actual del piloto (~1GB), el coste temporal es marginal (~14s), lo que proporciona una capa de seguridad imprescindible contra errores lógicos.
    
    * *Nota de Escalabilidad*: Aunque en entornos productivos masivos (TB/PB) sería obligatorio optar por estrategias incrementales (Incremental Backups) debido a los tiempos de transferencia, para el volumen actual la copia total ofrece la mejor relación entre simplicidad de restauración y seguridad.
