# Machine Learning + Dask

# Score and Predict Large Datasets

A veces entrenarás con un conjunto de datos más pequeño que cabe en la memoria, pero necesitarás predecir o generar un score para un conjunto de datos mucho más grande (posiblemente más grande que la memoria). Tal vez tu curva de aprendizaje se ha estabilizado, o solo tienes etiquetas para un subconjunto de los datos. En esta situación, puedes utilizar ParallelPostFit para paralelizar y distribuir los pasos del score o la predicción.


In [1]:
from dask.distributed import Client, progress

# Scale up: connect to your own cluster with bmore resources
# see http://dask.pydata.org/en/latest/setup.html
client = Client(processes=False, threads_per_worker=4,
                n_workers=1, memory_limit='2GB')
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://10.100.77.10:8787/status,

0,1
Dashboard: http://10.100.77.10:8787/status,Workers: 1
Total threads: 4,Total memory: 1.86 GiB
Status: running,Using processes: False

0,1
Comm: inproc://10.100.77.10/16124/1,Workers: 1
Dashboard: http://10.100.77.10:8787/status,Total threads: 4
Started: Just now,Total memory: 1.86 GiB

0,1
Comm: inproc://10.100.77.10/16124/4,Total threads: 4
Dashboard: http://10.100.77.10:60128/status,Memory: 1.86 GiB
Nanny: None,
Local directory: C:\Users\JOSEME~1\AppData\Local\Temp\dask-worker-space\worker-zkuhm1ol,Local directory: C:\Users\JOSEME~1\AppData\Local\Temp\dask-worker-space\worker-zkuhm1ol


In [2]:
import numpy as np
import dask.array as da
from sklearn.datasets import make_classification

Generamos una muestra pequeña de datos aleatorios con scikit-learn

In [3]:
X_train, y_train = make_classification(
    n_features=2, n_redundant=0, n_informative=2,
    random_state=1, n_clusters_per_class=1, n_samples=1000)
X_train[:5]

array([[ 1.53682958, -1.39869399],
       [ 1.36917601, -0.63734411],
       [ 0.50231787, -0.45910529],
       [ 1.83319262, -1.29808229],
       [ 1.04235568,  1.12152929]])

Vamos a clonar ese dataset varias veces con dask.array. x_large y y_large representan nuestro dataset más grande que la memoria

In [4]:
# Scale up: Incrementamos N, el número de veces que replicamos los datos
N = 100
X_large = da.concatenate([da.from_array(X_train, chunks=X_train.shape)
                          for _ in range(N)])
y_large = da.concatenate([da.from_array(y_train, chunks=y_train.shape)
                          for _ in range(N)])
X_large

Unnamed: 0,Array,Chunk
Bytes,1.53 MiB,15.62 kiB
Shape,"(100000, 2)","(1000, 2)"
Count,101 Tasks,100 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 1.53 MiB 15.62 kiB Shape (100000, 2) (1000, 2) Count 101 Tasks 100 Chunks Type float64 numpy.ndarray",2  100000,

Unnamed: 0,Array,Chunk
Bytes,1.53 MiB,15.62 kiB
Shape,"(100000, 2)","(1000, 2)"
Count,101 Tasks,100 Chunks
Type,float64,numpy.ndarray


Dado que nuestro training dataset entra en la memoria, se puede usar el estimador de scikit-learn como el estimador para el entrenamiento. Pero sabemos que queremos predecir sobre una base de datos más grandes, por lo que usaremos el scikit-learn dentro de ParallelPostFit.

In [5]:
from sklearn.linear_model import LogisticRegressionCV
from dask_ml.wrappers import ParallelPostFit

Puede revisar la nota de documentación de dask-ml para saber cuando un scoring parameter es necesario:
https://ml.dask.org/modules/generated/dask_ml.wrappers.ParallelPostFit.html#dask_ml.wrappers.ParallelPostFit.

In [6]:
clf = ParallelPostFit(LogisticRegressionCV(cv=3), scoring="r2")

In [7]:
clf.fit(X_train, y_train)

Ahora que el entrenamiento se generó, iremos a la predicción en la base de datos completa (más grande que la memoria)

In [8]:
y_pred = clf.predict(X_large)
y_pred

Unnamed: 0,Array,Chunk
Bytes,390.62 kiB,3.91 kiB
Shape,"(100000,)","(1000,)"
Count,201 Tasks,100 Chunks
Type,int32,numpy.ndarray
"Array Chunk Bytes 390.62 kiB 3.91 kiB Shape (100000,) (1000,) Count 201 Tasks 100 Chunks Type int32 numpy.ndarray",100000  1,

Unnamed: 0,Array,Chunk
Bytes,390.62 kiB,3.91 kiB
Shape,"(100000,)","(1000,)"
Count,201 Tasks,100 Chunks
Type,int32,numpy.ndarray


y_pred es un Dask array. Los workers pueden escribir el valor predecido en un archivo compartido del sistema, sin necesidad de recolectar los datos en una sola maquina. 

O, podemos revisar la puntuación de los modelos en la base de datos completa. La computación se realizará en paralelo, ninguna máquina almacenará toda la información de manera aislada. 

In [9]:
clf.score(X_large, y_large)

0.596

# Train Models on Large Datasets

La mayoría de estimadores en scikit-learn están diseñados para trabajar con NumPy arrays o scipy sparse matricies. Estas estructuras de datos deben encajar en la memoria RAM o en una sola maquina. 

Los estimadores implementados en Dask-ML trabajan bien con Dask Arrays o Dask Dataframes. Esto puede ser más grande que un RAM por sí mismo. Se puede distribuir en la memoria o en un cluster de maquinas

In [10]:
%matplotlib inline

In [11]:
from dask.distributed import Client

# Scale up: connect to your own cluster with more resources
# see http://dask.pydata.org/en/latest/setup.html
client = Client(processes=False, threads_per_worker=4,
                n_workers=1, memory_limit='2GB')
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 60131 instead


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://10.100.77.10:60131/status,

0,1
Dashboard: http://10.100.77.10:60131/status,Workers: 1
Total threads: 4,Total memory: 1.86 GiB
Status: running,Using processes: False

0,1
Comm: inproc://10.100.77.10/16124/10,Workers: 1
Dashboard: http://10.100.77.10:60131/status,Total threads: 4
Started: Just now,Total memory: 1.86 GiB

0,1
Comm: inproc://10.100.77.10/16124/13,Total threads: 4
Dashboard: http://10.100.77.10:60132/status,Memory: 1.86 GiB
Nanny: None,
Local directory: C:\Users\JOSEME~1\AppData\Local\Temp\dask-worker-space\worker-d1r99goq,Local directory: C:\Users\JOSEME~1\AppData\Local\Temp\dask-worker-space\worker-d1r99goq


In [12]:
import dask_ml.datasets
import dask_ml.cluster
import matplotlib.pyplot as plt

En este ejemplo, usaremos dask_ml.datasets.make_blobs para generar dask arrays aleatorios

In [13]:
# Scale up: increase n_samples or n_features
X, y = dask_ml.datasets.make_blobs(n_samples=1000000,
                                   chunks=100000,
                                   random_state=0,
                                   centers=3)
X = X.persist()
X

Unnamed: 0,Array,Chunk
Bytes,15.26 MiB,1.53 MiB
Shape,"(1000000, 2)","(100000, 2)"
Count,10 Tasks,10 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 15.26 MiB 1.53 MiB Shape (1000000, 2) (100000, 2) Count 10 Tasks 10 Chunks Type float64 numpy.ndarray",2  1000000,

Unnamed: 0,Array,Chunk
Bytes,15.26 MiB,1.53 MiB
Shape,"(1000000, 2)","(100000, 2)"
Count,10 Tasks,10 Chunks
Type,float64,numpy.ndarray


Usaremos el k-means de Dask-ML para clusterizar los puntos. Se usa el algoritmo de inicialización k-means, que escala mejor que k-means++. Toda la computación, tanto durante como después de la inicialización puede realizarse en paralelo. 

In [None]:
km = dask_ml.cluster.KMeans(n_clusters=3, init_max_iter=2, oversampling_factor=10)
km.fit(X)

found 0 physical cores < 1
  File "C:\Users\JOSE MENDOZA\anaconda3\lib\site-packages\joblib\externals\loky\backend\context.py", line 282, in _count_physical_cores
    raise ValueError(f"found {cpu_count_physical} physical cores < 1")


Haremos un plot para una muestra de puntos coloreados por el cluster en donde son clasificados.

In [None]:
fig, ax = plt.subplots()
ax.scatter(X[::1000, 0], X[::1000, 1], marker='.', c=km.labels_[::1000],
           cmap='viridis', alpha=0.25);

# Text Vectorization Pipeline

Este ejemplo busca ilustrar como Dak-ML puede ser usado para clasificar grandes bases de datos con información de texto en paralelo. Particularmente, se ajusta el modelo completo, incluyendo el text vectorization, como un pipeline. Se usa opciones de Dask como Dask Bag, Dask Dataframes y Dask Arrays 

In [None]:
from dask.distributed import Client, progress

client = Client(n_workers=2, threads_per_worker=2, memory_limit='2GB')
client

Obtenemos los datos
Scikit-learn ofrece datos de ejemplo que podemos seleccionar

In [None]:
import sklearn.datasets

bunch = sklearn.datasets.fetch_20newsgroups()

Los datos obtenido de scikit-learn no son tan grandes, por lo que los datos entran en la memoria. Cada documento del ejemplo es una string. El target que buscamos predecir es un entero, que codifica el tópico del post. 

Vamos a cargar los documentos y targets directamente en un dask DataFrame. En la práctica, de tener una base más grande que la memoria, cargaríamos los documentos desde un disco o algún cloud storage usando dask.bag o dask.delayed. 

In [None]:
import dask.dataframe as dd
import pandas as pd

df = dd.from_pandas(pd.DataFrame({"text": bunch.data, "target": bunch.target}),
                    npartitions=25)

df

Cada fila en la columna de texto tiene un bit de metadata y el texto completo del post

In [None]:
print(df.head().loc[2, 'text'][:500])

Feature Hashing
El HashingVectorizer de Dask proporciona una API similar a la implementación de scikit-learn. De hecho, la implementación de Dask-ML utiliza la de scikit-learn, aplicándola a cada partición de la serie dask.dataframe.Series o dask.bag.Bag.

La transformación, una vez realmente calculamos el resultado, ocurre en paralelo y devuelve un Dask array.

In [None]:
import dask_ml.feature_extraction.text

vect = dask_ml.feature_extraction.text.HashingVectorizer()
X = vect.fit_transform(df['text'])
X


El array output X tiene chunk sizes desconocidos debido a que el dask Series de input o Bag no conoce su propia extensión. 
Cada bloque en X es una matriz scipy.sparse.

In [None]:
X.blocks[0].compute()

Esta es una matriz de documento-termino. Cada fila es un hashed representation del post original


Classification Pipeline
Podemos combinar el HashingVectorizer con el Incremental y con un clasificador como scikit-learn's SGDClassifier para crear el classification pipeline.
Vamos a predecir si el tópico estuvo en la categoría "comp"


In [None]:
bunch.target_names

In [None]:
import numpy as np

positive = np.arange(len(bunch.target_names))[['comp' in x for x in bunch.target_names]]
y = df['target'].isin(positive).astype(int)
y

In [None]:
import numpy as np
import sklearn.linear_model
import sklearn.pipeline

import dask_ml.wrappers

Dado que el input proviene de un dask Series, con chunk sizes desconocidos, se necesita especificar assume_equal_chunks=True. Esto le indica a Dask-ML que sabemos que cada partición en X empareja con una partición en y.

In [None]:
sgd = sklearn.linear_model.SGDClassifier(
    tol=1e-3
)
clf = dask_ml.wrappers.Incremental(
    sgd, scoring='accuracy', assume_equal_chunks=True
)
pipe = sklearn.pipeline.make_pipeline(vect, clf)

SGDClassifier.partial_fit necesita saber el conjunto de categorías completo de antemano. Dado que el sgd está dentro de un Incremental, se necesita pasarlo como un argumento de incremental__classes en fit.

In [None]:
pipe.fit(df['text'], y,
         incremental__classes=[0, 1]);


Incremental.predict genera la predicción como un dask Array

In [None]:
predictions = pipe.predict(df['text'])
predictions

Podemos computar la predicción y puntuación en paralelo con dask_ml.metrics.accuracy_score.

In [None]:
dask_ml.metrics.accuracy_score(y, predictions)