# Auditoria de integridad HDFS (fsck + metricas)

Este notebook esta preparado para trabajar con Docker + HDFS como en los scripts del proyecto (especialmente `scripts/30_fsck_audit.sh`).

Flujo:
1. Extraer auditorias fsck desde HDFS del contenedor `namenode`.
2. Guardarlas en carpeta local del notebook.
3. Leer y resumir esas auditorias.
4. Construir tabla de metricas con `docker stats` y tiempos por fase.


## 1) Configuracion

Las ejecuciones se hacen contra el contenedor Docker que tiene cliente HDFS (`namenode`).
La descarga de auditorias se hace desde `/audit/fsck` (HDFS) hacia una carpeta local junto al notebook.


In [None]:
# Instalar librerías necesarias
!pip install pandas numpy

In [6]:
from pathlib import Path
import os
import re
import shutil
import subprocess
import pandas as pd
import numpy as np

pd.set_option('display.max_colwidth', 120)

NN_CONTAINER = os.environ.get("NN_CONTAINER", "namenode")
WORKDIR = Path.cwd()
LOCAL_AUDIT_DIR = WORKDIR / "audit" / "fsck"
LOCAL_AUDIT_DIR.mkdir(parents=True, exist_ok=True)

print("NN_CONTAINER:", NN_CONTAINER)
print("WORKDIR:", WORKDIR)
print("LOCAL_AUDIT_DIR:", LOCAL_AUDIT_DIR)


NN_CONTAINER: namenode
WORKDIR: c:\Users\ferna\Documents\Proyectos\CursoIA\HDFSTarea\data-integrity-hdfs-lab\notebooks
LOCAL_AUDIT_DIR: c:\Users\ferna\Documents\Proyectos\CursoIA\HDFSTarea\data-integrity-hdfs-lab\notebooks\audit\fsck


## 2) Extraer auditorias desde HDFS del contenedor

Si no hay auditorias locales, se ejecuta dentro del contenedor:
- `hdfs dfs -test -d /audit/fsck`
- `hdfs dfs -get -f /audit/fsck/* /tmp/...`

Y luego se copia del contenedor al directorio local del notebook con `docker cp`.


In [9]:
def run_cmd(cmd):
    return subprocess.run(cmd, shell=True, capture_output=True, text=True)

def has_local_fsck_files(local_dir: Path) -> bool:
    return any(local_dir.glob("*/fsck_data.txt"))

def pull_fsck_from_container(nn_container: str, local_dir: Path) -> bool:
    if shutil.which("docker") is None:
        print("docker no esta disponible en este entorno.")
        return False

    remote_tmp = f"/tmp/audit_fsck_export_{os.getpid()}"

    test_hdfs = run_cmd(
        f'docker exec -i {nn_container} bash -lc "hdfs dfs -test -d /audit/fsck"'
    )
    if test_hdfs.returncode != 0:
        print("No existe /audit/fsck en HDFS dentro del contenedor.")
        return False

    prep_and_get = run_cmd(
        f'docker exec -i {nn_container} bash -lc "rm -rf {remote_tmp} && mkdir -p {remote_tmp} && hdfs dfs -get -f /audit/fsck/* {remote_tmp}/"'
    )
    if prep_and_get.returncode != 0:
        print("Fallo extrayendo auditorias con hdfs dfs -get desde el contenedor.")
        print(prep_and_get.stderr[:400])
        return False

    local_dir.mkdir(parents=True, exist_ok=True)
    cp_out = run_cmd(f'docker cp {nn_container}:{remote_tmp}/. "{str(local_dir)}"')
    print("docker cp output:", cp_out)
    cleanup = run_cmd(f'docker exec -i {nn_container} bash -lc "rm -rf {remote_tmp}"')

    if cp_out.returncode != 0:
        print("Fallo copiando archivos del contenedor al host con docker cp.")
        print(cp_out.stderr[:400])
        return False

    if cleanup.returncode != 0:
        print("Aviso: no se pudo limpiar carpeta temporal remota.")

    return True

if not has_local_fsck_files(LOCAL_AUDIT_DIR):
    print("No hay auditorias locales. Intentando extraer desde HDFS en contenedor...")
    ok = pull_fsck_from_container(NN_CONTAINER, LOCAL_AUDIT_DIR)
    print("Extraccion completada:", ok)
else:
    print("Ya existen auditorias locales en", LOCAL_AUDIT_DIR)

print("Fechas locales detectadas:", sorted([p.name for p in LOCAL_AUDIT_DIR.glob('*') if p.is_dir()]))


No hay auditorias locales. Intentando extraer desde HDFS en contenedor...
docker cp output: CompletedProcess(args='docker cp namenode:/tmp/audit_fsck_export_33360/. "c:\\Users\\ferna\\Documents\\Proyectos\\CursoIA\\HDFSTarea\\data-integrity-hdfs-lab\\notebooks\\audit\\fsck"', returncode=0, stdout='', stderr='')
Extraccion completada: True
Fechas locales detectadas: ['2026-02-13']


## 3) Tabla de lectura de auditorias fsck

Se leen los archivos extraidos (`fsck_data.txt`) y se resumen contadores de integridad.


In [14]:
def parse_fsck_text(text: str) -> dict:
    return {
        "CORRUPT": len(re.findall(r"\bCORRUPT\b", text, flags=re.IGNORECASE)),
        "MISSING": len(re.findall(r"\bMISSING\b", text, flags=re.IGNORECASE)),
        "UNDER_REPLICATED": len(re.findall(r"Under replicated", text, flags=re.IGNORECASE)),
        "HEALTHY": bool(re.search(r"Status:\s*HEALTHY", text, flags=re.IGNORECASE)),
    }

rows = []
for dt_dir in sorted([p for p in LOCAL_AUDIT_DIR.glob("*") if p.is_dir()]):
    fsck_file = dt_dir / "fsck_data.txt"
    if not fsck_file.exists():
        continue
    txt = fsck_file.read_text(encoding="utf-8", errors="ignore")
    m = parse_fsck_text(txt)
    m["dt"] = dt_dir.name
    m["source_file"] = str(fsck_file)
    rows.append(m)

df_fsck = pd.DataFrame(rows)
if not df_fsck.empty:
    df_fsck = df_fsck[["dt", "CORRUPT", "MISSING", "UNDER_REPLICATED", "HEALTHY", "source_file"]].sort_values("dt")
else:
    df_fsck = pd.DataFrame(columns=["dt", "CORRUPT", "MISSING", "UNDER_REPLICATED", "HEALTHY", "source_file"])

print("Tabla de lectura de auditorias fsck:")
df_fsck


Tabla de lectura de auditorias fsck:


Unnamed: 0,dt,CORRUPT,MISSING,UNDER_REPLICATED,HEALTHY,source_file
0,2026-02-13,2,4,0,True,c:\Users\ferna\Documents\Proyectos\CursoIA\HDFSTarea\data-integrity-hdfs-lab\notebooks\audit\fsck\2026-02-13\fsck_da...


## 4) Tabla de metricas (tiempos/recursos)

Recursos: snapshot de `docker stats`.
Tiempos: se cargan desde `notebooks/metrics/phase_times.csv` si existe; si no, se crea plantilla para rellenar.


In [18]:
def parse_size_to_mib(value: str) -> float:
    value = value.strip()
    m = re.match(r"([0-9]+(?:\.[0-9]+)?)\s*([KMG]i?)?B", value)
    if not m:
        return np.nan
    num = float(m.group(1))
    unit = (m.group(2) or "").upper()
    factors = {
        "": 1 / (1024 * 1024),
        "K": 1 / 1024,
        "KI": 1 / 1024,
        "M": 1,
        "MI": 1,
        "G": 1024,
        "GI": 1024,
    }
    return num * factors.get(unit, np.nan)

def parse_net_to_mib(net_value: str):
    parts = [p.strip() for p in net_value.split("/")]
    if len(parts) != 2:
        return (np.nan, np.nan)
    return parse_size_to_mib(parts[0]), parse_size_to_mib(parts[1])

stats_cmd = "docker stats --no-stream --format '{{.Name}},{{.CPUPerc}},{{.MemUsage}},{{.NetIO}}'"
res = subprocess.run(stats_cmd, shell=True, capture_output=True, text=True)

stats_rows = []
if res.returncode == 0 and res.stdout.strip():
    for line in res.stdout.strip().splitlines():
        name, cpu, mem_usage, net_io = [x.strip() for x in line.split(",", 3)]
        mem_parts = [p.strip() for p in mem_usage.split("/")]
        mem_used_mib = parse_size_to_mib(mem_parts[0]) if mem_parts else np.nan
        net_in_mib, net_out_mib = parse_net_to_mib(net_io)
        cpu_pct = float(cpu.replace("%", "").strip()) if cpu.replace("%", "").strip() else np.nan
        stats_rows.append({
            "container": name,
            "cpu_pct": cpu_pct,
            "mem_used_mib": mem_used_mib,
            "net_in_mib": net_in_mib,
            "net_out_mib": net_out_mib,
        })

stats_df = pd.DataFrame(stats_rows)
print("Snapshot docker stats:")
stats_df


Snapshot docker stats:


Unnamed: 0,container,cpu_pct,mem_used_mib,net_in_mib,net_out_mib
0,'clustera-dnnm-1,0.6,902.1,4454.4,3348.48
1,'clustera-dnnm-3,0.52,921.2,4454.4,3624.96
2,'clustera-dnnm-2,0.58,764.0,,1.31
3,'namenode,0.14,490.8,2242.56,4454.4
4,'resourcemanager,0.51,600.8,3.14,


### 4.1 Generar `phase_times.csv` con medicion en tiempo real

Esta celda ejecuta fases del pipeline, toma muestras de `docker stats --no-stream` cada cierto intervalo y guarda `notebooks/metrics/phase_times.csv` con:
- `fase`
- `duracion_seg`
- `cpu_promedio_pct`
- `mem_promedio_mib`


In [24]:
import time
import subprocess
from pathlib import Path

# Configuracion
SAMPLE_INTERVAL_SEC = 2
CONTAINER_FILTERS = ("namenode", "resourcemanager", "dnnm", "clustera-dnnm")
PROJECT_ROOT = WORKDIR if (WORKDIR / "scripts").exists() else WORKDIR.parent
METRICS_DIR = WORKDIR / "metrics"
METRICS_DIR.mkdir(parents=True, exist_ok=True)
PHASE_TIMES_CSV = METRICS_DIR / "phase_times.csv"

PHASE_COMMANDS = [
    ("ingesta", "bash scripts/20_ingest_hdfs.sh"),
    ("auditoria_fsck", "bash scripts/30_fsck_audit.sh"),
    ("backup_copy", "bash scripts/40_backup_copy.sh"),
    ("incidente_recuperacion", "bash scripts/70_incident_simulation.sh && bash scripts/80_recovery_restore.sh"),
]

def _sample_stats_once():
    cmd = "docker stats --no-stream --format '{{.Name}},{{.CPUPerc}},{{.MemUsage}}'"
    res = subprocess.run(cmd, shell=True, capture_output=True, text=True)
    rows = []
    if res.returncode != 0 or not res.stdout.strip():
        return rows

    for line in res.stdout.strip().splitlines():
        name, cpu, mem_usage = [x.strip() for x in line.split(',', 2)]
        if not any(k in name for k in CONTAINER_FILTERS):
            continue
        mem_used = mem_usage.split('/')[0].strip()
        cpu_pct = float(cpu.replace('%', '').strip()) if cpu.replace('%', '').strip() else np.nan
        rows.append({
            "container": name,
            "cpu_pct": cpu_pct,
            "mem_used_mib": parse_size_to_mib(mem_used),
        })
    return rows

def _run_phase_with_sampling(label, command):
    samples = []
    start = time.time()
    proc = subprocess.Popen(command, shell=True, cwd=str(PROJECT_ROOT))

    while proc.poll() is None:
        samples.extend(_sample_stats_once())
        time.sleep(SAMPLE_INTERVAL_SEC)

    # Muestra final al terminar
    samples.extend(_sample_stats_once())

    end = time.time()
    duration = round(end - start, 2)
    rc = proc.returncode

    if samples:
        s_df = pd.DataFrame(samples)
        cpu_avg = round(float(s_df["cpu_pct"].mean()), 3)
        mem_avg = round(float(s_df["mem_used_mib"].mean()), 3)
    else:
        cpu_avg = np.nan
        mem_avg = np.nan

    return {
        "fase": label,
        "duracion_seg": duration,
        "cpu_promedio_pct": cpu_avg,
        "mem_promedio_mib": mem_avg,
        "return_code": rc,
        "muestras": len(samples),
    }

rows = []
for phase, cmd in PHASE_COMMANDS:
    print(f"[metrics] Ejecutando fase: {phase}")
    out = _run_phase_with_sampling(phase, cmd)
    rows.append(out)
    print(f"[metrics] {phase}: duracion={out['duracion_seg']}s cpu_avg={out['cpu_promedio_pct']} mem_avg={out['mem_promedio_mib']} rc={out['return_code']}")

phase_times_df = pd.DataFrame(rows)
phase_times_df.to_csv(PHASE_TIMES_CSV, index=False)
print("CSV generado:", PHASE_TIMES_CSV)
phase_times_df


[metrics] Ejecutando fase: ingesta
[metrics] ingesta: duracion=6.13s cpu_avg=0.559 mem_avg=760.63 rc=1
[metrics] Ejecutando fase: auditoria_fsck
[metrics] auditoria_fsck: duracion=6.1s cpu_avg=0.859 mem_avg=761.05 rc=1
[metrics] Ejecutando fase: backup_copy
[metrics] backup_copy: duracion=6.1s cpu_avg=1.1 mem_avg=760.97 rc=1
[metrics] Ejecutando fase: incidente_recuperacion
[metrics] incidente_recuperacion: duracion=6.11s cpu_avg=0.74 mem_avg=761.29 rc=1
CSV generado: c:\Users\ferna\Documents\Proyectos\CursoIA\HDFSTarea\data-integrity-hdfs-lab\notebooks\metrics\phase_times.csv


Unnamed: 0,fase,duracion_seg,cpu_promedio_pct,mem_promedio_mib,return_code,muestras
0,ingesta,6.13,0.559,760.63,1,10
1,auditoria_fsck,6.1,0.859,761.05,1,10
2,backup_copy,6.1,1.1,760.97,1,10
3,incidente_recuperacion,6.11,0.74,761.29,1,10


## 5) Conclusiones y recomendaciones


In [26]:
conclusiones = []

if df_fsck.empty:
    conclusiones.append("No se encontraron auditorias fsck locales. Ejecuta scripts/30_fsck_audit.sh y vuelve a correr la extraccion.")
else:
    ult = df_fsck.sort_values("dt").iloc[-1]
    if int(ult["CORRUPT"]) == 0 and int(ult["MISSING"]) == 0:
        conclusiones.append(f"La ultima auditoria ({ult['dt']}) no muestra CORRUPT ni MISSING.")
    else:
        conclusiones.append(f"La ultima auditoria ({ult['dt']}) tiene incidencias: CORRUPT={int(ult['CORRUPT'])}, MISSING={int(ult['MISSING'])}.")

    if int(ult["UNDER_REPLICATED"]) > 0:
        conclusiones.append("Hay bloques under_replicated: revisar DataNodes vivos y factor de replicacion.")

if phase_times_df["duracion_seg"].notna().any():
    fase_lenta = phase_times_df.loc[phase_times_df["duracion_seg"].idxmax(), "fase"]
    conclusiones.append(f"La fase mas lenta registrada es: {fase_lenta}.")
else:
    conclusiones.append("Faltan tiempos por fase: completa notebooks/metrics/phase_times.csv para cerrar analisis de coste.")

recomendaciones = [
    "Mantener ejecucion periodica de scripts/30_fsck_audit.sh y guardar evidencia por fecha.",
    "Usar docker stats durante ingesta/backup para medir coste real de integridad.",
    "Definir frecuencia de auditoria segun criticidad de datos y coste operativo.",
]

print("Conclusiones:")
for i, c in enumerate(conclusiones, 1):
    print(f"{i}. {c}")

print("\nRecomendaciones:")
for i, r in enumerate(recomendaciones, 1):
    print(f"{i}. {r}")


Conclusiones:
1. La ultima auditoria (2026-02-13) tiene incidencias: CORRUPT=2, MISSING=4.
2. La fase mas lenta registrada es: ingesta.

Recomendaciones:
1. Mantener ejecucion periodica de scripts/30_fsck_audit.sh y guardar evidencia por fecha.
2. Usar docker stats durante ingesta/backup para medir coste real de integridad.
3. Definir frecuencia de auditoria segun criticidad de datos y coste operativo.


## 6) Exportación

In [27]:
out_fsck = WORKDIR / "fsck_resumen_notebook.csv"
out_metrics = WORKDIR / "metricas_notebook.csv"
df_fsck.to_csv(out_fsck, index=False)
phase_times_df.to_csv(out_metrics, index=False)

print("Exportado:")
print("-", out_fsck)
print("-", out_metrics)


Exportado:
- c:\Users\ferna\Documents\Proyectos\CursoIA\HDFSTarea\data-integrity-hdfs-lab\notebooks\fsck_resumen_notebook.csv
- c:\Users\ferna\Documents\Proyectos\CursoIA\HDFSTarea\data-integrity-hdfs-lab\notebooks\metricas_notebook.csv
