# Ingesta y distribución de datos

### Librerías

In [5]:
import tensorflow_datasets as tfds
import multiprocessing

### Cargar el dataset *tf_flowers*
  - as_supervised=True devuelve tuplas (imagen, etiqueta)
  - with_info=True trae metadatos como número de ejemplos

In [6]:
ds, ds_info = tfds.load(
    'tf_flowers',
    split='train',
    with_info=True,
    as_supervised=True
)


### Extraer la lista de tensores de imagen (y etiquetas, si las necesitas)
Convierte el tf.data.Dataset a un iterador de numpy

In [7]:
images = []
labels = []
for img, lbl in tfds.as_numpy(ds):
    images.append(img)     # cada img es un ndarray de forma (alto, ancho, 3)
    labels.append(lbl)     # etiquetas

### Dividir la lista entre procesos para balancear carga

In [8]:
n_workers = multiprocessing.cpu_count() - 1

chunking simple: cada worker toma los índices i, i+n_workers, i+2*n_workers, …

In [9]:
image_chunks = [images[i::n_workers] for i in range(n_workers)]
label_chunks = [labels[i::n_workers] for i in range(n_workers)]

for i, chunk in enumerate(image_chunks):
    print(f"Worker {i}: procesará {len(chunk)} imágenes")

Worker 0: procesará 245 imágenes
Worker 1: procesará 245 imágenes
Worker 2: procesará 245 imágenes
Worker 3: procesará 245 imágenes
Worker 4: procesará 245 imágenes
Worker 5: procesará 245 imágenes
Worker 6: procesará 245 imágenes
Worker 7: procesará 245 imágenes
Worker 8: procesará 245 imágenes
Worker 9: procesará 245 imágenes
Worker 10: procesará 244 imágenes
Worker 11: procesará 244 imágenes
Worker 12: procesará 244 imágenes
Worker 13: procesará 244 imágenes
Worker 14: procesará 244 imágenes


### Descargar las imagenes en una ruta local

In [None]:
import tensorflow_datasets as tfds
import os
from PIL import Image

#ruta de destino local
destino = r"C:\Users\dtellezn2200\OneDrive - Instituto Politecnico Nacional\Documents\Diego\Computo paralelo\Preprocesamiento paralelo VGG\dataset"

#descargar el dataset
ds, info = tfds.load("tf_flowers", split="train", with_info=True, as_supervised=True)

#guardar imágenes organizadas por clase
for i, (img, label) in enumerate(tfds.as_numpy(ds)):
    clase = info.features['label'].int2str(label)
    carpeta_clase = os.path.join(destino, clase)
    os.makedirs(carpeta_clase, exist_ok=True)

    ruta_img = os.path.join(carpeta_clase, f"{i:05d}.jpg")
    Image.fromarray(img).save(ruta_img)

    if i % 500 == 0:
        print(f"{i} imágenes guardadas...")

print("✅ Dataset guardado en:", destino)

0 imágenes guardadas...
500 imágenes guardadas...
1000 imágenes guardadas...
1500 imágenes guardadas...
2000 imágenes guardadas...
2500 imágenes guardadas...
3000 imágenes guardadas...
3500 imágenes guardadas...
✅ Dataset guardado en: C:\Users\dtellezn2200\OneDrive - Instituto Politecnico Nacional\Documents\Diego\Computo paralelo\Preprocesamiento paralelo VGG\dataset


In [None]:
import glob

#ruta base
dataset_path = r"C:\Users\dtellezn2200\OneDrive - Instituto Politecnico Nacional\Documents\Diego\Computo paralelo\Preprocesamiento paralelo VGG\dataset"

#buscar imágenes
image_paths = glob.glob(fr"{dataset_path}\**\*.jpg", recursive=True)

print(f"Total de imágenes encontradas: {len(image_paths)}")
print("Primeras 5 rutas:", image_paths[:5])


Total de imágenes encontradas: 3670
Primeras 5 rutas: ['C:\\Users\\dtellezn2200\\OneDrive - Instituto Politecnico Nacional\\Documents\\Diego\\Computo paralelo\\Preprocesamiento paralelo VGG\\dataset\\daisy\\00009.jpg', 'C:\\Users\\dtellezn2200\\OneDrive - Instituto Politecnico Nacional\\Documents\\Diego\\Computo paralelo\\Preprocesamiento paralelo VGG\\dataset\\daisy\\00013.jpg', 'C:\\Users\\dtellezn2200\\OneDrive - Instituto Politecnico Nacional\\Documents\\Diego\\Computo paralelo\\Preprocesamiento paralelo VGG\\dataset\\daisy\\00015.jpg', 'C:\\Users\\dtellezn2200\\OneDrive - Instituto Politecnico Nacional\\Documents\\Diego\\Computo paralelo\\Preprocesamiento paralelo VGG\\dataset\\daisy\\00016.jpg', 'C:\\Users\\dtellezn2200\\OneDrive - Instituto Politecnico Nacional\\Documents\\Diego\\Computo paralelo\\Preprocesamiento paralelo VGG\\dataset\\daisy\\00024.jpg']


# Preprocesamiento de imagenes en paralelo

### Librerías

In [12]:
import os
import numpy as np
from PIL import Image
from pathlib import Path
from multiprocessing import Pool, cpu_count

### Parámetros

In [13]:
TARGET_SIZE = (224, 224)
MEAN = np.array([123.68, 116.779, 103.939])
INPUT_DIR = "dataset/"
OUTPUT_DIR = "output/"
os.makedirs(OUTPUT_DIR, exist_ok=True)

### Función de preprocesamiento para una imagen

In [14]:
def preprocess_image(img_path):
    try:
        img = Image.open(img_path).convert('RGB')
        img = img.resize(TARGET_SIZE)
        img_array = np.array(img).astype(np.float32)
        img_array -= MEAN
        return img_array
    except Exception as e:
        print(f"Error con {img_path}: {e}")
        return None

### Versión paralela

In [15]:
def preprocess_and_pair(img_path):
    arr = preprocess_image(img_path)
    return (arr, img_path) if arr is not None else None

def process_parallel(image_paths):
    with Pool(cpu_count()) as pool:
        results = pool.map(preprocess_and_pair, image_paths)
    return [r for r in results if r is not None]

### Función para guardar

In [16]:
def save_processed_images(processed_list):
    for arr, original_path in processed_list:
        restored = np.clip(arr + MEAN, 0, 255).astype(np.uint8)
        Image.fromarray(restored).save(os.path.join(OUTPUT_DIR, os.path.basename(original_path)))


### Cargar rutas

In [17]:
def load_image_paths(directory):
    return list(Path(directory).rglob("*.jpg"))

### Función principal

In [None]:
from tqdm import tqdm

if __name__ == "__main__":
    image_paths = load_image_paths(INPUT_DIR)
    
    processed = list(tqdm(process_parallel(image_paths), total=len(image_paths), desc="Progreso"))
    
    save_processed_images(processed)

# Medición y análisis comparativo

### Librerías

In [None]:
import time
import psutil
import multiprocessing
import matplotlib.pyplot as plt

### Función secuencial

In [None]:
def funcion_secuencial(imagenes, funcion_preprocesamiento):
    inicio = time.time()
    for img in imágenes:
        funcion_preprocesamiento(img)
    fin = time.time()
    return fin - inicio

### Función paralela

In [None]:
def funcion_paralela(imagenes, funcion_preprocesamiento, num_procesos):
    inicio = time.time()
    with multiprocessing.Pool(num_procesos) as pool:
        pool.map(funcion_preprocesamiento, imágenes)
    fin = time.time()
    return fin - inicio

### Tiempo y medición de CPU secuencial

In [None]:
tiempo_seq = funcion_secuencial(imagenes, preprocess_image)
uso_cpu_seq = medir_uso_cpu(tiempo_seq)

### Tiempo y medición de CPU en paralelo

In [None]:
tiempo_par = funcion_paralela(imagenes, preprocess_image, num_procesos=multiprocessing.cpu_count())
uso_cpu_par = medir_uso_cpu(tiempo_par)

### Medir uso de CPU

In [None]:
def medir_uso_cpu(duracion, intervalo=0.1):
    uso = []
    for _ in range(int(duracion / intervalo)):
        uso.append(psutil.cpu_percent(interval=intervalo))
    return uso

### Speedup

In [None]:
speedup = tiempo_seq / tiempo_par
print(f"Speedup: {speedup:.2f}")

### Visualización de resultados

In [None]:
plt.figure(figsize=(12, 5))

Uso de CPU

In [None]:
plt.subplot(1, 2, 1)
plt.plot(uso_cpu_seq, label="Secuencial")
plt.plot(uso_cpu_par, label="Paralelo")
plt.title("Uso de CPU durante ejecución")
plt.xlabel("Tiempo (intervalos)")
plt.ylabel("Uso de CPU (%)")
plt.legend()

Tiempo total

In [None]:
plt.subplot(1, 2, 2)
plt.bar(["Secuencial", "Paralelo"], [tiempo_seq, tiempo_par])
plt.title("Comparación de tiempo total")
plt.ylabel("Tiempo (s)")

plt.tight_layout()
plt.show()