[VL] Support GPU async native shuffle read#12370
Conversation
|
Run Gluten Clickhouse CI on x86 |
e5cc3aa to
0713a26
Compare
|
Run Gluten Clickhouse CI on x86 |
0713a26 to
bebb887
Compare
|
Run Gluten Clickhouse CI on x86 |
bebb887 to
5ecc27c
Compare
|
Run Gluten Clickhouse CI on x86 |
2 similar comments
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
f840d7a to
c1d2691
Compare
|
Run Gluten Clickhouse CI on x86 |
2 similar comments
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
b716f71 to
7c4a796
Compare
|
Run Gluten Clickhouse CI on x86 |
9 similar comments
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
ad69c7d to
096215f
Compare
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
1c29a29 to
33e5b75
Compare
|
Run Gluten Clickhouse CI on x86 |
|
|
||
| private def assertPositiveBlockSize(blockId: BlockId, blockSize: Long): Unit = { | ||
| if (blockSize < 0) { | ||
| throw BlockException(blockId, "Negative block size " + size) |
| LOG(INFO) << "Trying to get from cached buffer queue. Queue length: " << queue_.size() | ||
| << ", total size in queue: " << totalSize_ << ", current batch size: " << batch->numBytes() << std::endl; |
|
|
||
| notFull_.wait(lock, [&]() { return noMoreBatches_ || totalSize_ + batchSize <= capacity_; }); | ||
| if (noMoreBatches_) { | ||
| LOG(WARNING) << "Discard batch due to calling put() after noMorBatches()."; |
| // Stop reading more streams. Blocked by the native reader threads. | ||
| jniWrapper.stop(shuffleReaderHandle) | ||
| onComplete.foreach(_()) | ||
| // Would remove the resource object from registry to lower GC pressure. | ||
| TaskResources.releaseResource(resourceId) |
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
|
|
||
| private def assertPositiveBlockSize(blockId: BlockId, blockSize: Long): Unit = { | ||
| if (blockSize < 0) { | ||
| throw BlockException(blockId, "Negative block size " + size) |
| ReaderThreadPool* VeloxBackend::getReaderThreadPool() { | ||
| static std::once_flag readerThreadPoolInit; | ||
| std::call_once(readerThreadPoolInit, [this] { | ||
| const auto configuredThreads = | ||
| backendConf_->get<int32_t>(kShuffleReaderThreads, static_cast<int32_t>(std::thread::hardware_concurrency())); | ||
| // std::thread::hardware_concurrency() can return 0; | ||
| const auto numThreads = configuredThreads > 0 ? configuredThreads : 1; | ||
| readerThreadPool_ = std::make_unique<ReaderThreadPool>(numThreads); | ||
| }); | ||
| return readerThreadPool_.get(); | ||
| } |
| for (auto& task : tasks) { | ||
| tasks_.push({std::move(task), priority}); | ||
| } | ||
| } |
| auto& prioritizedTask = tasks_.top(); | ||
| LOG(INFO) << "Worker thread " << std::this_thread::get_id() << " is executing a task with priority " | ||
| << prioritizedTask.priority; | ||
| task = std::move(prioritizedTask.task); | ||
| tasks_.pop(); | ||
| } | ||
|
|
||
| if (task) { | ||
| task(); | ||
| } |
| void VeloxGpuHashShuffleReaderDeserializer::read() { | ||
| std::shared_ptr<arrow::io::InputStream> inputStream = nullptr; | ||
|
|
| // Close input stream if it's still open. | ||
| if (inputStream != nullptr) { | ||
| GLUTEN_THROW_NOT_OK(inputStream->Close()); | ||
| } | ||
|
|
||
| // Decrement active reader count. | ||
| if (activeReaders_.fetch_sub(1, std::memory_order_acq_rel) == 1) { | ||
| batchQueue_->noMoreBatches(); | ||
| completionCV_.notify_all(); | ||
| } |
The parallelism of gpu stages is limited by the GPU concurrency, but the shuffle read process, which includes data fetching, decompression and deserialisation, are still running on CPU. In this case we can parallelise these process to produce the output data asynchronously.
This PR adopts a producer–consumer design. Producer threads asynchronously perform shuffle reads, including data fetching, decompression, and deserialization, and produce decoded data. The consumer (the main thread) retrieves the prepared data as it becomes available and creates the corresponding device buffers.