Description
Tasks
For the past couple weeks I've been investigating datashader's performance and how we can improve upon it. I'm now documenting my remaining tasks, in case I get pulled away on a different project. Below is a list of tasks/issues that I'm currently addressing:
- Extend the filetimes.py and filetimes.yml benchmarking environment to find the optimal file format for datashader/dask (Issue Recommended file format for large files #129)
- Benchmark numba compared to handwritten ufuncs in vaex (Issue Performance comparison to vaex #310)
- Gather perf information about dask locking behavior (Issue Investigate dask locking behavior #314)
- Investigate why Cachey leads to better runtime performance for repeat datashader aggregations
- Document memory usage findings (Issue Add memory usage benchmarks #305)
- Investigate how datashader's performance changes with data types (doubles vs floats, etc) (Issue Add memory usage benchmarks #305)
- Verify that repeat aggregations no longer depend on file format (Issue Recommended file format for large files #129)
- Investigate distributed scheduler vs threaded scheduler for single-machine use case (Add --distributed option to benchmark + fix benchmarks for feather and bcolz #331, Performance degradation of repeated aggregations with distributed scheduler and client.persist() on a single machine #332, Improvements to distributed scheduler benchmarks #334)
- Identify issues hindering distributed scheduler from performing more effectively - credit goes to @martindurant ( Performance degradation of repeated aggregations with distributed scheduler and client.persist() on a single machine #332, Memory usage of min/max operations due to nan-checking #336, Using distributed scheduler with persist leads to slower loading times, despite dataset fitting into RAM with threaded scheduler #337 )
Performance takeaways
Below are some performance-related takeaways that fell out of my experiments and optimizations with datashader and dask:
General
-
Use the latest version of numba (>=0.33). This includes bugfixes providing ~3-5x speedups for many cases (Unnecessary refcount not removed in 2 levels function call causing significant performance decrease numba/numba#2345, Fix refct pruning not removing refct op with tail call. numba/numba#2349, refcount op in branching code affects performance numba/numba#2350)
-
When interacting with data on the filesystem, store it in the Apache Parquet format when possible. Snappy compression should be used when writing out parq files, and the data should rely on categorical dtypes (when possible) before writing the parq files, as parquet supports categoricals in its binary format (Recommended file format for large files #129)
-
Use the categorical dtype for columns with data that takes on a limited, fixed number of possible values. Categorical columns use a more memory-efficient data representation and are optimized for common operations such as sorting and finding uniques. Example of how to convert a column to the
categorical
dtype:df[colname] = df[colname].astype('category')
-
There is promise with enhancing datashader's performance even further by using single-precision floats (
np.float32
) instead of double-precision floats (np.float64
). In past experiments this cut down the time to load data off of disk (assuming the data was written out in single-precision float) as well as datashader's aggregation times. Care should be taken using this approach, as using single-precision (in any software application, not just datashader) leads to different numerical results than double-precision (Add memory usage benchmarks #305) -
When using pandas dataframes, there will be a speedup if you cache the
cvs.x_range
andcvs.y_range
variables, and pass them back into theCanvas()
constructor during future instantiations. As of Optimize range calculation operations #344 , dask dataframes automatically memoize the x_range and y_range calculations; this works for dask because dask's dataframes are immutable, unlike pandas (Recommended file format for large files #129)
Single machine
-
A rule-of-thumb for the number of partitions to use while converting pandas dataframes into dask dataframes is
multiprocessing.cpu_count()
. This allows dask to use one thread per core for parallelizing computations (Recommended file format for large files #129) -
When the entire dataset fits into memory at once, persist the dataframe as a Dask dataframe prior to passing it into datashader (Recommended file format for large files #129). One example of how to do this:
from dask import dataframe as dd import multiprocessing dask_df = dd.from_pandas(df, npartitions=multiprocessing.cpu_count()).persist() ... cvs = datashader.Canvas(...) agg = cvs.points(dask_df, ...)
-
When the entire dataset doesn't fit into memory at once, use the distributed scheduler (Add --distributed option to benchmark + fix benchmarks for feather and bcolz #331) without persisting (there is an outstanding issue Performance degradation of repeated aggregations with distributed scheduler and client.persist() on a single machine #332 that illustrates the problem with the distributed scheduler +
persist
).
Multiple machines
- Use the distributed scheduler to farm computations out to remote machines.
client.persist(dask_df)
may help in certain cases, but be sure to includedistributed.wait()
to block until the data is read into RAM on each worker (Performance degradation of repeated aggregations with distributed scheduler and client.persist() on a single machine #332)