# Introducción

[Dask](https://dask.org/) es una librería de Python que permite paralelizar procesos en múltiples CPU's, principalmente el conjunto pandas/numpy/scikit-learn, mediante estructuras de datos compatibles (tales como el DataFrame de Dask). A fin de ser utilizable por RAPIDS, se ha creado una librería de conexión llamada dask-cudf.

Dask permite crear entornos con múltiples instancias asignables a GPU's separadas, de tal modo que ciertas operaciones se añaden a un pipeline de manera asíncrona, y son ejecutadas en paralelo una vez definido el proceso completo. Esto nos permite hacer uso de múltiples GPU's para tratar datos y ejectutar algoritmos de machine learning mediante dask-cuml.

Aunque es posible crear todo un proceso de lectura y transformación de datos en paralelo, el objetivo de este cuaderno es medir los tiempos de ejecución de los algoritmos disponibles en dask-cuml que ya se han probado en los cuadernos anteriores. Por ello, la única sección de este cuaderno se ejecuta en GPU, y tan sólo el entrenamiento de modelos y cálculo de soluciones o predicciones es ejecutado mediante Dask.

**Nota:** este cuaderno emplea técnicas descritas en los cuadernos nb1 a nb6. Se recomienda haber leído y ejecutado al menos algunos de estos cuadernos, a fin de comprender los procesos ejecutados a continuación.

# Sección 1: computación de modelos machine learning en múltiples GPU's en paralelo mediante Dask

## Carga y tratamiento de datos

Importaciones y carga de scripts y datos inicial:

In [None]:
import os
import os.path

import cudf
import cupy as cp
import cuml
from cuml.metrics.regression import mean_squared_error as mnsq

%run ../utils/f_northing.py
%run ../utils/f_northing_numpy.py
%run ../utils/f_price_range.py
%run ../utils/f_static_data.py
%run ../utils/f_utils.py

cities_to_use = ['sevilla']
#cities_to_use = ['shanghai']
#cities_to_use = cities_to_use_1()
#cities_to_use = cities_to_use_2()

columns_to_use = columns_to_use()
columns_to_fit = columns_to_fit()

Importaciones únicas de este cuaderno:
* **subprocess:** librería de Python que nos permite crear y gestionar subprocesos. Se emplea para determinar la dirección local en la que correremos el cluster de Dask.
* **dask, dask_cuda, dask_cudf:** librerías de Dask para ejecución mediante GPU (CUDA y cuDF).
* **cuml.dask:** sub-librería gestionada por RAPIDS para la ejecución de modelos cuml en instancias multi GPU con Dask.

In [None]:
import subprocess
import dask

from dask.distributed import Client, wait, progress
from dask_cuda import LocalCUDACluster

import dask.dataframe as dd
import dask.array as da

from dask import compute
from dask.delayed import delayed

import dask_cudf
from cuml.dask.cluster import KMeans as DKMeans
from cuml.dask.cluster import DBSCAN as DDBSCAN
from cuml.dask.neighbors import NearestNeighbors as DNN
from cuml.dask.linear_model import LinearRegression as DLinReg

Mediante Dask y la librería de subprocesos de Python, creamos un subproceso albergando un cluster de CUDA, y un cliente ejecutable sobre el que cargaremos los procesos de Dask.

In [None]:
cmd = "hostname --all-ip-addresses"
process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
output, error = process.communicate()
IPADDR = str(output.decode()).split()[0]

cluster = LocalCUDACluster(ip=IPADDR)
client = Client(cluster)
client

**Nota:** si se desea ejecutar regresión linear, ignorar la siguiente celda y ejecutar la carga y tratamiento de datos de la sección **'Tratamiento especial de datos para regresión linear'**.

Carga y tratamiento de datos:

In [None]:
listings = cudf.DataFrame()

for city in cities_to_use:
    directory = '../data/' + city + '/'
    if os.path.exists(directory):
        for file in os.listdir(directory):
            if file.endswith('.csv'):
                temp_df = cudf.read_csv(directory + file, usecols = columns_to_use)
                standard_object_type(temp_df, ['host_acceptance_rate', 'neighbourhood_cleansed'])
                if listings.size == 0:
                    listings = temp_df
                else:
                    for column in listings.columns:
                        if listings[column].dtype != temp_df[column].dtype:
                            print('Found error: '+column+' type '+listings[column].dtype.name+' doesnt match '+temp_df[column].dtype.name)
                    listings = listings.append(temp_df)
                    
listings = listings.drop_duplicates().reset_index(drop=True)

type_conversion(listings, ['host_id', 'accommodates', 'number_of_reviews', 'reviews_per_month'])
column_factorize(listings, ['neighbourhood_cleansed'])

clean_format_strings(listings, ['host_response_rate', 'host_acceptance_rate'])
clean_format_price(listings, ['price'])

cupy_lat = cp.asarray(listings['latitude'])
cupy_long = cp.asarray(listings['longitude'])
n_cupy_array, e_cupy_array = latlong2osgbgrid_cupy(cupy_lat, cupy_long)
listings['northing'] = cudf.Series(n_cupy_array).astype('float32')
listings['easting'] = cudf.Series(e_cupy_array).astype('float32')

listings.head()

# Aplicación de algoritmos mediante cuml.dask

*Nota:* se recomienda no ejecutar todas las celdas a continuación en una sóla ejecución, a menos que se disponga de suficiente memoria.

## Aplicación de k-means

Los datos previamente tratados están actualmente gestionados en memoria de GPU mediante cuDF. Para poder emplearlos en el cluster de Dask, debemos convertirlos a un DataFrame de Dask. La operación persist() guarda estos cambios en la memoria asignada al cluster, de modo que no es necesario volver a convertir los datos para ejecutar múltiples pruebas en este cuaderno.

In [None]:
listings_dask = dask.dataframe.from_pandas(listings, npartitions=4)
listings_dask.persist()

In [None]:
%%time
km = DKMeans(n_clusters=5)
km.fit(listings_dask[['easting', 'northing']])
listings['kmeans'] = km.labels_
km.cluster_centers_

## Aplicación de DBSCAN

La interfaz de DBSCAN en cuml.dask no acepta el objeto DataFrame de Dask, sino el DataFrame de cuDF.

In [None]:
%%time
dbscan = DDBSCAN(client=client, eps=150)
price_df = listings[listings['price'] >= 200].reset_index(drop=True)
price_df['cluster'] = dbscan.fit_predict(price_df.loc[:, ['northing', 'easting']])
price_df['cluster'].nunique()

## Aplicación de Nearest Neighbors

In [None]:
%%time
knn = DNN(n_neighbors=3)
listings_cheap = listings[listings['price'] <= 50.0].reset_index(drop=True)
listings_expensive = listings[listings['price'] >= 100.0].reset_index(drop=True)
listings_cheap_cols = listings_cheap[['easting', 'northing']]
listings_expensive_cols = listings_expensive[['easting', 'northing']]

dask_cheap = dask.dataframe.from_pandas(listings_cheap_cols, npartitions=4)
dask_expensive = dask.dataframe.from_pandas(listings_expensive_cols, npartitions=4)

knn.fit(dask_cheap)
dask_distances, dask_indices = knn.kneighbors(dask_expensive)

## Tratamiento especial de datos para regresión linear

In [None]:
listings = cudf.DataFrame()

for city in cities_to_use:
    directory = '../data/' + city + '/'
    if os.path.exists(directory):
        for file in os.listdir(directory):
            if file.endswith('.csv'):
                temp_df = cudf.read_csv(directory + file, usecols = columns_to_use)
                standard_object_type(temp_df, ['host_acceptance_rate', 'neighbourhood_cleansed'])
                if(temp_df['host_total_listings_count'].dtype != 'float64'):
                    temp_df['host_total_listings_count'] = temp_df['host_total_listings_count'].fillna(-1).astype('float64')
                if(temp_df['bathrooms'].dtype != 'float64'):
                    temp_df['bathrooms'] = temp_df['bathrooms'].fillna(-1).astype('float64')
                if(temp_df['bedrooms'].dtype != 'float64'):
                    temp_df['bedrooms'] = temp_df['bedrooms'].fillna(-1).astype('float64')
                if(temp_df['beds'].dtype != 'float64'):
                    temp_df['beds'] = temp_df['beds'].fillna(-1).astype('float64')
                if listings.size == 0:
                    listings = temp_df
                else:
                    for column in listings.columns:
                        if listings[column].dtype != temp_df[column].dtype:
                            print('Found error: '+column+' type '+listings[column].dtype.name+' doesnt match '+temp_df[column].dtype.name)
                    listings = listings.append(temp_df)
                    
listings = listings.drop_duplicates()
listings = listings.reset_index(drop=True)

type_conversion_64(listings, ['host_id', 'accommodates', 'number_of_reviews', 'reviews_per_month', 'minimum_nights', 'maximum_nights', 'availability_30', 'availability_90', 'availability_365', 'number_of_reviews_ltm', 'review_scores_rating', 'review_scores_accuracy', 'review_scores_cleanliness', 'review_scores_checkin', 'review_scores_communication', 'review_scores_location', 'review_scores_value', 'host_total_listings_count', 'bathrooms', 'bedrooms', 'beds'])
column_factorize_64(listings, ['neighbourhood_cleansed', 'host_response_time', 'host_is_superhost', 'host_has_profile_pic', 'host_identity_verified', 'property_type', 'room_type', 'instant_bookable'])

clean_format_strings_64(listings, ['host_response_rate', 'host_acceptance_rate'])
clean_format_price_64(listings, ['price'])
listings['price'] = listings['price'].applymap(priceRange, 'float64')

cupy_lat = cp.asarray(listings['latitude'])
cupy_long = cp.asarray(listings['longitude'])
n_cupy_array, e_cupy_array = latlong2osgbgrid_cupy(cupy_lat, cupy_long)
listings['northing'] = cudf.Series(n_cupy_array).astype('float64')
listings['easting'] = cudf.Series(e_cupy_array).astype('float64')

listings.head()

## Aplicación de regresión linear

In [None]:
%%time
regression = DLinReg()
x_train, x_test, y_train, y_test = cuml.train_test_split(listings[columns_to_fit], listings['price'], train_size=0.9)
x_test_index = x_test.reset_index(drop=True)
y_test_index = y_test.reset_index(drop=True)

dask_x_train = dask.dataframe.from_pandas(x_train, npartitions=4)
dask_y_train = dask.dataframe.from_pandas(y_train, npartitions=4)
dask_x_test = dask.dataframe.from_pandas(x_test_index, npartitions=4)
dask_y_test = dask.dataframe.from_pandas(y_test_index, npartitions=4)

regression.fit(dask_x_train, dask_y_train)

In [None]:
%%time
predictions = regression.predict(dask_x_test)
y_results = predictions.compute()
print(y_results[0:10])
print(y_test_index[0:10])

In [None]:
loss = mnsq(y_test_index, y_results)
print(loss)

In [None]:
%reset -f