# Prueba del DataLoader

**Objetivo:** 
* Instanciar y probar la clase `PrefetchingDataLoader` importada desde `src/prefetchingDataloader.py`.
* Verificar que el prefetching funciona y comparar opcionalmente el rendimiento con y sin workers paralelos.

In [18]:
import sys
import os
import time
from sklearn.datasets import fetch_20newsgroups

In [19]:
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)
    print(f"Añadido '{module_path}' a sys.path")
else:
    print(f"'{module_path}' ya está en sys.path")

'c:\fespa-dev\nlp-curso\nlp-proyecto01' ya está en sys.path


In [20]:
from src.prefetchingDataloader import PrefetchingDataLoader

In [21]:
print("\nCargando datos de 20 Newsgroups (subset 'train')...")
docs_all = fetch_20newsgroups(subset='all',
                                remove=('headers', 'footers', 'quotes'),
                                data_home='./data/20newsgroups_cache'
                                ).data
docs_subset = docs_all[:5000]
print(f"Se usarán {len(docs_subset)} documentos para la prueba.")


Cargando datos de 20 Newsgroups (subset 'train')...
Se usarán 5000 documentos para la prueba.


In [22]:
def run_loader_test(loader_instance, test_name="Test"):
    """Itera sobre un loader, mide tiempo e imprime resultados."""
    if not loader_instance:
        print(f"{test_name}: Instancia de loader no válida.")
        return

    print(f"\n--- {test_name} ---")
    print(f"Iniciando consumo de ~{len(loader_instance)} lotes...")

    start_time = time.time()
    batch_count = 0
    doc_count = 0
    consumption_errors = 0

    try:
        for i, batch in enumerate(loader_instance):
            batch_count += 1
            # batch es list[list[list[str]]]
            doc_count_in_batch = len(batch)
            doc_count += doc_count_in_batch
            # Imprimir progreso de forma dinámica
            print(f"  Consumido Lote {i+1}/{len(loader_instance)}. "
                  f"Docs: {doc_count_in_batch}. "
                  f"Buffer: ~{loader_instance.buffer.qsize()}   ", end='\r')

            # Simular trabajo del consumidor (ej. enviar a modelo)
            # Un sleep más largo simula un consumidor más lento
            time.sleep(0.1) # Ajusta este valor

    except RuntimeError as e:
        print(f"\nERROR durante la iteración (posible error del productor): {e}")
        consumption_errors += 1
    except Exception as e:
        print(f"\nERROR inesperado durante la iteración: {e}")
        consumption_errors += 1

    end_time = time.time()
    print("\n" + "-"*len(test_name) + "---")
    print(f"{test_name}: Iteración finalizada.")
    total_time = end_time - start_time
    print(f"  Tiempo total: {total_time:.2f} seg")
    print(f"  Lotes recibidos: {batch_count}")
    print(f"  Documentos procesados: {doc_count}")
    if consumption_errors > 0:
        print(f"  Errores durante consumo: {consumption_errors}")
    print("-"*(len(test_name)+4))
    return total_time

In [23]:
# Parámetros
BATCH_SIZE = 128
BUFFER_SIZE = 8

# Prueba 1: Secuencial
print("\n" + "="*40)
print(" INICIANDO PRUEBA SECUENCIAL (0 Workers)")
print("="*40)

loader_seq = PrefetchingDataLoader(
    documents=docs_subset,
    batch_size=BATCH_SIZE,
    buffer_size=BUFFER_SIZE,
    num_workers=0
)
time_seq = run_loader_test(loader_seq, "Prueba Secuencial")

# Pequeña pausa entre pruebas
print("\npequeña pausa\n")
time.sleep(2)

# Prueba 2: Paralelo
print("\n" + "="*40)
print(" INICIANDO PRUEBA PARALELA")
print("="*40)
NUM_WORKERS_PARALLEL = 10
loader_par = PrefetchingDataLoader(
    documents=docs_subset,
    batch_size=BATCH_SIZE,
    buffer_size=BUFFER_SIZE,
    num_workers=NUM_WORKERS_PARALLEL
)
time_par = run_loader_test(loader_par, f"Prueba Paralela ({NUM_WORKERS_PARALLEL} Workers)")

#Comparación
if time_seq is not None and time_par is not None and time_par > 0:
    speedup = time_seq / time_par
    print("\n" + "="*40)
    print("        COMPARACIÓN DE TIEMPOS")
    print("="*40)
    print(f"Tiempo Secuencial : {time_seq:.2f} seg")
    print(f"Tiempo Paralelo   : {time_par:.2f} seg ({NUM_WORKERS_PARALLEL} workers)")
    print(f"Speedup (Seq/Par): {speedup:.2f}x")
    print("="*40)
elif time_seq is not None and time_par is not None:
        print("\nNo se puede calcular speedup (tiempo paralelo fue cero).")


 INICIANDO PRUEBA SECUENCIAL (0 Workers)

--- Prueba Secuencial ---
Iniciando consumo de ~40 lotes...
[Productor]: Iniciado (Workers=0).
[Productor]: Lote procesado (0.55s).
[Productor]: Lote procesado (0.40s).ffer: ~0   
[Productor]: Lote procesado (0.40s).ffer: ~0   
[Productor]: Lote procesado (0.56s).ffer: ~0   
[Productor]: Lote procesado (0.50s).ffer: ~0   
[Productor]: Lote procesado (0.69s).ffer: ~0   
[Productor]: Lote procesado (0.50s).ffer: ~0   
[Productor]: Lote procesado (0.40s).ffer: ~0   
[Productor]: Lote procesado (0.41s).ffer: ~0   
[Productor]: Lote procesado (0.55s).ffer: ~0   
[Productor]: Lote procesado (0.72s).uffer: ~0   
[Productor]: Lote procesado (0.46s).uffer: ~0   
[Productor]: Lote procesado (0.45s).uffer: ~0   
[Productor]: Lote procesado (0.39s).uffer: ~0   
[Productor]: Lote procesado (0.96s).uffer: ~0   
[Productor]: Lote procesado (0.68s).uffer: ~0   
[Productor]: Lote procesado (0.54s).uffer: ~0   
[Productor]: Lote procesado (0.65s).uffer: ~0   
[