[fix](wal) rename wal_reader and move to wal directory#61135
[fix](wal) rename wal_reader and move to wal directory#61135yiguolei merged 1 commit intoapache:masterfrom
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
There was a problem hiding this comment.
Pull request overview
This PR refactors WAL reading by separating low-level WAL file parsing from the vectorized GenericReader implementation, renaming the old file-based reader to WalFileReader and relocating the vectorized WAL reader into the group-commit WAL directory.
Changes:
- Introduce
WalFileReaderas the low-level WAL file reader (init/read header/read blocks/finalize). - Move/rename the vectorized
WalReader(GenericReaderimplementation) tobe/src/load/group_commit/wal/and update includes/usages. - Remove the previous
be/src/format/wal/wal_reader.{h,cpp}implementation and update tests and call sites.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| be/test/format/wal/wal_reader_writer_test.cpp | Update unit test to use WalFileReader. |
| be/src/load/group_commit/wal/wal_table.cpp | Switch header-reading helper to WalFileReader. |
| be/src/load/group_commit/wal/wal_reader.h | New location for vectorized WalReader (GenericReader) and switch to WalFileReader backend. |
| be/src/load/group_commit/wal/wal_reader.cpp | Vectorized WAL block conversion now reads via WalFileReader. |
| be/src/load/group_commit/wal/wal_manager.cpp | Update include to new wal_reader.h location. |
| be/src/load/group_commit/wal/wal_file_reader.h | Add new low-level WAL file reader API. |
| be/src/load/group_commit/wal/wal_file_reader.cpp | Implement low-level WAL file reading/parsing logic. |
| be/src/format/wal/wal_reader.h | Remove old vectorized WAL reader header. |
| be/src/format/wal/wal_reader.cpp | Remove old vectorized WAL reader implementation. |
| be/src/exec/scan/file_scanner.cpp | Update include to new WalReader header location. |
Comments suppressed due to low confidence (1)
be/src/load/group_commit/wal/wal_table.cpp:345
- If
read_header()(or the logging line) returns an error,finalize()won't run and the underlying file handle can leak becauseWalFileReader's destructor doesn't close. Consider a scope guard/RAII pattern sofinalize()is always executed after a successfulinit().
RETURN_IF_ERROR(wal_reader->read_header(version, columns));
VLOG_DEBUG << "wal=" << wal_path << ",version=" << std::to_string(version)
<< ",columns=" << columns;
RETURN_IF_ERROR(wal_reader->finalize());
return Status::OK();
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| namespace doris::vectorized { | ||
| #include "common/compile_check_begin.h" | ||
| struct ScannerCounter; | ||
| class WalReader : public GenericReader { | ||
| ENABLE_FACTORY_CREATOR(WalReader); | ||
|
|
||
| #include "common/status.h" | ||
| #include "io/fs/file_reader_writer_fwd.h" | ||
|
|
||
| namespace doris { | ||
|
|
||
| class WalReader { | ||
| public: | ||
| explicit WalReader(const std::string& file_name); | ||
| ~WalReader(); | ||
|
|
||
| Status init(); | ||
| Status finalize(); | ||
|
|
||
| Status read_block(PBlock& block); | ||
| Status read_header(uint32_t& version, std::string& col_ids); | ||
| WalReader(RuntimeState* state); | ||
| ~WalReader() override = default; | ||
| Status init_reader(const TupleDescriptor* tuple_descriptor); | ||
| Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; |
There was a problem hiding this comment.
WalReader's public API uses RuntimeState*, but this header doesn't declare RuntimeState (nor include runtime/runtime_state.h). This can break any TU that includes this header without already forward-declaring RuntimeState (e.g. wal_manager.h includes it). Add a forward declaration for doris::RuntimeState (preferred) or include the defining header.
| if (UNLIKELY(!block.ParseFromString(buf))) { | ||
| return Status::InternalError( | ||
| "failed to deserialize row, file_size=" + std::to_string(file_reader->size()) + | ||
| ", read_offset=" + std::to_string(_offset) + +", block_bytes=" + |
There was a problem hiding this comment.
_deserialize() builds an error string with ... + std::to_string(_offset) + + ", block_bytes=" + ... (double +), which is a compile-time error. Remove the stray + so the concatenation is valid (or switch to a formatter to avoid manual concatenation).
| ", read_offset=" + std::to_string(_offset) + +", block_bytes=" + | |
| ", read_offset=" + std::to_string(_offset) + ", block_bytes=" + |
| // read block | ||
| std::string block_buf; | ||
| block_buf.resize(block_len); | ||
| RETURN_IF_ERROR(file_reader->read_at(_offset, {block_buf.c_str(), block_len}, &bytes_read)); |
There was a problem hiding this comment.
read_at() writes into the provided buffer, but block_buf.c_str() returns a pointer to a const, null-terminated view of the string. Writing through it relies on const_cast and is undefined behavior per the C++ standard; use a mutable buffer pointer from the string (e.g., its writable data) instead.
| RETURN_IF_ERROR(file_reader->read_at(_offset, {block_buf.c_str(), block_len}, &bytes_read)); | |
| RETURN_IF_ERROR(file_reader->read_at(_offset, {block_buf.data(), block_len}, &bytes_read)); |
| return Status::InternalError("failed to read header expected= " + std::to_string(len) + | ||
| ",actually=" + std::to_string(bytes_read)); | ||
| for (auto* slot_desc : _tuple_descriptor->slots()) { | ||
| auto pos = _column_pos_map[slot_desc->col_unique_id()]; |
There was a problem hiding this comment.
Using _column_pos_map[slot_desc->col_unique_id()] will insert a default entry (pos=0) when the WAL header doesn't contain that column id, silently reading the wrong column. Use a lookup that doesn't mutate the map and return an error / record it in missing_cols when the id is absent.
| auto pos = _column_pos_map[slot_desc->col_unique_id()]; | |
| auto it = _column_pos_map.find(slot_desc->col_unique_id()); | |
| if (it == _column_pos_map.end()) { | |
| return Status::InternalError("read wal {} fail, column id {} not found in header", | |
| _wal_path, slot_desc->col_unique_id()); | |
| } | |
| auto pos = it->second; |
| pos, src_block.columns()); | ||
| } | ||
| vectorized::ColumnPtr column_ptr = src_block.get_by_position(pos).column; | ||
| if (!column_ptr && slot_desc->is_nullable()) { |
There was a problem hiding this comment.
make_nullable() dereferences its argument; calling it with a null column_ptr will crash. If a WAL column can be absent/null here, construct an all-NULL column of the expected type/row count instead; otherwise, remove the null check and assert the column is always present.
| if (!column_ptr && slot_desc->is_nullable()) { | |
| if (!column_ptr) { | |
| // WAL column is absent; create an all-NULL column with the expected type and row count. | |
| auto& type = output_block_columns[index].type; | |
| column_ptr = type->create_column(); | |
| column_ptr->insert_many_defaults(src_block.rows()); | |
| } else if (slot_desc->is_nullable()) { |
| std::vector<std::string> column_id_vector = | ||
| absl::StrSplit(col_ids, ",", absl::SkipWhitespace()); | ||
| _column_id_count = column_id_vector.size(); | ||
| try { | ||
| int64_t pos = 0; | ||
| for (auto col_id_str : column_id_vector) { | ||
| auto col_id = std::strtoll(col_id_str.c_str(), nullptr, 10); | ||
| _column_pos_map.emplace(col_id, pos); | ||
| pos++; | ||
| } | ||
| } catch (const std::invalid_argument& e) { | ||
| return Status::InvalidArgument("Invalid format, {}", e.what()); |
There was a problem hiding this comment.
std::strtoll doesn't throw std::invalid_argument, so the try/catch here is ineffective. If you need validation, parse with an end-pointer/errno check and return InvalidArgument when the entire token isn't a valid integer; also consider clearing _column_pos_map before filling to avoid stale entries if get_columns() is called more than once.
|
run buildall |
|
PR approved by at least one committer and no changes requested. |
|
PR approved by anyone and no changes requested. |
|
skip buildall |
TPC-H: Total hot run time: 27741 ms |
TPC-DS: Total hot run time: 153268 ms |
What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)