<a href="https://colab.research.google.com/github/cam2149/DistributedProcessing/blob/main/4_procesamiento_en_paralelo_con_dask.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### **Instalación de librerias necesarias**

In [1]:
!pip install dask[complete] dask-ml scikit-learn pandas numpy matplotlib kaggle

Collecting dask-ml
  Downloading dask_ml-2025.1.0-py3-none-any.whl.metadata (6.0 kB)
Collecting lz4>=4.3.2 (from dask[complete])
  Downloading lz4-4.4.5-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl.metadata (3.8 kB)
Collecting dask-glm>=0.2.0 (from dask-ml)
  Downloading dask_glm-0.4.0-py3-none-any.whl.metadata (2.4 kB)
Collecting distributed>=2025.1.0 (from dask-ml)
  Downloading distributed-2026.1.2-py3-none-any.whl.metadata (3.4 kB)
Collecting sparse>=0.15 (from dask-glm>=0.2.0->dask-ml)
  Downloading sparse-0.17.0-py2.py3-none-any.whl.metadata (5.3 kB)
INFO: pip is looking at multiple versions of distributed to determine which version is compatible with other requirements. This could take a while.
Collecting distributed>=2025.1.0 (from dask-ml)
  Downloading distributed-2026.1.1-py3-none-any.whl.metadata (3.4 kB)
  Downloading distributed-2026.1.0-py3-none-any.whl.metadata (3.4 kB)
Collecting sortedcontainers>=2.0.5 (from distributed>=2025.1.0->d

In [2]:
!du -hs *

55M	sample_data


In [3]:
# Importaciones
import dask
import dask.array as da
import dask.dataframe as dd
from dask.distributed import Client
from dask_ml.datasets import make_classification
from dask_ml.model_selection import train_test_split
from dask_ml.wrappers import Incremental
from sklearn.linear_model import SGDClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report
import numpy as np
import pandas as pd

###**Inicialización del cliente Dask**

In [4]:
# Crear cliente Dask para procesamiento paralelo
# En Colab, usa LocalCluster automáticamente
client = Client(n_workers=2, threads_per_worker=2, memory_limit='2GB')
client
# El dashboard estará disponible en el link que se muestra

INFO:distributed.http.proxy:To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:42093
INFO:distributed.scheduler:  dashboard at:  http://127.0.0.1:8787/status
INFO:distributed.scheduler:Registering Worker plugin shuffle
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:35775'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:44675'
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:36537 name: 1
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:36537
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:47652
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:33107 name: 0
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:33107
INFO:distributed.core:Starting established connection to tcp://127

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 2
Total threads: 4,Total memory: 3.73 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:42093,Workers: 0
Dashboard: http://127.0.0.1:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B

0,1
Comm: tcp://127.0.0.1:33107,Total threads: 2
Dashboard: http://127.0.0.1:32851/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:35775,
Local directory: /tmp/dask-scratch-space/worker-5ai4l8o1,Local directory: /tmp/dask-scratch-space/worker-5ai4l8o1

0,1
Comm: tcp://127.0.0.1:36537,Total threads: 2
Dashboard: http://127.0.0.1:40681/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:44675,
Local directory: /tmp/dask-scratch-space/worker-dqwine8w,Local directory: /tmp/dask-scratch-space/worker-dqwine8w


El cliente Dask permite distribuir el trabajo entre múltiples workers y proporciona un dashboard para monitorear el progreso

###**Generación de dataset sintético grande**

In [5]:
# Crear un dataset de clasificación con 1 millón de muestras
n_samples = 1_000_000
n_features = 100
n_classes = 2

X, y = make_classification(
    n_samples=n_samples,
    n_features=n_features,
    n_classes=n_classes,
    n_informative=50,
    n_redundant=20,
    chunks=n_samples // 10,  # Dividir en 10 chunks
    flip_y=0.1,
    random_state=42
)

print(f"Shape de X: {X.shape}")
print(f"Shape de y: {y.shape}")
print(f"Tipo: {type(X)}")

Shape de X: (1000000, 100)
Shape de y: (1000000,)
Tipo: <class 'dask.array.core.Array'>


Este código genera un millón de muestras divididas en chunks para procesamiento eficiente en memoria.

###**División train-test y persistencia en memoria**

In [None]:
# Split de datos
X_train, X_test, y_train, y_test = train_test_split(
    X, y,
    test_size=0.2,
    random_state=42
)

# Persistir datos en memoria distribuida para acceso rápido
X_train, X_test, y_train, y_test = dask.persist(
    X_train, X_test, y_train, y_test
)

print(f"Train samples: {len(y_train)}")
print(f"Test samples: {len(y_test)}")

# Precomputar las clases (requerido para clasificación)
classes = da.unique(y_train).compute()
print(f"Classes: {classes}")


Train samples: 800000
Test samples: 200000
Classes: [0 1]


In [11]:
# Split de datos en 70% train, 20% test, 10% eval

# Primero, dividir en 70% train y 30% para el resto (test + eval)
X_train, X_rem, y_train, y_rem = train_test_split(
    X, y,
    test_size=0.3, # 30% para test y eval
    random_state=42
)

# Luego, dividir el 30% restante en 20% test y 10% eval (2/3 para test, 1/3 para eval)
X_test, X_eval, y_test, y_eval = train_test_split(
    X_rem, y_rem,
    test_size=(1/3), # 10% del total es 1/3 del 30% restante
    random_state=42
)

# Persistir datos en memoria distribuida para acceso rápido
X_train, X_test, X_eval, y_train, y_test, y_eval = dask.persist(
    X_train, X_test, X_eval, y_train, y_test, y_eval
)

print(f"Train samples: {len(y_train)}")
print(f"Test samples: {len(y_test)}")
print(f"Evaluation samples: {len(y_eval)}")

# Precomputar las clases (requerido para clasificación)
classes = da.unique(y_train).compute()
print(f"Classes: {classes}")

Train samples: 700000
Test samples: 200000
Evaluation samples: 100000
Classes: [0 1]


La persistencia mantiene los datos en memoria distribuida, evitando recalcular operaciones en cada iteración

###**Entrenamiento incremental con SGDClassifier**

In [12]:
# Crear estimador base de scikit-learn
base_estimator = SGDClassifier(
    loss='log_loss',
    penalty='l2',
    max_iter=1000,
    tol=1e-3,
    random_state=42
)

# Envolver con Incremental de Dask-ML
incremental_model = Incremental(
    estimator=base_estimator,
    scoring='accuracy'
)

# Entrenar el modelo (una pasada por todos los datos)
incremental_model.fit(X_train, y_train, classes=classes)

# Evaluar
train_score = incremental_model.score(X_train, y_train)
test_score = incremental_model.score(X_test, y_test)

print(f"Train accuracy: {train_score:.4f}")
print(f"Test accuracy: {test_score:.4f}")

Train accuracy: 0.8799
Test accuracy: 0.8803


El wrapper Incremental automatiza el uso de partial_fit sobre los chunks de datos Dask.

###**Entrenamiento con múltiples épocas**

In [13]:
# Reiniciar modelo para entrenamiento con múltiples pasadas
base_estimator_multi = SGDClassifier(
    loss='log_loss',
    penalty='l2',
    max_iter=1000,
    tol=1e-4,
    random_state=42
)

incremental_model_multi = Incremental(
    estimator=base_estimator_multi,
    scoring='accuracy'
)

# Entrenar por 10 épocas
scores_history = []

for epoch in range(10):
    incremental_model_multi.partial_fit(X_train, y_train, classes=classes)
    score = incremental_model_multi.score(X_test, y_test)
    scores_history.append(score)
    print(f"Epoch {epoch+1}/10 - Test accuracy: {score:.4f}")

print(f"\nBest accuracy: {max(scores_history):.4f}")


Epoch 1/10 - Test accuracy: 0.8784
Epoch 2/10 - Test accuracy: 0.8848
Epoch 3/10 - Test accuracy: 0.8875
Epoch 4/10 - Test accuracy: 0.8890
Epoch 5/10 - Test accuracy: 0.8900
Epoch 6/10 - Test accuracy: 0.8910
Epoch 7/10 - Test accuracy: 0.8902
Epoch 8/10 - Test accuracy: 0.8913
Epoch 9/10 - Test accuracy: 0.8907
Epoch 10/10 - Test accuracy: 0.8915

Best accuracy: 0.8915


Este enfoque permite múltiples pasadas sobre los datos para mejorar el rendimiento del modelo.

###**Predicciones y evaluación detallada**

In [16]:
# Realizar predicciones (lazy computation) en el set de evaluación
y_pred_eval_lazy = incremental_model_multi.predict(X_eval)

# Computar predicciones reales
y_pred_eval = y_pred_eval_lazy.compute()
y_eval_computed = y_eval.compute()

# Métricas detalladas
accuracy_eval = accuracy_score(y_eval_computed, y_pred_eval)
print(f"\nAccuracy en el set de evaluación: {accuracy_eval:.4f}")

print("\nClassification Report en el set de evaluación:")
print(classification_report(y_eval_computed, y_pred_eval))


Accuracy en el set de evaluación: 0.8903

Classification Report en el set de evaluación:
              precision    recall  f1-score   support

           0       0.89      0.89      0.89     50035
           1       0.89      0.89      0.89     49965

    accuracy                           0.89    100000
   macro avg       0.89      0.89      0.89    100000
weighted avg       0.89      0.89      0.89    100000



##**GridSearchCV paralelo con Dask backend**

In [17]:
from sklearn.model_selection import GridSearchCV
import joblib

# Crear dataset más pequeño para GridSearch
X_small, y_small = make_classification(
    n_samples=10_000,
    n_features=20,
    n_classes=2,
    chunks=2000,
    random_state=42
)

X_small_computed = X_small.compute()
y_small_computed = y_small.compute()

# Definir grid de hiperparámetros
param_grid = {
    'alpha': [0.0001, 0.001, 0.01],
    'penalty': ['l2', 'l1', 'elasticnet'],
    'max_iter': [1000, 2000]
}

base_sgd = SGDClassifier(loss='log_loss', random_state=42)

grid_search = GridSearchCV(
    base_sgd,
    param_grid=param_grid,
    cv=3,
    scoring='accuracy',
    n_jobs=-1
)

# Ejecutar grid search con backend de Dask
with joblib.parallel_backend('dask'):
    grid_search.fit(X_small_computed, y_small_computed)

print(f"Best parameters: {grid_search.best_params_}")
print(f"Best score: {grid_search.best_score_:.4f}")

# Mostrar resultados
results_df = pd.DataFrame(grid_search.cv_results_)
print("\nTop 5 configurations:")
print(results_df[['params', 'mean_test_score', 'rank_test_score']].head())


Best parameters: {'alpha': 0.01, 'max_iter': 1000, 'penalty': 'l1'}
Best score: 0.6919

Top 5 configurations:
                                              params  mean_test_score  \
0  {'alpha': 0.0001, 'max_iter': 1000, 'penalty':...         0.679101   
1  {'alpha': 0.0001, 'max_iter': 1000, 'penalty':...         0.678700   
2  {'alpha': 0.0001, 'max_iter': 1000, 'penalty':...         0.684800   
3  {'alpha': 0.0001, 'max_iter': 2000, 'penalty':...         0.679101   
4  {'alpha': 0.0001, 'max_iter': 2000, 'penalty':...         0.678700   

   rank_test_score  
0               15  
1               17  
2               11  
3               15  
4               17  


El backend de Dask para joblib permite distribuir el entrenamiento de GridSearchCV en el cluster

###**Procesamiento de datos CSV reales con Dask**

In [None]:
# Ejemplo con datos CSV (descomentar si tienes un archivo)
# df = dd.read_csv('large_dataset.csv')
#
# # Preprocesamiento con Dask DataFrame
# df = df.dropna()
# df['feature_engineered'] = df['feature1'] * df['feature2']
#
# # Convertir a arrays de Dask
# X_from_df = df.drop('target', axis=1).to_dask_array(lengths=True)
# y_from_df = df['target'].to_dask_array(lengths=True)
#
# # Continuar con el pipeline de ML...


In [None]:
# Cerrar el cliente Dask
client.close()


INFO:distributed.scheduler:Remove client Client-76614cfb-096d-11f1-84d2-0242ac1c000c
INFO:distributed.core:Received 'close-stream' from tcp://127.0.0.1:51212; closing.
INFO:distributed.scheduler:Remove client Client-76614cfb-096d-11f1-84d2-0242ac1c000c
INFO:distributed.scheduler:Close client connection: Client-76614cfb-096d-11f1-84d2-0242ac1c000c
INFO:distributed.scheduler:Retire worker addresses (stimulus_id='retire-workers-1771050656.645428') (0, 1)
INFO:distributed.nanny:Closing Nanny at 'tcp://127.0.0.1:44875'. Reason: nanny-close
INFO:distributed.nanny:Nanny asking worker to close. Reason: nanny-close
INFO:distributed.nanny:Closing Nanny at 'tcp://127.0.0.1:38813'. Reason: nanny-close
INFO:distributed.nanny:Nanny asking worker to close. Reason: nanny-close
INFO:distributed.scheduler:Remove client Client-worker-476ef974-096e-11f1-8665-0242ac1c000c
INFO:distributed.core:Received 'close-stream' from tcp://127.0.0.1:39342; closing.
INFO:distributed.scheduler:Remove client Client-worke

###**Ventajas clave de Dask para ML**
- **Escalabilidad**: Maneja datasets que exceden la memoria RAM dividiendo en chunks

- **Paralelización**: Distribuye el trabajo entre múltiples cores/workers automáticamente
​

- **Integración**: Compatible con scikit-learn y su ecosistema
​

- **Lazy evaluation**: Solo computa cuando es necesario, optimizando recursos
​

Este ejemplo completo cubre desde la configuración básica hasta técnicas avanzadas como entrenamiento incremental y búsqueda de hiperparámetros distribuida.