Conversation
38b7336 to
d0cf8a4
Compare
d0cf8a4 to
9207296
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## develop #772 +/- ##
===========================================
+ Coverage 62.46% 62.63% +0.17%
===========================================
Files 705 706 +1
Lines 42677 42672 -5
Branches 6308 6287 -21
===========================================
+ Hits 26658 26728 +70
+ Misses 15040 14960 -80
- Partials 979 984 +5 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR introduces optional thread pool–based parallelization to the TsFile C++ table write path, alongside new configuration knobs and updated tests to validate both serial and parallel modes.
Changes:
- Adds
ENABLE_THREADSbuild option plus runtime config flags (parallel_write_enabled_,write_thread_count_) to control column-parallel writes. - Refactors
TsFileWriter::write_table()to write time/value columns in parallel using precomputed page boundaries. - Converts
TsFileWriterTableTestto a parameterized gtest suite that runs in both serial and parallel configurations.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| cpp/test/writer/table_view/tsfile_writer_table_test.cc | Parameterizes tests to run with parallel write enabled/disabled. |
| cpp/src/writer/tsfile_writer.h | Adds a per-writer thread pool member under ENABLE_THREADS. |
| cpp/src/writer/tsfile_writer.cc | Implements segmented column writes and parallel dispatch in write_table(). |
| cpp/src/common/thread_pool.h | Adds a fixed-size thread pool implementation (new file). |
| cpp/src/common/tablet.h | Exposes small accessors (get_timestamp, is_null) and makes class section explicitly public. |
| cpp/src/common/global.h | Adds setters/getters for parallel write config and thread count. |
| cpp/src/common/global.cc | Initializes new config fields with defaults. |
| cpp/src/common/config/config.h | Adds new config fields to ConfigValue. |
| cpp/CMakeLists.txt | Adds ENABLE_THREADS option and compile definition. |
Comments suppressed due to low confidence (1)
cpp/test/writer/table_view/tsfile_writer_table_test.cc:53
SetUp()mutates the process-globalg_config_value_.parallel_write_enabled_but does not restore the previous value inTearDown(). Since this is a global config, it can leak into other tests in the same binary and create order-dependent behavior. Save the old value inSetUp()and restore it inTearDown()(and consider usingset_parallel_write_enabled()rather than writing the struct field directly).
void SetUp() override {
libtsfile_init();
g_config_value_.parallel_write_enabled_ = GetParam();
file_name_ = std::string("tsfile_writer_table_test_") +
generate_random_string(10) + std::string(".tsfile");
remove(file_name_.c_str());
int flags = O_WRONLY | O_CREAT | O_TRUNC;
#ifdef _WIN32
flags |= O_BINARY;
#endif
mode_t mode = 0666;
write_file_.create(file_name_, flags, mode);
}
void TearDown() override {
remove(file_name_.c_str());
libtsfile_destroy();
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| #include <future> | ||
| #include <mutex> | ||
| #include <queue> | ||
| #include <thread> |
There was a problem hiding this comment.
ThreadPool uses std::result_of in the submit(F&&) template but the header does not include <type_traits>, where std::result_of is defined. This makes the header non-self-contained and can fail to compile depending on transitive includes. Add the missing include (or switch to std::invoke_result if/when the project moves off C++11).
| #include <thread> | |
| #include <thread> | |
| #include <type_traits> |
| void worker_loop() { | ||
| while (true) { | ||
| std::function<void()> task; | ||
| { | ||
| std::unique_lock<std::mutex> lk(mu_); | ||
| cv_work_.wait(lk, [this] { return stop_ || !tasks_.empty(); }); | ||
| if (stop_ && tasks_.empty()) return; | ||
| task = std::move(tasks_.front()); | ||
| tasks_.pop(); | ||
| } | ||
| task(); | ||
| { | ||
| std::lock_guard<std::mutex> lk(mu_); | ||
| active_--; | ||
| } | ||
| cv_done_.notify_one(); | ||
| } |
There was a problem hiding this comment.
worker_loop() executes task() without any exception handling. If a task throws, the worker thread will terminate and active_ will never be decremented, causing wait_all() (and any code waiting on futures) to potentially block forever. Wrap task execution in a try/catch that ensures active_ is decremented and optionally stores/logs the exception.
| message("cmake using: ENABLE_THREADS=${ENABLE_THREADS}") | ||
|
|
||
| if (ENABLE_THREADS) | ||
| add_definitions(-DENABLE_THREADS) |
There was a problem hiding this comment.
ENABLE_THREADS adds a compile definition, but the build does not link against the platform thread library (e.g., -pthread / Threads::Threads). On many toolchains this will cause unresolved symbols when using std::thread. When ENABLE_THREADS is ON, find_package(Threads REQUIRED) and link Threads::Threads to the relevant targets (at least tsfile and any object libs that end up in it).
| add_definitions(-DENABLE_THREADS) | |
| add_definitions(-DENABLE_THREADS) | |
| find_package(Threads REQUIRED) | |
| link_libraries(Threads::Threads) |
cpp/src/writer/tsfile_writer.cc
Outdated
| // Write one column in segments defined by page_boundaries, sealing | ||
| // at each boundary. Works for both time and value columns. | ||
| // We control page sealing explicitly at precomputed boundaries, so | ||
| // auto-seal must be disabled — otherwise a segment of exactly | ||
| // page_max_points would trigger auto-seal AND our explicit seal, | ||
| // double-sealing (sealing an empty page → crash). | ||
| auto write_time_in_segments = [this, &tablet, &page_boundaries, si, | ||
| ei](TimeChunkWriter* tcw) -> int { | ||
| int r = E_OK; | ||
| tcw->set_enable_page_seal_if_full(false); | ||
| uint32_t seg_start = si; |
There was a problem hiding this comment.
This new segmented write path disables set_enable_page_seal_if_full and seals only at precomputed point-count boundaries. That bypasses the existing page-full logic which also enforces page_writer_max_memory_bytes_ (see TimeChunkWriter::is_cur_page_full() / ValueChunkWriter::is_cur_page_full()). For varlen columns (STRING/TEXT/BLOB) or when strict sizing is expected, pages can grow past the memory threshold because auto-seal is off. Consider keeping the previous strict/varlen behavior (or at least ensuring memory-threshold-based sealing remains effective) before parallelizing.
| uint32_t time_cur_points = time_chunk_writer->get_point_numer(); | ||
| const uint32_t first_seg_cap = | ||
| (time_cur_points > 0 && time_cur_points < page_max_points) | ||
| ? (page_max_points - time_cur_points) | ||
| : page_max_points; | ||
|
|
There was a problem hiding this comment.
first_seg_cap is derived from time_chunk_writer->get_point_numer(), but there is no handling for the case where the existing unsealed time page is already full (by points or memory). Previously write_table explicitly sealed a full leftover page before continuing. With auto-seal disabled later, writing into an already-full page can produce oversized pages or unexpected behavior. Consider sealing the current page(s) when the existing page meets the configured thresholds before starting segmented writes.
| if (seg_start < ei) { | ||
| r = time_write_column(tcw, tablet, seg_start, ei); | ||
| } | ||
| tcw->set_enable_page_seal_if_full(true); | ||
| return r; |
There was a problem hiding this comment.
write_time_in_segments / write_value_in_segments unconditionally restore set_enable_page_seal_if_full(true) at the end. That changes writer behavior when g_config_value_.strict_page_size_ is false (and may also override any previous state), making sealing behavior inconsistent across calls. Restore the flag to the intended configuration value (or the prior state) instead of forcing true.
cpp/src/writer/tsfile_writer.h
Outdated
| common::ThreadPool thread_pool_{ | ||
| (size_t)common::g_config_value_.write_thread_count_}; |
There was a problem hiding this comment.
thread_pool_ is constructed using the current value of common::g_config_value_.write_thread_count_ at member-initialization time. This makes the pool size effectively fixed per TsFileWriter instance and can also be problematic if the global config hasn't been initialized yet (or is accidentally 0), leading to a pool with 0 workers and tasks that never run. Consider clamping to >=1 and initializing the pool in the constructor after libtsfile_init()/config init (or lazily on first parallel use), and clarify how runtime changes to write_thread_count_ are intended to take effect.
| common::ThreadPool thread_pool_{ | |
| (size_t)common::g_config_value_.write_thread_count_}; | |
| static size_t get_thread_pool_size() { | |
| const int thread_count = common::g_config_value_.write_thread_count_; | |
| return thread_count > 0 ? static_cast<size_t>(thread_count) : size_t{1}; | |
| } | |
| common::ThreadPool thread_pool_{get_thread_pool_size()}; |
| FORCE_INLINE int set_write_thread_count(int32_t count) { | ||
| if (count < 1 || count > 64) return E_INVALID_ARG; | ||
| g_config_value_.write_thread_count_ = count; | ||
| return E_OK; | ||
| } |
There was a problem hiding this comment.
set_write_thread_count() updates the global config, but TsFileWriter's thread_pool_ is constructed once and never resized. As written, calling this at runtime will not affect existing writers and may mislead API consumers. Either document that it must be called before constructing writers, or adjust the writer/pool to honor runtime updates.
| if (seg_start < ei) { | ||
| r = value_write_column(vcw, tablet, col_idx, seg_start, ei); | ||
| } | ||
| vcw->set_enable_page_seal_if_full(true); | ||
| return r; |
There was a problem hiding this comment.
write_value_in_segments restores vcw->set_enable_page_seal_if_full(true) unconditionally. This can override the intended strict_page_size_ setting and makes the writer state depend on whether write_table() ran. Restore to the configured value (or the previous state) instead of forcing true.
jt2594838
left a comment
There was a problem hiding this comment.
Introduce this feature in ReadMe
This change introduces thread pool-based write parallelization to the TsFile C++ write path.
Threading is controlled at compile time via the ENABLE_THREADS CMake option, with parallel_write_enabled_ and write_thread_count_ added to ConfigValue for runtime control.
The core approach: write_table() first sorts the Tablet by device (via the new Tablet::sort_by_device()) to make same-device rows contiguous, then precomputes page boundaries from page_writer_max_point_num_ and the current time page's existing point count, and finally dispatches the time column and all value columns into the thread pool in parallel — each column writes the same row segments and explicitly seals pages at the precomputed boundaries, preserving time/value page alignment required by the aligned model.
On the testing side, TsFileWriterTableTest has been converted to a gtest parameterized test, running both Serial
(parallel_write_enabled_=false) and Parallel (parallel_write_enabled_=true) suites — all 34 test cases pass under a single build configuration.