# UD3 ‚Äì Actividad 3.2: Control de calidad de ficheros CSV con firma

**Nombre y apellidos del alumno:**  
*(Completa este apartado antes de comenzar la actividad)*

## Introducci√≥n

La empresa **RetailCorp** opera en varios pa√≠ses europeos y gestiona su inventario de productos de forma distribuida.

Cada d√≠a, los distintos almacenes env√≠an un *snapshot* del estado de su inventario en formato CSV a un sistema centralizado de anal√≠tica y planificaci√≥n.

Durante un incidente reciente, se detect√≥ que algunos ficheros conten√≠an inconsistencias en su contenido y errores de integridad de archivo (hash incorrecto).

El √°rea de **Data Engineering** ha decidido implementar un **control de calidad de entrada en el pipeline**, de forma que solo los ficheros correctos puedan avanzar hacia las etapas posteriores.

El objetivo de esta pr√°ctica es **dise√±ar e implementar un control de calidad de ficheros CSV con verificaci√≥n de firma (SHA256/MD5)** de forma reproducible y automatizable.

## Dataset de trabajo

Se proporciona un fichero comprimido `RetailCorp-dataset-sample.tar.gz` que contiene una muestra de ficheros CSV de inventario junto con sus firmas.

### Contenido
- CSV: snapshots de inventario con campos `product_id`, `warehouse_id`, `snapshot_date`, `stock_units`, `reserved_units`.
- Firma: archivos `.sha256` o `.md5` correspondientes a cada CSV.

El alumno deber√° extraer los ficheros y procesarlos siguiendo los pasos de la actividad.

# **Tareas a realizar**

### Tarea 1 ‚Äì Preparaci√≥n del entorno

Crear los directorios de trabajo (`incoming_raw`, `input`, `rejected`, `archive`) y extraer los ficheros del dataset proporcionado.
Se proporciona una funci√≥n de preparaci√≥n del entorno, no hay que modificar nada  

Aqu√≠ solo hay que analizar el c√≥digo para familiarizarse con la estructura de directorios


In [1]:
import os
import subprocess

def prepare_environment(base_dir="data_pipeline",
                        dataset="RetailCorp-dataset-sample.tar.gz"):
    """
    Prepara la estructura de directorios y extrae el dataset inicial.
    """

    os.makedirs(f"{base_dir}/incoming_raw", exist_ok=True)
    os.makedirs(f"{base_dir}/input", exist_ok=True)
    os.makedirs(f"{base_dir}/rejected", exist_ok=True)
    os.makedirs(f"{base_dir}/archive", exist_ok=True)
    os.makedirs(f"{base_dir}/logs", exist_ok=True)

    subprocess.run(
        ["tar", "-xzf", dataset, "-C", f"{base_dir}/incoming_raw"],
        check=True
    )


def reset_data_pipeline(base_dir="data_pipeline"):
    """
    Elimina completamente el directorio de trabajo del pipeline
    para permitir una ejecuci√≥n limpia del proceso.
    """
    if os.path.exists(base_dir):
        subprocess.run(
            ["rm", "-rf", base_dir],
            check=True
        )

reset_data_pipeline()
prepare_environment()

### Tarea 2 ‚Äì Implementar la funcionalidad de gesti√≥n de la integridad en los ficheros de entrada

Se recomienda ir implement√°ndolo en varias funciones que permitan dividir y estructurar el proceso.  
Puede realizarse en varias celdas.  

Registrar en un fichero de log la informaci√≥n de cada fichero procesado, indicando:
- Nombre de fichero
- Estado (`OK`, `REJECTED`, `ERROR`)
- Mensaje asociado

Mover los ficheros correctos al directorio `input`, los rechazados a `rejected` y archivar las firmas en `archive`.

In [2]:
import hashlib
import binascii

def calculate_hash(file_path, algorithm='sha256', chunk_size=4096):
    if algorithm == 'sha256':
        hash_func = hashlib.sha256()
        with open(file_path, 'rb') as f:
            for chunk in iter(lambda: f.read(chunk_size), b''):
                hash_func.update(chunk)
        return hash_func.hexdigest()

    elif algorithm == 'md5':
        hash_func = hashlib.md5()
        with open(file_path, 'rb') as f:
            for chunk in iter(lambda: f.read(chunk_size), b''):
                hash_func.update(chunk)
        return hash_func.hexdigest()

    else:
        raise ValueError("Algoritmo no soportado.")

# Funci√≥n copiada del tutorial, mediante la cual calculamos el hash del fichero y detectar cambios o fallos. 

In [3]:
def read_signature(sig_path):

    with open(sig_path, 'r') as f:
        content = f.read().strip()
    # El hash es la primera palabra de la l√≠nea
    expected_hash = content.split()[0]
    
    if sig_path.endswith('.sha256'):
        algorithm = 'sha256'
    elif sig_path.endswith('.md5'):
        algorithm = 'md5'
    else:
        raise ValueError(f"Extensi√≥n de firma no reconocida: {sig_path}")
    
    return expected_hash, algorithm

# Funci√≥n para leer el fichero asociado al csv y extraer el hash para comprobarlo

In [4]:
import datetime

LOG_FILE = "data_pipeline/logs/integrity_check.log"

def log_event(filename, status, message):

    timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    line = f"{timestamp} | {filename} | {status} | {message}\n"
    with open(LOG_FILE, 'a') as f:
        f.write(line)
    print(line.strip())

# Funci√≥n para procesar el fichero y registrar el estado y usar un timestamp

In [5]:
import shutil
import glob

def process_files(base_dir="data_pipeline"):
    incoming = f"{base_dir}/incoming_raw"
    input_dir = f"{base_dir}/input"
    rejected_dir = f"{base_dir}/rejected"
    archive_dir = f"{base_dir}/archive"

    # Buscamos todos los ficheros CSV en incoming_raw (sin duplicados)
    csv_files = list(set(
        glob.glob(f"{incoming}/**/*.csv", recursive=True) +
        glob.glob(f"{incoming}/*.csv")
    ))

    if not csv_files:
        print("No se encontraron ficheros CSV en incoming_raw.")
        return

    for csv_path in csv_files:
        filename = os.path.basename(csv_path)

        sig_path = None
        for ext in ['.sha256', '.md5']:
            candidate = csv_path + ext
            if os.path.exists(candidate):
                sig_path = candidate
                break

        if sig_path is None:
            shutil.move(csv_path, os.path.join(rejected_dir, filename))
            log_event(filename, "REJECTED", "No se encontr√≥ fichero de firma asociado")
            continue

        try:
            expected_hash, algorithm = read_signature(sig_path)
            calculated_hash = calculate_hash(csv_path, algorithm=algorithm)
            sig_filename = os.path.basename(sig_path)

            if calculated_hash == expected_hash:
                shutil.move(csv_path, os.path.join(input_dir, filename))
                shutil.move(sig_path, os.path.join(archive_dir, sig_filename))
                log_event(filename, "OK", f"Integridad verificada ({algorithm}): {calculated_hash}")
            else:
                shutil.move(csv_path, os.path.join(rejected_dir, filename))
                shutil.move(sig_path, os.path.join(archive_dir, sig_filename))
                log_event(filename, "REJECTED",
                          f"Hash incorrecto ({algorithm}). Esperado: {expected_hash} | Calculado: {calculated_hash}")

        except Exception as e:
            log_event(filename, "ERROR", str(e))

In [6]:
process_files()

2026-02-18 16:48:36 | stock_snapshot_ES_WH_02_20260129.csv | REJECTED | Hash incorrecto (sha256). Esperado: 31b69a91ae0a42b48373b69279573b9d2612eb4dc9f7c142e1deb81f8e8d80ab | Calculado: 451aba4bcdcff581344f2bba735e703a94b15f63c7f5049deb3e36559afb2120
2026-02-18 16:48:36 | stock_snapshot_ES_WH_06_20260127.csv | REJECTED | Hash incorrecto (md5). Esperado: 854e5d63a7eaf0f7f11e10cb39b653cb | Calculado: 3570c6d4dd3028300334fa474fd6e209
2026-02-18 16:48:36 | stock_snapshot_FR_WH_02_20260129.csv | REJECTED | Hash incorrecto (sha256). Esperado: a7039cf19f7555f9f11b600adc18b7b2c215b7ecd89122db08df90dea845abcd | Calculado: 59271123818f76ead8b8d1ceec36f5825027003e9ac500d3654432dcc05a36f2
2026-02-18 16:48:36 | stock_snapshot_ES_WH_04_20260128.csv | OK | Integridad verificada (sha256): 3b8d3ed4e7c293e25ad0b8ab7fe1738c34f603530eec800b3624d869f62fa25f
2026-02-18 16:48:36 | stock_snapshot_ES_WH_01_20260129.csv | OK | Integridad verificada (sha256): 6a4f07384d1e2c459e9204b88462996fb3bc4d18be60d127d1b27

### Tarea 3 ‚Äì Conteo de ficheros correctos e incorrectos

Mostrar el n√∫mero total de ficheros procesados correctamente y los rechazados, para validar la actividad.  

Puede usarse cat y grep sobre el fichero de log generado por el proceso para obtener esta parte

In [7]:
import subprocess

result = subprocess.run(
    ["grep", "-c", "| OK |", LOG_FILE],
    capture_output=True, text=True
)
ok_count = int(result.stdout.strip()) if result.returncode == 0 else 0

result = subprocess.run(
    ["grep", "-c", "| REJECTED |", LOG_FILE],
    capture_output=True, text=True
)
rejected_count = int(result.stdout.strip()) if result.returncode == 0 else 0

print(f"‚úÖ Ficheros aceptados (OK):       {ok_count}")
print(f"‚ùå Ficheros rechazados (REJECTED): {rejected_count}")
print(f"üì¶ Total procesados:              {ok_count + rejected_count}")

‚úÖ Ficheros aceptados (OK):       5
‚ùå Ficheros rechazados (REJECTED): 4
üì¶ Total procesados:              9


reset_data_pipeline()
prepare_environment()

## Declaraci√≥n de autor√≠a y uso de Inteligencia Artificial Generativa (IA)

Durante la realizaci√≥n de esta actividad, declaro que (sustituye el cuadro por una X donde proceda):

‚òê **No he utilizado herramientas de IA** durante la realizaci√≥n de esta actividad.  

‚òê **He utilizado herramientas de IA como apoyo al aprendizaje**, para la consulta puntual de conceptos, aclaraci√≥n de dudas o comprensi√≥n de la documentaci√≥n, elaborando de forma aut√≥noma el contenido entregado.  

‚òê **Parte del contenido de la actividad ha sido generado con herramientas de IA**, siendo dicho contenido revisado, comprendido y adaptado cr√≠ticamente antes de su entrega final.  

‚òê **La mayor parte del contenido de la actividad ha sido generado con herramientas de IA**, existiendo una aportaci√≥n propia limitada.  

La selecci√≥n de esta opci√≥n implica asumir la responsabilidad sobre la autor√≠a, comprensi√≥n y calidad del trabajo presentado.