## Benchmarking

This notebook demonstrates how to benchmark basic computations with Dask in the Coiled execution environment.

In a distributed environment, the same computation can have different runtimes, depending on the size of the cluster, partitioning of the data, computational parallelism, and other factors.

Here are the main things you'll learn from the benchmarking in this notebook:

* Dask is a lot faster when data is spread across multiple partitions so the computations can be run in parallel
* Persisting DataFrames in memory can be a great performance optimization
* Figuring out the optimal partitioning for a given computation is challenging
* Lazy execution makes benchmarking challenging

### Setup Coiled cluster to access Dask computation environment

In [149]:
import coiled
from dask.distributed import Client
import dask.dataframe as dd
import time

In [150]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
cluster = coiled.Cluster(name="benchmarking", n_workers=10)

In [4]:
client = Client(cluster)

## Count benchmarking

This section reads a month of the NYC taxi data into different Dask DataFrames, performs a count, and measures computation time.  Here are the scenarios examined:

1. Persisted DataFrame with 41 partitions
2. Unpersisted DataFrame with 41 partitions
3. Unpersisted DataFrame with 11 partitions
4. Unpersisted DataFrame with 1 partition

DataFrames with multiple partitions can perform count computations in parallel and that's why they're much faster!  Let's quantify the speed gains Dask provides.

In [5]:
dtype = {
    "payment_type": "UInt8",
    "VendorID": "UInt8",
    "passenger_count": "UInt8",
    "RatecodeID": "UInt8",
    "store_and_fwd_flag": "category",
    "PULocationID": "UInt16",
    "DOLocationID": "UInt16",
}
path = "s3://nyc-tlc/trip data/yellow_tripdata_2019-01.csv"

### Create Persisted DataFrame

In [6]:
df = dd.read_csv(
    "s3://nyc-tlc/trip data/yellow_tripdata_2019-01.csv",
    parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
    dtype=dtype,
    storage_options={"anon": True},
    blocksize="16 MiB",
).persist()

In [7]:
df.npartitions

41

### Create unpersisted DataFrame

In [8]:
df_unpersisted = dd.read_csv(
    path,
    parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
    dtype=dtype,
    storage_options={"anon": True},
    blocksize="16 MiB",
)

In [9]:
df_unpersisted.npartitions

41

### Create unpersisted DataFrame without setting blocksize

In [10]:
df_no_blocksize = dd.read_csv(
    path,
    parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
    dtype=dtype,
    storage_options={"anon": True},
)

In [11]:
df_no_blocksize.npartitions

11

### Create unpersisted DataFrame with blocksize set to None

In [12]:
df_none_blocksize = dd.read_csv(
    path,
    parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
    dtype=dtype,
    storage_options={"anon": True},
    blocksize=None,
)

In [13]:
df_none_blocksize.npartitions

1

### Define some benchmark helper functions

In [14]:
dask_benchmarks = {
    'duration': [],  # in seconds
    'task': [],
}

In [15]:
def benchmark(f, df, benchmarks, name, **kwargs):
    start_time = time.time()
    ret = f(df, **kwargs)
    benchmarks['duration'].append(time.time() - start_time)
    benchmarks['task'].append(name)
    print(f"{name} took: {benchmarks['duration'][-1]} seconds")
    return benchmarks['duration'][-1]

### Run the count benchmarks with the different DataFrames

In [16]:
def count(df):
    return len(df)

In [17]:
benchmark(count, df=df, benchmarks=dask_benchmarks, name='count_persisted')

count_persisted took: 0.13576006889343262 seconds


0.13576006889343262

In [18]:
benchmark(count, df=df_unpersisted, benchmarks=dask_benchmarks, name='count_unpersisted')

count_unpersisted took: 5.159780263900757 seconds


5.159780263900757

In [19]:
benchmark(count, df=df_no_blocksize, benchmarks=dask_benchmarks, name='count_no_blocksize')

count_no_blocksize took: 6.059859991073608 seconds


6.059859991073608

In [20]:
benchmark(count, df=df_none_blocksize, benchmarks=dask_benchmarks, name='count_none_blocksize')

count_none_blocksize took: 36.718952894210815 seconds


36.718952894210815

## Count benchmarking conclusions

Running the count operation on a persisted DataFrame with 41 partitions is by far the fastest.  The operation is much slower when the DataFrame isn't persisted and when fewer partitions are used.  Using one partition is particularily slow because it prohibits Dask from performing the computation in parallel.

Count benchmarks are OK for CSV files, but should be avoided when benmarking data stored in the Parquet file format.  Parquet files store count metadata in the file footer.  Execution engines can just grab the count from the file footer rather than reading in all the data and actually performing the count.

## mean benchmarking

Let's calculate the mean fare using the same DataFrames as above and see if the computation time results are similar.

In [21]:
def mean(df):
    return df.fare_amount.mean().compute()

In [22]:
benchmark(mean, df=df, benchmarks=dask_benchmarks, name='mean_persisted')

mean_persisted took: 0.1818859577178955 seconds


0.1818859577178955

In [23]:
benchmark(mean, df=df_unpersisted, benchmarks=dask_benchmarks, name='mean_unpersisted')

mean_unpersisted took: 5.212815046310425 seconds


5.212815046310425

In [24]:
benchmark(mean, df=df_no_blocksize, benchmarks=dask_benchmarks, name='mean_no_blocksize')

mean_no_blocksize took: 6.081754922866821 seconds


6.081754922866821

In [25]:
benchmark(mean, df=df_none_blocksize, benchmarks=dask_benchmarks, name='mean_none_blocksize')

mean_none_blocksize took: 34.238887786865234 seconds


34.238887786865234

## Mean benchmarking conclusions

The mean benchmarks unsurprisingly show the same results as the count benchmarks.  DataFrames with more partitions that are persisted in memory are a lot faster.  Count and mean operations are easy to parallelize on a cluster.  Let's turn our attention to an aggregation that's more complicated.

## Group by benchmarking

Let's compute the total fare amount, by day.  Remember that the data is distributed in 41 different partitions spread across different nodes in the cluster.  Data for a given day may be spread across each memory partition.

Certain aggregataions require the data to be shuffled, which can be expensive.  Minimizing data shuffling is one of the best performance optimizations in a cluster environment.

This section will calculate a sum, which is easier to parallelize than other aggregations, like a distinct count.  Let's view the output of this computation before running the benchmark.

In [26]:
df_unpersisted["pickup_day"] = df_unpersisted["tpep_pickup_datetime"].dt.date

In [32]:
df_unpersisted.groupby("pickup_day").fare_amount.sum().compute().sort_values(ascending=False).head()

pickup_day
2019-01-11    4213545.05
2019-01-25    3629449.04
2019-01-23    3625535.48
2019-01-17    3547486.80
2019-01-24    3521669.99
Name: fare_amount, dtype: float64

## Benchmark the computation

In [33]:
def light_benchmark(f):
  start_time = time.time()
  f()
  return time.time() - start_time

In [35]:
light_benchmark(lambda: df_unpersisted.groupby('pickup_day').fare_amount.sum().compute())

6.0322792530059814

## Set an index and see if that makes the computation run faster

In [36]:
df_unpersisted2 = df_unpersisted.set_index("pickup_day")

In [37]:
light_benchmark(lambda: df_unpersisted2.groupby('pickup_day').fare_amount.sum().compute())

8.970112085342407

In this case the cost of setting the index outweighs the performance benefit.

## Closing thoughts

This benchmarking analysis demonstrates the power of cluster computing with Dask.  It also highlights the unique challenges of working in a cluster environment.  Organizing data in the right memory partitions and strategically executing computations is necessary to optimize performance.