# Tarea 9 - Dask

_175904 - Jorge III Altamirano Astorga_

## Ejercicio/Tarea

Aprovecha la capacidad de Dask para realizar cómputo en paralelo para ajustar un modelo para predecir la proporción de propina de un viaje. Realiza búsqueda de hiperparámetros en grid con cross validation. Puedes usar funciones de scikit learn. Recuerda usar el decorador `delayed` para ejecutar en paralelo.

* ¿Qué tan rápido es buscar en paralelo comparado con una búsqueda secuencial en python?

Haz lo mismo que arriba, pero utilizando la biblioteca Dask-ML http://dask-ml.readthedocs.io/en/latest/ 

_Respuesta: es más lento paralelizar que hacerlo secuencialmente, dado el volumen (pequeño de datos) como se detalla en la siguiente respuesta a la pregunta que es pequeño._

### Secuencial

In [1]:
from sklearn.linear_model import *
from sklearn.pipeline import *
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import LabelEncoder
from sklearn.pipeline import *
import pandas as pd

trips_pd = pd.read_csv("../data/trips.csv")
X_train, X_test, y_train, y_test = train_test_split(
    trips_pd.iloc[:, [1,2,3,7]],
    trips_pd.tip_amount,
    test_size=0.3,
    random_state=175904)
print("Number of records: %d"%trips_pd.shape[0])
trips_pd[1:5]

Number of records: 9198


Unnamed: 0,car_type,fare_amount,passenger_count,taxi_id,tip_amount,tpep_dropoff_datetime,tpep_pickup_datetime,trip_distance
1,A,9.0,1,1,0.0,2015-01-05 23:35:02,2015-01-05 23:25:15,1.81
2,A,7.5,1,1,1.0,2015-01-06 15:22:12,2015-01-06 15:11:45,0.96
3,A,8.5,1,1,1.0,2015-01-08 08:31:23,2015-01-08 08:22:12,1.9
4,A,7.5,1,1,1.66,2015-01-08 12:35:54,2015-01-08 12:26:26,1.0


In [2]:
%%time 
grid_params = { 
    "lasso0__eps": [1e-3, 1e-4, 1e-5],
    "lasso0__n_alphas": [100, 250, 500]}
lasso0 = LassoCV(max_iter=1000, n_alphas=300, n_jobs=15, random_state=175904)
pipe0  = Pipeline([("lasso0", lasso0)])
grid = GridSearchCV(estimator=pipe0, 
                    param_grid=grid_params, 
                    n_jobs=14, scoring="neg_mean_squared_error")
gridm = grid.fit(X_train, y_train)

CPU times: user 2.42 s, sys: 5.73 s, total: 8.15 s
Wall time: 5.87 s


In [3]:
print("Score: %f with metric '%s'\nThe best parameters are: %s"%(
    gridm.score(X_test, y_test), 
    grid.scoring,
    gridm.best_params_))

Score: -3.360575 with metric 'neg_mean_squared_error'
The best parameters are: {'lasso0__eps': 0.0001, 'lasso0__n_alphas': 500}


In [4]:
%%time
gridm.predict(X_test)

CPU times: user 1.61 ms, sys: 6.93 ms, total: 8.54 ms
Wall time: 682 µs


array([4.90660838, 0.83992721, 0.92645592, ..., 0.79878273, 0.82947737,
       0.65123205])

### Paralelo

In [5]:
import dask.dataframe as dd
import dask.array as da
import dask_ml.model_selection as dms
from dask.distributed import Client
import dask
import numpy as np
from dask import delayed

dask.set_options(scheduler='threads')
client = Client("jupyter.corp.penoles.mx:8786")

trips = dd.read_csv("file:/tmp/trips.csv")
dX_train, dX_test, dy_train, dy_test = (
    dd.from_pandas(X_train, npartitions=3),
    dd.from_pandas(X_test, npartitions=3),
    dd.from_pandas(y_train, npartitions=3),
    dd.from_pandas(y_test, npartitions=3)
)

trips.head(4)

Unnamed: 0,car_type,fare_amount,passenger_count,taxi_id,tip_amount,tpep_dropoff_datetime,tpep_pickup_datetime,trip_distance
0,A,22.0,1,1,4.6,2015-01-03 01:37:02,2015-01-03 01:17:32,6.9
1,A,9.0,1,1,0.0,2015-01-05 23:35:02,2015-01-05 23:25:15,1.81
2,A,7.5,1,1,1.0,2015-01-06 15:22:12,2015-01-06 15:11:45,0.96
3,A,8.5,1,1,1.0,2015-01-08 08:31:23,2015-01-08 08:22:12,1.9


In [6]:
%%time 
grid_params = { 
    "lasso0__eps": [1e-3, 1e-4, 1e-5],
    "lasso0__n_alphas": [100, 250, 500]}

lasso0 = LassoCV(max_iter=1000, n_alphas=300, n_jobs=15, random_state=175904)
pipe0  = Pipeline([("lasso0", lasso0)])
dgrid = dms.GridSearchCV(estimator=pipe0, 
                    param_grid=grid_params, 
                    n_jobs=14, scoring="neg_mean_squared_error")
dgridm = dgrid.fit(dX_train, dy_train)

CPU times: user 81.9 ms, sys: 12 ms, total: 93.9 ms
Wall time: 21.5 s


In [7]:
print("Score: %f with metric '%s'\nThe best parameters are: %s"%(
    gridm.score(dX_test, dy_test), 
    grid.scoring,
    gridm.best_params_))

Score: -3.360575 with metric 'neg_mean_squared_error'
The best parameters are: {'lasso0__eps': 0.0001, 'lasso0__n_alphas': 500}


In [8]:
%%time
y_hat = dX_test.map_partitions(dgridm.predict)
y_hat.compute()

CPU times: user 267 ms, sys: 17 ms, total: 284 ms
Wall time: 6.48 s


* ¿Cómo se comparan los tiempos de ejecución de tu búsqueda con la de Dask ML?

_Respuesta: el costo de ejecución en paralelo (dada la distribución de los datos y paralelización) es mayor al costo que tiene procesarlos en un solo nodo. Por lo tanto toma ligeramente más tiempo dicho "overhead" que si lo hicieramos secuencialmente en un solo nodo._

**Bonus**

Haz lo mismo utilizando Spark ML

In [9]:
import pyspark, sys, os, re
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
# https://spark.apache.org/docs/latest/configuration.html
conf = SparkConf()
conf.set("spark.worker.cleanup.appDataTtl", 24*60*60)
conf.set("spark.worker.cleanup.enabled", True)
conf.set("spark.driver.memory", "60g")
conf.set("spark.driver.cores", 14)
conf.set("spark.driver.memoryOverhead", 0.9)
conf.set("spark.executor.memory", "60g")
conf.set("spark.executor.cores", 14)
conf.set("spark.jars", 
         "local:/usr/local/spark-2.3.0-bin-hadoop2.7/jars/hadoop-common-2.7.3.jar," +
         "local:/usr/local/spark-2.3.0-bin-hadoop2.7/jars/commons-cli-1.2.jar,"+
         "file:/usr/local/spark-2.3.0-bin-hadoop2.7/jars/hadoop-aws-2.7.3.jar," +
         "file:/usr/local/spark-2.3.0-bin-hadoop2.7/jars/aws-java-sdk-1.7.4.jar"
        )
conf.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
### get they creds to login to AWS :-)
HOME = os.environ["HOME"]
aws_id, aws_key = (None, None)
with open(HOME+"/.aws/credentials", "r") as f:
    for line in f:
        line = line.strip()
        if "aws_access_key_id" in line:
            aws_id = re.sub("^.*aws_access_key_id\s*=\s*", "", line)
        elif "aws_secret_access_key" in line:
            aws_key = re.sub("^.*aws_secret_access_key\s*=\s*", "", line)
### end getting keys
conf.set("spark.hadoop.fs.s3a.access.key", aws_id)
conf.set("spark.hadoop.fs.s3a.secret.key", aws_key)
aws_id, aws_key = (None, None)
# conf.set("spark.jars.packages", "JohnSnowLabs:spark-nlp:1.5.3")
sc = SparkContext(master = "spark://jupyter.corp.penoles.mx:7077", 
                  sparkHome="/usr/local/spark/", 
                  appName="tarea-8-dask", conf=conf)
spark = SQLContext(sc)
print("Running Spark v%s"%sc.version)

Running Spark v2.3.0


* ¿Cómo se comparan los tiempos de ejecución de Spark vs Dask?

Usa los datos en s3://dask-data/nyc-taxi/2015/yellow_tripdata_2015-01.csv

_Respuesta: es más ágil en datos de gran escala solamente Spark vs Dask. Sin embargo, tiene peores resultados en el error, y es menos controlable que el maduro (en comparación) scikit-learn._

In [10]:
%%time
yellow = yellow_csv = None
try:
    print("Trying to pull parquet in local HDFS instead of downloading CSV in S3...", end="")
    yellow = spark.read.parquet("hdfs://jupyter.corp.penoles.mx:9000/tmp/yellow.parquet")
    print(" FOUND IT!!!")
except:
    print("\n\nI didn't find the parquet. Then we're downloading it... (this will take a while)")
    yellow_csv = spark.read.csv("s3a://dask-data/nyc-taxi/2015/yellow_tripdata_2015-01.csv", header=True)
    print("Writing parquet...")
    yellow_csv.write.parquet("hdfs://jupyter.corp.penoles.mx:9000/tmp/yellow.parquet", mode="overwrite")
    print("Reading parquet...")
    yellow = spark.read.parquet("hdfs://jupyter.corp.penoles.mx:9000/tmp/yellow.parquet")
    pass
yellow_csv = None
yellow.cache()

Trying to pull parquet in local HDFS instead of downloading CSV in S3... FOUND IT!!!
CPU times: user 29.8 ms, sys: 9.6 ms, total: 39.4 ms
Wall time: 10.8 s


In [11]:
yellow.show(2)

+--------+--------------------+---------------------+---------------+-------------+-------------------+------------------+----------+------------------+-------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|   pickup_longitude|   pickup_latitude|RateCodeID|store_and_fwd_flag|  dropoff_longitude|  dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+--------+--------------------+---------------------+---------------+-------------+-------------------+------------------+----------+------------------+-------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|       1| 2015-01-16 04:18:49|  2015-01-16 04:36:07|              3|        10.40|-73.983985900878906|40.721401214599609|        

In [12]:
yellow.count()

12748986

In [13]:
yellow.printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- RateCodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)



#### Transformaciones

Transformaciones necesarias para trabajar con Spark ML.

In [14]:
yellow = yellow \
    .withColumn("fare_amount", yellow.fare_amount.cast(DoubleType())) \
    .withColumn("passenger_count", yellow.passenger_count.cast(IntegerType())) \
    .withColumn("VendorID", yellow.VendorID.cast(IntegerType())) \
    .withColumn("trip_distance", yellow.trip_distance.cast(DoubleType())) \
    .withColumn("tip_amount", yellow.tip_amount.cast(DoubleType())) \
    .drop("label", "features")
spark_si = StringIndexer(inputCol="tip_amount", outputCol="label")
yellow = spark_si.fit(yellow).transform(yellow).cache()
yellow.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- RateCodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- label: double (nullable = false)



### Comparación Spark vs Dask vs Secuencial

Dado que tenemos un número mucho mayor de registros en S3, hago un sample aleatorio de los datos que están. Sé que son diferentes datos, y pudieran afectar la predicción: pero lo considero no significativo. Por eso creo el subset `yellow_sm` con un poco más de los 9198 que contiene el set de trips usados en Dask y Secuencial.

Esto coincide con mis observaciones: realizar el paralelismo toma tiempo y recursos. 

Por lo tanto, de manera preliminar puedo afirmar que: vale la pena paralelizar siempre y cuando sean datos suficientes. Además, no vale la pena utilizar para programar rápidamente en Spark, dado que requiere que se hagan los pasos previos; por ende, no vale la pena hacer el esfuerzo para datos en pequeña escala.

In [15]:
yellow_sm = yellow.sample(fraction=0.0075, seed=175904)
features = ["fare_amount", "passenger_count", "VendorID", "trip_distance"]
spark_va = VectorAssembler(inputCols=features, outputCol="features")
yellow_sm = spark_va.transform(yellow_sm)
print("Number of records: %d"%yellow_sm.count())

Number of records: 94890


In [16]:
%%time
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import *
import pyspark.ml as sml
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
spark_lasso0 = LinearRegression( 
    elasticNetParam=1.0, #set it as a Lasso L1
    labelCol="tip_amount",
    predictionCol="y_hat"
)
# spark_lasso0m = spark_lasso0.fit(yellow_sm )

spark_grid = ParamGridBuilder() \
    .addGrid(spark_lasso0.tol, [1e-3, 1e-4, 1e-5]) \
    .addGrid(spark_lasso0.epsilon, [1.35, 1.70, 2.0]) \
    .build()

spark_eval = RegressionEvaluator(predictionCol="y_hat", labelCol="tip_amount")
spark_cv1 = CrossValidator(estimator=spark_lasso0,
                     evaluator=spark_eval,
                     estimatorParamMaps=spark_grid, parallelism=20)
spark_pipe = sml.Pipeline(stages=[spark_cv1])

CPU times: user 8.38 ms, sys: 322 µs, total: 8.7 ms
Wall time: 97.6 ms


In [17]:
%%time
#train & test sets
yellow_train, yellow_test = yellow_sm.randomSplit(weights=[0.7, 0.3], seed=175904)
yellow_train, yellow_test = (yellow_train.cache(), yellow_test.cache())

CPU times: user 10.4 ms, sys: 3.4 ms, total: 13.8 ms
Wall time: 718 ms


In [18]:
print("Number of records\nFull set:  %d (100%%) \nTrain set: %d (~60%%)\nTest set:  %d (~40%%)"%
      (yellow_sm.count(), yellow_train.count(), yellow_test.count()))

Number of records
Full set:  94890 (100%) 
Train set: 66539 (~60%)
Test set:  28351 (~40%)


In [19]:
%%time
spark_pipem = spark_pipe.fit(yellow_sm)
spark_preds = spark_pipem.transform(yellow_sm)

spark_preds.show(2)

+--------+--------------------+---------------------+---------------+-------------+-------------------+------------------+----------+------------------+-------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+-----+-----------------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|   pickup_longitude|   pickup_latitude|RateCodeID|store_and_fwd_flag|  dropoff_longitude|  dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|label|         features|             y_hat|
+--------+--------------------+---------------------+---------------+-------------+-------------------+------------------+----------+------------------+-------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+-----+-----------------+------------------+
|

In [20]:
ev = spark_pipe.getStages()[0].getEvaluator()
#obtenemos la métrica del error
metric = ev.getMetricName()
#obtenemos el valor del eror
error  = ev.evaluate(spark_preds)
best_params = [spark_pipem.stages[0].bestModel.extractParamMap()[x] 
               for x in [spark_lasso0.epsilon, spark_lasso0.tol]]
print("Score: %f with metric '%s'\n"%(error, metric) +
      "Best parameters for grid search:\n" +
      '{"epsilon":%.2f, "tol":%.4f}'%(best_params[0], best_params[1]))

Score: 2.601646 with metric 'rmse'
Best parameters for grid search:
{"epsilon":2.00, "tol":0.0001}


### Datos de Gran Escala

Ahora se utilizarán todos los datos (>1 GB)

In [21]:
%%time
yellow2 = spark_va.transform(yellow)
#train & test sets
yellow_train2, yellow_test2 = yellow2.randomSplit(weights=[0.6, 0.4], seed=175904)
yellow_train2, yellow_test2 = (yellow_train2.cache(), yellow_test2.cache())
spark_pipe2m = spark_pipe.fit(yellow_train2)
spark_preds2 = spark_pipe2m.transform(yellow_test2)

CPU times: user 2.26 s, sys: 708 ms, total: 2.97 s
Wall time: 1min 53s


In [22]:
print("Number of records\nFull set:  %d (100%%) \nTrain set: %d (~60%%)\nTest set:  %d (~40%%)"%
      (yellow2.count(), yellow_train2.count(), yellow_test2.count()))

Number of records
Full set:  12748986 (100%) 
Train set: 7650508 (~60%)
Test set:  5098478 (~40%)


In [23]:
ev2 = spark_pipe.getStages()[0].getEvaluator()
#obtenemos el valor del eror
error2  = ev2.evaluate(spark_preds2)
best_params2 = [spark_pipe2m.stages[0].bestModel.extractParamMap()[x] 
               for x in [spark_lasso0.epsilon, spark_lasso0.tol]]
print("Score: %f with metric '%s'\n"%(error2, metric) +
      "Best parameters for grid search:\n" +
      '{"epsilon":%.2f, "tol":%.4f}'%(best_params2[0], best_params2[1]))

Score: 2.332589 with metric 'rmse'
Best parameters for grid search:
{"epsilon":1.35, "tol":0.0010}


In [24]:
spark_preds2.show(2)

+--------+--------------------+---------------------+---------------+-------------+-------------------+------------------+----------+------------------+-------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+-----+------------------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|   pickup_longitude|   pickup_latitude|RateCodeID|store_and_fwd_flag|  dropoff_longitude|  dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|label|          features|             y_hat|
+--------+--------------------+---------------------+---------------+-------------+-------------------+------------------+----------+------------------+-------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+-----+------------------+------------------

* ¿Cambia alguno de los resultados anteriores?

_Respuesta: Los errores ahora son mucho mayores con el dataset de S3, investigando encontré que pudiera ser porque existen muchos más outliers. Esto es importante, dado que esta muestra es mayor, y seguramente recoge mejor la realidad. Entonces sería importante analizar más este dataset para encontrar las razones detrás de esto, y evaluar si otros modelos tienen mejores resultados._

_También se observa que los tiempos de ejecución son similares para datos pequeños que para grandes datasets. Eso es importante, como se detallo al inicio de esta sección._

## Conclusiones

Encontré muy interesante utilizar Dask y Dask-ML. Es relevante que utilice **scikit-learn** dado que tiene muchos más modelos y es más maduro y controlable que Spark. 

Me encontré con otras otras situaciones al hacer mi cluster _on-prem_ dado que esta maestría me está siendo pagado por la empresa en la que laboro. Dicha empresa no tiene permitido utilizar servicios en la nube por regulación interna. 

Por lo tanto, dediqué mucho tiempo y esfuerzo, y me topé, con muchos problemas para utilizar S3 dentro de Hadoop y Spark dado que las dependencias de Java en Spark+Hadoop son estrictas y en palabras de otro AWS-SDK es _frágil_. Ejecuté toda esta tarea en un cluster de 3 nodos, cada uno con 64 GiB de RAM; con 16 vCPUs (el master) y con 8 vCPUs (2 workers/esclavos). En todos los casos no se consumía más del 50% de RAM, por lo que correr Dask y Spark _side by side_ no afectaba el rendimiento del uno al otro en ninguno de los nodos. Estos nodos estában dedicados únicamente para este propósito, aunque conviven con otras virtuales con utilización suficiente en distintos nodos **sin sobre suscripción**. Por lo que estas pruebas fueron realizadas relativamente en un ambiente suficientemente aislado y controlado, a diferencia de AWS EC2+EMR, que en ocasiones los jobs corren en hosts sobresuscritos o con cargas pesadas de otros  clientes que afectan las nuestras.

Así mismo, EMR hace las cosas muy fácil y _out of the box_, por lo que configurar Spark fue complejo.

También me encontré con que Spark no está distribuyendo homogéneamente la carga: los workers con 8 vCPUs no tenían gran utilización (y entiendo) de los recursos de memoria y procesamiento. Necesito investigar porqué está ocurriendo. Pero aún así lo encontré sumamente ágil.

## Bibliografía

* <http://dask-ml.readthedocs.io/en/latest/>
* <https://stackoverflow.com/questions/44167038/subset-dask-dataframe-by-column-position>
* <https://stackoverflow.com/questions/39721800/convert-pandas-dataframe-to-dask-dataframe>
* <https://dask-ml.readthedocs.io/en/latest/examples/predict.html>
* <https://mapr.com/blog/predicting-breast-cancer-using-apache-spark-machine-learning-logistic-regression/>
* <https://en.wikipedia.org/wiki/Root-mean-square_deviation>

In [25]:
sc.stop()