Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read Parquet files faster #47964

Merged
merged 7 commits into from
Apr 17, 2023
Merged

Read Parquet files faster #47964

merged 7 commits into from
Apr 17, 2023

Conversation

al13n321
Copy link
Member

@al13n321 al13n321 commented Mar 24, 2023

Changelog category (leave one):

  • Performance Improvement

Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):

Reading files in Parquet format is now much faster. IO and decoding are parallelized (controlled by max_threads setting), and only required data ranges are read.

This is still not very efficient. Possible future improvements:

  • Implement parallel reading for StorageFile (likely by implementing factory versions of MMapReadBufferFromFileDescriptor and ReadBufferFromFileDescriptorPRead). With this PR, decoding happens in max_threads threads, but they all read from one ReadBuffer, locking a mutex. No good, but still faster than one thread.
  • Do parallel reading separately from parallel parsing, to allow having different number of threads for it. In particular, lots of download threads when reading from a different region, as Parquet often wants to read lots of short ranges.
  • Do decoding ourselves instead of using arrow, to avoid some copying. Maybe reimplement everything, or maybe reuse arrow's metadata decoding and lower-level data decoding (low enough level that we don't have to copy unnecessarily, if such level exists).

@robot-ch-test-poll4 robot-ch-test-poll4 added the pr-performance Pull request with some performance improvements label Mar 24, 2023
@Avogar Avogar self-assigned this Mar 24, 2023
@al13n321 al13n321 force-pushed the fast-parquet branch 2 times, most recently from 88777cd to 570ae57 Compare March 30, 2023 03:43
"bigger than background_schedule_pool_size."
: getCurrentExceptionMessage(/* with_stacktrace */ true));
abort();
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(This is unrelated to the rest of the PR. Without this, the server gives you a quiet SIGABRT if you set max_thread_pool_size too small. Now it'll log an error.)

@al13n321 al13n321 marked this pull request as ready for review March 30, 2023 05:54
@al13n321
Copy link
Member Author

Some speed numbers. Better than before, worse than what is possible.

  • Querying one column from a local file didn't get faster (or slower), it's still reading sequentially. (SELECT AdvEngineID, COUNT(*) AS c FROM file('ds/hits.parquet') GROUP BY AdvEngineID ORDER BY c DESC LIMIT 10 0.75 s)
  • All columns, local file: ~8x faster, it's decoding in parallel (after reading sequentially). (SELECT sum(ignore(*)) AS c FROM file('ds/hits.parquet') 102 s -> 12.7 s)
  • One column, S3, local region: 50x faster, it's both reading and decoding in parallel. (SELECT AdvEngineID, COUNT(*) AS c FROM url('https://clickhouse-datasets-us-west-2.s3.amazonaws.com/hits.parquet') GROUP BY AdvEngineID ORDER BY c DESC LIMIT 10 49 s -> 1 s)
  • All columns, S3, local region: 4x faster. (SELECT sum(ignore(*)) AS c FROM url('https://clickhouse-datasets-us-west-2.s3.amazonaws.com/hits.parquet') 193 s -> 46 s)
  • One column, remote region (US <- Europe): 27x faster (SELECT AdvEngineID, COUNT(*) AS c FROM url('https://clickhouse-public-datasets.s3.amazonaws.com/hits_compatible/hits.parquet') GROUP BY AdvEngineID ORDER BY c DESC LIMIT 10 385 s -> 14 s)
  • Increasing max_threads from 16 to 128 makes the previous query another 3x faster (5 s). Increasing to 256 (more than the number of HTTP requests the query does) does nothing, still 5x slower than local region, not sure why.
  • All columns, remote region: 7x faster (SELECT sum(ignore(*)) AS c FROM url('https://clickhouse-public-datasets.s3.amazonaws.com/hits_compatible/hits.parquet') 800 s -> 115 s)
  • All of the above gets about the same speed between url() and s3() table functions. (As it should, S3 is just HTTP with extra steps.) (Before this PR, this wasn't always the case; the comparisons above are using url().)
  • If the query does anything nontrivial with the fetched data, it gets bottlenecked on Data processing is not parallelized if the source returns one stream of data. #38755 instead.

@Avogar
Copy link
Member

Avogar commented Mar 30, 2023

I will start reviewing the changes. Looks very promising, great work!
Also please check tests failures, most of them are related.

}
chassert(false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make it obvious that this branch always returns, when skimming the code. But looks like it's more confusing than helpful, removing.

@Avogar
Copy link
Member

Avogar commented Mar 30, 2023

Looks good in general. I will go deep into details later.
Also, it would be really good to try to refactor/simplify code in FormatFactory::getInputImpl and in ParquetBlockInputFormat, because it was a bit hard to read it and understand all logic.

@al13n321
Copy link
Member Author

refactor/simplify code in FormatFactory::getInputImpl

Couldn't actually remove any logic from FormatFactory, each bit seems useful, almost all of it existed already, in different places. Moved things around a little to hopefully make it more clear. Lmk if you have better ideas. Especially for how to organize the whole thing better, I don't like how awkward the whole random_access_input_creator thing turned out, but I couldn't think of anything better that would work.

ParquetBlockInputFormat

Same, couldn't think of a simpler way to do it (especially if we're going to make it decode columns in parallel too). Reorganized a little and added comments, open to suggestions.

Comment on lines +812 to +837
if (response.getStatus() == Poco::Net::HTTPResponse::HTTPStatus::HTTP_PARTIAL_CONTENT)
{
*res.file_size += requested_range_begin;
res.seekable = true;
}
else
{
res.seekable = response.has("Accept-Ranges") && response.get("Accept-Ranges") == "bytes";
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes behavior slightly: previously StorageURL would send a HEAD request with "Range: 0-" for this check, now it's without "Range" (except if this parseFileInfo() is called from nextImpl(), which is not the important call site). I expect it's ok because HTTP servers are supposed to report "Accept-Ranges: bytes" anyway (?), but I don't actually know what HTTP servers are out there and what headers they send. For S3 this works, for althttpd this didn't work even before the change, didn't try anything else.

@al13n321
Copy link
Member Author

Still looking at the test failures.

@al13n321
Copy link
Member Author

Ok, fixed, there were two pre-existing bugs in ReadBufferFromS3::{seek,setReadUntilPosition} that I didn't notice, which previously weren't being hit because these methods were mostly unused.

@@ -197,15 +199,15 @@ bool ReadBufferFromS3::nextImpl()

off_t ReadBufferFromS3::seek(off_t offset_, int whence)
{
if (offset_ == offset && whence == SEEK_SET)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One bug.

read_until_position = position;
impl.reset();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two bug (impl was being destroyed without resetting working_buffer, which points into impl).

@al13n321
Copy link
Member Author

al13n321 commented Apr 4, 2023

Tests still fail, at least some of them because order of rows is not preserved by default anymore.

Should we make input_format_parquet_preserve_order setting default to true, in case anyone relies on the ordering? It's tempting to default to false to get better speed out of the box, especially in benchmarks.

@alexey-milovidov
Copy link
Member

Set it to true by default in this pull request. Then we will merge. Then we will change it to false in another pull request. It will allow us to highlight it better in the changelog.

@al13n321
Copy link
Member Author

al13n321 commented Apr 4, 2023

Would be nice to add a virtual column for row index, then detect ORDER BY _row_idx to switch to order-preserving mode. Doesn't help with breaking people's existing queries, just a prettier interface than a setting.

@al13n321 al13n321 force-pushed the fast-parquet branch 3 times, most recently from 8f55cfd to 8dc28ad Compare April 7, 2023 00:38
@al13n321
Copy link
Member Author

Couldn't reproduce the CachedOnDiskReadBufferFromFile assertion failure locally: https://s3.amazonaws.com/clickhouse-test-reports/47964/8dc28ad500c79074bb7d638b5ec5f06b3c6ed30e/stress_test__debug_.html . From log and core dump, it appears to be this scenario:

  1. One CachedOnDiskReadBufferFromFile downloads a file segment, then fails to write it to cache (probably because of an exception). The FileSegment ends up in the following state:
  • segment_range = [0, 731]
  • downloaded_size = 0
  • remote_file_reader->getFileOffsetOfBufferEnd() == 732
  • remote_file_reader->working_buffer size = 732
  1. Another CachedOnDiskReadBufferFromFile wants to read the same segment. It takes the remote_file_reader, probably seeks it to 0 (within working_buffer), then fails an assert when getFileOffsetOfBufferEnd() (732) is different from downloaded_size (0).

But this doesn't seem related to this PR, and the test doesn't fail on master (and has failed on the PR multiple times), so I'm not sure.

@kssenii, plz review the last commit ("Hopefully fix assertion failure in CachedOnDiskReadBufferFromFile").

Copy link
Member

@kssenii kssenii left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't reproduce the CachedOnDiskReadBufferFromFile assertion failure locally: https://s3.amazonaws.com/clickhouse-test-reports/47964/8dc28ad500c79074bb7d638b5ec5f06b3c6ed30e/stress_test__debug_.html .

I've never seen this assertion fail before, I'd think it is related to changes

another CachedOnDiskReadBufferFromFile wants to read the same segment. It takes the remote_file_reader, probably seeks it to 0 (within working_buffer), then fails an assert when getFileOffsetOfBufferEnd() (732) is different from downloaded_size (0).

I'd think the the problem is in another place, because see these lines of logs

2023.04.07 07:44:39.755076 [ 3363 ] {befe85ec-9b42-4d3c-b454-c75455a15a04} <Test> CachedOnDiskReadBufferFromFile(00170_test/yaq/fqcczxfqquhmvacyzmwqwnoglwagg): Read 732 bytes, read type REMOTE_FS_READ_AND_PUT_IN_CACHE, position: 0, offset: 732
2023.04.07 07:44:39.777827 [ 3363 ] {befe85ec-9b42-4d3c-b454-c75455a15a04} <Test> FileSegment(b2c2cee274e671de0e4265cbb005a440) : [0, 731]: Updated state from DOWNLOADING to PARTIALLY DOWNLOADED
2023.04.07 07:44:39.777959 [ 3363 ] {befe85ec-9b42-4d3c-b454-c75455a15a04} <Test> FileSegment(b2c2cee274e671de0e4265cbb005a440) : [0, 731]: Resetting downloader from befe85ec-9b42-4d3c-b454-c75455a15a04:3363
2023.04.07 07:44:39.778193 [ 3363 ] {befe85ec-9b42-4d3c-b454-c75455a15a04} <Test> FileSegment(b2c2cee274e671de0e4265cbb005a440) : [0, 731]: Complete batch. (File segment: [0, 731], key: b2c2cee274e671de0e4265cbb005a440, state: PARTIALLY_DOWNLOADED, downloaded size: 0, reserved size: 732, downloader id: None, current write offset: 0, first non-downloaded offset: 0, caller id: befe85ec-9b42-4d3c-b454-c75455a15a04:3363, detached: 0, kind: Regular, unbound: 0)

732 bytes were read, then we returned from nextImpl without writing anything to cache and there is no error in log, this is not normal. Probably the reason is that we passed working_buffer.begin() instead of position() (therefore it could silently write nothing, let's add an assertion to FileSegment::write like assertdata != nullptr)?) but this is a bit strange, there was a guarantee that impl buffer is not seekable (the restricted_seek check guaranteed that) and we always passed external buffer forward into impl, so it was fine to use working_buffer.begin().

src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp Outdated Show resolved Hide resolved
@al13n321
Copy link
Member Author

Undid most of the nonsense refactoring in CachedOnDiskReadBufferFromFile. Switched from making it accept pre-existing reader in any state to making sure that pre-existing reader is always in the correct state.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
pr-performance Pull request with some performance improvements
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants