refactor: Try to simplify StringColumn's CompactionOnDump, write to file in sequential#187
refactor: Try to simplify StringColumn's CompactionOnDump, write to file in sequential#187zhanglei1949 wants to merge 1 commit intoalibaba:mainfrom
Conversation
There was a problem hiding this comment.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
Review Summary by QodoSimplify StringColumn compaction with direct sequential file writing
WalkthroughsDescription• Simplify StringColumn compaction logic by removing complex offset mapping • Directly write live strings sequentially to new data file during dump • Eliminate temporary buffer and reduce memory allocations during compaction • Remove unused unordered_map include and CompactionPlan struct Diagramflowchart LR
A["StringColumn dump"] --> B{"Check unique bytes<br/>vs total bytes"}
B -->|No compaction needed| C["Write buffers as-is"]
B -->|Compaction needed| D["Stream live strings<br/>to new file"]
D --> E["Update item offsets<br/>and compute MD5"]
E --> F["Write compacted data"]
C --> G["Close buffers"]
F --> G
File Changes1. include/neug/utils/property/column.h
|
Code Review by Qodo
|
| size_t unique_bytes = 0; | ||
| { | ||
| std::unordered_set<uint64_t> seen; | ||
| for (size_t i = 0; i < size_; ++i) { | ||
| const auto item = get_string_item(i); | ||
| if (item.length > 0 && seen.insert(item.offset).second) | ||
| unique_bytes += item.length; | ||
| } | ||
| } | ||
| pos_val = pos_.load(); | ||
| // No-compaction path: dump containers as-is. | ||
| size_t pos_val = pos_.load(); | ||
| if (unique_bytes == pos_val) { | ||
| write_file(filename + ".pos", &pos_val, sizeof(pos_val), 1); | ||
| items_buffer_->Dump(filename + ".items"); | ||
| data_buffer_->Dump(filename + ".data"); | ||
| items_buffer_->Close(); | ||
| data_buffer_->Close(); | ||
| return; | ||
| } | ||
|
|
||
| // Slow path: stream each live slot to a new .data file, skipping stale | ||
| // bytes. Reads from data_buffer_ and writes to a different file — no | ||
| // aliasing, so plain slot order is safe (no sort needed). | ||
| const auto data_path = filename + ".data"; | ||
| auto parent = std::filesystem::path(data_path).parent_path(); | ||
| if (!parent.empty()) | ||
| std::filesystem::create_directories(parent); | ||
| std::unique_ptr<FILE, decltype(&fclose)> fout( | ||
| fopen(data_path.c_str(), "wb"), &fclose); | ||
| if (!fout) | ||
| THROW_IO_EXCEPTION("Failed to open for compaction: " + data_path); | ||
|
|
||
| FileHeader header{}; | ||
| fwrite(&header, sizeof(header), 1, fout.get()); // placeholder | ||
|
|
||
| const auto* raw = reinterpret_cast<const char*>(data_buffer_->GetData()); | ||
| size_t write_offset = 0; | ||
| MD5_CTX md5_ctx; | ||
| MD5_Init(&md5_ctx); | ||
|
|
||
| for (size_t i = 0; i < size_; ++i) { | ||
| const auto item = get_string_item(i); | ||
| if (item.length == 0) | ||
| continue; | ||
| fwrite(raw + item.offset, 1, item.length, fout.get()); | ||
| MD5_Update(&md5_ctx, raw + item.offset, item.length); | ||
| set_string_item(i, {write_offset, item.length}); | ||
| write_offset += item.length; | ||
| } | ||
|
|
||
| MD5_Final(header.data_md5, &md5_ctx); | ||
| fseek(fout.get(), 0, SEEK_SET); | ||
| fwrite(&header, sizeof(header), 1, fout.get()); | ||
| fout.reset(); | ||
|
|
||
| pos_val = write_offset; |
There was a problem hiding this comment.
1. Compaction logic still in header 📎 Requirement gap ⚙ Maintainability
TypedColumn<std::string_view>::dump() now contains substantial compaction + file I/O implementation (directory creation, fopen/fwrite/fseek, MD5) directly in column.h, which keeps complex non-interface logic in the header. This violates the refactor requirement to move string compaction implementation out of column.h into appropriate implementation files.
Agent Prompt
## Issue description
`include/neug/utils/property/column.h` still contains a large, non-interface implementation of string column compaction and sequential file dumping inside `TypedColumn<std::string_view>::dump()`, contrary to the requirement to reduce header logic.
## Issue Context
The compliance objective is to keep `column.h` focused on declarations/interfaces and move substantial compaction logic (filesystem operations, file handles, streaming writes, hashing, offset rewriting) into appropriate `.cc/.cpp` implementation files or a dedicated module.
## Fix Focus Areas
- include/neug/utils/property/column.h[288-342]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| size_t unique_bytes = 0; | ||
| { | ||
| std::unordered_set<uint64_t> seen; | ||
| for (size_t i = 0; i < size_; ++i) { | ||
| const auto item = get_string_item(i); | ||
| if (item.length > 0 && seen.insert(item.offset).second) | ||
| unique_bytes += item.length; | ||
| } | ||
| } | ||
| pos_val = pos_.load(); | ||
| // No-compaction path: dump containers as-is. | ||
| size_t pos_val = pos_.load(); | ||
| if (unique_bytes == pos_val) { | ||
| write_file(filename + ".pos", &pos_val, sizeof(pos_val), 1); | ||
| items_buffer_->Dump(filename + ".items"); | ||
| data_buffer_->Dump(filename + ".data"); | ||
| items_buffer_->Close(); | ||
| data_buffer_->Close(); | ||
| return; | ||
| } | ||
|
|
||
| // Slow path: stream each live slot to a new .data file, skipping stale | ||
| // bytes. Reads from data_buffer_ and writes to a different file — no | ||
| // aliasing, so plain slot order is safe (no sort needed). | ||
| const auto data_path = filename + ".data"; | ||
| auto parent = std::filesystem::path(data_path).parent_path(); | ||
| if (!parent.empty()) | ||
| std::filesystem::create_directories(parent); | ||
| std::unique_ptr<FILE, decltype(&fclose)> fout( | ||
| fopen(data_path.c_str(), "wb"), &fclose); | ||
| if (!fout) | ||
| THROW_IO_EXCEPTION("Failed to open for compaction: " + data_path); | ||
|
|
||
| FileHeader header{}; | ||
| fwrite(&header, sizeof(header), 1, fout.get()); // placeholder | ||
|
|
||
| const auto* raw = reinterpret_cast<const char*>(data_buffer_->GetData()); | ||
| size_t write_offset = 0; | ||
| MD5_CTX md5_ctx; | ||
| MD5_Init(&md5_ctx); | ||
|
|
||
| for (size_t i = 0; i < size_; ++i) { | ||
| const auto item = get_string_item(i); | ||
| if (item.length == 0) | ||
| continue; | ||
| fwrite(raw + item.offset, 1, item.length, fout.get()); | ||
| MD5_Update(&md5_ctx, raw + item.offset, item.length); | ||
| set_string_item(i, {write_offset, item.length}); | ||
| write_offset += item.length; | ||
| } |
There was a problem hiding this comment.
2. Compaction inflates shared strings 🐞 Bug ≡ Correctness
TypedColumn<std::string_view>::dump() slow-path rewrites bytes for every live slot even when many items share the same backing offset (e.g., from resize(default)), turning a single shared string into N copies when any stale bytes exist. This can massively increase the dumped .data size and makes compaction counterproductive.
Agent Prompt
## Issue description
`TypedColumn<std::string_view>::dump()`'s slow compaction path writes data for every row even when multiple rows reference the same `string_item.offset` (shared backing bytes, created by `resize(size, default_value)`). When compaction triggers due to any stale bytes, this expands shared strings into N copies and can drastically inflate snapshot size.
## Issue Context
The previous implementation avoided this by mapping `old_offset -> new_offset` and only writing each unique backing segment once.
## Fix Focus Areas
- include/neug/utils/property/column.h[287-347]
- include/neug/utils/property/column.h[364-388]
## Implementation notes
- Track seen offsets in the slow path (e.g., `std::unordered_map<uint64_t, uint64_t> old_to_new`).
- If an offset was already written, only update the item to the previously assigned new offset; do not `fwrite()` again and do not advance `write_offset`.
- Ensure the resulting `pos_val` equals the compacted payload size (unique bytes actually written).
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| std::unique_ptr<FILE, decltype(&fclose)> fout( | ||
| fopen(data_path.c_str(), "wb"), &fclose); | ||
| if (!fout) | ||
| THROW_IO_EXCEPTION("Failed to open for compaction: " + data_path); | ||
|
|
||
| FileHeader header{}; | ||
| fwrite(&header, sizeof(header), 1, fout.get()); // placeholder | ||
|
|
||
| const auto* raw = reinterpret_cast<const char*>(data_buffer_->GetData()); | ||
| size_t write_offset = 0; | ||
| MD5_CTX md5_ctx; | ||
| MD5_Init(&md5_ctx); | ||
|
|
||
| for (size_t i = 0; i < size_; ++i) { | ||
| const auto item = get_string_item(i); | ||
| if (item.length == 0) | ||
| continue; | ||
| fwrite(raw + item.offset, 1, item.length, fout.get()); | ||
| MD5_Update(&md5_ctx, raw + item.offset, item.length); | ||
| set_string_item(i, {write_offset, item.length}); | ||
| write_offset += item.length; | ||
| } | ||
|
|
||
| MD5_Final(header.data_md5, &md5_ctx); | ||
| fseek(fout.get(), 0, SEEK_SET); | ||
| fwrite(&header, sizeof(header), 1, fout.get()); | ||
| fout.reset(); |
There was a problem hiding this comment.
3. Unchecked file write errors 🐞 Bug ☼ Reliability
The new dump compaction path ignores return values from fwrite()/fseek() and does not check fclose() results, so I/O failures can silently produce corrupted snapshot .data files. Other dump implementations in the repo check these operations and throw on failure.
Agent Prompt
## Issue description
The StringColumn dump compaction path performs multiple file I/O operations (`fwrite`, `fseek`, `fclose`) without checking return values. On short writes, seek failures, or close/flush errors, the snapshot file can be silently corrupted.
## Issue Context
Other codepaths already enforce strict I/O checking (e.g., `MMapContainer::Dump`, `MutableCsr` dump).
## Fix Focus Areas
- include/neug/utils/property/column.h[314-340]
- src/storages/container/mmap_container.cc[132-151]
- src/storages/csr/mutable_csr.cc[163-196]
## Implementation notes
- Check `fwrite(...) == expected` for header and each data segment; throw `THROW_IO_EXCEPTION(...)` on mismatch.
- Check `fseek(...) == 0` before rewriting the header.
- Prefer `if (fclose(fp.release()) != 0) THROW_IO_EXCEPTION(...)` to ensure close/flush errors are surfaced.
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
There was a problem hiding this comment.
Pull request overview
This PR refactors StringColumn (TypedColumn<std::string_view>) dump-time compaction logic to simplify CompactionOnDump by writing the compacted .data file sequentially, removing the previous compaction plan/mapping implementation (Fix #177).
Changes:
- Replaces the previous compaction-plan + offset-remap implementation with a two-path dump: fast path (no stale bytes) vs slow path (manual streaming write).
- Removes the
CompactionPlan/stream_compact_and_dump()helper code and associatedunordered_mapusage. - Introduces a
unique_bytesscan to decide whether compaction is needed before dumping.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| for (size_t i = 0; i < size_; ++i) { | ||
| const auto item = get_string_item(i); | ||
| if (item.length == 0) | ||
| continue; | ||
| fwrite(raw + item.offset, 1, item.length, fout.get()); | ||
| MD5_Update(&md5_ctx, raw + item.offset, item.length); | ||
| set_string_item(i, {write_offset, item.length}); | ||
| write_offset += item.length; | ||
| } |
There was a problem hiding this comment.
Slow-path compaction currently fwrite’s every non-empty slot in index order and updates each slot’s offset to the new write position. This duplicates bytes when multiple slots share the same old offset (e.g., resize(default) makes many rows share one backing string), which can massively increase the dumped .data size when any stale bytes exist. Track a mapping from old_offset -> new_offset and only write each unique offset once, then remap subsequent slots to the already-written location (like the previous implementation did).
| const auto item = get_string_item(i); | ||
| if (item.length == 0) | ||
| continue; | ||
| fwrite(raw + item.offset, 1, item.length, fout.get()); | ||
| MD5_Update(&md5_ctx, raw + item.offset, item.length); | ||
| set_string_item(i, {write_offset, item.length}); | ||
| write_offset += item.length; |
There was a problem hiding this comment.
The slow-path loop skips item.length == 0 entries via continue and therefore never rewrites their offsets. Empty-string values can legitimately have a non-zero offset (set_value appends 0 bytes but records the current pos), and after compaction the new .data size may be smaller than that old offset, causing get_view()'s offset + length <= data_size assert to fail after reopen. Even for zero-length items, update the stored offset to a valid location within the new compacted buffer (e.g., current write_offset) before continuing.
| void dump(const std::string& filename) override { | ||
| // Compact before dumping. StringColumn uses an append-only strategy for | ||
| // updates, leaving stale copies in data_buffer_. When there is reused | ||
| // data we stream the compacted bytes directly to the output file in a | ||
| // single forward pass, computing MD5 on-the-fly, which avoids: | ||
| // 1. A temporary buffer allocation (effective_size bytes). | ||
| // 2. The memcpy from temp_buf → data_buffer_. | ||
| // 3. The subsequent container Dump() copy. | ||
| // When there is nothing to compact we fall through and let the container | ||
| // handle the write as usual (e.g. reflink / copy_file_range via | ||
| // FileSharedMMap::Dump, or a single fwrite via MMapContainer::Dump). | ||
| if (!items_buffer_ || !data_buffer_) { | ||
| THROW_RUNTIME_ERROR("Buffers not initialized for dumping"); | ||
| } | ||
| resize(size_); // Resize the string column with avg size to shrink or | ||
| // expand data buffer | ||
| size_t pos_val; | ||
| if (size_ > 0) { | ||
| auto plan = prepare_compaction_plan(); | ||
| if (plan.reused_size > 0) { | ||
| // Stream path: source (data_buffer_) and destination (snapshot file) | ||
| // are always different files, so there is no aliasing hazard. | ||
| pos_val = stream_compact_and_dump(plan, filename + ".data"); | ||
| write_file(filename + ".pos", &pos_val, sizeof(pos_val), 1); | ||
| items_buffer_->Dump(filename + ".items"); | ||
| items_buffer_->Close(); | ||
| data_buffer_->Close(); | ||
| return; | ||
| size_t unique_bytes = 0; | ||
| { | ||
| std::unordered_set<uint64_t> seen; | ||
| for (size_t i = 0; i < size_; ++i) { | ||
| const auto item = get_string_item(i); | ||
| if (item.length > 0 && seen.insert(item.offset).second) | ||
| unique_bytes += item.length; | ||
| } | ||
| } | ||
| pos_val = pos_.load(); | ||
| // No-compaction path: dump containers as-is. | ||
| size_t pos_val = pos_.load(); | ||
| if (unique_bytes == pos_val) { | ||
| write_file(filename + ".pos", &pos_val, sizeof(pos_val), 1); | ||
| items_buffer_->Dump(filename + ".items"); | ||
| data_buffer_->Dump(filename + ".data"); | ||
| items_buffer_->Close(); | ||
| data_buffer_->Close(); | ||
| return; | ||
| } |
There was a problem hiding this comment.
dump() no longer calls resize(size_) before dumping. In the no-compaction path (unique_bytes == pos_), this means data_buffer_->Dump() will persist the entire allocated buffer size (which may have grown via set_any() resizes) rather than a right-sized buffer, potentially bloating snapshot files and checkpoint time even when there are no stale bytes. Consider reintroducing the resize(size_) call (or otherwise shrinking data_buffer_ to pos_) before the fast-path dump.
| FileHeader header{}; | ||
| fwrite(&header, sizeof(header), 1, fout.get()); // placeholder | ||
|
|
||
| const auto* raw = reinterpret_cast<const char*>(data_buffer_->GetData()); | ||
| size_t write_offset = 0; | ||
| MD5_CTX md5_ctx; | ||
| MD5_Init(&md5_ctx); | ||
|
|
||
| for (size_t i = 0; i < size_; ++i) { | ||
| const auto item = get_string_item(i); | ||
| if (item.length == 0) | ||
| continue; | ||
| fwrite(raw + item.offset, 1, item.length, fout.get()); | ||
| MD5_Update(&md5_ctx, raw + item.offset, item.length); | ||
| set_string_item(i, {write_offset, item.length}); | ||
| write_offset += item.length; | ||
| } | ||
|
|
||
| MD5_Final(header.data_md5, &md5_ctx); | ||
| fseek(fout.get(), 0, SEEK_SET); | ||
| fwrite(&header, sizeof(header), 1, fout.get()); | ||
| fout.reset(); |
There was a problem hiding this comment.
The manual .data writer doesn’t check return values from fwrite()/fseek() (placeholder header write, per-item writes, final header overwrite), and it doesn’t check/propagate close/flush errors. Other dump paths (e.g., MMapContainer::Dump) throw on short writes/seek failures; this path should similarly validate all I/O calls and raise THROW_IO_EXCEPTION to avoid silently producing corrupted snapshots on disk-full or I/O errors.
|
|
||
| // Slow path: stream each live slot to a new .data file, skipping stale | ||
| // bytes. Reads from data_buffer_ and writes to a different file — no | ||
| // aliasing, so plain slot order is safe (no sort needed). | ||
| const auto data_path = filename + ".data"; | ||
| auto parent = std::filesystem::path(data_path).parent_path(); | ||
| if (!parent.empty()) | ||
| std::filesystem::create_directories(parent); | ||
| std::unique_ptr<FILE, decltype(&fclose)> fout( |
There was a problem hiding this comment.
There are existing table/column dump+reopen tests, but they don’t appear to exercise this new slow-path compaction branch (unique_bytes != pos_). Adding a regression test that (1) creates many rows sharing a default string, (2) updates a row multiple times to create stale bytes, and (3) includes at least one empty-string value, would catch both size blow-ups and offset validity issues in the compaction writer.
Another solution diffs from #181
Fix #177