Benchmarks : Grouping

Matt Dowle edited this page Aug 12, 2016 · 27 revisions
  • The input data is randomly ordered. No pre-sort. No indexes. No key.
  • 5 simple queries are run: large groups and small groups on different columns of different types. Similar to what a data analyst might do in practice; i.e., various ad hoc aggregations as the data is explored and investigated.
  • Each package is tested separately in its own fresh session.
  • Each query is repeated once more, immediately. This is to isolate cache effects and confirm the first timing. The first and second times are plotted. The total runtime of all 5 tests is also displayed.
  • The results are compared and checked allowing for numeric tolerance and column name differences.
  • It is the toughest test we can think of but happens to be realistic and very common. Your mileage may of course vary.

Scroll down to find reproducible code, system info and FAQ.


Smaller sizes : 1E7 1E8
Larger size : 2E9 (data.table and dplyr ok but pandas MemoryError)

Code to reproduce the timings above :

## data.table run
$ R --vanilla
require(data.table)
N=2e9; K=100
set.seed(1)
DT <- data.table(
  id1 = sample(sprintf("id%03d",1:K), N, TRUE),      # large groups (char)
  id2 = sample(sprintf("id%03d",1:K), N, TRUE),      # large groups (char)
  id3 = sample(sprintf("id%010d",1:(N/K)), N, TRUE), # small groups (char)
  id4 = sample(K, N, TRUE),                          # large groups (int)
  id5 = sample(K, N, TRUE),                          # large groups (int)
  id6 = sample(N/K, N, TRUE),                        # small groups (int)
  v1 =  sample(5, N, TRUE),                          # int in range [1,5]
  v2 =  sample(5, N, TRUE),                          # int in range [1,5]
  v3 =  sample(round(runif(100,max=100),4), N, TRUE) # numeric e.g. 23.5749
)
cat("GB =", round(sum(gc()[,2])/1024, 3), "\n")
system.time( DT[, sum(v1), keyby=id1] )
system.time( DT[, sum(v1), keyby=id1] )
system.time( DT[, sum(v1), keyby="id1,id2"] )
system.time( DT[, sum(v1), keyby="id1,id2"] )
system.time( DT[, list(sum(v1),mean(v3)), keyby=id3] )
system.time( DT[, list(sum(v1),mean(v3)), keyby=id3] )
system.time( DT[, lapply(.SD, mean), keyby=id4, .SDcols=7:9] )
system.time( DT[, lapply(.SD, mean), keyby=id4, .SDcols=7:9] )
system.time( DT[, lapply(.SD, sum), keyby=id6, .SDcols=7:9] )
system.time( DT[, lapply(.SD, sum), keyby=id6, .SDcols=7:9] )

## dplyr run
$ R --vanilla
require(dplyr)
N=2e9; K=100
set.seed(1)
DF <- data.frame(stringsAsFactors=FALSE,
  id1 = sample(sprintf("id%03d",1:K), N, TRUE),
  id2 = sample(sprintf("id%03d",1:K), N, TRUE),
  id3 = sample(sprintf("id%010d",1:(N/K)), N, TRUE),
  id4 = sample(K, N, TRUE),                          
  id5 = sample(K, N, TRUE),                         
  id6 = sample(N/K, N, TRUE),                       
  v1 =  sample(5, N, TRUE),                         
  v2 =  sample(5, N, TRUE),                       
  v3 =  sample(round(runif(100,max=100),4), N, TRUE)
)
cat("GB =", round(sum(gc()[,2])/1024, 3), "\n")
system.time( DF %>% group_by(id1) %>% summarise(sum(v1)) )
system.time( DF %>% group_by(id1) %>% summarise(sum(v1)) )
system.time( DF %>% group_by(id1,id2) %>% summarise(sum(v1)) )
system.time( DF %>% group_by(id1,id2) %>% summarise(sum(v1)) )
system.time( DF %>% group_by(id3) %>% summarise(sum(v1),mean(v3)) )
system.time( DF %>% group_by(id3) %>% summarise(sum(v1),mean(v3)) )
system.time( DF %>% group_by(id4) %>% summarise_each(funs(mean), 7:9) )
system.time( DF %>% group_by(id4) %>% summarise_each(funs(mean), 7:9) )
system.time( DF %>% group_by(id6) %>% summarise_each(funs(sum), 7:9) )
system.time( DF %>% group_by(id6) %>% summarise_each(funs(sum), 7:9) )
$ python3
import pandas as pd
import numpy as np
import timeit

# randChar is workaround for MemoryError in mtrand.RandomState.choice
# http://stackoverflow.com/questions/25627161/how-to-solve-memory-error-in-mtrand-randomstate-choice
def randChar(f, numGrp, N) :
   things = [f%x for x in range(numGrp)]
   return [things[x] for x in np.random.choice(numGrp, N)]

def randFloat(numGrp, N) :
   things = [round(100*np.random.random(),4) for x in range(numGrp)]
   return [things[x] for x in np.random.choice(numGrp, N)]

N=int(1e9)
K=100
DF = pd.DataFrame({
  'id1' : randChar("id%03d", K, N),       # large groups (char)
  'id2' : randChar("id%03d", K, N),       # large groups (char)
  'id3' : randChar("id%010d", N//K, N),   # small groups (char)
  'id4' : np.random.choice(K, N),         # large groups (int)
  'id5' : np.random.choice(K, N),         # large groups (int)
  'id6' : np.random.choice(N//K, N),      # small groups (int)
  'v1' :  np.random.choice(5, N),         # int in range [1,5]
  'v2' :  np.random.choice(5, N),         # int in range [1,5]
  'v3' :  randFloat(100,N)                # numeric e.g. 23.5749
})
timeit.Timer("DF.groupby(['id1']).agg({'v1':'sum'})"                            ,"from __main__ import DF").timeit(1)
timeit.Timer("DF.groupby(['id1']).agg({'v1':'sum'})"                            ,"from __main__ import DF").timeit(1)
timeit.Timer("DF.groupby(['id1','id2']).agg({'v1':'sum'})"                      ,"from __main__ import DF").timeit(1)
timeit.Timer("DF.groupby(['id1','id2']).agg({'v1':'sum'})"                      ,"from __main__ import DF").timeit(1)
timeit.Timer("DF.groupby(['id3']).agg({'v1':'sum', 'v3':'mean'})"               ,"from __main__ import DF").timeit(1)
timeit.Timer("DF.groupby(['id3']).agg({'v1':'sum', 'v3':'mean'})"               ,"from __main__ import DF").timeit(1)
timeit.Timer("DF.groupby(['id4']).agg({'v1':'mean', 'v2':'mean', 'v3':'mean'})" ,"from __main__ import DF").timeit(1)
timeit.Timer("DF.groupby(['id4']).agg({'v1':'mean', 'v2':'mean', 'v3':'mean'})" ,"from __main__ import DF").timeit(1)
timeit.Timer("DF.groupby(['id6']).agg({'v1':'sum', 'v2':'sum', 'v3':'sum'})"    ,"from __main__ import DF").timeit(1)
timeit.Timer("DF.groupby(['id6']).agg({'v1':'sum', 'v2':'sum', 'v3':'sum'})"    ,"from __main__ import DF").timeit(1)

Correct pandas usage checked

R info

$ R --vanilla
# R version 3.1.1 (2014-07-10) -- "Sock it to Me"
# Copyright (C) 2014 The R Foundation for Statistical Computing
# Platform: x86_64-pc-linux-gnu (64-bit)

> sessionInfo()
# R version 3.1.1 (2014-07-10)
# Platform: x86_64-pc-linux-gnu (64-bit)
# 
# locale:
#  [1] LC_CTYPE=en_US.UTF-8       LC_NUMERIC=C
#  [3] LC_TIME=en_US.UTF-8        LC_COLLATE=en_US.UTF-8
#  [5] LC_MONETARY=en_US.UTF-8    LC_MESSAGES=en_US.UTF-8
#  [7] LC_PAPER=en_US.UTF-8       LC_NAME=C
#  [9] LC_ADDRESS=C               LC_TELEPHONE=C
# [11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C

# attached base packages:
# [1] stats     graphics  grDevices utils     datasets  methods base

# other attached packages:
# [1] dplyr_0.2        data.table_1.9.2
# 
# loaded via a namespace (and not attached):
# [1] assertthat_0.1 parallel_3.1.1 plyr_1.8.1     Rcpp_0.11.2 reshape2_1.4
# [6] stringr_0.6.2  tools_3.1.1

System info

Amazon EC2 r3.8large

$ lsb_release -a
# No LSB modules are available.
# Distributor ID:    Ubuntu
# Description:    Ubuntu 14.04 LTS
# Release:    14.04
# Codename:    trusty

$ uname -a
# Linux ip-172-31-33-222 3.13.0-29-generic #53-Ubuntu SMP
# Wed Jun 4 21:00:20 UTC 2014 x86_64 x86_64 x86_64 GNU/Linux

$ lscpu
# Architecture:          x86_64
# CPU op-mode(s):        32-bit, 64-bit
# Byte Order:            Little Endian
# CPU(s):                32
# On-line CPU(s) list:   0-31
# Thread(s) per core:    2
# Core(s) per socket:    8
# Socket(s):             2
# NUMA node(s):          2
# Vendor ID:             GenuineIntel
# CPU family:            6
# Model:                 62
# Stepping:              4
# CPU MHz:               2494.090
# BogoMIPS:              5049.01
# Hypervisor vendor:     Xen
# Virtualization type:   full
# L1d cache:             32K
# L1i cache:             32K
# L2 cache:              256K
# L3 cache:              25600K
# NUMA node0 CPU(s):     0-7,16-23
# NUMA node1 CPU(s):     8-15,24-31

$ free -h
#              total       used       free     shared    buffers cached
# Mem:          240G       2.4G       237G       364K        60M 780M
# -/+ buffers/cache:       1.6G       238G
# Swap:           0B         0B         0B

FAQ

What about compiler flags?

We placed the following in ~/.R/Makevars before installing the packages :

CFLAGS=-O3 -mtune=native
CXXFLAGS=-O3 -mtune=native

Otherwise, R by default compiles packages with -g -O2. The R core team know of some problems with -O3 with some other CRAN packages in some circumstances so that choice of R's default is not an oversight. On the other hand we don't know which CRAN packages are affected and to what degree. So we used -O3 for these benchmarks to isolate that concern, since we're not aware of any problem with -O3.

-O3 can speed up by up to 30%; e.g., see tests 3 and 5 in before and after plots below. Right click and 'open image in a new tab' to see these images full size. We realise a single chart showing the percentage change would be better, but not a priority.

For pandas 0.14-1 we installed as follows. Conveniently, it appears to use -O3 by default but if there are better options, please let us know.

wget -O- http://neuro.debian.net/lists/trusty.us-ca.libre | sudo tee /etc/apt/sources.list.d/neurodebian.sources.list
sudo apt-key adv --recv-keys --keyserver hkp://pgp.mit.edu:80 2649A5A9
sudo apt-get update
sudo apt-get install python3-pandas python3-dev
python3-config --cflags
-I/usr/include/python3.4m -I/usr/include/python3.4m  -Wno-unused-result -Werror=declaration-after-statement
-g -fstack-protector --param=ssp-buffer-size=4 -Wformat -Werror=format-security  -DNDEBUG -g -fwrapv
-O3 -Wall -Wstrict-prototypes

Why not a single line chart rather than multiple bar plots for each size?

Because we want to compare syntax too. The syntax is quite wide so a horizontal bar chart accommodates that text. The image can grow in height as more tests or packages are added whereas a line chart starts to become cluttered. The largest size is likely the most interesting; we don't expect many people to look at the smaller sizes.

Shouldn't there be a gc() between each system.time()?

system.time() already does that built-in. It has a gcFirst argument, by default TRUE. The time for that gc is excluded by system.time.

Those costs are tiny. Are they the EC2 compute cost?

Yes @ $0.30/hour as a spot instance. The machine specs are above. It was just to show how accessible and cheap large RAM on-demand machines can be.

What about comparing memory usage too?

Yes we'd like to but didn't know how to measure that via the command line. This looks perfect: https://github.com/gsauthof/cgmemtime.

Does either package have a parallel option?

Not currently. There are 32 CPU's on this machine and only one is being used, so that's an obvious thing to address.

What if the groups aren't randomly shuffled but there's some clustering?

All packages should be faster due to better L2/L3 cache efficiency. Some may benefit more than others and it'd be great to test that. Any volunteers? However, a feature of this benchmark is that it's a sum of 5 different queries. In real life, if groups are clustered then it can only be on one dimension (e.g. (id,time) or (time,id), but not both). We're trying to capture that with this benchmark.

How does SQL compare, what about indexes and is dplyr faster with a database backend?

This would be great to investigate. Any volunteers? The time and space to create indexes vs the subsequent faster grouping times should be reported separately. data.table's setkey is like a clustered index in SQL. The key is the physical row order in RAM and is very L2/L3 cache efficient when grouping those columns. dplyr would also be faster on a data.frame if the groups are contiguous in RAM for the same reason. But recall that the benchmark's 5 queries are designed to test grouping in several different columns. So just one ordering or one index would be insufficient. This is why we're working on secondary keys (set2key) analogous to CREATE INDEX in SQL. Two types of SQL database should be tested: row-store and column-store. If they have in-memory options those should be selected since disk will always be slower than RAM. Both data.table and data.frame are in-memory column stores limited by RAM. 240GB is the biggest EC2 machine available currently which we used for these results. On the other hand, a database can of course can store and query many TB and the data can persist on disk.

Has anything else changed?

v2 was originally sample(1e6, N, TRUE) but the size of these ints caused sum(v2) in test 5 to overflow. R detects this and coerces to numeric. To keep things simple we changed it to sample(5, N, TRUE) just like v1.

Why do the data.table commands use keyby= rather than by=?

To match dplyr. by= in data.table returns the groups in the order they first appear in the dataset. This can be handy when the data is grouped in blocks and the order those blocks appear has some meaning to be preserved. As far as we know dplyr has no equivalent; it always sorts the returned groups. keyby= is a by= followed by a setkey on the group columns in the result.

Why did you run these benchmarks?

To know what the facts are and to demonstrate what we think a decent benchmark for large data is. To gain feedback and ideas for improvement. Single query benchmarks like this enable us to compare in a controlled way various packages on as fair and open a basis as possible using simulated data to measure how they scale in size and cardinality.

Is 2 billion the limit?

Yes. data.table and we believe data.frame are both currently limited to 2^31 rows. To increase this limit we need to start using long vectors internally which were new in R 3.0.0.

Why just grouping?

Because we had to start somewhere. Joins, updates and selecting subsets will follow.

Is just testing sum and mean too simple?

Yes. sum and mean benefit from advanced optimizations in both packages. Adding, say, median is expected to give different results. As might sum()+1 and other expressions that are very commonly needed in practice but are more difficult to optimize. Doing sum() by group first then adding 1 afterwards can be dramatically faster for this reason.

Is it elapsed time that's measured?

Strictly speaking, it's user+sys. Other unrelated tasks on the machine can affect elapsed, we believe. However, this server had so many CPUs and so much memory that elapsed==user+sys in all cases.

How about -flto and similar extra flags?

If there's evidence that it's worth the testing time we're happy to do so. Is there any downside to the extra flag? Why haven't the compiler developers included it in -O3 already? It's a complicated topic; e.g. What are your compiler flags?.

Base R's mean accumulates in long double precision where the machine supports it (and most do) and then does a second pass through the entire vector to adjust mean(x) by typically a very small amount to ensure that sum(x-mean(x)) is minimized. What do these packages do?

Both data.table and dplyr currently do a single pass sum(x)/length(x) where the sum is done in long double precision. They match base in terms of long double but don't do the second pass, currently. Feedback welcome.

What about base R grouping functions?

It would be great to add a comparison for 1E7 rows. We expect the scale to be hours or possibly days for 2E9.

Why haven't the faster methods been incorporated into R itself?

  • data.table and dplyr are still developing and not yet fully mature
  • as you can see from the answer about mean above, they are sometimes not precisely the same as base R. This is the same reason fread hasn't replaced read.csv, because it isn't yet a drop in replacement (ability to read from connections being the main difference).
  • it isn't just speed but both packages prefer a different syntax to base R
  • a lot of new code at C level would need to be added to R itself and a large amount of testing and risk of breakage (5,000 dependent packages on CRAN). To date there hasn't been the will or the time.
  • packages are a fantastic way to allow progress without breaking base R and its dependants.

Why do you use sum(gc()[,2]) rather than object.size()?

object.size() is an estimate and quite slow (pryr::object_size() is better) but also since each package is tested in a fresh and minimal R --vanilla session, sum(gc()[,2] is actually how much RAM that R session has consumed. It includes the increased size of R's global string cache and accounts for the efficiency of R only saving unique strings once. The size of the entire R session is the important thing for large data if you have other R sessions (or other non-R jobs) running on the same machine at the same time. This figure matches very closely to what htop displays for the R process; i.e. 100GB +/- 0.5GB.

Why is the 2nd run for dplyr faster (9 minutes instead of 14 minutes) unlike the other packages where the 1st and 2nd run are the same (5 mins pandas both runs and 3 mins data.table both runs)?

We don't know. It has been suggested that dplyr uses more working memory in this test; growing R's heap to accommodate that memory takes more time on the first run. R's heap is then already big enough for the 2nd run. But that is just a theory.

I have reproduced your timings but when I sum up the 10 system.time() calls I arrive at a total time that is ~1.5x less than the actual total time of the 10 calls.

There are two effects to consider.
Firstly, system.time() returns 3 figures: user, system and elapsed. On a healthy system where no other process is running (e.g. another user, or a virus scan on Windows), user+system == elapsed. Our benchmark was run on a healthy fresh EC2 instance and we checked this was the case for all calls. If you compare a proc.time() before and after the 10 calls, you should compare user+system as well as comparing elapsed to check nothing unrelated to the benchmark has increased the wall clock timing.
Secondly, system.time() has an argument gcFirst which by default is TRUE (see earlier FAQ above). This gc() is performed first by system.time() to free up memory used by previous commands in the R session. Otherwise a gc() during the individual test might increase its variance unfairly just because of previous commands in the session. The time this up-front forced gc() takes is not included in the timings system.time() reports but it would be included when comparing proc.time() before and after the 10 calls. If the algorithm being timed is not memory inefficient and therefore incurs gc() costs within it to clean up as it is running (often many times) then those will be included which is fair. It is just the first forced extra gc() by system.time() that is excluded since if the algorithm is memory efficient and requires no gc() then it should not be penalized by a gc() that was really caused by a previous unrelated call.
If you compare the total time of the 10 calls to the total time printed at the top of the plot then you should find it matches as that was done with a proc.time() before and after (comparing user+sys, see ealier FAQ above) and therefore includes the time of the 10 forced extra gc()s by system.time().

Have all these questions really been asked?

Yes they have. Even this question was asked. This is why this is a wiki page, so we could improve the benchmark and easily add to this FAQ as comments and suggestions come in.

How do I contact you?

Matt Dowle; mattjdowle at gmail dot com