Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

groupby aggregation does not scale well with amount of groups #4001

Open
jangorecki opened this issue Sep 21, 2018 · 15 comments
Open

groupby aggregation does not scale well with amount of groups #4001

jangorecki opened this issue Sep 21, 2018 · 15 comments

Comments

@jangorecki
Copy link

jangorecki commented Sep 21, 2018

It seems that there is performance bug when doing grouping. Time and memory consumed by dask does not seems to scale well with number of output rows.
Please find below script to produce example data, replace N to produce bigger input data.

import pandas as pd
import numpy as np

def randChar(f, numGrp, N) :
   things = [f%x for x in range(numGrp)]
   return [things[x] for x in np.random.choice(numGrp, N)]

def randFloat(numGrp, N) :
   things = [round(100*np.random.random(),4) for x in range(numGrp)]
   return [things[x] for x in np.random.choice(numGrp, N)]

N = int(1e7)
K = 100
x = pd.DataFrame({
  'id1' : randChar("id%03d", K, N),       # large groups (char)
  'id2' : randChar("id%03d", K, N),       # large groups (char)
  'id3' : randChar("id%010d", N//K, N),   # small groups (char)
  'id4' : np.random.choice(K, N),         # large groups (int)
  'id5' : np.random.choice(K, N),         # large groups (int)
  'id6' : np.random.choice(N//K, N),      # small groups (int)
  'v1' :  np.random.choice(5, N),         # int in range [1,5]
  'v2' :  np.random.choice(5, N),         # int in range [1,5]
  'v3' :  randFloat(100,N)                # numeric e.g. 23.5749
})
x.to_csv("example.csv", encoding='utf-8', index=False)

And following code to perform grouping.

import os
import gc
import timeit
import pandas as pd
import dask as dk
import dask.dataframe as dd

print(pd.__version__)
print(dk.__version__)

x = dd.read_csv("example.csv", na_filter=False).persist()
print(len(x))

gc.collect()
t_start = timeit.default_timer()
ans = x.groupby(['id1']).agg({'v1':'sum'}).compute()
t = timeit.default_timer() - t_start
print(len(ans))
print(t)
del ans

gc.collect()
t_start = timeit.default_timer()
ans = x.groupby(['id3']).agg({'v1':'sum', 'v3':'mean'}).compute()
t = timeit.default_timer() - t_start
print(len(ans))
print(t)
del ans

Running on python 3.6.5, pandas 0.23.4, dask 0.19.2.
Single machine 20 CPU, 125 GB memory.

For input 1e7 rows
output 100 rows, timing: 0.4032 s
output 1e5 rows, timing: 2.1272 s

For input 1e8 rows
output 100 rows, timing: 3.2559 s
output 1e6 rows, timing: 149.8847 s

Additionally I checked alternative approach, instead of .compute to use Client and .persist(adding print(len(.)) to ensure persist has kicked in). In both cases time was not acceptable (see table below, units are seconds).

in_rows groups compute() persist()
1e+07 fewer 0.395 0.425
1e+07 more 2.124 2.145
1e+08 fewer 3.239 3.420
1e+08 more 148.211 148.043
1e+09 fewer NA 848.911
1e+09 more NA 5364.569
@mrocklin
Copy link
Member

My guess is that this isn't a bug with grouping, but rather an issue where you're running out of RAM and so getting heavily degraded performance. Strings are unforunately expensive to store in memory in Python and Pandas' lack of a text type kills us.

Some things you could try to test this theory:

  1. Have lots of groups but use fewer rows and see how things behave
  2. Store your data in an efficient on-disk format like parquet, and then do the groupby-aggregation from parquet, rather than having everything sitting in memory
  3. Watch the diagnostic dashboard to see if its writing/reading from disk (look for orange rectangles in the task stream plot) and also check out the profile page after execution to see what is taking up time. See http://dask.pydata.org/en/latest/diagnostics-distributed.html

Also, just to clarify things, persist and compute are orthogonal to the local and distributed schedulers. There is no reason to change to use persist and print(len(.)) when using the distributed scheduler. Your code shouldn't have to change. Compute will work exactly as it did before.

@mrocklin
Copy link
Member

Any update on this @jangorecki ? Were any of those suggestions helpful?

@jangorecki
Copy link
Author

I run those queries with parquet on 1e7 and 1e8 only.
I could not read 1e9 parquet in dask saved by spark due to

FileNotFoundError: [Errno 2] No such file or directory: '/home/user/git/db-benchm
ark/G1_1e9_1e2.parq/_metadata/_metadata'                                

and I couldn't create it from dask as reading csv was running out of memory.
Timings are as follows:
1e7
fewer 6.6s
more 8.1s
1e8
fewer 61.5s
more 216.4s
So there is slowdown, but on those datasizes we surely haven't run out of memory (datasize is 5GB, machine 125GB). I am not surprised there is a slowdown instead of speed up as we moved from in-memory processing to on-disk processing.
Going back to main problem... it already manifests on 1e8 with more groups and in-memory processing, where lack of memory surely is not a problem. Current dask in-memory timing 148s is sub-optimal. AFAIU tuning data storage, etc. will not help much, thus we shouldn't focus on this but rather improve in-memory 1e8 timing.

@mrocklin
Copy link
Member

@martindurant please see the parquet error above. It looks like the current experience of writing from spark and reading from dask still isn't smooth for novice users.

@jangorecki you might find this notebook helpful for your benchmarks: https://gist.github.com/c0b84b689238ea46cf9aa1c79155fe34

@mrocklin
Copy link
Member

My timings for 1e7 are as follows:

1e7
fewer 3s (with the distributed scheduler)
fewer 0.4s (with the threaded scheduler (which is default, just never create a client))
more 5s

but on those datasizes we surely haven't run out of memory (datasize is 5GB, machine 125GB)

Creating all of the columns and storing them in memory is something like 200MB on disk as parquet or 5GB in RAM using Python object dtype. You really shouldn't underestimate the performance cost of using Python object dtypes in Pandas. They operate at Python speeds (rather than C like the rest of Pandas) and take up a ton of memory. In this situation I would strongly recommend

  1. Not persisting all of the unnecessary columns in RAM
  2. Using categorical dtypes

If we do this then I can easily run the 1e8 system on my laptop (16GB of RAM). Here is a run through with the threaded scheduler:

https://gist.github.com/34e987777b19b6f6039e8e6c44dd7fcc

@mrocklin
Copy link
Member

1e8

fewer: 0.57s
more: 220s

Still not great for many groups, as you said above

@mrocklin
Copy link
Member

@TomAugspurger you might want to run the notebook above and look at the /profile page output (also happy to help with this if you're unfamiliar with it). There are some surprisingly long durations coming up in Pandas that I'm not able to easily explain.

@TomAugspurger
Copy link
Member

Anything in particular that stands out duration-wise? (I only tried the default values in the notebook. Haven't played around with others).

One thing that looked somewhat odd was the .agg({'v1':'sum', 'v3':'mean'}) is that pandas is spending time in _factorize_array on both the _get_compress_labels side (the actual determination of groups, which isn't surprising) and the in the actual agg side. I haven't looked at whether there's actually duplicative computation yet though.

@mrocklin
Copy link
Member

mrocklin commented Oct 13, 2018 via email

@jangorecki
Copy link
Author

jangorecki commented Jan 26, 2019

Just to highlight this issue I can point to this report https://h2oai.github.io/db-benchmark/ (click "5 GB" tab)
where dask is fastest solution when there are few groups (question 1) but is terribly slow for many groups (question 3).

@alex959595
Copy link

alex959595 commented Feb 25, 2019

By default Dask combines all group by apply chunks a single Pandas dataframe output no matter the size resulting in. As shown in the Many groups example in the link. https://examples.dask.org/dataframes/02-groupby.html.

By using the parameter split_out you should be able to control this size.

It also automatically combines the resulting aggregation chucks (every 8 chunks by default). FYI: This is an arbitrary value set in dask/dataframe/core.py line 3595. This value is fine for most cases but if the aggregation chunks are large, it could take a substantial amount of time to combine them, (especially if you need to transfer chunk from another worker.

I think you can ensure output of the groupby chunks don’t get to big by setting split_out=4 and split_every =False.

df.groupby('id').x.mean(split_out=4, split_every=False)

@jangorecki
Copy link
Author

jangorecki commented Feb 26, 2019

@alex959595 thanks for suggestion. Any idea why this is not optimised internally? So the aggregation API could be data agnostic, relying only on metadata (schema).

@mrocklin
Copy link
Member

mrocklin commented Feb 26, 2019 via email

@PalakHarwani
Copy link

I have a huge csv file(~400 GB) and I'm reading it through dask. I want to group the data by a column and apply a function to the grouped dataframes. It works perfectly fine with smaller csv files. Is there a way to use split_out with custom functions or any other workaround for this?

@jangorecki
Copy link
Author

jangorecki commented Jun 22, 2020

@PalakHarwani I haven't used split_out function, but there are tricks you can use, you may know them already but just in case. Thanks to @ravwojdyla who contributed them recently to my project.

  • use local distributed process-pool scheduler
  • use less parallel tasks and leverage local processing

That translates to

from dask import distributed
client = distributed.Client(processes=True, silence_logs=logging.ERROR)
dk.config.set({"optimization.fuse.ave-width": 20})

So just by using "distributed` on a single machine you can deal with high cardinality queries much faster.
Even on a small 0.5GB data some queries got more than 10x speed up. There are queries that got slower now (i.e. low cardinality groupby), but the gains are much bigger.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants