# Entrega - Auditoria fsck + metricas + conclusiones

Notebook de entrega sin hardcodear metricas: 
1. Lee auditorias fsck desde HDFS
2. Carga tabla de metricas reales desde CSV
3. Genera resumen y conclusiones


In [19]:
from __future__ import annotations

import io
import re
import subprocess
from pathlib import Path

import pandas as pd


## 1) Lectura de auditorias fsck

Busca la ultima fecha disponible en `/audit/fsck` y lee los resumentes CSV.


In [20]:
NN_CONTAINER = "namenode"

def run(cmd: str) -> str:
    return subprocess.check_output(cmd, shell=True, text=True, stderr=subprocess.STDOUT)

def hdfs_cat(path: str) -> str | None:
    cmd = f'docker exec {NN_CONTAINER} bash -lc "hdfs dfs -cat {path}"'
    try:
        return run(cmd)
    except subprocess.CalledProcessError:
        return None

def latest_fsck_dt() -> str | None:
    cmd = f'docker exec {NN_CONTAINER} bash -lc "hdfs dfs -ls /audit/fsck"'
    try:
        out = run(cmd)
    except subprocess.CalledProcessError:
        return None
    dts = []
    for line in out.splitlines():
        parts = line.split()
        if len(parts) >= 8:
            path = parts[-1]
            dt = path.rsplit('/', 1)[-1]
            if re.match(r'^\d{4}-\d{2}-\d{2}$', dt):
                dts.append(dt)
    return sorted(dts)[-1] if dts else None

DT = latest_fsck_dt()
print('DT detectada:', DT)


DT detectada: 2026-02-04


In [21]:
rows = []
if DT is not None:
    for target in ["data", "backup"]:
        csv_path = f"/audit/fsck/{DT}/fsck_{target}_summary.csv"
        raw = hdfs_cat(csv_path)
        if raw:
            tmp = pd.read_csv(io.StringIO(raw))
            metric_map = {r.metric.upper(): int(r.count) for r in tmp.itertuples()}
            rows.append({
                "dt": DT,
                "scope": target,
                "CORRUPT": metric_map.get("CORRUPT", 0),
                "MISSING": metric_map.get("MISSING", 0),
                "UNDER_REPLICATED": metric_map.get("UNDER_REPLICATED", 0),
                "source": csv_path,
            })

fsck_table = pd.DataFrame(rows)
if fsck_table.empty:
    fsck_table = pd.DataFrame(columns=["dt","scope","CORRUPT","MISSING","UNDER_REPLICATED","source"])

fsck_table


Unnamed: 0,dt,scope,CORRUPT,MISSING,UNDER_REPLICATED,source
0,2026-02-04,data,0,0,0,/audit/fsck/2026-02-04/fsck_data_summary.csv
1,2026-02-04,backup,0,0,0,/audit/fsck/2026-02-04/fsck_backup_summary.csv


## 2) Generacion automatica de metricas reales

Estas celdas permiten ejecutar el pipeline y generar `notebooks/output/metrics_raw.csv`
con duracion, CPU promedio y memoria promedio medidas en tiempo real con `docker stats`.


In [22]:
import time

def parse_mem_to_mb(mem_usage: str) -> float:
    left = mem_usage.split('/')[0].strip().replace(' ', '')
    m = re.match(r'([0-9]+(?:\.[0-9]+)?)([KMG]iB)', left)
    if not m:
        return 0.0
    value = float(m.group(1))
    unit = m.group(2)
    if unit == 'GiB':
        return value * 1024.0
    if unit == 'MiB':
        return value
    if unit == 'KiB':
        return value / 1024.0
    return 0.0

def sample_cluster_stats() -> tuple[float, float]:
    cmd = "docker stats --no-stream --format '{{.Name}},{{.CPUPerc}},{{.MemUsage}}'"
    out = run(cmd)
    cpus = []
    mems = []
    for line in out.strip().splitlines():
        parts = line.split(',', 2)
        if len(parts) != 3:
            continue
        name, cpu, mem = parts
        if not (name == 'namenode' or 'dnnm' in name):
            continue
        try:
            cpus.append(float(cpu.replace('%', '').strip()))
            mems.append(parse_mem_to_mb(mem))
        except ValueError:
            continue
    if not cpus:
        return (0.0, 0.0)
    return (sum(cpus) / len(cpus), sum(mems) / len(mems))

def run_phase_metric(fase: str, cmd: str, sample_interval_s: float = 1.0) -> dict:
    print(f'[metric] running {fase}: {cmd}')
    start = time.time()
    proc = subprocess.Popen(cmd, shell=True, executable='/bin/bash')
    cpu_samples = []
    mem_samples = []

    while proc.poll() is None:
        cpu, mem = sample_cluster_stats()
        cpu_samples.append(cpu)
        mem_samples.append(mem)
        time.sleep(sample_interval_s)

    rc = proc.wait()
    duration_s = round(time.time() - start, 2)
    cpu_avg = round(sum(cpu_samples) / len(cpu_samples), 3) if cpu_samples else 0.0
    mem_avg = round(sum(mem_samples) / len(mem_samples), 3) if mem_samples else 0.0
    note = 'OK' if rc == 0 else f'FAILED({rc})'
    return {
        'fase': fase,
        'duracion_s': duration_s,
        'cpu_promedio_pct': cpu_avg,
        'mem_promedio_mb': mem_avg,
        'nota': note,
    }

def run_all_metrics(phases: list[tuple[str, str]], out_csv: Path) -> pd.DataFrame:
    rows = []
    for fase, cmd in phases:
        rows.append(run_phase_metric(fase, cmd))
    df = pd.DataFrame(rows)
    out_csv.parent.mkdir(parents=True, exist_ok=True)
    df.to_csv(out_csv, index=False)
    print(f'[metric] saved: {out_csv}')
    return df


In [23]:
RUN_BENCHMARK = True  # ejecuta y genera metrics_raw.csv
METRICS_OUT = Path('notebooks/output/metrics_raw.csv')

PHASES = [
    ('ingesta_hdfs', 'bash scripts/20_ingest_hdfs.sh'),
    ('fsck_auditoria', 'bash scripts/30_fsck_audit.sh'),
    ('backup_copy', 'bash scripts/40_backup_copy.sh'),
    ('incident_simulation', 'bash scripts/70_incident_simulation.sh'),
    ('recovery_restore', 'bash scripts/80_recovery_restore.sh'),
]

if RUN_BENCHMARK:
    metrics_generated = run_all_metrics(PHASES, METRICS_OUT)
    metrics_generated
else:
    print('Para generar metricas reales: cambia RUN_BENCHMARK a True y ejecuta esta celda.')
    print(f'Se esperaba archivo: {METRICS_OUT}')


[metric] running ingesta_hdfs: bash scripts/20_ingest_hdfs.sh


bash: scripts/20_ingest_hdfs.sh: No existe el fichero o el directorio


[metric] running fsck_auditoria: bash scripts/30_fsck_audit.sh


bash: scripts/30_fsck_audit.sh: No existe el fichero o el directorio


[metric] running backup_copy: bash scripts/40_backup_copy.sh


bash: scripts/40_backup_copy.sh: No existe el fichero o el directorio


[metric] running incident_simulation: bash scripts/70_incident_simulation.sh


bash: scripts/70_incident_simulation.sh: No existe el fichero o el directorio


[metric] running recovery_restore: bash scripts/80_recovery_restore.sh


bash: scripts/80_recovery_restore.sh: No existe el fichero o el directorio


[metric] saved: notebooks/output/metrics_raw.csv


## 3) Tabla de metricas (tiempos/recursos)

Carga `notebooks/output/metrics_raw.csv` generado por la seccion anterior.


In [24]:
metrics_path = Path('notebooks/output/metrics_raw.csv')
required_cols = ["fase", "duracion_s", "cpu_promedio_pct", "mem_promedio_mb"]

if metrics_path.exists():
    metrics = pd.read_csv(metrics_path)
    missing = [c for c in required_cols if c not in metrics.columns]
    if missing:
        raise ValueError(f'Faltan columnas en {metrics_path}: {missing}')
else:
    metrics = pd.DataFrame(columns=required_cols + ["nota"])

metrics


Unnamed: 0,fase,duracion_s,cpu_promedio_pct,mem_promedio_mb,nota
0,ingesta_hdfs,3.03,0.126,774.02,FAILED(127)
1,fsck_auditoria,3.03,0.122,774.02,FAILED(127)
2,backup_copy,3.03,0.132,774.02,FAILED(127)
3,incident_simulation,3.03,0.124,774.0,FAILED(127)
4,recovery_restore,3.03,0.12,774.4,FAILED(127)


In [25]:
if metrics.empty:
    summary = pd.DataFrame(columns=["metrica", "valor"])
else:
    summary = pd.DataFrame([
        {"metrica": "tiempo_total_s", "valor": float(metrics['duracion_s'].sum())},
        {"metrica": "cpu_max_pct", "valor": float(metrics['cpu_promedio_pct'].max())},
        {"metrica": "mem_max_mb", "valor": float(metrics['mem_promedio_mb'].max())},
    ])
summary


Unnamed: 0,metrica,valor
0,tiempo_total_s,15.15
1,cpu_max_pct,0.132
2,mem_max_mb,774.4


## 4) Conclusiones y recomendaciones

Plantilla basada en resultados reales del notebook:

- Integridad: revisar `fsck_table` y confirmar valores de `CORRUPT`, `MISSING`, `UNDER_REPLICATED`.
- Coste: usar `metrics` para identificar fase mas costosa (tiempo) y mayor carga (CPU/MEM).
- Replicacion recomendada: justificar con trade-off coste/riesgo observado.
- Frecuencia de auditoria: diaria/semanal segun criticidad del dato.

Si ya tienes metricas y fsck correctos, puedes dejar una conclusion final breve en una celda markdown adicional.


In [26]:
out_dir = Path('notebooks/output')
out_dir.mkdir(parents=True, exist_ok=True)
if not fsck_table.empty:
    fsck_table.to_csv(out_dir / f'fsck_table_{DT}.csv', index=False)
if not metrics.empty:
    metrics.to_csv(out_dir / f'metrics_table_{DT if DT else "latest"}.csv', index=False)
if not summary.empty:
    summary.to_csv(out_dir / f'metrics_summary_{DT if DT else "latest"}.csv', index=False)
sorted(str(p) for p in out_dir.glob('*.csv'))


['notebooks/output/fsck_table_2026-02-04.csv',
 'notebooks/output/metrics_raw.csv',
 'notebooks/output/metrics_summary_2026-02-04.csv',
 'notebooks/output/metrics_table_2026-02-04.csv']