Pre-Procesamiento de Imagenes:

Se importan librerías para remover los fondos de las imágenes, así como la redimensión de las mismas y colocar las imágenes en blanco y negro para su posterior normalización y etiquetado.

Se recorre la carpeta de Raw que contiene los datos crudos de las imágenes hasta recorrer las tres carpetas correspondientes (rock, paper,scissors). Primero remueve el fondo de la imagen, luego la convierte en escala de grises, la redimensiona a 30x20 px y finalmente guarda el resultado en la carpeta correspondiente, cambiando el nombre a processed_XXX (donde XXX es el nombre original del archivo).

In [None]:
import os
from PIL import Image
from rembg import remove

# Ruta a la carpeta data/raw
raw_folder = "../data/raw"

# Recorre cada subcarpeta dentro de data/raw
for subfolder in os.listdir(raw_folder):
    subfolder_path = os.path.join(raw_folder, subfolder)
    
    # Verifica si es una carpeta
    if os.path.isdir(subfolder_path):
        print(f"Procesando carpeta: {subfolder}")
        
        # Recorre cada archivo en la subcarpeta
        for file_name in os.listdir(subfolder_path):
            file_path = os.path.join(subfolder_path, file_name)
            
            # Verifica si es una imagen
            if file_name.lower().endswith(('.png', '.jpg', '.jpeg')):
                print(f"Procesando imagen: {file_name}")
                
                # Abre la imagen
                with Image.open(file_path) as img:
                    # Remueve el fondo
                    img_no_bg = remove(img)
                    
                    # Convierte a escala de grises
                    img_gray = img_no_bg.convert("L")
                    
                    # Redimensiona la imagen
                    img_resized = img_gray.resize((30, 20))
                    
                    # Guarda la imagen procesada
                    output_path = os.path.join(subfolder_path, f"processed_{file_name}")
                    img_resized.save(output_path)
                    print(f"Imagen procesada guardada en: {output_path}")




Normalización de los datos

In [None]:
import os
import numpy as np
from PIL import Image
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor
import time  # Para medir el tiempo de ejecución

# Función para normalizar un lote de imágenes
def normalize_images(batch):
    for file_path in batch:
        try:
            # Abre la imagen
            with Image.open(file_path) as img:
                # Convierte la imagen a un array numpy y normaliza
                img_array = np.array(img) / 255.0
                
                # Convierte de nuevo a imagen y guarda
                normalized_img = Image.fromarray((img_array * 255).astype(np.uint8))
                normalized_img.save(file_path)
        except Exception as e:
            print(f"Error procesando {file_path}: {e}")

# Función para dividir una lista en lotes
def split_into_batches(file_list, batch_size):
    for i in range(0, len(file_list), batch_size):
        yield file_list[i:i + batch_size]

# Función para procesar una carpeta
def process_folder(subfolder):
    subfolder_path = os.path.join(processed_folder, subfolder)
    
    # Verifica si es una carpeta
    if os.path.isdir(subfolder_path):
        print(f"Procesando carpeta: {subfolder}")
        
        # Lista de imágenes en la carpeta
        file_list = [
            os.path.join(subfolder_path, file_name)
            for file_name in os.listdir(subfolder_path)
            if file_name.lower().endswith(('.png', '.jpg', '.jpeg'))
        ]
        
        # Divide las imágenes en lotes de 100
        batches = list(split_into_batches(file_list, 100))
        
        # Normaliza cada lote
        for batch in tqdm(batches, desc=f"Normalizando imágenes en {subfolder}", unit="lote"):
            normalize_images(batch)

# Usa ThreadPoolExecutor para procesar varias carpetas al mismo tiempo
if __name__ == "__main__":
    # Ruta a la carpeta data/processed
    processed_folder = "../data/processed"

    # Temporizador para medir el tiempo de ejecución
    start_time = time.time()

    # Lista de subcarpetas en data/processed
    subfolders = [subfolder for subfolder in os.listdir(processed_folder) if os.path.isdir(os.path.join(processed_folder, subfolder))]
    print(f"Inicialización completada en {time.time() - start_time:.2f} segundos")

    # Usa ThreadPoolExecutor para procesar carpetas en paralelo
    with ThreadPoolExecutor(max_workers=3) as executor:  # Cambia max_workers según el número de hilos que quieras usar
        list(tqdm(executor.map(process_folder, subfolders), total=len(subfolders), desc="Procesando carpetas"))

    print(f"Todas las carpetas procesadas en {time.time() - start_time:.2f} segundos")

División y Etiquetado de Imágenes en Conjuntos de Entrenamiento y Testeo

In [None]:
import os
import shutil
import pandas as pd
from sklearn.model_selection import train_test_split
from tqdm.notebook import tqdm

def dividir_y_etiquetar(processed_folder, train_folder, test_folder, train_csv_path, test_csv_path, test_size=0.3):
    """
    Divide las imágenes directamente desde las carpetas de processed en entrenamiento y testeo,
    las mueve a sus respectivas carpetas y genera los CSVs correspondientes.
    """
    # Diccionario para almacenar rutas y etiquetas
    data = []

    # Recorre cada subcarpeta dentro de processed_folder
    label_map = {"rock": 0, "paper": 1, "scissors": 2}
    subfolders = [subfolder for subfolder in os.listdir(processed_folder) if os.path.isdir(os.path.join(processed_folder, subfolder))]
    for subfolder in tqdm(subfolders, desc="Procesando subcarpetas"):
        subfolder_path = os.path.join(processed_folder, subfolder)
        label = label_map.get(subfolder.lower(), -1)
        if label == -1:
            print(f"Carpeta desconocida: {subfolder}. Ignorando...")
            continue

        # Recorre cada archivo en la subcarpeta
        file_list = [os.path.join(subfolder_path, file_name) for file_name in os.listdir(subfolder_path) if file_name.lower().endswith(('.png', '.jpg', '.jpeg'))]
        for file_path in file_list:
            data.append({"path": file_path, "label": label})

    # Convierte los datos a un DataFrame
    df = pd.DataFrame(data)

    # Divide los datos en entrenamiento y testeo
    train_df, test_df = train_test_split(df, test_size=test_size, random_state=42, stratify=df["label"])

    # Crea las carpetas de destino
    os.makedirs(train_folder, exist_ok=True)
    os.makedirs(test_folder, exist_ok=True)

    # Mueve las imágenes de entrenamiento con barra de progreso
    print("Moviendo imágenes de entrenamiento...")
    for _, row in tqdm(train_df.iterrows(), total=len(train_df), desc="Entrenamiento"):
        label_folder = os.path.join(train_folder, str(row["label"]))
        os.makedirs(label_folder, exist_ok=True)
        shutil.copy(row["path"], label_folder)

    # Mueve las imágenes de testeo con barra de progreso
    print("Moviendo imágenes de testeo...")
    for _, row in tqdm(test_df.iterrows(), total=len(test_df), desc="Testeo"):
        label_folder = os.path.join(test_folder, str(row["label"]))
        os.makedirs(label_folder, exist_ok=True)
        shutil.copy(row["path"], label_folder)

    # Guardar los conjuntos en archivos CSV separados
    train_df.to_csv(train_csv_path, index=False)
    test_df.to_csv(test_csv_path, index=False)

    print(f"Conjunto de entrenamiento guardado en: {train_csv_path}")
    print(f"Conjunto de testeo guardado en: {test_csv_path}")

# Rutas de las carpetas y archivos
processed_folder = "../data/processed"
train_folder = "../data/training"
test_folder = "../data/test"
train_csv_path = "../data/training_labels.csv"
test_csv_path = "../data/test_labels.csv"

# Divide y etiqueta directamente
dividir_y_etiquetar(processed_folder, train_folder, test_folder, train_csv_path, test_csv_path)