Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelization for embarrassingly parallel tasks #5751

Closed
michaelaye opened this issue Dec 19, 2013 · 64 comments
Closed

Parallelization for embarrassingly parallel tasks #5751

michaelaye opened this issue Dec 19, 2013 · 64 comments
Labels
Docs Groupby IO HDF5 read_hdf, HDFStore Performance Memory or execution speed performance
Milestone

Comments

@michaelaye
Copy link
Contributor

I would like to promote the idea of applying multiprocessing.Pool() execution of embarrassingly parallel tasks, e.g. the application of a function to a large number of columns.
Obviously, there's some overhead to set it up, so there will be a lower limit of number of columns from which on this only can be faster than the already fast cython approach. I will be adding my performance tests later to this issue.
I was emailing @jreback about this and he added the following remarks/complications:

  • transferring numpy data is not that efficient - but still prob good enough
  • you can transfer (pickle) lambda type functions so maybe need to look at using a library called dill ( which solves this problem) - possibly could slightly modify msgpack to do this though (and is already pretty efficient at transferring other types of objects)
  • could also investigate joblib - I think statsmodels uses it [ed: That seems to be correct, I read in their group about joblib]
  • I would create a new top level dir core/parallel for this type of stuff
  • the strategy in this link could be a good way to follow: http://stackoverflow.com/questions/17785275/share-large-read-only-numpy-array-between-multiprocessing-processes

links:
http://stackoverflow.com/questions/13065172/multiprocess-python-numpy-code-for-processing-data-faster
http://docs.cython.org/src/userguide/parallelism.html
http://distarray.readthedocs.org/en/v0.5.0/

@jtratner
Copy link
Contributor

Maybe this would be something better done in a sandbox/parallel folder
until we settle on what's feasible to do? It would also be good to have
someone to test perf on Windows as well, I've heard that there are very
different performance characteristics for Windows vs. Linux/OSX.

@jtratner
Copy link
Contributor

for windows vs. linux/osx threading/multiprocessing that is.

@jreback
Copy link
Contributor

jreback commented Dec 19, 2013

I think this could start with an optional keyword / option to enable parallel conputation where useful. this solves the windows/Linux issues because the default to do parallel can be different ( or have different thresholds, like we do for numexpr)

@TomAugspurger
Copy link
Contributor

They could all be grouped under a par attribute like the str methods. What all is in mind here other than apply? Would sum, prod, etc possibly benefit?

@jreback
Copy link
Contributor

jreback commented Dec 28, 2013

@michaelaye can u put up a simple example that would be nice for benchmarking?

@jreback
Copy link
Contributor

jreback commented Dec 29, 2013

Here's an implementation using joblib:

https://github.com/jreback/pandas/tree/parallel

and some timings.....I had to use a pretty contrived function actually....
you need to weigh the pickle time for the sub-frames vs the function time
the pickle time listed below is to disk which is not the case when sending to sub-processes
(but is still the limiting factor I think). FYI if the frame is already on disk (e.g. HDF), then this could have quite substantial benefit I think.

In [1]: df1  = DataFrame(np.random.randn(20000, 1000))

In [2]: def f1(x):
   ...:         result = [ np.sqrt(x) for i in range(10) ]
   ...:         return result[-1]
   ...: 
In [8]: %timeit df1.to_pickle('test.p')
1 loops, best of 3: 1.77 s per loop
# reg apply
In [3]: %timeit df1.apply(f1)
1 loops, best of 3: 6.28 s per loop

# using 12 cores (6 real x 2 hyperthread)
In [4]: %timeit df1.apply(f1,engine='joblib')
1 loops, best of 3: 2.06 s per loop

# 1 core pass thru
In [5]: %timeit df1.apply(f1,engine=pd.create_parallel_engine(name='joblib',force=True,max_cpu=1))
1 loops, best of 3: 6.28 s per loop

In [6]: %timeit df1.apply(f1,engine=pd.create_parallel_engine(name='joblib',force=True,max_cpu=4))
1 loops, best of 3: 2.68 s per loop

In [7]: %timeit df1.apply(f1,engine=pd.create_parallel_engine(name='joblib',force=True,max_cpu=2))
1 loops, best of 3: 3.87 s per loop

pickle time outweighs the perf gains, function is too quick so no benefit here

In [6]: %timeit df1.apply(f2)
1 loops, best of 3: 981 ms per loop

In [7]: %timeit df1.apply(f2,engine='joblib')
1 loops, best of 3: 1.8 s per loop

In [8]: def f2(x):
    return np.sqrt(x)
   ...: 

So you need a sufficiently slow function on a single-column to make this worthwhile
(would be pretty easy to do a sample timing of a single column say and decide wether to go parallel or not)
right now just using it on-demand (user/option) specified

In [9]: %timeit f1(df1.icol(0))
100 loops, best of 3: 5.89 ms per loop

In [10]: %timeit f2(df1.icol(0))
1000 loops, best of 3: 639 ᄉs per loop

@dragoljub
Copy link

Good write up Jeff. I think pickle time is a big factor but also the time to spawn a new process.

I would envision this working with a set of compute processes that are pre launched on startup and wait to do work. For my future parallel work I will probably use iPython parallel across distributed hdf5 data. The problem that I often run into is slowly growing memory consumption for python processes that live for too long.

On disk parallel access of HDF5 row chunks to speed up computation sounds great.

@jreback
Copy link
Contributor

jreback commented Dec 29, 2013

@dragoljub

yep...I don't think adding a IPython parallel back end would be all that difficult (or other distributed type of backends). Just inherit and plug in.

HDF5 and groupby apply look like especially nice cases for enhancement with this.

Pls play around and give me feedback on the API (and even take a stab at a backend!)

@jreback
Copy link
Contributor

jreback commented Jan 1, 2014

http://docs.cython.org/src/userguide/parallelism.html

a Cynthon engine is also straightforward (though needs a slight change in setup to compile with OpenMP Support)
but seems straightforward

@chrido
Copy link

chrido commented Jan 29, 2014

I've loaded a data-frame with roughly 170M rows in memory (python used ~35GB RAM) and timed the same operation with 3 methods and ran it over night. The machine has 32 physical or 64 hypervised cores and enough free RAM. While date conversion is a very cheap operation it shows the overhead of these methods.

While the single threaded way is the fastest its quite boring to see a single core continuously running at 100% while 63 are idling. Ideally I want for parallel operations some kind of batching to reduce the overhead, e.g. always 100000 rows or something like batchsize=100000.

@interactive
def to_date(strdate) :
    return datetime.fromtimestamp(int(strdate)/1000)

%time res['milisecondsdtnormal']=res['miliseconds'].map(to_date)
#CPU times: user 14min 52s, sys: 2h 1min 30s, total: 2h 16min 22s
#Wall time: 2h 17min 5s

pool = Pool(processes=64)
%time res['milisecondsdtpool']=pool.map(to_date, res['miliseconds'])
#CPU times: user 21min 37s, sys: 2min 30s, total: 24min 8s
#Wall time: 5h 40min 50s

from IPython.parallel import Client
rc = Client() #local 64 engines
rc[:].execute("from datetime import datetime")
%time res['milisecondsipython'] = rc[:].map_sync(to_date, res['miliseconds'])
#CPU times: user 5h 27min 4s, sys: 1h 23min 50s, total: 6h 50min 54s
#Wall time: 10h 56min 18s

@jreback
Copy link
Contributor

jreback commented Jan 29, 2014

it's not at all clear what you are timing here; the way pool and ipython split this is exceedingly poor; they turn this type of task into a lot of scalar evaluations where the cost of transport is MUCH higher than the evaluations time.

the pr does exactly this type of batching

you need to have each processor execute a slice and work on it in a single task (for each proessor), not distrute the pool like you are showing.

@jreback jreback modified the milestones: 0.15.0, 0.14.0 Feb 15, 2014
@jreback
Copy link
Contributor

jreback commented Feb 25, 2014

@michaelaye did you have a look at this branch? https://github.com/jreback/pandas/tree/parallel

@jseabold
Copy link
Contributor

jseabold commented Mar 7, 2014

Oh, this is exciting. I've been waiting for a parallel scatter/gather apply function using IPython.parallel. Please keep us up to date on any progress here.

@dragoljub
Copy link

Indeed! It would be a great feature. I have been using concurrent.futures and that makes things pretty easy, however the cost of spooling up new processes still takes up a bunch of time. If we have IPython parallel kernels just waiting to do work with all the proper imports, passing data pointers to them and aggregating results would be fantastic.

@jreback
Copy link
Contributor

jreback commented Mar 7, 2014

@dragoljub you have hit the nail on the head. joblib is fine, but you often don't need to spawn processes that way; usually you have an engine hanging out their.

Do you have some code that I could hijack?

I don't think it would be very hard to add this using IPython.parallel; I just have never used it (as for what I do I often spawn relatively long-lived processes)

@jseabold
Copy link
Contributor

jseabold commented Mar 7, 2014

It may be overkill but I have a notebook on using IPython.parallel here. There are some quick examples.

https://github.com/jseabold/zorro

Their docs are also quite good

http://ipython.org/ipython-doc/dev/parallel/

@jreback
Copy link
Contributor

jreback commented Mar 7, 2014

thanks skipper....ok next thing....do you guys have some non-trivial examples for vbenching purposes? e.g. stuff that does actual work (and takes a non-trivial amount of time) that can use for benchmarking? (needs to be relatively self-contained....though obviously could use say statsmodels :)

@jseabold
Copy link
Contributor

jseabold commented Mar 7, 2014

I happen to be running one such problem right now. :) I'm skipping apply in favor of joblib.Parallel map. Let me see if I can make it self contained.

@jseabold
Copy link
Contributor

jseabold commented Mar 7, 2014

Hmm, maybe it is too trivial. My actual use case takes much longer 20 obs ~ 1s and the data is quite big. Find the first occurrence of a word in some text. You can scale up n, make the "titles" longer, include unicode, etc. and it quickly becomes time consuming.

n = 100

random_strings = pd.util.testing.makeStringIndex().tolist()
X = pd.DataFrame({'title' : random_strings * n,
                                'year' : np.random.randint(1400, 1800, size=10*n)})

def min_date(x): # can't be a lambda for joblib/pickling
    # watch out for global
    return X.ix[X.title.str.contains('\\b{}\\b'.format(x))].year.min()

X.title.apply(min_date) 

There are maybe some better examples in ipython/examples/parallel. There are also a couple in my notebook. E.g., parallel optimization, but I'm not sure it's a real use case of the scatter-gather apply I'm thinking of. Something like

def crazy_optimization_func(...):
    ....

df = pd.DataFrame(random_start_values)
df.apply(crazy_optimization_func, ...)

Where the DataFrame contains rows of random starting values and you iterate over the zero axis to do poor man's global optimization.

@jseabold
Copy link
Contributor

jseabold commented Mar 7, 2014

Some API inspiration. See aaply, adply, etc.

http://cran.r-project.org/web/packages/plyr/index.html

@jseabold
Copy link
Contributor

jseabold commented Mar 7, 2014

I forgot that Hadley kindly re-licensed all of his code to BSD-compatible, so you can take more than API inspiration if for some reason you're curious.

@jreback
Copy link
Contributor

jreback commented Mar 7, 2014

actually the api really is very easy:

df.apply(my_cool_function, engine='ipython')

from IPython.parallel import Client
df.apply(my_cool_function,engine=pd.create_parallel_engine(client=Client(profile='mpi')))

e.g. you just pass an engine (prob will allow you to simply pass a Client directly as an engine)

@jseabold
Copy link
Contributor

jseabold commented Mar 7, 2014

Great.

You might also allow just 'ipython' and use a default Client() call. If you start your IPython session/notebook with the correct profile then it should respect this and look in that directory for the setup code it needs. There was a bug here in some of the IPython 1.x but it should be fixed now.

ipython/ipython#4238

@jreback
Copy link
Contributor

jreback commented Mar 7, 2014

passing engine='ipython' will create a default Client
also settable via an option, parallel.default_engine

and will only pass with a threshold number of rows (could have a function do that too)

@jseabold
Copy link
Contributor

jseabold commented Mar 7, 2014

Sounds awesome. Can't wait for this. Going to be a big feature.

@cossatot
Copy link

What is the status of this? It seems awesome. Do you just need some functions for benchmarks? I can come up with something if that's helpful/ all that's needed.

how much should a target function take (per row, say; that's what I always apply on)? 0.1 s? 1 s? 10 s? RAM limitations?

@jreback
Copy link
Contributor

jreback commented Jun 11, 2014

Well it works for joblib, sort of with IPython.parallel. needs some more work/time. I am also convinced that you need a pretty intensive task for this to be really useful. e.g. the creation/comm time is non-trivial.

I won't have time for this for a while, but my code is out there.

@nehalecky
Copy link
Contributor

Agree with the comments above :

I think that if your data already lives in a pandas DataFrame then eager parallel execution probably provides most of the benefits that you should expect without anything unexpected.

That's dead on.

Also, I very much like the idea of an engine = argument option. This would be a huge benefit for most end users, especially those using pandas as core dependency in their own applications—immediate parallelism across .map and .apply with the inclusions of a dependency (ala dask) and / or JIT (ala numba) and a simple configuration option.

Excited about this, and just amazing, all the work here. Kudos to you all.

@jreback jreback modified the milestones: Next Major Release, 0.17.0 Aug 20, 2015
@Aspire1Inspire2
Copy link

Is it possible to integrate pandas with the @parallel decorator in ipyparallel, like the example that they have with numpy?

http://ipyparallel.readthedocs.org/en/latest/multiengine.html#remote-function-decorators

I think theoretically speaking, even pandas does not support parallel computing by default, user can still refer to mpi4py for parallel computation. It's just some more coding time if one knows about MPI already.

@datnamer
Copy link

@jreback jreback modified the milestones: 0.18.1, Next Major Release Feb 17, 2016
@jreback
Copy link
Contributor

jreback commented Feb 17, 2016

let's just doc this to direct to dask

@heroxbd
Copy link

heroxbd commented Feb 23, 2016

I tried out groupby and apply with pandas 0.17.1, and surprised to see the function is applied in parallel. I am confused, is this feature already added and enabled by default? I am not using dask.

@jreback
Copy link
Contributor

jreback commented Feb 23, 2016

@heroxbd well the GIL IS released during groupby operations. However there is not paralleism natively / inherent in groupby. Why do you think its in parallel?

@heroxbd
Copy link

heroxbd commented Feb 24, 2016

The evidence is that all the loads of my CPUs grow to 100% when I call groupby apply.
sph = tt.groupby('ev').apply(sphericity)

@jreback
Copy link
Contributor

jreback commented Feb 24, 2016

so if you do arithmetic ops inside sphericity (e.g. df + df) or like. These defer to numexpr, which utilizes multi-cores by default. Can you provide a sketch of this function?

These are performance things that pandas does w/o the user being specifically aware.

@heroxbd
Copy link

heroxbd commented Feb 24, 2016

@jreback Ah-ha. Here it is:
pmtl, ql = event['pmt'].values, event['q'].values
pl = pdir[pmtl] * ql[:,np.newaxis]
s = np.sum(pl[:,:,np.newaxis] * pl[:,np.newaxis,:], axis=0)
qs = np.sum(ql*_2)
eig = np.sort(np.linalg.eigvals(s/qs))
return pd.Series({'S': (eig[0] + eig[1])_1.5, 'A': eig[0]*1.5}) # sphericity, aplanarity

@heroxbd
Copy link

heroxbd commented Mar 2, 2016

Sorry for the noise. The parallel execution come from OpenMP used by OpenBLAS, which in turn is used by NumPy.

@jreback
Copy link
Contributor

jreback commented Apr 25, 2016

closing this as dask is the most appropriate for this. Certainly could add some usage within pandas to actually use dask, but those are separate issues (e.g. on a big-enough groupby (number of groups) deferring to dask is a good thing)

@jreback jreback closed this as completed Apr 25, 2016
@dragoljub
Copy link

dragoljub commented Apr 25, 2016

Was there a reason when releasing the GIL in pandas group by operations to only allows separate group by and apply operations to happen concurrently rather than the computing independent group-level aggregations in parallel?

@mrocklin
Copy link
Contributor

Once you have GIL-releasing groupby operations then other developers can use Pandas in parallel. It's actually quite tricky to intelligently write down the algorithms to handle all groupbys. I think that if someone wants to do this for Pandas that'd be great. It's a pretty serious undertaking though.

To do this with dask.dataframe

$ conda install dask
or 
$ pip install dask[dataframe]
import pandas as pd
df = ... your pandas code ...

import dask.dataframe as dd
df = dd.from_pandas(df, npartitions=20)
dd_result = df.groupby(df.A).B.mean()  # or whatever
pd_result = dd_result.compute()  # uses as many threads as you have logical cores

Of course, this doesn't work on all groupbys (as mentioned before, this is incredibly difficult to do generally) but does work in many common cases.

@dragoljub
Copy link

@mrocklin thanks for the tip. How long would df = dd.from_pandas(df, npartitions=20) take on a dataframe with say 10M-100M rows? Is there a copy involved? Does dask support categorical columns?

@mrocklin
Copy link
Contributor

There is a single copy involved (just to other pandas dataframes). We're effectively just doing dfs = df.iloc[i: i + blocksize] for i in range(0, len(df), blocksize)]. We're also doing a sort on the index ahead of time which, if your algorithms don't require it, you can turn off with sort=False (though some groupbys, particularly groupby.applys will definitely appreciate a sorted index.)

Generally the from_pandas call is cheap relative to groupby calls. Copying data in memory generally runs fairly quickly.

@jreback
Copy link
Contributor

jreback commented Apr 25, 2016

@dragoljub

In particular YMMV depending on such things as the number of groups that you are dealing and how you partition

Small number of groups

In [7]: N = 10000000

In [8]: ngroups = 100

In [9]: df = pd.DataFrame({'A' : np.random.randn(N), 'B' : np.random.randint(0,ngroups,size=N)})

In [10]: %timeit df.groupby('B').A.mean()
10 loops, best of 3: 161 ms per loop
In [15]: ddf = dd.from_pandas(df, npartitions=8)

In [16]: %timeit ddf.groupby(ddf.B).mean().compute()
1 loop, best of 3: 223 ms per loop

Larger number of groups

In [17]: ngroups = 10000

In [18]: df = pd.DataFrame({'A' : np.random.randn(N), 'B' : np.random.randint(0,ngroups,size=N)})

In [19]: %timeit df.groupby('B').A.mean()
1 loop, best of 3: 591 ms per loop

In [21]: %timeit ddf.groupby(ddf.B).mean().compute()
1 loop, best of 3: 323 ms per loop

Can do even better if actually use our index

In [32]: ddf = dd.from_pandas(df.set_index('B'), npartitions=8)

In [33]: %timeit ddf.groupby(ddf.index).mean().compute()
1 loop, best of 3: 215 ms per loop

Note that these are pretty naive timings. This is a single computation that is split into embarassingly parallel tasks. Generally you would use dask for multiple steps in a computation. If you data doesn't fit in memory that dask can often help a lot more.

@rbiswas4
Copy link

In an embarrassingly parallel calculation, I create many dataframes which must be dumped to disc (This is the desired output of the program). I tried doing the computation and the dumping (to hdf5) in parallel using joblib. And I run into HDFWrite trouble. Note, that at this point, I do not worry so much about performance of the parallel write.

An example which demonstrates the problem is in https://github.com/rbiswas4/ParallelHDFWriteProblem

The program demo_problem.py includes a function called worker(i) which creates a very simple dataframe based on the input i and appends that to an hdf file. I do this twice once in serial, and once in parallel using joblib.

What I find is that the serial case always works. But the parallel case is not reproducible: Sometimes it works without a problem and sometimes it crashes. The two log files https://github.com/rbiswas4/ParallelHDFWriteProblem/blob/master/demo.log.worked
and
https://github.com/rbiswas4/ParallelHDFWriteProblem/blob/master/demo.log_problem
are two cases when it worked and did not work respective.

Is there a better way to write to hdf files in a parallel way from pandas that I should use? Is this a question for other fora like joblib and pyTables?

@shoyer
Copy link
Member

shoyer commented Aug 29, 2016

@rbiswas4 If you want to dump a bunch of data to disk in parallel, the easiest thing to do is to create a separate HDF5 file for each process. Your approach is certainly going to result in corrupted data -- see the pytables FAQ for more details (http://www.pytables.org/FAQ.html#can-pytables-be-used-in-concurrent-access-scenarios). You might also be interested in dask.

@rbiswas4
Copy link

Writing to separate files is something I have to avoid because I might end up creating too many files (inode limits). I suppose this means one of the following:

  • I should not be looking at hdf5 as a possible output format, but use a database.
  • Use a smaller number of partitions and hence files created. This is still quite inelegant
  • consider some methodology by which I can write out the dataframes through a separate process (I am not sure how to split things)

It seems the first is the best bet. And, yes I intend to see whether I should use dask!

Thanks @shoyer

@heroxbd
Copy link

heroxbd commented Aug 29, 2016

@rbiswas4, if you can write out raw hdf5 (via h5py) instead of pytables, please have a look at SWMR,

https://www.hdfgroup.org/HDF5/docNewFeatures/NewFeaturesSwmrDocs.html

available in hdf5-1.10.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Docs Groupby IO HDF5 read_hdf, HDFStore Performance Memory or execution speed performance
Projects
None yet
Development

No branches or pull requests