-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Selective BufferedInput without cache #7217
Conversation
✅ Deploy Preview for meta-velox canceled.
|
@oerling has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
@oerling How hard is it to make |
@oerling has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
@oerling has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
a173875
to
0563909
Compare
@oerling has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@oerling I did the first pass on the production code and left minors. Thanks!
// Synchronously sets 'data_' to cover loadedRegion_'. | ||
void loadSync(); | ||
|
||
SelectiveBufferedInput* const bufferedInput_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please put const members together first.
void loadSync(); | ||
|
||
SelectiveBufferedInput* const bufferedInput_; | ||
IoStatistics* ioStats_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/IoStatistics* ioStats_;/IoStatistics* const ioStats_;/
// Testing function to access loaded state. | ||
void testingData( | ||
velox::common::Region& loadedRegion, | ||
memory::Allocation*& data, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is better to use memory::Allocation** and std::string**?
const int32_t loadQuantum_; | ||
|
||
// Allocation with loaded data. Has space for region.length or loadQuantum_ | ||
// bytes, whichevr is less. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/whichevr/whichever/
// Offset of current run from start of 'data_' | ||
uint64_t offsetOfRun_; | ||
|
||
// Pointer to start of current run in 'entry->data()' or |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: leave one space between characters? Thanks!
} | ||
|
||
void SelectiveBufferedInput::makeLoads( | ||
std::vector<LoadRequest*> requests, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const std::vector<LoadRequest*>& requests,
void SelectiveBufferedInput::makeLoads( | ||
std::vector<LoadRequest*> requests, | ||
bool prefetch) { | ||
if (requests.empty() || (requests.size() < 2 && !prefetch)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you comment why skip loads if "(requests.size() < 2 && !prefetch)" is true? Thanks!
if (requests.empty() || (requests.size() < 2 && !prefetch)) { | ||
return; | ||
} | ||
int32_t maxDistance = options_.maxCoalesceDistance(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const int32_t maxDistance =
return; | ||
} | ||
int32_t maxDistance = options_.maxCoalesceDistance(); | ||
auto loadQuantum = options_.loadQuantum(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const auto loadQuantum =
// If reading densely accessed, coalesce into large for best throughput, if | ||
// for sparse, coalesce to quantum to reduce overread. Not all sparse access | ||
// is correlated. | ||
auto maxCoalesceBytes = prefetch ? options_.maxCoalesceBytes() : loadQuantum; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto: const
requests, | ||
maxDistance, | ||
// Break batches up. Better load more short ones i parallel. | ||
1000, // limit coalesce by size, not count. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Ranges limit per IO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@oerling overall looks good to me % comments. Thanks!
void makeLoads(std::vector<LoadRequest*> requests, bool prefetch); | ||
|
||
// Makes a CoalescedLoad for 'requests' to be read together, coalescing | ||
// IO is appropriate. If 'prefetch' is set, schedules the CoalescedLoad |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/is appropriate/if appropriate/
coalescedLoads_; | ||
|
||
// Distinct coalesced loads in 'coalescedLoads_'. | ||
std::vector<std::shared_ptr<cache::CoalescedLoad>> allCoalescedLoads_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we all use SelectiveCoalescedLoad instead of CoalescedLoad? Thanks!
[&](LoadRequest* request, std::vector<LoadRequest*>& ranges) { | ||
ranges.push_back(request); | ||
}, | ||
[&](int32_t /*gap*/, std::vector<LoadRequest*> /*ranges*/) { /*no op*/ }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to handle skip io range?
}); | ||
// Combine adjacent short reads. | ||
|
||
int32_t numNewLoads = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: s/numNewLoads/numLoads/
} // namespace | ||
|
||
void CoalescedInputStream::loadSync() { | ||
if (region_.length < SelectiveBufferedInput::kTinySize && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we check loadedRegion_.length here?
@oerling has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
1 similar comment
@oerling has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
@oerling has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@oerling LGTM % minors. Thanks!
void DirectInputStream::BackUp(int32_t count) { | ||
VELOX_CHECK_GE(count, 0, "can't backup negative distances"); | ||
|
||
uint64_t unsignedCount = static_cast<uint64_t>(count); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const uint64_t unsignedCount =
if (data_.numPages() == 0) { | ||
run_ = reinterpret_cast<uint8_t*>(tinyData_.data()); | ||
runSize_ = tinyData_.size(); | ||
offsetInRun_ = offsetInData; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
VELOX_CHECK_LT(offsetInRun_, runSize_);
loadSync(); | ||
} | ||
|
||
auto offsetInData = offsetInRegion_ - (loadedRegion_.offset - region_.offset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const auto offsetInData
options_(readerOptions) {} | ||
|
||
/// Constructor used by clone(). | ||
DirectBufferedInput( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move this to private?
/// call for 'stream' since the load is to be triggered by the first | ||
/// access. | ||
std::shared_ptr<DirectCoalescedLoad> coalescedLoad( | ||
const SeekableInputStream* FOLLY_NONNULL stream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
private: | ||
std::shared_ptr<IoStatistics> const ioStats_; | ||
const uint64_t groupId_; | ||
std::shared_ptr<ReadFileInputStream> const input_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const std::shared_ptr<ReadFileInputStream> input_;
// Makes a CoalescedLoad for 'requests' to be read together, coalescing | ||
// IO if appropriate. If 'prefetch' is set, schedules the CoalescedLoad | ||
// on 'executor_'. Links the CoalescedLoad to all CacheInputStreams that it | ||
// concerns. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/concerns/covers/
uint64_t offsetInRuns = 0; | ||
for (int i = 0; i < allocation.numRuns(); ++i) { | ||
auto run = allocation.runAt(i); | ||
uint64_t bytes = memory::AllocationTraits::pageBytes(run.numPages()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: const bytes and readSize
// Case where request is a little over quantum but is folowed by another | ||
// within the max distance. Coalesces and allows reading the region of | ||
// max quantum + max distance in one piece. | ||
request.loadSize = request.region.length; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/request.region.length/region.length/
@oerling has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
|
||
DirectBufferedInput* const bufferedInput_; | ||
IoStatistics* const ioStats_; | ||
std::shared_ptr<ReadFileInputStream> const input_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const std::shared_ptr<ReadFileInputStream> input_;
void readRegion(std::vector<LoadRequest*> requests, bool prefetch); | ||
|
||
const uint64_t fileNum_; | ||
std::shared_ptr<cache::ScanTracker> const tracker_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const std::shared_ptr<cache::ScanTracker> tracker_;
const std::shared_ptr<IoStatistics> ioStats_;
@oerling has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
Summary: DirectBufferedInput - Selective BufferedInput without cache Adds a BufferedInput that tracks access frequency and coalesces by frequency class, similar to CachedBufferedInput. This does not cache the data but instead owns the data in the BufferedInput, like the base BufferedInput. Adjusts coalescing, so that incfrequently accessed data has smaller max coalesce because not all infrequent loading is correlated. Sets the stream count cutoff for coalesced load from 40 to 1000 streams because many streams are very small and in wide tables (e.g. mostly null columns) and there is no point splitting these up. Reviewed By: xiaoxmeng Differential Revision: D50603890 Pulled By: oerling
This pull request was exported from Phabricator. Differential Revision: D50603890 |
@oerling Thank you for this patch! Does |
Summary: #7217 introduced DirectBufferInput(DBI) to replace BufferInput (BI). Compared to BI, DBI has several advantages like fetch the data in executor as async mode, break the chunk into pieces, etc. They are controlled by 4 main parameters: loadQuantum, prefetchRowGroups, maxCoalesceBytes, maxCoalesceDistanceBytes. But the default value of these parameters aren't optimal in usage like Gluten, which fetch data rowgroup by rowgroup and didn't reuse the fetched data. As a result those default values leads to huge performance regression compared to BI. The PR add the confligs like loadQuantum, prefetchRowGroups, to HiveConfig so we can config them when HiveConnector is constructed. Meanwhile directorySizeGuess, filePreloadThreshold also Impacted the performance. - loadQuantum controls the size of each coalesce load. In Gluten, we set the default value to 256MB to make sure whole row group is fetched. Because the default rowgroup size is 128MB. If loadQuantum is smaller than rowgroup size, the rowgroup will be break as column chunks to be fetched. If it's even less than chunk size, then only the configured size is load in IO threads, all other data is loaded by task work thread in sync mode. it's why the serious performance regression happens. - CoalesceDistance controls the size of over read. It's common that we don't select all the columns of a table. So some column chunk shouldn't be fetched which leads to non-continuous disk read. If the chunk size is smaller than coalesceDistance we can over read the data to make continuous disk read. Currently we set it 1MB in gluten - CoalesceBytes is the unit to fetch data in IO threads, if you have many io threads in executor, you may get better performance to fetch the data in small pieces instead of whole block. CoalesceBytes controls the size. Currently we configured it as 64MB in Gluten. - As the name indicated, prefetchRowGroups means how many rowgroup should be prefetched if engine is processing current rowgroup. Set it to more than 1 can have the overlap of next rowgroup fetching time and current rowgroup processing time. So we can get best performance. - directorySizeGuess is renamed as footerEstimatedSize to better align with its usage. It's used to estimate the footer data to be fetched. The footer data includs meta data of each row group. - filePreloadThreshold configured the threshold of file size, if a file is smaller than the threashold, we should fetch the whole file directly. The PR is to solve the issue #8041. There are more talk in PR #7873 Pull Request resolved: #7978 Reviewed By: xiaoxmeng Differential Revision: D52569526 Pulled By: mbasmanova fbshipit-source-id: bba99e61949bb8a7c8db15d714cc15b19f15633b
Velox PR7217(facebookincubator/velox#7217) added directbufferinput, which leads to performance regression seriously. The root cause is that the default config in the PR is not optimal for remote storage. The PR added 6 config: loadQuantum: 256M (make sure it's larger than row group size, parquet default is 128M) maxCoalesceDistance: 1M ( in case the columns are not load contieneously, like select a, c from table_with_column_a_b_c. If b is mall column than 1M, then we can load it to make a large block CoalesceBytes: 64M, break the row group fetches into small chunks
DirectBufferedInput - Selective BufferedInput without cache
Adds a BufferedInput that tracks access frequency and coalesces by
frequency class, similar to CachedBufferedInput. This does not cache
the data but instead owns the data in the BufferedInput, like the base
BufferedInput.
Adjusts coalescing, so that incfrequently accessed data has smaller
max coalesce because not all infrequent loading is correlated. Sets
the stream count cutoff for coalesced load from 40 to 1000 streams
because many streams are very small and in wide tables (e.g. mostly
null columns) and there is no point splitting these up.