*Curso: Big Data - Métodos Intensivos de Computación*</br>
*Presentado por: Boris Mauricio Martinez Gutierrez*

In [None]:
from google.colab import drive
drive.mount('/content/drive')
%cd /content/drive/MyDrive/BigData/dask/

Mounted at /content/drive
/content/drive/MyDrive/BigData/dask


In [None]:
# instala la version mas reciente de dask
!pip install distributed==2021.5.0

NYCTaxi
------

[Download link](http://www.andresmh.com/nyctaxitrips/)

Taxi trips taken in 2013 released by a FOIA request.  Around 20GB CSV uncompressed.

**Try the following:**

*  Use `dask.dataframe` with pandas-style queries
*  Store in HDF5 both with and without categoricals, measure the size of the file and query times
*  Set the index by one of the date-time columns and store in castra (also using categoricals).  Perform range queries and measure speed.  What size and complexity of query can you perform while still having an "interactive" experience?

### Almacenamiento en disco del dataset

In [None]:
# descarga del dataset NYCTaxi
import os
import urllib.request

path = os.path.join('data','nyctaxi')
if not os.path.exists(path):
  os.makedirs(path)

print('Downloading NYCTaxi dataset... ', end='', flush=True)
url = 'https://archive.org/download/nycTaxiTripData2013/trip_data.7z'
filename, headers = urllib.request.urlretrieve(url, os.path.join(path,'trip_data.7z'))
print('Done!', flush=True)

Downloading NYCTaxi dataset... Done!


In [None]:
# tamaño en disco del dataset
!du -sh data/nyctaxi

31G	data/nyctaxi


In [None]:
# libreria para descomprimir archivos 7zip
!pip install pyunpack patool

Collecting pyunpack
  Downloading https://files.pythonhosted.org/packages/83/29/020436b1d8e96e5f26fa282b9c3c13a3b456a36b9ea2edc87c5fed008369/pyunpack-0.2.2-py2.py3-none-any.whl
Collecting patool
[?25l  Downloading https://files.pythonhosted.org/packages/43/94/52243ddff508780dd2d8110964320ab4851134a55ab102285b46e740f76a/patool-1.12-py2.py3-none-any.whl (77kB)
[K     |████████████████████████████████| 81kB 3.9MB/s 
[?25hCollecting entrypoint2
  Downloading https://files.pythonhosted.org/packages/8a/b0/8ef4b1d8be02448d164c52466530059d7f57218655d21309a0c4236d7454/entrypoint2-0.2.4-py3-none-any.whl
Collecting easyprocess
  Downloading https://files.pythonhosted.org/packages/48/3c/75573613641c90c6d094059ac28adb748560d99bd27ee6f80cce398f404e/EasyProcess-0.3-py2.py3-none-any.whl
Installing collected packages: entrypoint2, easyprocess, pyunpack, patool
Successfully installed easyprocess-0.3 entrypoint2-0.2.4 patool-1.12 pyunpack-0.2.2


In [None]:
from pyunpack import Archive
Archive(os.path.join(path,'trip_data.7z')).extractall(path)

In [None]:
# archivos en el directorio
!ls data/nyctaxi

trip_data_10.csv  trip_data_2.csv  trip_data_6.csv  trip_data_9.csv
trip_data_11.csv  trip_data_3.csv  trip_data_7.csv
trip_data_12.csv  trip_data_4.csv  trip_data.7z
trip_data_1.csv   trip_data_5.csv  trip_data_8.csv


In [None]:
# lamentablemente, los nombres de las columnas difieren en cada archivo csv
%%time
from glob import glob
import pandas as pd


column_names = ['medallion','hack_license','vendor_id','rate_code','store_and_fwd_flag','pickup_datetime','dropoff_datetime',
                'passenger_count','trip_time_in_secs','trip_distance','pickup_longitude','pickup_latitude','dropoff_longitude',
                'dropoff_latitude']

files = sorted(glob(os.path.join(path, '*.csv')))[1:]

for f in files:
  df = pd.read_csv(f)
  os.remove(f)
  df.to_csv(f, header=column_names, index=False)



CPU times: user 31min 21s, sys: 1min 55s, total: 33min 16s
Wall time: 41min 55s


### Lectura del dataset con Dask

In [None]:
from dask.distributed import Client
client = Client()

In [None]:
import dask.dataframe as ddf

dtypes = {'dropoff_datetime': str,
 'dropoff_latitude': float,
 'dropoff_longitude': float,
 'passenger_count': int,
 'pickup_datetime': str,
 'pickup_latitude': float,
 'pickup_longitude': float,
 'rate_code': int,
 'trip_distance': float,
 'trip_time_in_secs': int,
 'vendor_id': str}

filename = os.path.join(path, 'trip_data_*.csv')
df_csv = ddf.read_csv(filename, parse_dates=['dropoff_datetime','pickup_datetime'], dtype=dtypes, usecols=list(dtypes.keys()))

df_csv.head()

Unnamed: 0,vendor_id,rate_code,pickup_datetime,dropoff_datetime,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude
0,CMT,1,2013-01-01 15:11:48,2013-01-01 15:18:10,4,382,1.0,-73.978165,40.757977,-73.989838,40.751171
1,CMT,1,2013-01-06 00:18:35,2013-01-06 00:22:54,1,259,1.5,-74.006683,40.731781,-73.994499,40.75066
2,CMT,1,2013-01-05 18:49:41,2013-01-05 18:54:23,1,282,1.1,-74.004707,40.73777,-74.009834,40.726002
3,CMT,1,2013-01-07 23:54:15,2013-01-07 23:58:20,2,244,0.7,-73.974602,40.759945,-73.984734,40.759388
4,CMT,1,2013-01-07 23:25:03,2013-01-07 23:34:24,1,560,2.1,-73.97625,40.748528,-74.002586,40.747868


In [None]:
df_csv.dtypes

vendor_id                    object
rate_code                     int64
pickup_datetime      datetime64[ns]
dropoff_datetime     datetime64[ns]
passenger_count               int64
trip_time_in_secs             int64
trip_distance               float64
pickup_longitude            float64
pickup_latitude             float64
dropoff_longitude           float64
dropoff_latitude            float64
dtype: object

In [None]:
df_csv.tail()

Unnamed: 0,vendor_id,rate_code,pickup_datetime,dropoff_datetime,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude
8895,CMT,1,2013-09-06 16:55:11,2013-09-06 17:03:33,1.0,501.0,1.9,-73.982445,40.764473,-73.981407,40.78389
8896,CMT,1,2013-09-02 01:50:22,2013-09-02 01:56:00,2.0,338.0,2.0,-73.981827,40.755859,-73.998489,40.734047
8897,CMT,1,2013-09-06 16:17:12,2013-09-06 16:26:30,1.0,558.0,1.4,-73.954323,40.781231,-73.967674,40.762943
8898,CMT,1,2013-09-07 08:40:49,2013-09-07 09:00:03,1.0,1154.0,10.8,-74.000572,40.740673,-73.861877,40.768436
8899,CMT,1,2013-09-06 09:47:50,2013-09-06 10:02:24,1.0,874.0,2.1,-74.014252,40.717148,-73.993431,40.733189


In [None]:
# número total de registros en el dataset
len(df_csv)

173179759

In [None]:
%%time
df_csv.isnull().sum().compute()

CPU times: user 54.8 s, sys: 6.61 s, total: 1min 1s
Wall time: 9min 40s


vendor_id               0
rate_code               0
pickup_datetime         0
dropoff_datetime        0
passenger_count         0
trip_time_in_secs       0
trip_distance           0
pickup_longitude        0
pickup_latitude         0
dropoff_longitude    3438
dropoff_latitude     3438
dtype: int64

In [None]:
%%time
# eliminando filass con valores nulos
# df_csv = df_csv.dropna(subset=['dropoff_longitude','dropoff_latitude'])
# ordenando el dataset por una columna de fecha
df_csv = df_csv.sort_values(by='pickup_datetime')
df_csv = client.persist(df_csv)

CPU times: user 1min 26s, sys: 10.8 s, total: 1min 37s
Wall time: 20min


In [None]:
# en promedio, cuantos pasajeros transportó un taxi en la ciudad de Nueva York durante el año 2013
%time df_csv['passenger_count'].mean().compute()

CPU times: user 1min 12s, sys: 7.68 s, total: 1min 20s
Wall time: 12min 10s


1.7094193669596225

In [None]:
# la duración mediana (en minutos) de una viaje es:
%time df_csv['trip_time_in_secs'].quantile(q=0.5).compute()

CPU times: user 1min 1s, sys: 6.92 s, total: 1min 8s
Wall time: 11min 35s


762.0

### Dataset con formato HDF5

In [None]:
# exportando dataset con formato HDF5
target = os.path.join(path, 'trip_data.h5')

# sin ajustar variables categóricas
df_csv.to_hdf(target, path)

df_h5 = ddf.read_hdf(target, path)
df_h5.head()

Unnamed: 0,vendor_id,rate_code,pickup_datetime,dropoff_datetime,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude
0,CMT,1.0,2013-01-01 15:11:48,2013-01-01 15:18:10,4.0,382.0,1.0,-73.978165,40.757977,-73.989838,40.751171
1,CMT,1.0,2013-01-06 00:18:35,2013-01-06 00:22:54,1.0,259.0,1.5,-74.006683,40.731781,-73.994499,40.75066
2,CMT,1.0,2013-01-05 18:49:41,2013-01-05 18:54:23,1.0,282.0,1.1,-74.004707,40.73777,-74.009834,40.726002
3,CMT,1.0,2013-01-07 23:54:15,2013-01-07 23:58:20,2.0,244.0,0.7,-73.974602,40.759945,-73.984734,40.759388
4,CMT,1.0,2013-01-07 23:25:03,2013-01-07 23:34:24,1.0,560.0,2.1,-73.97625,40.748528,-74.002586,40.747868


In [None]:
# tamaño del archivo en disco
!du -sh data/nyctaxi/trip_data.h5

15G	data/nyctaxi/trip_data.h5


In [None]:
# el tiempo que toma el cálculo de la mediana es menor
%time df_h5['trip_time_in_secs'].quantile(q=0.5).compute()

CPU times: user 28.5 s, sys: 3.33 s, total: 31.8 s
Wall time: 5min 13s


688.0

In [None]:
# ajustando las variables de texto como categóricas
os.remove(target)
# exportando de nuevo con formato HDF5
df_csv.categorize(columns=['vendor_id']).to_hdf(target, path)

df_h5 = ddf.read_hdf(target, path)
df_h5.head()

Unnamed: 0,vendor_id,rate_code,pickup_datetime,dropoff_datetime,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude
0,CMT,1.0,2013-01-01 15:11:48,2013-01-01 15:18:10,4.0,382.0,1.0,-73.978165,40.757977,-73.989838,40.751171
1,CMT,1.0,2013-01-06 00:18:35,2013-01-06 00:22:54,1.0,259.0,1.5,-74.006683,40.731781,-73.994499,40.75066
2,CMT,1.0,2013-01-05 18:49:41,2013-01-05 18:54:23,1.0,282.0,1.1,-74.004707,40.73777,-74.009834,40.726002
3,CMT,1.0,2013-01-07 23:54:15,2013-01-07 23:58:20,2.0,244.0,0.7,-73.974602,40.759945,-73.984734,40.759388
4,CMT,1.0,2013-01-07 23:25:03,2013-01-07 23:34:24,1.0,560.0,2.1,-73.97625,40.748528,-74.002586,40.747868


In [None]:
# tamaño del archivo en disco
!du -sh data/nyctaxi/trip_data.h5

15G	data/nyctaxi/trip_data.h5


In [None]:
%time df_h5['trip_time_in_secs'].quantile(q=0.5).compute()

CPU times: user 29.1 s, sys: 3.72 s, total: 32.8 s
Wall time: 5min 25s


688.0

### Dataset con formato Parquet

In [None]:
!pip install fastparquet partd

In [None]:
# asignando una columna de fecha como indice
df_csv = df_csv.set_index('pickup_datetime', sorted=True)
df_csv = client.persist(df_csv)

df_csv.head()

Unnamed: 0_level_0,vendor_id,rate_code,dropoff_datetime,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude
pickup_datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
2013-01-01,VTS,5,2013-01-01 00:41:00,2,2460,31.39,-73.970512,40.794228,-74.29879,40.987392
2013-01-01,VTS,1,2013-01-01 00:11:00,6,660,1.78,-74.006927,40.740765,-73.982994,40.739616
2013-01-01,VTS,1,2013-01-01 00:16:00,2,960,2.92,-73.986702,40.751236,-74.003181,40.72036
2013-01-01,VTS,1,2013-01-01 00:11:00,1,660,9.35,-73.87117,40.773914,-73.805054,40.68121
2013-01-01,VTS,1,2013-01-01 00:05:00,1,300,0.48,-73.991821,40.695587,-73.990479,40.700699


In [None]:
# exportando dataset con formato parquet
target = os.path.join(path, 'trip_data.parquet')

df_csv.categorize(columns=['vendor_id']).to_parquet(target, engine="fastparquet")
df_parquet = ddf.read_parquet(target)
df_parquet.head()

In [None]:
# tamaño del archivo en disco
!du -sh data/nyctaxi/trip_data.parquet

In [None]:
# consultas
%time df_parquet['passenger_count'].mean().compute()

In [None]:
%time df_parquet['trip_time_in_secs'].quantile(q=0.5).compute()

In [None]:
# número de viajes por mes y promedio de pasajeros por viaje por mes
%time df_parquet.groupby(df_parquet.index).agg({'passenger_count': ['count', 'mean']}).compute()
%time df_parquet.resample('M').agg({'passenger_count': ['count', 'mean']}).compute()

### Dataset con formato Castra

In [None]:
!pip install castra msgpack

In [None]:
from castra import Castra

target = os.path.join(path, 'trip_data.castra')

df_castra = Castra(target, template=df_csv)
df_castra.partitions

In [None]:
df_castra['201305':'201306']

In [None]:
client.close()

## RAPIDS-CUDA Y DASK

In [None]:
!nvcc -V && which nvcc

nvcc: NVIDIA (R) Cuda compiler driver
Copyright (c) 2005-2020 NVIDIA Corporation
Built on Wed_Jul_22_19:09:09_PDT_2020
Cuda compilation tools, release 11.0, V11.0.221
Build cuda_11.0_bu.TC445_37.28845127_0
/usr/local/cuda/bin/nvcc


In [None]:
!nvidia-smi

Wed May 12 12:39:42 2021       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 465.19.01    Driver Version: 460.32.03    CUDA Version: 11.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla T4            Off  | 00000000:00:04.0 Off |                    0 |
| N/A   57C    P8    10W /  70W |      0MiB / 15109MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

In [None]:
# instala un ambiente de miniconda con todas las librerias necesarias para ejecutar rapids-cuda
!git clone https://github.com/rapidsai/rapidsai-csp-utils.git
!bash rapidsai-csp-utils/colab/rapids-colab.sh stable

import sys, os, shutil

sys.path.append('/usr/local/lib/python3.7/site-packages/')
os.environ['NUMBAPRO_NVVM'] = '/usr/local/cuda/nvvm/lib64/libnvvm.so'
os.environ['NUMBAPRO_LIBDEVICE'] = '/usr/local/cuda/nvvm/libdevice/'
os.environ["CONDA_PREFIX"] = "/usr/local"
for so in ['cudf']:
  fn = 'lib'+so+'.so'
  source_fn = '/usr/local/lib/'+fn
  dest_fn = '/usr/lib/'+fn
  if os.path.exists(source_fn):
    print(f'Copying {source_fn} to {dest_fn}')
    shutil.copyfile(source_fn, dest_fn)
# fix for BlazingSQL import issue
# ImportError: /usr/lib/x86_64-linux-gnu/libstdc++.so.6: version `GLIBCXX_3.4.26' 
# not found (required by /usr/local/lib/python3.7/site-packages/../../libblazingsql-engine.so)
if not os.path.exists('/usr/lib64'):
    os.makedirs('/usr/lib64')
for so_file in os.listdir('/usr/local/lib'):
  if 'libstdc' in so_file:
    shutil.copyfile('/usr/local/lib/'+so_file, '/usr/lib64/'+so_file)
    shutil.copyfile('/usr/local/lib/'+so_file, '/usr/lib/x86_64-linux-gnu/'+so_file)

fatal: destination path 'rapidsai-csp-utils' already exists and is not an empty directory.
PLEASE READ
********************************************************************************************************
Changes:
1. IMPORTANT SCRIPT CHANGES: Colab has updated to Python 3.7, and now runs our STABLE and NIGHTLY versions (0.19 and 0.20)!  PLEASE update your older install script code as follows:
	!bash rapidsai-csp-utils/colab/rapids-colab.sh 0.19

	import sys, os, shutil

	sys.path.append('/usr/local/lib/python3.7/site-packages/')
	os.environ['NUMBAPRO_NVVM'] = '/usr/local/cuda/nvvm/lib64/libnvvm.so'
	os.environ['NUMBAPRO_LIBDEVICE'] = '/usr/local/cuda/nvvm/libdevice/'
	os.environ['CONDA_PREFIX'] = '/usr/local'
	for so in ['cudf', 'rmm', 'nccl', 'cuml', 'cugraph', 'xgboost', 'cuspatial']:
	  fn = 'lib'+so+'.so'
	  source_fn = '/usr/local/lib/'+fn
	  dest_fn = '/usr/lib/'+fn
	  if os.path.exists(source_fn):
	    print(f'Copying {source_fn} to {dest_fn}')
	    shutil.copyfile(source_fn,

In [None]:
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
cluster = LocalCUDACluster()
client = Client(cluster)

In [None]:
import cudf
import dask_cudf
import dask.dataframe as ddf

dtypes = {'dropoff_datetime': str,
 'dropoff_latitude': float,
 'dropoff_longitude': float,
 'passenger_count': int,
 'pickup_datetime': str,
 'pickup_latitude': float,
 'pickup_longitude': float,
 'rate_code': int,
 'trip_distance': float,
 'trip_time_in_secs': int,
 'vendor_id': str}

# probando con el primer archivo que compone el dataset
filename = os.path.join(path, 'trip_data_1.csv')

cdf = cudf.read_csv(filename, parse_dates=['dropoff_datetime','pickup_datetime'], dtype=dtypes, usecols=dtypes.keys())
dask_cuda_df = dask_cudf.from_cudf(cdf, chunksize=30000).persist()

dask_cuda_df.head()

Unnamed: 0,vendor_id,rate_code,pickup_datetime,dropoff_datetime,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude
0,CMT,1,2013-01-01 15:11:48,2013-01-01 15:18:10,4.0,382.0,1.0,-73.978165,40.757977,-73.989838,40.751171
1,CMT,1,2013-01-06 00:18:35,2013-01-06 00:22:54,1.0,259.0,1.5,-74.006683,40.731781,-73.994499,40.75066
2,CMT,1,2013-01-05 18:49:41,2013-01-05 18:54:23,1.0,282.0,1.1,-74.004707,40.73777,-74.009834,40.726002
3,CMT,1,2013-01-07 23:54:15,2013-01-07 23:58:20,2.0,244.0,0.7,-73.974602,40.759945,-73.984734,40.759388
4,CMT,1,2013-01-07 23:25:03,2013-01-07 23:34:24,1.0,560.0,2.1,-73.97625,40.748528,-74.002586,40.747868


In [None]:
dask_cuda_df.npartitions

493

In [None]:
%time len(dask_cuda_df)

CPU times: user 6.83 s, sys: 19.7 s, total: 26.5 s
Wall time: 28.5 s


14776615

In [None]:
%time dask_cuda_df.isnull().sum().compute()

CPU times: user 6.13 s, sys: 22.9 s, total: 29 s
Wall time: 31.9 s


vendor_id             0
rate_code             0
pickup_datetime       0
dropoff_datetime      0
passenger_count       0
trip_time_in_secs     0
trip_distance         0
pickup_longitude      0
pickup_latitude       0
dropoff_longitude    86
dropoff_latitude     86
dtype: uint64

In [None]:
%time dask_cuda_df['vendor_id'].value_counts().compute()

CPU times: user 7.17 s, sys: 1.56 s, total: 8.74 s
Wall time: 18.7 s


CMT    7450899
VTS    7325716
Name: vendor_id, dtype: int64

In [None]:
import cudf
import dask_cudf
import dask.dataframe as ddf

In [None]:
# importando el resto del dataset.
filename = os.path.join(path, 'trip_data_*.csv')

final_df = dask_cudf.read_csv(filename, parse_dates=['dropoff_datetime','pickup_datetime'],
                              dtype=dtypes, usecols=list(dtypes.keys()), chunksize='128 MiB')

In [None]:
final_df = final_df.persist()
final_df

Unnamed: 0_level_0,vendor_id,rate_code,pickup_datetime,dropoff_datetime,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude
npartitions=228,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
,object,int64,object,object,float64,float64,float64,float64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...


In [None]:
%time len(final_df)

In [None]:
%time final_df.isnull().sum().compute()

In [None]:
%time final_df['passenger_count'].mean().compute()

In [None]:
%time final_df['trip_time_in_secs'].quantile(q=0.5).compute()

In [None]:
client.close()