In [1]:
import coiled

In [2]:
coiled.Cloud()

In [3]:
cluster = coiled.Cluster(n_workers=4)

Creating Cluster. This takes about a minute ...Checking environment images
Valid environment image found


In [4]:
from dask.distributed import Client
client = Client(cluster)
print("Dashboard:", client.dashboard_link)

Dashboard: http://ec2-3-131-95-16.us-east-2.compute.amazonaws.com:8787/status



+-------------+--------+-----------+---------+
| Package     | client | scheduler | workers |
+-------------+--------+-----------+---------+
| cloudpickle | 1.5.0  | 1.6.0     | 1.6.0   |
| distributed | 2.23.0 | 2.24.0    | 2.24.0  |
+-------------+--------+-----------+---------+


In [6]:
import s3fs
fs = s3fs.S3FileSystem(anon=True)
trips = fs.ls('nyc-tlc/trip data', detail=True)

In [7]:
files = ['s3://' + trip['Key'] for trip in trips if ('yellow_tripdata_2019' in trip['Key'])]

In [8]:
files[:3]

['s3://nyc-tlc/trip data/yellow_tripdata_2019-01.csv',
 's3://nyc-tlc/trip data/yellow_tripdata_2019-02.csv',
 's3://nyc-tlc/trip data/yellow_tripdata_2019-03.csv']

In [9]:
from dask import dataframe as dd

df = dd.read_csv(
    files[:3],
    dtype={
        "payment_type": "UInt8",
        "VendorID": "UInt8",
        "passenger_count": "UInt8",
        "RatecodeID": "UInt8",
    },
    storage_options={"anon": True},
)

In [10]:
df.npartitions

34

End-to-end (load + process) time for 3 files on 4 nodes

In [12]:
%%time

q1 = df.groupby("passenger_count").mean().compute()

CPU times: user 84.9 ms, sys: 10.1 ms, total: 95 ms
Wall time: 30.8 s


Total is about 30 seconds (mostly I/O)

Let's load the df into memory (the way Pandas does) first

In [13]:
df.persist()

Unnamed: 0_level_0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
npartitions=34,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1
,UInt8,object,object,UInt8,float64,UInt8,object,int64,int64,UInt8,float64,float64,float64,float64,float64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


Make sure it's "fully loaded"

In [17]:
%%timeit

q1 = df.groupby("passenger_count").mean().compute()

The slowest run took 15.75 times longer than the fastest. This could mean that an intermediate result is being cached.
1.85 s ± 699 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


Typically about 1.5 sec

__Real-world scenario__

Now let's compare end-to-end time to read and process a query as we scale

Clear memory...

In [19]:
client.restart()

0,1
Client  Scheduler: tls://ec2-3-131-95-16.us-east-2.compute.amazonaws.com:8786  Dashboard: http://ec2-3-131-95-16.us-east-2.compute.amazonaws.com:8787/status,Cluster  Workers: 4  Cores: 16  Memory: 68.72 GB


In [18]:
cluster.scale(8)

In [22]:
%%time

q1 = df.groupby("passenger_count").mean().compute()

CPU times: user 72.5 ms, sys: 6.34 ms, total: 78.8 ms
Wall time: 15.6 s


Timing so far (end-to-end):
* Pandas (local) ~ 70 sec
* 4 nodes: ~ 30 sec
* 8 nodes: ~ 15 sec

In [23]:
cluster.scale(12)

In [26]:
%%timeit

q1 = df.groupby("passenger_count").mean().compute()

The slowest run took 87.32 times longer than the fastest. This could mean that an intermediate result is being cached.
10.1 s ± 4.09 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


Timing so far (end-to-end):
* Pandas (local) ~ 70 sec
* 4 nodes: ~ 30 sec
* 8 nodes: ~ 15 sec
* 12 nodes: ~ 10 sec

In [63]:
client.close()

In [64]:
cluster.close()