-
Notifications
You must be signed in to change notification settings - Fork 6.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Integrate WAL compression into log reader/writer. #9642
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. I didn't fully understand the logic, so I've left some questions inline.
const size_t max_output_buffer_len = | ||
kBlockSize - (recycle_log_files_ ? kRecyclableHeaderSize : kHeaderSize); | ||
CompressionOptions opts; | ||
constexpr uint32_t compression_format_version = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use a descriptive constant instead of 2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked at other call sites for the existing CompressData()/UncompressData() and the format seems to be hardcoded to 2.
@@ -85,10 +100,31 @@ IOStatus Writer::AddRecord(const Slice& slice) { | |||
assert(static_cast<int64_t>(kBlockSize - block_offset_) >= header_size); | |||
|
|||
const size_t avail = kBlockSize - block_offset_ - header_size; | |||
|
|||
// Compress the record | |||
if (compress_ && (compress_start || left == 0)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is kinda confusing. IIUC, we try to compress as much as will fit in a block. However, the first physical record for the logical record, and the start of the block may not be aligned. So the compressed buffer may be split into multiple physical records. Is that correct? Would the records have a dependency?
It might be simpler and easier to reason about if we have nested while loops.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that is correct. There is no dependency other than all the compressed records need to be uncompressed in order to recover the original record. I tried changing the compress API to pass the available physical block size and it doesn't make a difference.
I can add some comments to make it more readable. The advantage of having a single loop is that there's less code required for compress vs uncompressed. Once a chunk is available, compressed or not, the rest of the code to generate the physical record is the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comments.
db/log_reader.cc
Outdated
uncompressed_record_.append(uncompressed_buffer_.get(), | ||
uncompressed_size); | ||
} | ||
} while (remaining > 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is remaining
guaranteed to go down to 0 in all cases? I'm still not sure how it works in the case of a chunk compressed by ZSTD_compressStream2() spanning 2 physical records. It seems hard to believe that the uncompression algorithm can completely consume input upto arbitrary boundaries. Would it be safer to concat all the physical records belonging to a logical one and then uncompress?
To make it more concrete, consider an example of a logical record compressed by 2 calls to ZSTD_compressStream, and written to 3 physical records {p1, p2} and {p3}, where the output of the first call is split into p1 and p2. Is there a possibility of a a literal spanning the boundary between p1/p2, since the compress call would have been unaware of such a boundary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That case works. The uncompression algorithm doesn't rely on knowing the boundaries since the input reader doesn't know them either in case of file compression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, now that I think about it, I guess in the example I mentioned above the uncompression might keep any input partial data in its internal buffers and flush it to output when it sees the rest. The ZSTD streaming documentation is rather sparse, hence the doubt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you look at the example here, The outer loop just reads chunks from the compressed file. The inner loop uses the input.pos/input.size to figure out if the chunk can be decompressed any further.
https://github.com/facebook/zstd/blob/dev/examples/streaming_decompression.c#L47
db/log_reader.cc
Outdated
uncompressed_record_.append(uncompressed_buffer_.get(), | ||
uncompressed_size); | ||
} | ||
} while (remaining > 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I think your original check of remaining > 0 || uncompressed_size == kBlockSize
was probably correct. The documentation states that if output.pos == output.size, there may be some data left in internal buffers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's why I had added it, although removing it didn't cause the tests to fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added it back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation. LGTM. Can we run db_bench to verify no regression? Same benchmarks as the previous PR for adding compression type record should suffice.
Unfortunately, I'm seeing a regression. Base commit average 421886.00 ops/sec. fillseq : 2.287 micros/op 437242 ops/sec; 48.4 MB/s With the changes average 393841.50 ops/sec - Will run it a few more times to confirm. |
Comparing release builds - Benchmark results with ZSTD WAL compression enabled - |
db/log_reader.cc
Outdated
*fragment = Slice(header + header_size, length); | ||
*fragment_type_or_err = type; | ||
return true; | ||
if (uncompress_ && type != kSetCompressionType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's possible that re-arranging this if
(negating the condition and swapping the "then" and "else" code blocks) could negate some of the regression. Why? Generally default branch predictions would be not to jump forward (favoring "then" rather than "else"). And possibly better code locality because of "return" from "then" and "else". You can also put LIKELY()
around type != kSetCompressionType
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That seemed to help. I'll run it one more time to be sure.
Without PR -
Avg ops/sec 478815
fillseq : 2.042 micros/op 489809 ops/sec; 54.2 MB/s
fillseq : 2.134 micros/op 468617 ops/sec; 51.8 MB/s
fillseq : 2.066 micros/op 484099 ops/sec; 53.6 MB/s
fillseq : 2.140 micros/op 467203 ops/sec; 51.7 MB/s
fillseq : 2.146 micros/op 465934 ops/sec; 51.5 MB/s
fillseq : 2.038 micros/op 490768 ops/sec; 54.3 MB/s
fillseq : 2.194 micros/op 455855 ops/sec; 50.4 MB/s
fillseq : 2.051 micros/op 487570 ops/sec; 53.9 MB/s
fillseq : 2.075 micros/op 481840 ops/sec; 53.3 MB/s
fillseq : 2.150 micros/op 465151 ops/sec; 51.5 MB/s
fillseq : 2.062 micros/op 484875 ops/sec; 53.6 MB/s
fillseq : 2.054 micros/op 486925 ops/sec; 53.9 MB/s
fillseq : 2.095 micros/op 477330 ops/sec; 52.8 MB/s
fillseq : 2.063 micros/op 484727 ops/sec; 53.6 MB/s
fillseq : 2.020 micros/op 495089 ops/sec; 54.8 MB/s
fillseq : 2.093 micros/op 477879 ops/sec; 52.9 MB/s
fillseq : 2.133 micros/op 468929 ops/sec; 51.9 MB/s
fillseq : 2.014 micros/op 496462 ops/sec; 54.9 MB/s
fillseq : 2.151 micros/op 464845 ops/sec; 51.4 MB/s
fillseq : 2.073 micros/op 482397 ops/sec; 53.4 MB/s
With PR -
Avg ops/sec 470565
fillseq : 2.162 micros/op 462585 ops/sec; 51.2 MB/s
fillseq : 2.092 micros/op 478039 ops/sec; 52.9 MB/s
fillseq : 2.103 micros/op 475427 ops/sec; 52.6 MB/s
fillseq : 2.081 micros/op 480608 ops/sec; 53.2 MB/s
fillseq : 2.147 micros/op 465735 ops/sec; 51.5 MB/s
fillseq : 2.117 micros/op 472425 ops/sec; 52.3 MB/s
fillseq : 2.092 micros/op 478105 ops/sec; 52.9 MB/s
fillseq : 2.117 micros/op 472352 ops/sec; 52.3 MB/s
fillseq : 2.116 micros/op 472519 ops/sec; 52.3 MB/s
fillseq : 2.131 micros/op 469370 ops/sec; 51.9 MB/s
fillseq : 2.154 micros/op 464144 ops/sec; 51.3 MB/s
fillseq : 2.098 micros/op 476660 ops/sec; 52.7 MB/s
fillseq : 2.139 micros/op 467601 ops/sec; 51.7 MB/s
fillseq : 2.170 micros/op 460933 ops/sec; 51.0 MB/s
fillseq : 2.124 micros/op 470743 ops/sec; 52.1 MB/s
fillseq : 2.120 micros/op 471664 ops/sec; 52.2 MB/s
fillseq : 2.115 micros/op 472834 ops/sec; 52.3 MB/s
fillseq : 2.174 micros/op 460034 ops/sec; 50.9 MB/s
fillseq : 2.123 micros/op 471077 ops/sec; 52.1 MB/s
fillseq : 2.135 micros/op 468455 ops/sec; 51.8 MB/s
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Subsequent runs, I still see some regression -
490107 vs 473037 ops/sec
My guess is that something similar needs to be done for log_writer as well but because the else block is embedded in the loop, it'll probably need a bigger refactoring.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its noise. The variation without your changes (478815 vs 490107) suggests so. Also, PGO should take care of rearrangement if necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. Let me submit a diff.
@sidroyc has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks!
Integrate the streaming compress/uncompress API into WAL compression.
The streaming compression object is stored in the log_writer along with a reusable output buffer to store the compressed buffer(s).
The streaming uncompress object is stored in the log_reader along with a reusable output buffer to store the uncompressed buffer(s).
Test Plan:
Added unit tests to verify different scenarios - large buffers, split compressed buffers, etc.
Future optimizations:
The overhead for small records is quite high, so it makes sense to compress only buffers above a certain threshold and use a separate record type to indicate that those records are compressed.