Break apart large files in dask.bag.from_filenames #427

Merged
merged 6 commits into from Jul 15, 2015

Projects

None yet

2 participants

@mrocklin
Member

This allows users to break apart single large text files in dask.bag. It reuses the textblock functionality that we've been using in dask.dataframe.io.read_csv

In [1]: import dask.bag as db

In [2]: b = db.from_filenames('iris.json', chunkbytes=5000)  # 5kB chunks

In [3]: b.dask
Out[3]: 
{('from-filename-1', 0): (list,
  (<class StringIO.StringIO at 0x7fdf2828f940>,
   (<function dask.utils.textblock>,
    '/home/mrocklin/data/iris.json',
    0,
    5000,
    None))),
 ('from-filename-1', 1): (list,
  (<class StringIO.StringIO at 0x7fdf2828f940>,
   (<function dask.utils.textblock>,
    '/home/mrocklin/data/iris.json',
    5000,
    10000,
    None))),
 ('from-filename-1', 2): (list,
  (<class StringIO.StringIO at 0x7fdf2828f940>,
   (<function dask.utils.textblock>,
    '/home/mrocklin/data/iris.json',
    10000,
    15000,
    None))),
 ('from-filename-1', 3): (list,
  (<class StringIO.StringIO at 0x7fdf2828f940>,
   (<function dask.utils.textblock>,
    '/home/mrocklin/data/iris.json',
    15000,
    20000,
    None)))}

In [5]: b.count().compute()
Out[5]: 150

cc @danielfrg

@danielfrg
Member

This is working for me. Testing it in a 33 GB json file right now.

Any ideas on how to get a "ideal" chunkbytes for a given machine and some data?

@mrocklin
Member

As long as you're under available_memory / ncores then I suspect that it's more specific to the dataset and the computation than to the machine. I also suspect that it doesn't matter much as long as you choose some moderate value, like 10 megs.

It's tricky to figure out ahead of time. For example if you leave your data as just bytes then 10MB seems like a small amount that fits very comfortably in memory. However if you're parsing this into Python objects then it might blow up into 30x times that size.

What was your experience? Did you blow up memory?

@danielfrg
Member

No, haven't blow up.

I did a count on this 33 GB json file. I used 100MB as the chunk size because using less (10MB) it was taking a long time setting the data variable. Like you said it depends.

Code:

data = db.from_filenames('RC_2015-05', chunkbytes=100000).map(json.loads)
data.npartitions  # 334557
n = data.count().compute()   # 54504410
# time: 1 loops, best of 1: 11min 40s per loop

Time using wc -l:

time wc -l RC_2015-05
54504410 RC_2015-05   -   lines
wc -l RC_2015-05  28.70s user 10.19s system 97% cpu 39.810 total

Of course its not a fair comparison because I did json.loads on dask.

The chunkbytes is working fine for me and now I can see dask using multiple processes while reading one big file on my laptop.

@mrocklin
Member

Yeah, json parsing is much slower than disk I/O. I recommend the use of the ujson library, which is a bit faster, but still nothing like I/O speeds.

Parallelizing a single file will also incur some overhead. We're not being optimal here. We read in bytes, then decode, then pass them through a StringIO object. This could probably be improved with some care. The normal case of not using chunkbytes doesn't incur this cost.

@mrocklin
Member

OK, I'm going to merge this soon if no comments.

@danielfrg I'd love to see what you do with this data if you continue playing with it.

@mrocklin mrocklin merged commit 2743d35 into dask:master Jul 15, 2015

1 check passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details
@mrocklin mrocklin deleted the mrocklin:bag-large-files branch Jul 15, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment