## DASK groupby and sorting on Hyperplane 

In [1]:
import warnings
import os
import sys
import pandas as pd
import numpy as np
import dask
import dask.dataframe as dd
from dask.distributed import Client
from typing import List, Set, Dict, Tuple, Optional
import types
from google.cloud import storage
from tqdm.notebook import tqdm

import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt 
%matplotlib inline

pd.options.display.max_rows = 999
warnings.filterwarnings('ignore')

from hyperplane import notebook_common as nc

#### Below the magic cell to set parameters that can be passed in through pipeline jobs
Later when this notebook is used in a production pipeline, the variables set in this cell can be changed to something else, by adding this like to the graphql query

`
parameters: {create: {key: "data_url", value: "some_other_data_url"}}
`


In [12]:
year = "199*"

In [13]:
data_url = f"s3://dask-data/airline-data/{year}.csv"
data_url

's3://dask-data/airline-data/199*.csv'

#### Below is the one liner to scale up the job to kubernetes

In [3]:
client, cluster = nc.initialize_cluster(
        nprocs=5,
        nthreads=3,
        ram_gb_per_proc=2.4,
        cores_per_worker=15,
        scheduler_deploy_mode="remote",
        num_workers = 2
    )


👉 Hyperplane: selecting worker node pool
👉 Hyperplane: selecting scheduler node pool
Creating scheduler pod on cluster. This may take some time.
👉 Hyperplane: spinning up a dask cluster with a scheduler as a standalone container.
👉 Hyperplane: In a few minutes you'll be able to access the dashboard at https://ds.hyperplane.dev/dask-cluster-e3dc00fa-6e8f-4f8e-ba49-ead6eb972974/status
👉 Hyperplane: to get logs from all workers, do `cluster.get_logs()`


In [4]:
## install any necessary custom packages on the remote node image 
def install_package_on_remote():
    import os
    return os.system("pip install s3fs")
client.run(install_package_on_remote)

{'tcp://10.1.47.3:34741': 256,
 'tcp://10.1.47.3:35361': 256,
 'tcp://10.1.47.3:41555': 0,
 'tcp://10.1.47.3:45429': 0,
 'tcp://10.1.47.3:45699': 256,
 'tcp://10.1.48.3:33355': 0,
 'tcp://10.1.48.3:34819': 0,
 'tcp://10.1.48.3:41273': 0,
 'tcp://10.1.48.3:42043': 0,
 'tcp://10.1.48.3:44215': 0}

In [7]:
%%time
df = dd.read_csv(data_url, 
#                  blocksize = 25e6, 
                 storage_options = {'anon': True},
                usecols = ['DepTime','FlightNum','DepDelay','Origin', 'Dest','Distance'],
                dtype={'Distance': 'float64',
                      'DepTime':'float64',
                      'FlightNum':'int64',
                      'DepDelay':'float64',
                      'Dest':'object',
                      'Origin':'object'}, 
                encoding = "ISO-8859-1")

print(f"number of rows, {df.map_partitions(len).compute().sum()}")
print(f"total size {df.memory_usage_per_partition().compute().sum()/1024./1024./1024.} G")
df.head(2)


number of rows, 52694390
total size 2.355632930994034 G
CPU times: user 245 ms, sys: 33.2 ms, total: 278 ms
Wall time: 21.1 s


Unnamed: 0,DepTime,FlightNum,DepDelay,Origin,Dest,Distance
0,1707.0,29,37.0,CMH,IND,182.0
1,1706.0,29,36.0,CMH,IND,182.0


In [8]:
%%time
# lazy groupby and sorting to get the 10 largest trade per ticker
df_sort = df.groupby('Origin').apply(lambda x : x.nlargest(n = 10, columns = 'Distance'))
df_sort

CPU times: user 22.6 ms, sys: 3.41 ms, total: 26 ms
Wall time: 23.7 ms


Unnamed: 0_level_0,DepTime,FlightNum,DepDelay,Origin,Dest,Distance
npartitions=85,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
,float64,int64,float64,object,object,float64
,...,...,...,...,...,...
...,...,...,...,...,...,...
,...,...,...,...,...,...
,...,...,...,...,...,...


In [10]:
%%time
# actual compute of the groupby sorting result
df_sort_local = df_sort.compute()
df_sort_local

CPU times: user 590 ms, sys: 16.8 ms, total: 607 ms
Wall time: 46.1 s


Unnamed: 0_level_0,Unnamed: 1_level_0,DepTime,FlightNum,DepDelay,Origin,Dest,Distance
Origin,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
ROA,65653,1140.0,1777,0.0,ROA,LGA,405.0
ROA,65654,1142.0,1777,2.0,ROA,LGA,405.0
ROA,65655,1140.0,1777,0.0,ROA,LGA,405.0
ROA,65656,,1777,,ROA,LGA,405.0
ROA,65657,1140.0,1777,0.0,ROA,LGA,405.0
...,...,...,...,...,...,...,...
ITH,11497,800.0,253,0.0,ITH,PIT,239.0
ITH,11498,800.0,253,0.0,ITH,PIT,239.0
ITH,11499,800.0,253,0.0,ITH,PIT,239.0
ITH,11500,800.0,253,0.0,ITH,PIT,239.0


## Compare with pandas (will kill the kernel)
Note the code below is going to crush the kernel or kill the instance due to Out of Memory Error

In [None]:
# %%time
# import pandas as pd
# df_pd = df.compute()

In [None]:
# ## crushed the kernel
# %%time
# df_sort_pd = df_pd.groupby('Origin').apply(lambda x : x.nlargest(n = 10, columns = 'Distance'))

## close cluster after done
It's a good idea to close the cluster after use. If you forgot to add this cell, don't worry :) Hyperplane will automatically garbage collect the node after it detects it's being idle for a bit


In [11]:
cluster.close()