In [1]:
from __future__ import annotations

import argparse
import json
import os
import sys
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, Optional, Tuple

In [None]:
def _rotr(x: int, n: int) -> int:
    return ((x >> n) | (x << (32 - n))) & 0xFFFFFFFF

def _ch(x: int, y: int, z: int) -> int:
    return (x & y) ^ (~x & z)

def _maj(x: int, y: int, z: int) -> int:
    return (x & y) ^ (x & z) ^ (y & z)

def _big_sigma0(x: int) -> int:
    return _rotr(x, 2) ^ _rotr(x, 13) ^ _rotr(x, 22)

def _big_sigma1(x: int) -> int:
    return _rotr(x, 6) ^ _rotr(x, 11) ^ _rotr(x, 25)

def _small_sigma0(x: int) -> int:
    return _rotr(x, 7) ^ _rotr(x, 18) ^ (x >> 3)

def _small_sigma1(x: int) -> int:
    return _rotr(x, 17) ^ _rotr(x, 19) ^ (x >> 10)


_K = [
    0x428a2f98, 0x71374491, 0xb5c0fbcf, 0xe9b5dba5, 0x3956c25b, 0x59f111f1, 0x923f82a4, 0xab1c5ed5,
    0xd807aa98, 0x12835b01, 0x243185be, 0x550c7dc3, 0x72be5d74, 0x80deb1fe, 0x9bdc06a7, 0xc19bf174,
    0xe49b69c1, 0xefbe4786, 0x0fc19dc6, 0x240ca1cc, 0x2de92c6f, 0x4a7484aa, 0x5cb0a9dc, 0x76f988da,
    0x983e5152, 0xa831c66d, 0xb00327c8, 0xbf597fc7, 0xc6e00bf3, 0xd5a79147, 0x06ca6351, 0x14292967,
    0x27b70a85, 0x2e1b2138, 0x4d2c6dfc, 0x53380d13, 0x650a7354, 0x766a0abb, 0x81c2c92e, 0x92722c85,
    0xa2bfe8a1, 0xa81a664b, 0xc24b8b70, 0xc76c51a3, 0xd192e819, 0xd6990624, 0xf40e3585, 0x106aa070,
    0x19a4c116, 0x1e376c08, 0x2748774c, 0x34b0bcb5, 0x391c0cb3, 0x4ed8aa4a, 0x5b9cca4f, 0x682e6ff3,
    0x748f82ee, 0x78a5636f, 0x84c87814, 0x8cc70208, 0x90befffa, 0xa4506ceb, 0xbef9a3f7, 0xc67178f2,
]


class SHA256:
    __slots__ = ("_h", "_buffer", "_bit_len")

    def __init__(self) -> None:
        # IV inicial
        self._h = [
            0x6a09e667,
            0xbb67ae85,
            0x3c6ef372,
            0xa54ff53a,
            0x510e527f,
            0x9b05688c,
            0x1f83d9ab,
            0x5be0cd19,
        ]
        self._buffer = bytearray()
        self._bit_len = 0  # longitud total del mensaje en bits

    def update(self, data: bytes) -> "SHA256":
        if not data:
            return self

        self._buffer.extend(data)
        self._bit_len += len(data) * 8

        # procesar bloques completos de 64 bytes
        while len(self._buffer) >= 64:
            block = bytes(self._buffer[:64])
            del self._buffer[:64]
            self._compress(block)

        return self

    def digest(self) -> bytes:
        # Copia del estado para no destruir el objeto al finalizar
        h_copy = self._h[:]
        buf_copy = bytes(self._buffer)
        bit_len_copy = self._bit_len

        # padding
        padded = bytearray(buf_copy)
        padded.append(0x80)
        while (len(padded) % 64) != 56:
            padded.append(0x00)
        padded.extend(bit_len_copy.to_bytes(8, "big"))

        # comprimir bloques finales
        for i in range(0, len(padded), 64):
            self._compress(bytes(padded[i:i+64]))

        out = b"".join(x.to_bytes(4, "big") for x in self._h)

        # restaurar estado
        self._h = h_copy
        self._buffer = bytearray(buf_copy)
        self._bit_len = bit_len_copy

        return out

    def hexdigest(self) -> str:
        return self.digest().hex()

    def _compress(self, block: bytes) -> None:
        # mensaje -> W[0..63]
        w = [0] * 64
        for i in range(16):
            w[i] = int.from_bytes(block[i*4:(i+1)*4], "big")
        for i in range(16, 64):
            w[i] = (_small_sigma1(w[i-2]) + w[i-7] + _small_sigma0(w[i-15]) + w[i-16]) & 0xFFFFFFFF

        a, b, c, d, e, f, g, h = self._h

        for i in range(64):
            t1 = (h + _big_sigma1(e) + _ch(e, f, g) + _K[i] + w[i]) & 0xFFFFFFFF
            t2 = (_big_sigma0(a) + _maj(a, b, c)) & 0xFFFFFFFF

            h = g
            g = f
            f = e
            e = (d + t1) & 0xFFFFFFFF
            d = c
            c = b
            b = a
            a = (t1 + t2) & 0xFFFFFFFF

        self._h[0] = (self._h[0] + a) & 0xFFFFFFFF
        self._h[1] = (self._h[1] + b) & 0xFFFFFFFF
        self._h[2] = (self._h[2] + c) & 0xFFFFFFFF
        self._h[3] = (self._h[3] + d) & 0xFFFFFFFF
        self._h[4] = (self._h[4] + e) & 0xFFFFFFFF
        self._h[5] = (self._h[5] + f) & 0xFFFFFFFF
        self._h[6] = (self._h[6] + g) & 0xFFFFFFFF
        self._h[7] = (self._h[7] + h) & 0xFFFFFFFF


def sha256_bytes(data: bytes) -> str:
    return SHA256().update(data).hexdigest()


def sha256_file(path: Path, chunk_size: int = 1024 * 1024) -> Tuple[str, int, float]:
    """
    Retorna: (hexdigest, size_bytes, mtime_epoch)
    """
    hasher = SHA256()
    st = path.stat()
    with path.open("rb") as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            hasher.update(chunk)
    return hasher.hexdigest(), st.st_size, st.st_mtime


# -----------------------------
# Manifest (registro) de hashes
# -----------------------------

@dataclass
class ManifestEntry:
    sha256: str
    size_bytes: int
    mtime_epoch: float
    approved_at_utc: str


class IntegrityManifest:
    def __init__(self, manifest_path: Path) -> None:
        self.manifest_path = manifest_path
        self.data: Dict[str, ManifestEntry] = {}
        self._load()

    def _load(self) -> None:
        if not self.manifest_path.exists():
            self.data = {}
            return
        raw = json.loads(self.manifest_path.read_text(encoding="utf-8"))
        files = raw.get("files", {})
        out: Dict[str, ManifestEntry] = {}
        for p, v in files.items():
            out[p] = ManifestEntry(
                sha256=v["sha256"],
                size_bytes=int(v.get("size_bytes", 0)),
                mtime_epoch=float(v.get("mtime_epoch", 0.0)),
                approved_at_utc=v.get("approved_at_utc", ""),
            )
        self.data = out

    def save(self) -> None:
        payload = {
            "algo": "SHA-256",
            "created_at_utc": datetime.now(timezone.utc).isoformat(),
            "files": {
                p: {
                    "sha256": e.sha256,
                    "size_bytes": e.size_bytes,
                    "mtime_epoch": e.mtime_epoch,
                    "approved_at_utc": e.approved_at_utc,
                }
                for p, e in sorted(self.data.items())
            },
        }
        self.manifest_path.parent.mkdir(parents=True, exist_ok=True)
        tmp = self.manifest_path.with_suffix(self.manifest_path.suffix + ".tmp")
        tmp.write_text(json.dumps(payload, indent=2, ensure_ascii=False), encoding="utf-8")
        os.replace(tmp, self.manifest_path)

    def baseline(self, file_path: Path) -> ManifestEntry:
        h, size, mtime = sha256_file(file_path)
        entry = ManifestEntry(
            sha256=h,
            size_bytes=size,
            mtime_epoch=mtime,
            approved_at_utc=datetime.now(timezone.utc).isoformat(),
        )
        self.data[str(file_path.resolve())] = entry
        return entry

    def verify(self, file_path: Path) -> Tuple[bool, Optional[ManifestEntry], Optional[str]]:
        key = str(file_path.resolve())
        ref = self.data.get(key)
        if ref is None:
            return False, None, "NO_BASELINE"

        current_hash, size, mtime = sha256_file(file_path)
        if current_hash == ref.sha256:
            return True, ref, None

        # Mismatch: retornamos info para alerta
        details = (
            f"MISMATCH sha256_ref={ref.sha256} sha256_cur={current_hash} "
            f"size_ref={ref.size_bytes} size_cur={size} "
            f"mtime_ref={ref.mtime_epoch} mtime_cur={mtime}"
        )
        return False, ref, details


# -----------------------------
# CLI
# -----------------------------

def _self_test() -> None:
    # Vectores básicos conocidos
    if sha256_bytes(b"") != "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855":
        raise RuntimeError("Self-test SHA256 falló para mensaje vacío")
    if sha256_bytes(b"abc") != "ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad":
        raise RuntimeError("Self-test SHA256 falló para 'abc'")


def cmd_baseline(args: argparse.Namespace) -> int:
    manifest = IntegrityManifest(Path(args.manifest))
    p = Path(args.file)
    if not p.exists() or not p.is_file():
        print(f"ERROR: archivo no existe: {p}", file=sys.stderr)
        return 2

    entry = manifest.baseline(p)
    manifest.save()
    print(f"BASELINE OK\nfile={p.resolve()}\nsha256={entry.sha256}\nsize_bytes={entry.size_bytes}\napproved_at_utc={entry.approved_at_utc}")
    return 0


def cmd_verify(args: argparse.Namespace) -> int:
    manifest = IntegrityManifest(Path(args.manifest))

    targets = []
    if args.all:
        targets = [Path(k) for k in manifest.data.keys()]
        if not targets:
            print("ERROR: manifest vacío (no hay baseline).", file=sys.stderr)
            return 2
    else:
        p = Path(args.file)
        targets = [p]

    ok_all = True
    for p in targets:
        if not p.exists() or not p.is_file():
            ok_all = False
            print(f"ALERT NOFILE file={p.resolve()}", file=sys.stderr)
            continue

        ok, ref, err = manifest.verify(p)
        if ok:
            print(f"OK file={p.resolve()} sha256={ref.sha256}")
        else:
            ok_all = False
            if err == "NO_BASELINE":
                print(f"ALERT NO_BASELINE file={p.resolve()}", file=sys.stderr)
            else:
                print(f"ALERT INTEGRITY_FAIL file={p.resolve()} {err}", file=sys.stderr)

    return 0 if ok_all else 1


def cmd_scan_dir(args: argparse.Namespace) -> int:
    """
    Baseline/verify para un directorio (útil para verificación periódica).
    """
    manifest = IntegrityManifest(Path(args.manifest))
    base_dir = Path(args.dir).resolve()
    if not base_dir.exists() or not base_dir.is_dir():
        print(f"ERROR: directorio no existe: {base_dir}", file=sys.stderr)
        return 2

    exts = tuple(x.lower() for x in (args.ext or []))
    files = []
    for root, _, fnames in os.walk(base_dir):
        for fn in fnames:
            p = Path(root) / fn
            if exts:
                if p.suffix.lower() not in exts:
                    continue
            files.append(p)

    if args.mode == "baseline":
        for p in files:
            manifest.baseline(p)
        manifest.save()
        print(f"BASELINE DIR OK dir={base_dir} files={len(files)} manifest={Path(args.manifest).resolve()}")
        return 0

    # verify
    ok_all = True
    for p in files:
        ok, _, err = manifest.verify(p)
        if ok:
            print(f"OK file={p.resolve()}")
        else:
            ok_all = False
            if err == "NO_BASELINE":
                print(f"ALERT NO_BASELINE file={p.resolve()}", file=sys.stderr)
            else:
                print(f"ALERT INTEGRITY_FAIL file={p.resolve()} {err}", file=sys.stderr)
    return 0 if ok_all else 1


def build_parser() -> argparse.ArgumentParser:
    ap = argparse.ArgumentParser(prog="integrity_guard", description="Control de integridad de planos/documentos con SHA-256 (implementación desde 0).")
    ap.add_argument("--manifest", default="integrity_manifest.json", help="Ruta del manifest JSON (default: integrity_manifest.json)")

    sub = ap.add_subparsers(dest="cmd", required=True)

    p1 = sub.add_parser("baseline", help="Registrar hash SHA-256 de un archivo aprobado (baseline)")
    p1.add_argument("file", help="Ruta del archivo aprobado")
    p1.set_defaults(func=cmd_baseline)

    p2 = sub.add_parser("verify", help="Verificar integridad comparando contra el baseline")
    g = p2.add_mutually_exclusive_group(required=True)
    g.add_argument("--all", action="store_true", help="Verificar todos los archivos presentes en el manifest")
    g.add_argument("--file", help="Verificar un archivo específico (misma ruta que el baseline)")
    p2.set_defaults(func=cmd_verify)

    p3 = sub.add_parser("scan-dir", help="Baseline o verificación masiva de un directorio")
    p3.add_argument("mode", choices=["baseline", "verify"], help="baseline: registra hashes; verify: compara contra manifest")
    p3.add_argument("dir", help="Directorio raíz a escanear")
    p3.add_argument("--ext", nargs="*", default=[], help="Filtrar por extensiones (ej: .pdf .dwg .dxf)")
    p3.set_defaults(func=cmd_scan_dir)

    return ap

def main() -> int:
    _self_test()
    ap = build_parser()
    args = ap.parse_args()
    return int(args.func(args))


if __name__ == "__main__":
    raise SystemExit(main())