Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python][Parquet] Parquet deserialization speeds slower on Linux #38389

Open
mrocklin opened this issue Oct 22, 2023 · 85 comments
Open

[Python][Parquet] Parquet deserialization speeds slower on Linux #38389

mrocklin opened this issue Oct 22, 2023 · 85 comments

Comments

@mrocklin
Copy link

mrocklin commented Oct 22, 2023

Describe the bug, including details regarding any error messages, version, and platform.

I'm debugging slow performance in Dask DataFrame and have tracked things down, I think, to slow parquet deserialization in PyArrow. Based on what I know of Arrow I expect to get GB/s and I'm getting more in the range of 100-200 MB/s. What's more is that this seems to depend strongly on the environment (Linux / OSX) I'm using. I could use help tracking this down.

Experiment

I've isolated the performance difference down to the following simple experiment (notebook here):

# Create dataset
import pyarrow as pa
import pyarrow.parquet as pq
import numpy as np
import pandas as pd
import time
import io

x = np.random.randint(0, 100000, size=(1000000, 100))
df = pd.DataFrame(x)
t = pa.Table.from_pandas(df)


# Write to local parquet file

pq.write_table(t, "foo.parquet")


# Time Disk speeds

start = time.time()
with open("foo.parquet", mode="rb") as f:
    bytes = f.read()
    nbytes = len(bytes)
stop = time.time()
print("Disk Bandwidth:", int(nbytes / (stop - start) / 2**20), "MiB/s")


# Time Arrow Parquet Speeds

start = time.time()
_ = pq.read_table("foo.parquet")
stop = time.time()
print("PyArrow Read Bandwidth:", int(nbytes / (stop - start) / 2**20), "MiB/s")


# Time In-Memory Read Speeds

start = time.time()
pq.read_table(io.BytesIO(bytes))
stop = time.time()

print("PyArrow In-Memory Bandwidth:", int(nbytes / (stop - start) / 2**20), "MiB/s")

Results

I've tried this on a variety of cloud machines (intel/arm, VMs/metal, 8-core/64-core, AWS/GCP) and they all get fast disk speeds (probably cached), but only about 150MB/s parquet deserialization speeds. I've tried this on two laptops, one a MBP and one a ThinkPad running Ubuntu and I get ...

  • MacBookPro: 1GiB/s PyArrow deserialization performance (what I expect)
  • Ubuntu/Thinkpad: 150MB/s PyArrow deserialization

In all cases I've installed latest release, PyArrow 13 from conda-forge

Summary

I'm confused by this. I've seen Arrow go way faster than this. I've tried to isolate the problem as much as possible to identify something in my environment that is the cause, but I can't. Everything seems to point to the conclusion that "PyArrow Parquet is just slow on Linux" which doesn't make any sense to me.

I'd welcome any help. Thank you all for your work historically.

Component(s)

Parquet, Python

@mrocklin
Copy link
Author

Here is a rendered notebook from my Ubuntu Thinkpad to make the numbers more concrete: https://gist.github.com/mrocklin/526120bb5231cc5d9d4e3ca87fc09c68

This run was actually a bit faster than I usually see. On Cloud VMs (even very nice ones) it's hard to get above 150MB/s on real data.

@jorisvandenbossche
Copy link
Member

jorisvandenbossche commented Oct 23, 2023

For reference, I ran your snippet above and repeated the timing part multiple times on a Linux (Ubuntu 20.04) Dell XPS 13 9380 (more than 4 years old, 8th gen Intel Core i7, 4 cores / 8 threads), and I get almost 2 GB/s for disk speed and around 1 GB/s for reading (just under for from file, just above for in-memory).

(so at least it's not a simple mac vs linux issue)

One environment characteristic that will significantly influence those numbers is the parallelization (the Parquet reading will by default use all the available cores). So it might be worth to run those timings with and without threads enabled, to check if also single-threaded performance is bad and ensure it's not related to bad scaling on that front.
On my laptop, I get the expected 3-4x speedup with threads enabled (the numbers above), as I get around 250-300 MB/s using use_threads=False.

@fjetter
Copy link
Contributor

fjetter commented Oct 23, 2023

I ran a couple of pyspy benchmarks on pure pq.read_table downloading from S3. I ran two tests, one with column projection and one with bulk reading. Both show basically the same profile but with different weighting of components.

This profile shows the case where I'm reading a file and are selecting about half it's columns (a mix between different dtypes)

image

Note how the read_table request is split into three parts

  1. A HEAD request that infers whether the provided path is a file or a directory
    finfo = filesystem.get_file_info(path_or_paths)
    This is latency bound which on S3 is typically 100-200ms but can vary quite strongly. This is a request we cannot cache or use to pre-fetch any kind of data. In this example, this alone took 20% of the entire read time
  2. The initialization of the FileSystemDataset object. In native profiles, this points to arrow::dataset::Fragment::ReadPhysicalSchema so I assume this is fetching the footer. This is probably unavoidable but at least this request could be used to pre-fetch some payload data but I'm not sure if this is actually done (I guess not since, the buffer_size kwarg is zero by default). In this specific example, this is about 10% of the read
  3. The final section is now the actual reading of the file.

So, that's 30% where we're doing nothing/not a lot? I'm not sure at which point the pre_buffering can kick on or how this works. This stuff does not show up in my profile since it's the arrow native threadpool.

At least this initial HEAD request appears to be bad, particularly if we're fetching just a couple of columns from otherwise already small-ish files. The file I was looking at is one of the TPCH lineitem files which in our dataset version is 22.4MiB large.

Edit: This all ran on a Coiled VM and I was basically running the read request in a for loop. No multi threading on my side, just the pyarrow native stuff.

@mapleFU
Copy link
Member

mapleFU commented Oct 23, 2023

About (1) some optimization will be included later, see:

  1. GH-37857: [Python][Dataset] Expose file size to python dataset #37868 (This patch should be revisited )

Seems we can enable larger prefetch-depth to arrow fetching multiple files concurrently.

@fjetter
Copy link
Contributor

fjetter commented Oct 23, 2023

Sorry, I just realize that my comment is also slightly off topic. The OP discusses pure deserialization without S3 in between

@fjetter
Copy link
Contributor

fjetter commented Oct 23, 2023

FWIW I slightly modified the above script to run each operation N times since I noticed quite some variance on my machine (M1 2020 MacBook)

# Create dataset
import pyarrow as pa
import pyarrow.parquet as pq
import numpy as np
import pandas as pd
import time
import io

x = np.random.randint(0, 100000, size=(1000000, 100))
df = pd.DataFrame(x)
t = pa.Table.from_pandas(df)
niterations = 20

# Write to local parquet file

pq.write_table(t, "foo.parquet")


# Time Disk speeds

start = time.perf_counter()
for _ in range(niterations):
    with open("foo.parquet", mode="rb") as f:
        bytes = f.read()
        nbytes = len(bytes)
stop = time.perf_counter()
print("Disk Bandwidth:", int(nbytes / ((stop - start) / niterations) / 2**20), "MiB/s")


# Time Arrow Parquet Speeds

start = time.perf_counter()
for _ in range(niterations):
    _ = pq.read_table("foo.parquet")
stop = time.perf_counter()
print("PyArrow Read Bandwidth:", int(nbytes / ((stop - start) / niterations) / 2**20), "MiB/s")


# Time In-Memory Read Speeds

start = time.perf_counter()
for _ in range(niterations):
    pq.read_table(io.BytesIO(bytes))
stop = time.perf_counter()

print("PyArrow In-Memory Bandwidth:", int(nbytes / ((stop - start) / niterations) / 2**20), "MiB/s")

# Time In-Memory Read Speeds

start = time.perf_counter()
for _ in range(niterations):
    pq.read_table(io.BytesIO(bytes)).to_pandas()
stop = time.perf_counter()

print("PyArrow (to_pandas) Bandwidth:", int(nbytes / ((stop - start) / niterations) / 2**20), "MiB/s")

and I get M1 (using pyarrow-13.0.0-py310h382c99a_11_cpu from conda-forge

Disk Bandwidth: 5154 MiB/s
PyArrow Read Bandwidth: 2294 MiB/s
PyArrow In-Memory Bandwidth: 2439 MiB/s
PyArrow (to_pandas) Bandwidth: 1142 MiB/s

while on the cloud using a m6i.xlarge (a coiled based, dask bootstrapped VM, see below) (4vCPUs) it is just (using pyarrow-13.0.0-py310hf9e7431_11_cpu from conda-forge)

Disk Bandwidth: 2173 MiB/s
PyArrow Read Bandwidth: 448 MiB/s
PyArrow In-Memory Bandwidth: 448 MiB/s
PyArrow (to_pandas) Bandwidth: 313 MiB/s

@fjetter
Copy link
Contributor

fjetter commented Oct 23, 2023

@jorisvandenbossche have you used the same conda forge build for your measurements or did you build it yourself? It would be nice to rule out any build differences

@fjetter
Copy link
Contributor

fjetter commented Oct 23, 2023

Ok, fun experiment. I wrapped the above script in a function run_benchmark and ran this on my machine...

image

Looks like the simple fact that we're running this in the dask environment is slowing us down quite a bit. This also biases most/all Coiled-based cloud benchmarks

Code
# Create dataset
import pyarrow as pa
import pyarrow.parquet as pq
import numpy as np
import pandas as pd
import time
import io
def run_benchmark():
    from distributed.worker import print
    x = np.random.randint(0, 100000, size=(1000000, 100))
    df = pd.DataFrame(x)
    t = pa.Table.from_pandas(df)
    niterations = 20
    
    # Write to local parquet file
    
    pq.write_table(t, "foo.parquet")
    
    
    # Time Disk speeds
    
    start = time.perf_counter()
    for _ in range(niterations):
        with open("foo.parquet", mode="rb") as f:
            bytes = f.read()
            nbytes = len(bytes)
    stop = time.perf_counter()
    print("Disk Bandwidth:", int(nbytes / ((stop - start) / niterations) / 2**20), "MiB/s")
    # Time Arrow Parquet Speeds
    
    start = time.perf_counter()
    for _ in range(niterations):
        pq.read_table("foo.parquet")
    stop = time.perf_counter()
    print("PyArrow Read Bandwidth:", int(nbytes / ((stop - start) / niterations) / 2**20), "MiB/s")
    
    # Time In-Memory Read Speeds
    
    start = time.perf_counter()
    for _ in range(niterations):
        pq.read_table(io.BytesIO(bytes))
    stop = time.perf_counter()
    
    print("PyArrow In-Memory Bandwidth:", int(nbytes / ((stop - start) / niterations) / 2**20), "MiB/s")
    
    # Time In-Memory Read Speeds
    
    start = time.perf_counter()
    for _ in range(niterations):
        pq.read_table(io.BytesIO(bytes)).to_pandas()
    stop = time.perf_counter()
    
    print("PyArrow (to_pandas) Bandwidth:", int(nbytes / ((stop - start) / niterations) / 2**20), "MiB/s")

run_benchmark()

from distributed import Client
client = Client()

client.submit(run_benchmark).result()

@fjetter
Copy link
Contributor

fjetter commented Oct 23, 2023

Is pyarrow using either one of OMP_NUM_THREADS, MKL_NUM_THREADS, OPENBLAS_NUM_THREADS to infer how large the threadpool is allowed to be?

Edit: Looking at the code base, I see references and documentation that suggests that OMP_NUM_THREADS controls the size of the internal threadpool

Dask is setting those to one and when I remove that, I'm back to the non-dask speed.

fjetter added a commit to dask/distributed that referenced this issue Oct 23, 2023
In #5098 we set a malloc trim threshold by default to more aggressively control memory trimming.
also related #7177

At the same time, we included these default settings but didn't have incredibly solid arguments for it. It's been a long standing best practice when using dask to disable this nested parallelism.

We haven't received a lot of user feedback about this. However, we had some internal reports of users who were struggling with this because this was quite unexpected behavior for them and non-trivial to debug for the ordinary end user.

In apache/arrow#38389 (comment) this also suggests to negatively impact read performance of parquet tables.

We should consider removing this again
@mrocklin
Copy link
Author

To be clear, my experiment in this issue was run without Dask and without Coiled. I wanted to isolate things.

@mrocklin
Copy link
Author

Turning threads off I get around 170MB/s on my linux machine, 600 MB/s on my OS-X machine.

as I get around 250-300 MB/s using use_threads=False.

I'm curious, for deserializing integers is this expected performance? I would have thought that for this kind of data (just ints) we'd be closer to GB/s speeds. I'm curious, what is the slow part here? Is there some intense compression or something?

have you used the same conda forge build for your measurements or did you build it yourself? It would be nice to rule out any build differences

I'm also quite curious about this.

@jorisvandenbossche
Copy link
Member

I was using a conda-forge install for the above numbers, no custom build.

Turning threads off I get around 170MB/s on my linux machine, 600 MB/s on my OS-X machine.

But for linux you mentioned above a similar number with threads. So that means you see hardly any perf improvement with threads on linux?

@jorisvandenbossche
Copy link
Member

jorisvandenbossche commented Oct 23, 2023

Is pyarrow using either one of OMP_NUM_THREADS, MKL_NUM_THREADS, OPENBLAS_NUM_THREADS to infer how large the threadpool is allowed to be?

Yes, it seems we are using OMP_NUM_THREADS (and otherwise check std::thread::hardware_concurrency(), which I think also doesn't always give the correct number, eg in a container), see the relevant code.
You can also manually override this with pa.set_cpu_count().

@mrocklin
Copy link
Author

But for linux you mentioned above a similar number with threads. So that means you see hardly any perf improvement with threads on linux

Yes. That's correct. To be clear though, I'm currently more confused about only getting 150-200 MB/s deserializing integers on a single thread. That seems very strange to me.

@fjetter
Copy link
Contributor

fjetter commented Oct 23, 2023

Yes, it seems we are using OMP_NUM_THREADS (and otherwise check std::thread::hardware_concurrency(), which I think also doesn't always give the correct number, eg in a container), see the relevant code.
You can also manually override this with pa.set_cpu_count().

Yes, thanks. I already found that pyarrow is setting the CPU threadpool to one inside of dask regardless of the env settings. I already tested a little with set_cpu_count but so far we haven't seen the hoped-for speedup

@mrocklin
Copy link
Author

To find a standardized shared compute environment I tried this on Google Colab.
I found that x.copy() ran in 2 GB/s and pq.read_table(io.BytesIO(bytes)) ran in 180 MB/s.

@fjetter
Copy link
Contributor

fjetter commented Oct 24, 2023

I found that x.copy() ran in 2 GB/s and pq.read_table(io.BytesIO(bytes)) ran in 180 MB/s.

I'm not sure if this comparison is actually fair and valid. Parquet -> Arrow has to do a nontrivial amount of work. Even your random data is encoded and compressed. (See pq.ParquetFile("foo.parquet").metadata.to_dict() to inspect the metadata)

image

I also ran this on colab and got something like this

Disk Bandwidth: 1636 MiB/s
PyArrow Read Bandwidth: 231 MiB/s
PyArrow In-Memory Bandwidth: 220 MiB/s

from your benchmark output. I went along and ran

import pickle
pickled_df = pickle.dumps(x)
compressedb = pa.compress(pickled_df, "SNAPPY")
nbytes = len(compressedb)
start = time.time()
pa.decompress(compressedb, decompressed_size=len(pickled_df), codec="SNAPPY")
stop = time.time()
print("SNAPPY Decompress Bandwidth:", int(nbytes / (stop - start) / 2**20), "MiB/s")

 which gives me 

SNAPPY Decompress Bandwidth: 199 MiB/s

so we're moving in the same vicinity as the parquet read.

@mrocklin
Copy link
Author

Cool. What I'm reading from you are a couple of insights:

  1. Arrow uses SNAPPY compression by default
  2. SNAPPY is performing around 200 MB/s on these machines

I'll add my understanding, which is that I also expect SNAPPY to operate at GB/s speeds, but for some reason it's not here.

On Colab I also get 200MB/s as you do, but on my MacBook I get 767 MB/s

This probably allows us to dive a bit deeper into the problem.

@jorisvandenbossche
Copy link
Member

But for linux you mentioned above a similar number with threads. So that means you see hardly any perf improvement with threads on linux

Yes. That's correct. To be clear though, I'm currently more confused about only getting 150-200 MB/s deserializing integers on a single thread. That seems very strange to me.

Yes, I understand (and dask uses use_threads=False anyway, so mostly depends on this single threaded performance). But then to not mix too many different issues at once, it might be better to focus the various timings in this issue on single threaded performance.

Parquet -> Arrow has to do a nontrivial amount of work

Parquet is indeed a complex file format. In addition to the decompression, there is also the decoding (although the file here will use dictionary encoding, and that should be quite fast I would expect. Also quickly testing plain and delta_binary_packed encodings, and that actually gives slower reads than the default in this case).

I was also wondering if we could have an idea which bandwidth one can expect for just the decompression, to have some point of comparison. The snappy readme (https://github.com/google/snappy) itself mentions decompression at 500MB/s for Intel Core i7. Running the snippet of Florian above, I actually only get around 100MB/s for the SNAPPY decompression..

Arrow uses SNAPPY compression by default

Quickly testing with another compression (pq.write_table(t, "foo_lz4.parquet", compression="lz4"), I get consistently faster reads with LZ4 compared to SNAPPY for this dataset, but only around 5-10% faster. Not a huge difference, but so in general one can always tweak the encoding and compression settings for their specific datasets to achieve optimal read performance.

Using no compression at all (compression="none") also gives some speed-up (but of course trading storage size with read speed, and on eg S3 that might not even be beneficial)

@jorisvandenbossche
Copy link
Member

Re-running the benchmarks with a slightly adapted script from above (single threaded, different compressions), and ensuring I run it while having no other applications running, I actually get quite decent single threaded performance:

Code
import pyarrow as pa
import pyarrow.parquet as pq
import numpy as np
import pandas as pd
import time
import io

# Create datasets
x = np.random.randint(0, 100000, size=(1000000, 100))
df = pd.DataFrame(x)
t = pa.Table.from_pandas(df)
pq.write_table(t, "foo.parquet")
pq.write_table(t, "foo-lz4.parquet", compression="lz4")
pq.write_table(t, "foo-uncompressed.parquet", compression="none")

def run_benchmark(fname):


    niterations = 20

    # Time Disk speeds
    
    start = time.perf_counter()
    for _ in range(niterations):
        with open(fname, mode="rb") as f:
            bytes = f.read()
            nbytes = len(bytes)
    stop = time.perf_counter()
    print("Disk Bandwidth:", int(nbytes / ((stop - start) / niterations) / 2**20), "MiB/s")
    # Time Arrow Parquet Speeds
    
    start = time.perf_counter()
    for _ in range(niterations):
        pq.read_table(fname, use_threads=False)
    stop = time.perf_counter()
    print("PyArrow Read Bandwidth:", int(nbytes / ((stop - start) / niterations) / 2**20), "MiB/s")
    
    # Time In-Memory Read Speeds
    
    start = time.perf_counter()
    for _ in range(niterations):
        pq.read_table(io.BytesIO(bytes), use_threads=False)
    stop = time.perf_counter()
    
    print("PyArrow In-Memory Bandwidth:", int(nbytes / ((stop - start) / niterations) / 2**20), "MiB/s")
    
    # Time In-Memory Read Speeds
    
    start = time.perf_counter()
    for _ in range(niterations):
        pq.read_table(io.BytesIO(bytes), use_threads=False).to_pandas(use_threads=False)
    stop = time.perf_counter()
    
    print("PyArrow (to_pandas) Bandwidth:", int(nbytes / ((stop - start) / niterations) / 2**20), "MiB/s")
In [3]: run_benchmark("foo.parquet")
Disk Bandwidth: 2052 MiB/s
PyArrow Read Bandwidth: 436 MiB/s
PyArrow In-Memory Bandwidth: 459 MiB/s
PyArrow (to_pandas) Bandwidth: 280 MiB/s

In [4]: run_benchmark("foo-lz4.parquet")
Disk Bandwidth: 2100 MiB/s
PyArrow Read Bandwidth: 516 MiB/s
PyArrow In-Memory Bandwidth: 569 MiB/s
PyArrow (to_pandas) Bandwidth: 323 MiB/s

In [5]: run_benchmark("foo-uncompressed.parquet")
Disk Bandwidth: 2092 MiB/s
PyArrow Read Bandwidth: 667 MiB/s
PyArrow In-Memory Bandwidth: 730 MiB/s
PyArrow (to_pandas) Bandwidth: 409 MiB/s

And the file sizes are 258, 255 and 293 MB, respectively (so the actual speedup for uncompressed is a bit lower than what the above gives, because it's reading more MBs. But it's still faster in terms of seconds to read)

@pitrou
Copy link
Member

pitrou commented Oct 24, 2023

PyArrow In-Memory Bandwidth: 730 MiB/s

This seems like a rather low number for uncompressed integer data. What is the exact encoding used by the integer column? The metadata display above is a bit ambiguous as it lists three encodings :-)

@pitrou
Copy link
Member

pitrou commented Oct 24, 2023

Ok, so after investigating this, one factor is that pq.write_table by default tries to use dictionary encoding, and the heuristic to stop using dictionary encoding is when the dictionary page size reaches the dictionary_pagesize_limit parameter, which is set to a very high default of 1MB.

However, the benchmark numbers are also largely skewed by the fact that the MB/s figure is computed relatively to the compressed and encoded size, not on the final in-memory size.

@pitrou
Copy link
Member

pitrou commented Oct 24, 2023

With that in mind, here are updated results against the actual in-memory size:

  • with encoding RLE_DICTIONARY (the default):
PyArrow Read Bandwidth for 'foo.parquet': 1274 MiB/s
PyArrow In-Memory Bandwidth for 'foo.parquet': 1355 MiB/s
PyArrow Read Bandwidth for 'foo-lz4.parquet': 1533 MiB/s
PyArrow In-Memory Bandwidth for 'foo-lz4.parquet': 1645 MiB/s
PyArrow Read Bandwidth for 'foo-uncompressed.parquet': 1639 MiB/s
PyArrow In-Memory Bandwidth for 'foo-uncompressed.parquet': 1775 MiB/s
  • with encoding PLAIN:
PyArrow Read Bandwidth for 'foo.parquet': 493 MiB/s
PyArrow In-Memory Bandwidth for 'foo.parquet': 515 MiB/s
PyArrow Read Bandwidth for 'foo-lz4.parquet': 1323 MiB/s
PyArrow In-Memory Bandwidth for 'foo-lz4.parquet': 1478 MiB/s
PyArrow Read Bandwidth for 'foo-uncompressed.parquet': 1761 MiB/s
PyArrow In-Memory Bandwidth for 'foo-uncompressed.parquet': 2428 MiB/s
  • with encoding DELTA_BINARY_PACKED:
PyArrow Read Bandwidth for 'foo.parquet': 1573 MiB/s
PyArrow In-Memory Bandwidth for 'foo.parquet': 1644 MiB/s
PyArrow Read Bandwidth for 'foo-lz4.parquet': 1606 MiB/s
PyArrow In-Memory Bandwidth for 'foo-lz4.parquet': 1657 MiB/s
PyArrow Read Bandwidth for 'foo-uncompressed.parquet': 1691 MiB/s
PyArrow In-Memory Bandwidth for 'foo-uncompressed.parquet': 1801 MiB/s

@pitrou
Copy link
Member

pitrou commented Oct 24, 2023

And now the file sizes:

  • with encoding RLE_DICTIONARY (the default):
-rw-rw-r-- 1 antoine antoine 255224210 oct.  24 16:53 foo-lz4.parquet
-rw-rw-r-- 1 antoine antoine 258035659 oct.  24 16:53 foo.parquet
-rw-rw-r-- 1 antoine antoine 292773361 oct.  24 16:53 foo-uncompressed.parquet
  • with encoding PLAIN:
-rw-rw-r-- 1 antoine antoine 414232019 oct.  24 16:54 foo-lz4.parquet
-rw-rw-r-- 1 antoine antoine 448751689 oct.  24 16:54 foo.parquet
-rw-rw-r-- 1 antoine antoine 800112507 oct.  24 16:54 foo-uncompressed.parquet
  • with encoding DELTA_BINARY_PACKED:
-rw-rw-r-- 1 antoine antoine 228584003 oct.  24 16:55 foo-lz4.parquet
-rw-rw-r-- 1 antoine antoine 227823921 oct.  24 16:55 foo.parquet
-rw-rw-r-- 1 antoine antoine 227811357 oct.  24 16:55 foo-uncompressed.parquet

... meaning that DELTA_BINARY_PACKED is probably a very good choice for integer columns.

@jorisvandenbossche
Copy link
Member

  1. Try again after [Parquet][Python] Potential regression in Parquet parallel reading #38591

Note that I assume you were doing your initial benchmarks with pyarrow 13.0 (because 14.0 was not yet released when the thread started), and I think this is only a regression in 14.0

  1. Try without snappy compression

Antoine posted some numbers above (eg #38389 (comment)) with default of snappy compression, with lz4, and uncompressed (snappy clearly being the slowest, but the actual difference depends on which encoding is being used)

@pitrou
Copy link
Member

pitrou commented Nov 23, 2023

@mrocklin It seems like those files are not available using S3 anonymous access, am I right? Can you perhaps make one of those files available?

@phofl
Copy link
Contributor

phofl commented Nov 23, 2023

We have a public version here: s3://coiled-data/tpch/scale-1000/ That should be available with anonymous access

@pitrou
Copy link
Member

pitrou commented Nov 23, 2023

Okay, I took a quick look.

One thing that takes some time is dictionary decoding for the dictionary-encoded columns.
You can keep those columns dictionary-encoded in Arrow by passing read_dictionary=["l_returnflag", "l_linestatus", "l_shipinstruct", "l_shipmode"]. This seems to save around 25% CPU time (and also makes the data much more compact in memory).

Once that is done, I've got the following breakdown, roughly:

  • 54% is Snappy decompression
  • 16% is RLE decoding of dictionary indices
  • 13% is PLAIN decoding of BYTE_ARRAY values
  • 10% is PLAIN decoding of FLOAT and DOUBLE values

I did these measurements on one CPU core. The overall read speed is around 220 MB/s from the local filesystem.

@pitrou
Copy link
Member

pitrou commented Nov 23, 2023

Something weird is that most columns out of this file have a single chunk, even though the file has 21 row groups. This doesn't look right:

>>> [(name, a.num_chunks) for name, a in zip(tab.column_names, tab.columns)]
[('l_orderkey', 1),
 ('l_partkey', 1),
 ('l_suppkey', 1),
 ('l_linenumber', 1),
 ('l_quantity', 1),
 ('l_extendedprice', 1),
 ('l_discount', 1),
 ('l_tax', 1),
 ('l_returnflag', 21),
 ('l_linestatus', 21),
 ('l_shipdate', 1),
 ('l_commitdate', 1),
 ('l_receiptdate', 1),
 ('l_shipinstruct', 21),
 ('l_shipmode', 21),
 ('l_comment', 1)]

>>> pf = pq.ParquetFile('~/arrow/data/lineitem/lineitem_0002072d-7283-43ae-b645-b26640318053.parquet')
>>> pf.metadata
<pyarrow._parquet.FileMetaData object at 0x7f236076dcb0>
  created_by: DuckDB
  num_columns: 16
  num_rows: 2568534
  num_row_groups: 21
  format_version: 1.0
  serialized_size: 29792

@pitrou
Copy link
Member

pitrou commented Nov 23, 2023

I mean that something is weird in the way the Parquet reader behaves, btw. The files don't seem to be at fault (except perhaps for using Snappy :-)).

@jorisvandenbossche
Copy link
Member

Something weird is that most columns out of this file have a single chunk, even though the file has 21 row groups. This doesn't look right:

That's because of the use of pq.ParquetFile.read(..) (assuming you were using that here; @mrocklin's gist is using that, but your gist from earlier was using pq.read_table, which should result in much more chunks)

This ParquetFile.read() functions binds to parquet::arrow::FileReader::ReadTable, and I have noticed before that for some reason this concatenates the chunks somewhere in the read path.
On the other hand, ParquetFile.iter_batches() binds to FileReader::GetRecordBatchReader and the Dataset API / pq.read_table to FileReader::GetRecordBatchGenerator, and those two APIs will returns smaller batches (first per row group, but they also have a batch_size and will typically also return multiple chunks per row group).

# this file has 21 row groups
>>> file_path = "lineitem_0002072d-7283-43ae-b645-b26640318053.parquet"
>>> f = pq.ParquetFile(file_path)

# reading with ParquetFile.read gives a single chunk of data
>>> f.read()["l_orderkey"].num_chunks
1
# even when using the read_row_groups API 
>>> f.read_row_groups([0, 1])["l_orderkey"].num_chunks
1
# only when using iter_batches, it's of course multiple chunks. The default batch_size here is 2**16,
# which even results in more batches than the number of row groups
>>> pa.Table.from_batches(f.iter_batches())["l_orderkey"].num_chunks
40
# we can make the batch_size larger
>>> pa.Table.from_batches(f.iter_batches(batch_size=128000))["l_orderkey"].num_chunks
21
# strangely it still seems to concatenate *across* row groups when further increasing the batch size
>>> pa.Table.from_batches(f.iter_batches(batch_size=2**17))["l_orderkey"].num_chunks
20

# pq.read_table uses the datasets API, but doesn't allow passing a batch size
>>> pq.read_table(file_path)["l_orderkey"].num_chunks
21
# in the datasets API, now the default batch size is 2**17 instead of 2**16 ... 
>>> import pyarrow.dataset as ds
>>> ds.dataset(file_path, format="parquet").to_table()["l_orderkey"].num_chunks
21
# we can lower it (now each individual row group gets split, no combination of data of multiple row groups,
# I think because the GetRecordBatchGenerator uses a sub-generator per row group instead of a single iterator
# for the whole file as GetRecordBatchReader does)
>>> ds.dataset(file_path, format="parquet").to_table(batch_size=2**16)["l_orderkey"].num_chunks
42

So in summary, this is also a bit of a mess on our side (there are many different ways to read a parquet file ..). I had been planning to bring up that you might want to not use ParquetFile().read() in dask, because it's a bit slower because of returning a single chunk. Although if it's for the use case to convert to pandas later on, it might also not matter that much (although when using pyarrow strings, then it can matter).

On the Arrow side, we should maybe consider to make the default batch size a bit more uniform, and see if we want to use an actual batch size for the ReadTable code path as well.

@pitrou
Copy link
Member

pitrou commented Nov 24, 2023

I'm quite sure I was using pq.read_table above.

@milesgranger
Copy link
Contributor

I ran perf against a debug build locally (Linux) and on an m6i VM and nothing significantly different seemed to appear. However, it naively seemed like the destructor of Status being the largest overhead was unexpected.

Thought it may have been related to https://issues.apache.org/jira/browse/ARROW-2400, and so reverted that w/ no significant change in the perf report. So just wanted to clarify if this is also expected?

image

@pitrou
Copy link
Member

pitrou commented Nov 24, 2023

@milesgranger You should run perf against a release build (with optimizations) otherwise the results will probably not be relevant. A good practice is to select the RelWithDebInfo build type with CMake.

@mrocklin
Copy link
Author

You can keep those columns dictionary-encoded in Arrow by passing read_dictionary=["l_returnflag", "l_linestatus", "l_shipinstruct", "l_shipmode"]. This seems to save around 25% CPU time (and also makes the data much more compact in memory).

I'm guessing that converting these to pandas dataframes would result in them being categorical dtype series. Is that correct?

The files don't seem to be at fault (except perhaps for using Snappy :-)).

What would folks recommend as default compression? LZ4?

If so, @milesgranger maybe it's easy to change the data generation scripts in some way with this change? I'd be fine changing things in the benchmark if we think it's a good global recommendation. (For context, I don't like changing things in benchmarks to make performance better because it results in over-tuning and non-realistic results, but if the change is good general practice as recommended by other people then it feels better I think).

@mrocklin
Copy link
Author

I did these measurements on one CPU core. The overall read speed is around 220 MB/s from the local filesystem

@pitrou if it's easy for you, I'm curious how well this parallelizes. If you do it on local four cores do you get ~800MB/s? (assuming that your filesystem can go that fast, which seems likely)

@jorisvandenbossche
Copy link
Member

And what machine did you get that number? (because above (#38389 (comment)) you reported much higher single-threaded numbers)

@pitrou
Copy link
Member

pitrou commented Nov 24, 2023

@jorisvandenbossche It wasn't the same file.

@pitrou
Copy link
Member

pitrou commented Dec 13, 2023

I did these measurements on one CPU core. The overall read speed is around 220 MB/s from the local filesystem

@pitrou if it's easy for you, I'm curious how well this parallelizes. If you do it on local four cores do you get ~800MB/s? (assuming that your filesystem can go that fast, which seems likely)

Sorry for the delay. Basically, yes, at least with the Snappy-compressed file:

>>> %timeit pq.read_table('~/arrow/data/lineitem/lineitem_0002072d-7283-43ae-b645-b26640318053.parquet', use_threads=False, read_dictionary=['l_returnflag
...: ', 'l_linestatus', 'l_shipinstruct', 'l_shipmode'])
575 ms ± 5.49 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
>>> pool = ThreadPoolExecutor(4)
>>> %timeit list(pool.map(lambda _: pq.read_table('~/arrow/data/lineitem/lineitem_0002072d-7283-43ae-b645-b26640318053.parquet', use_threads=False, read_d
...: ictionary=['l_returnflag', 'l_linestatus', 'l_shipinstruct', 'l_shipmode']), range(4)))
596 ms ± 19.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Also, do note that the speed I'm reporting here (225 MB/s on one core) is relative to file size. Relative to in-memory Table size, this is more around 525 MB/s on one core.

@pitrou
Copy link
Member

pitrou commented Dec 13, 2023

Some low-level stats using perf stat -d -d -d:

 Performance counter stats for 'taskset -c 1 python -c import pyarrow.parquet as pq; [pq.read_table("~/arrow/data/lineitem/lineitem_0002072d-7283-43ae-b645-b26640318053.parquet", use_threads=False, read_dictionary=["l_returnflag", "l_linestatus", "l_shipinstruct", "l_shipmode"]).nbytes for i in range(10)]':

          6 051,90 msec task-clock                #    0,996 CPUs utilized          
               806      context-switches          #  133,181 /sec                   
                 1      cpu-migrations            #    0,165 /sec                   
         1 340 104      page-faults               #  221,435 K/sec                  
    26 695 694 765      cycles                    #    4,411 GHz                      (20,17%)
     2 085 747 402      stalled-cycles-frontend   #    7,81% frontend cycles idle     (20,22%)
     9 165 304 095      stalled-cycles-backend    #   34,33% backend cycles idle      (20,25%)
    59 629 749 052      instructions              #    2,23  insn per cycle         
                                                  #    0,15  stalled cycles per insn  (20,13%)
    10 417 615 407      branches                  #    1,721 G/sec                    (20,07%)
       102 060 991      branch-misses             #    0,98% of all branches          (20,27%)
    27 063 402 822      L1-dcache-loads           #    4,472 G/sec                    (20,25%)
       431 791 165      L1-dcache-load-misses     #    1,60% of all L1-dcache accesses  (20,12%)
   <not supported>      LLC-loads                                                   
   <not supported>      LLC-load-misses                                             
     1 614 170 143      L1-icache-loads           #  266,721 M/sec                    (20,06%)
        30 033 666      L1-icache-load-misses     #    1,86% of all L1-icache accesses  (20,13%)
        24 183 320      dTLB-loads                #    3,996 M/sec                    (20,21%)
         7 451 324      dTLB-load-misses          #   30,81% of all dTLB cache accesses  (20,20%)
         1 845 633      iTLB-loads                #  304,968 K/sec                    (20,20%)
            97 899      iTLB-load-misses          #    5,30% of all iTLB cache accesses  (20,13%)
       200 049 623      L1-dcache-prefetches      #   33,056 M/sec                    (20,11%)
   <not supported>      L1-dcache-prefetch-misses                                   

       6,074363585 seconds time elapsed

       4,552905000 seconds user
       1,494984000 seconds sys

At more than 2 instructions per cycle, it seems that we are not suffering much from cache or branch prediction misses.

@mrocklin
Copy link
Author

mrocklin commented Dec 13, 2023 via email

@pitrou
Copy link
Member

pitrou commented Dec 13, 2023

I have also tried to regenerate the given file using different compressions and then compared reading performance:

>>> !ls -la lineitem-*
-rw-rw-r-- 1 antoine antoine 133922479 déc.  13 22:40 lineitem-lz4.pq
-rw-rw-r-- 1 antoine antoine 129419248 déc.  13 22:38 lineitem-snappy.pq
-rw-rw-r-- 1 antoine antoine 266474815 déc.  13 22:40 lineitem-uncompressed.pq
-rw-rw-r-- 1 antoine antoine  93395071 déc.  13 22:40 lineitem-zstd.pq

>>> %timeit pq.read_table('lineitem-snappy.pq', use_threads=False, read_dictionary=['l_returnflag', 'l_linestatus', 'l_shipinstruct', 'l_shipmode'])
512 ms ± 12.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

>>> %timeit pq.read_table('lineitem-uncompressed.pq', use_threads=False, read_dictionary=['l_returnflag', 'l_linestatus', 'l_shipinstruct', 'l_shipmode'])
252 ms ± 11 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

>>> %timeit pq.read_table('lineitem-lz4.pq', use_threads=False, read_dictionary=['l_returnflag', 'l_linestatus', 'l_shipinstruct', 'l_shipmode'])
311 ms ± 13.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

>>> %timeit pq.read_table('lineitem-zstd.pq', use_threads=False, read_dictionary=['l_returnflag', 'l_linestatus', 'l_shipinstruct', 'l_shipmode'])
456 ms ± 13.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

So we get:

  • uncompressed: 1 GB/s relative to file size, 1.2 GB/s relative to in-memory Table size
  • snappy: 252 MB/s relative to file size, 600 MB/s relative to in-memory Table size
  • lz4: 430 MB/s relative to file size, 990 MB/s relative to in-memory Table size
  • zstd: 205 MB/s relative to file size, 675 MB/s relative to in-memory Table size

Note that only the speeds relative to in-memory Table size are comparable, since the file sizes vary. Snappy is the slowest of all options, while producing a file size not much better than lz4 and strictly worse than zstd.

@pitrou
Copy link
Member

pitrou commented Dec 13, 2023

That said, ~1 GB/s for uncompressed PLAIN-encoded fixed-width data is still very mediocre. I think this has to with the fact that pq.read_table concatenates the row groups together instead of building one more chunk per row group:

>>> tab = pq.read_table('lineitem-uncompressed.pq', use_threads=False, read_dictionary=['l_returnflag', 'l_linestatus', 'l_shipinstruct', 'l_shipmode'], columns=['l_or
...: derkey', 'l_partkey', 'l_suppkey', 'l_linenumber', 'l_shipdate', 'l_commitdate'])
>>> [{n: c.num_chunks} for n, c in zip(tab.column_names, tab.columns)]
[{'l_orderkey': 1},
 {'l_partkey': 1},
 {'l_suppkey': 1},
 {'l_linenumber': 1},
 {'l_shipdate': 1},
 {'l_commitdate': 1}]

If I deliberately read row groups separately for these PLAIN-encoded columns, I get almost twice the speed:

>>> %timeit pq.read_table('lineitem-uncompressed.pq', use_threads=False, read_dictionary=['l_returnflag', 'l_linestatus', 'l_shipinstruct', 'l_shipmode'], columns=['l_
...: orderkey', 'l_partkey', 'l_suppkey', 'l_linenumber', 'l_shipdate', 'l_commitdate'])
67.1 ms ± 1.39 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

>>> f = pq.ParquetFile('lineitem-uncompressed.pq', read_dictionary=['l_returnflag', 'l_linestatus', 'l_shipinstruct', 'l_shipmode'])
>>> %timeit [f.read_row_group(i, use_threads=False, columns=['l_orderkey', 'l_partkey', 'l_suppkey', 'l_linenumber', 'l_shipdate', 'l_commitdate']) for i in range(f.nu
...: m_row_groups)]
36.6 ms ± 841 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

jorisvandenbossche added a commit that referenced this issue Feb 27, 2024
…nit (#40143)

### Rationale for this change

Closes #40142

I'm developing a new dask integration with pyarrow parquet reader (see dask/dask-expr#882) and want to rely on the pyarrow Filesystem more.

Right now, we are performing a list operation ourselves to get all touched files and I would like to pass the retrieved `FileInfo` objects directly to the dataset constructor. This API is already exposed in C++ and this PR is adding the necessary python bindings.

The benefit of this is that there is API is that it cuts the need to perform additional HEAD requests to a remote storage.

This came up in #38389 (comment) and there's been related work already with #37857

### What changes are included in this PR?

Python bindings for the `DatasetFactory` constructor that accepts a list/vector of `FileInfo` objects.

### Are these changes tested?

~I slightly modified the minio test setup such that the prometheus endpoint is exposed. This can be used to assert that there hasn't been any HEAD requests.~ I ended up removing this again since parsing the response is a bit brittle.

### Are there any user-facing changes?

* Closes: #40142

Lead-authored-by: fjetter <fjetter@users.noreply.github.com>
Co-authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
zanmato1984 pushed a commit to zanmato1984/arrow that referenced this issue Feb 28, 2024
…aset init (apache#40143)

### Rationale for this change

Closes apache#40142

I'm developing a new dask integration with pyarrow parquet reader (see dask/dask-expr#882) and want to rely on the pyarrow Filesystem more.

Right now, we are performing a list operation ourselves to get all touched files and I would like to pass the retrieved `FileInfo` objects directly to the dataset constructor. This API is already exposed in C++ and this PR is adding the necessary python bindings.

The benefit of this is that there is API is that it cuts the need to perform additional HEAD requests to a remote storage.

This came up in apache#38389 (comment) and there's been related work already with apache#37857

### What changes are included in this PR?

Python bindings for the `DatasetFactory` constructor that accepts a list/vector of `FileInfo` objects.

### Are these changes tested?

~I slightly modified the minio test setup such that the prometheus endpoint is exposed. This can be used to assert that there hasn't been any HEAD requests.~ I ended up removing this again since parsing the response is a bit brittle.

### Are there any user-facing changes?

* Closes: apache#40142

Lead-authored-by: fjetter <fjetter@users.noreply.github.com>
Co-authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
thisisnic pushed a commit to thisisnic/arrow that referenced this issue Mar 8, 2024
…aset init (apache#40143)

### Rationale for this change

Closes apache#40142

I'm developing a new dask integration with pyarrow parquet reader (see dask/dask-expr#882) and want to rely on the pyarrow Filesystem more.

Right now, we are performing a list operation ourselves to get all touched files and I would like to pass the retrieved `FileInfo` objects directly to the dataset constructor. This API is already exposed in C++ and this PR is adding the necessary python bindings.

The benefit of this is that there is API is that it cuts the need to perform additional HEAD requests to a remote storage.

This came up in apache#38389 (comment) and there's been related work already with apache#37857

### What changes are included in this PR?

Python bindings for the `DatasetFactory` constructor that accepts a list/vector of `FileInfo` objects.

### Are these changes tested?

~I slightly modified the minio test setup such that the prometheus endpoint is exposed. This can be used to assert that there hasn't been any HEAD requests.~ I ended up removing this again since parsing the response is a bit brittle.

### Are there any user-facing changes?

* Closes: apache#40142

Lead-authored-by: fjetter <fjetter@users.noreply.github.com>
Co-authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

9 participants