Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pipelined & parallel compression optimization #6262

Closed
wants to merge 7 commits into from

Conversation

cp5555
Copy link

@cp5555 cp5555 commented Jan 6, 2020

This PR adds support for pipelined & parallel compression optimization for BlockBasedTableBuilder. This optimization makes block building, block compression and block appending a pipeline, and uses multiple threads to accelerate block compression. Users can set CompressionOptions::parallel_threads greater than 1 to enable compression parallelism.

// Number of threads for parallel compression.
//
// Default: 1.
uint32_t threads;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename the option name to something more specific.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed this option to parallel_threads.

@@ -128,21 +133,29 @@ struct CompressionOptions {
// Default: false.
bool enabled;

// Whether to enable parallel compression.
bool parallel_enabled;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to when we already have the previous option?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this since parallel_threads is sufficient.

* This source code is licensed under both the BSD-style license (found in the
* LICENSE file in the root directory of this source tree) and the GPLv2 (found
* in the COPYING file in the root directory of this source tree).
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please copy&past the header from another file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copied RocksDB header for work_queue.h.

* If `maxSize == 0` the queue size is unbounded.
*
* @param maxSize The maximum allowed size of the work queue.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these source code copied from somewhere else? If not I suggest you keep the comment format similar to other code: "//" for each line.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code was copied from facebook/zstd.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you put this information in a code comment? Maybe after line 27.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added. Also add similar information in work_queue_test.cc.


namespace rocksdb {

/// Unbounded thread-safe work queue.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was the class copied from somewhere? If it is from a proven library then fine. If not, we probably should at least add some unit tests. It's also worth thinking whether we can simplify it but still satisfy our performance requirement.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a unit test for work_queue.h, since there is an out-of-box unit test from facebook/zstd. Currently all methods in the class are used except waitUntilFinished, so maybe we could keep the class WorkQueue as it was in zstd repo.

@@ -450,6 +460,106 @@ struct BlockBasedTableBuilder::Rep {
~Rep() {}
};

class BlockBasedTableBuilder::ParallelCompressionRep {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to keep it a subclass of BlockBasedTableBuilder, or can we define a separate class for it? It feels that the relationship between the two classes are quite loose.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ParallelCompressionRep was designed as a helper class for parallel compression in block based table only. ParallelCompressionRep will only be used by BlockBasedTableBuilder, so I made it a private inner class so that it's not visible to other code, including those who used block_based_table_builder.h.

// releasing string memories during vector clear()
// in order to save memory allocation overhead
class Keys {
public:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you keep the coding convention? You can run "make format" to reformat it. I tried it and it also work with Ubuntu subsystem on Windows. Let me know if you want me to run it for you and give you a patch for formatting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've checked the make format results in latest commits. Sorry for the inconvenience.

@yzygitzh
Copy link
Contributor

ping @siying

Copy link
Contributor

@siying siying left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome! I don't have major comments in the main logic. More unit tests might be needed though.

* If `maxSize == 0` the queue size is unbounded.
*
* @param maxSize The maximum allowed size of the work queue.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you put this information in a code comment? Maybe after line 27.


// Get blocks from mem-table walking thread, compress them and
// pass them to the write thread. Used in parallel compression mode only
void WriteBlocks(CompressionContext& compression_ctx,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If my understanding is correct, the function is a function used by compression threads. I think we should try to think about a better name. Right now, it's a little bit hard for me to imaging that it is a long running function that keeps taking work item from a queue and process them until it is signaled to finish. Maybe including something like "thread". Or keeping the convention of some parts of the code, prefix "Bg" in the function name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename WriteBlocks to BGWorkCompression.

CompressionType& result_compression_type);

// Get compressed blocks from WriteBlocks and write them into SST
void WriteRawBlocks();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to WriteBlocks(). If my understand is correct, this is the function used by the block writing thread. Can you try to think about a better name? Something like including "thread" in the function name or something like that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename WriteRawBlocks to BGWorkWriteRawBlock. Get rid of "s" suffix to keep aligned with original WriteRawBlock.

@@ -0,0 +1,254 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only see a unit test for work queue, but I didn't see unit test that builds SST files with parallel compression. Maybe I missed something. Also it is preferred to have at least one unit test that covers the whole flow: from setting the option, generating SST files, and read them back and check the values are correct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I missed unit tests in former versions.
I leveraged existing RandomizedHarnessTest in table/table_test.cc as basic unit tests. They now also check cases where parallel compression is enabled.
I also enabled DBBasicTestWithParallelIO in db/db_basic_test.cc and DBTest2::CompressionOptions in db/db_test2.cc to check parallel compression cases.
These tests should all cover the whole flow.

Copy link
Contributor

@siying siying left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I gave it another pass and have more comments. I should have been more careful in the first round of reviews.
Again thank you for working on it and I believe it is very cool project.

r->index_builder->OnKeyAdded(key);
}
WriteBlock(Slice(data_block), &r->pending_handle,
true /* is_data_block */);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a problem: without parallel compression, inside this function call, rep_.data_begin_offset is updated, so that in BlockBasedTableBuilder::Add() we can determine the size of the file reaches limit so the file can terminate. But now, we don't know it until the background threads finished compressing the blocks.

I don't know a good way to solve the problem. Can we estimate the size and terminate the file in BlockBasedTableBuilder::Add()?

Also, can compression queue can be pointer that points to the object in the pool instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a problem: without parallel compression, inside this function call, rep_.data_begin_offset is updated, so that in BlockBasedTableBuilder::Add() we can determine the size of the file reaches limit so the file can terminate. But now, we don't know it until the background threads finished compressing the blocks.

I don't know a good way to solve the problem. Can we estimate the size and terminate the file in BlockBasedTableBuilder::Add()?

rep_.data_begin_offset will only be increased in kBuffered state, where parallel compression code path is not involved. I think that variable tracks raw size instead of compressed size, in a synchronized and single-threaded way. Only when the state is changed to kUnbuffered, code related to parallel compression is executed. As a result, code related to rep_.data_begin_offset should work well with parallel compression enabled.

There is such a problem in ProcessKeyValueCompaction in compaction_job.cc though, where current_output_file_size is updated by builder's FileSize(). However, maximum blocks inflight is bounded by number of compression threads (I'll explain that in another comment). As a result, the file size with parallel compression is bounded by original_file_size + compressed_block_size * number_of_compression_threads. I think it's acceptable in most cases.

Also, can compression queue can be pointer that points to the object in the pool instead?

I make all WorkQueue's point to pointers in the pool. Now block_rep_pool_, compress_queue_ and write_queue_ contain references (pointers) to data in block_rep_buf_.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add SST size estimation based on historical_compression_ratio * bytes_under_compression.

std::vector<std::string> keys;
size_t size;
};
Keys* keys_ptr;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We follow Google C++ Style and class member's naming convention is keys_ptr_: https://google.github.io/styleguide/cppguide.html#Variable_Names

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.


std::queue<T> queue_;
bool done_;
std::size_t maxSize_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I missed something but I didn't see maxSzie_ ever set in the queues used. Should we set something? Because the writer thread can be significantly faster than the background compression thread, without a limit to the queue size, we can end up with unlimited raw block in the queue, which consumes memory and make it harder to estimate file size, but does not help with anything.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The maximum size of queue, i.e. inflight blocks, was implicitly ensured because we have a determined number of BlockRep's, which is equal to number of compression threads. Each time we want to emit a block to compression, we have to fetch a BlockRep from its pool. I make this more explicit by adding setMaxSize calls in ParallelCompressionRep constructor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change setMaxSize to initialization in initialization list to keep the same convention as BlockBasedTableBuilder::Rep.

@@ -299,12 +300,13 @@ struct BlockBasedTableBuilder::Rep {
PartitionedIndexBuilder* p_index_builder_ = nullptr;

std::string last_key;
const Slice* first_key_in_next_block_ptr = nullptr;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to make it std::unique_ptr so that we don't have to do the cleaning up with delete?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

BlockRepSlot* slot_ptr;
};
typedef WorkQueue<BlockRep> BlockRepPool;
BlockRepPool block_rep_pool;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why the data structure is a WorkQueue and not a normal vector or deque?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BlockRep pool will be pushed by writer thread and popped by block building thread concurrently. This is to reuse memory and keep a determined number of inflight compression payloads. As a result, the pool has to be thread-safe. More comments are added around BlockRep definition for this.

CompressionType compression_type;
std::string* first_key_in_next_block_ptr;
Keys* keys_ptr;
BlockRepSlot* slot_ptr;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need ptr in those variable names. It's clear they are pointers because their types are pointers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

delete block_rep.compressed_data_ptr;
delete block_rep.first_key_in_next_block_ptr;
delete block_rep.keys_ptr;
delete block_rep.slot_ptr;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than delete them, can we make them unique_ptr?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

}
}

r->pc_rep->write_queue.push(std::move(block_rep.slot_ptr));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little bit confused here. We moved block_rep.slot_ptr, but we still seem to continue using and cleaning up this pointer. It seems to be contradicting. Do we need std::move() here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WorkQueue's push method only accepts r-value by design. We have to use std::move to wrap the pointer, but the data referenced by pointer is not moved, only pointer value, i.e. a scalar, is "moved". Actually, the pointer value is just copied into the queue.

slot_ptr is now made a unique_ptr, and unique_ptr::get() returns a r-value itself, so std::move is not necessary. But std::move for block_rep's is still needed, because it's a l-value.

}

r->pc_rep->write_queue.push(std::move(block_rep.slot_ptr));
r->pc_rep->compress_queue.push(std::move(block_rep));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This std::move() is confusing to me too. I'm not sure about the behavior we want for block_rep after the move.
My understanding is that, we want to reuse those allocated memory for keys, strings, first_key_in_next_block_ptr, etc. If that is the case, can we be more explicit here? If block_rep_pool keeps holding the ownership to all those objects pointed by those pointers, I don't think we should do std::move() here.

Either way, please add comments somewhere to explain the ownership for those objects.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When block_rep is a struct, std::move will only move its value (including pointers), but not the data it references. Now block_rep is a pointer, std::move will only move its value as well. I've added comments for object ownership around BlockRep definition and WorkQueue variable definitions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving a pointer is confusing too. My vote would be to move away from this move.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified WorkQueue design so it copies elements instead of moves them. As long as we avoid passing large elements directly to WorkQueue in future, this should be fine. We can always pass large objects by pointers.
This behavior is more similar to STL queue, and maybe less misleading.

@yzygitzh
Copy link
Contributor

yzygitzh commented Mar 9, 2020

@siying Sorry I'm busy writing my master thesis these days. I'll look into your comments by the end of this week.

@yzygitzh
Copy link
Contributor

Hi @siying , sorry for late reply.

I've fixed the code according your comments. Besides, another unit test for parallel compression is added in DBBasicTestWithTimestampCompressionSettings test in db/db_with_timestamp_basic_test.cc.

PTAL, thanks!

// inflight blocks being compressed when BlockBasedTableBuilder::FileSize()
// is called. The SST size inflation is bounded by
// parallel_threads * compressed_block_size.
//
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More demostrations about SST file inflation added.

@yzygitzh
Copy link
Contributor

Hi @siying ,

Again thanks for your comments! Several updates since last push:

  1. After discussing with @burtonli , I add a SST size estimation mechanism for parallel compression. It uses compression ratio so far in SST building and raw bytes under compression to estimate the size inflation, and add it to the file size on disk. The block building thread will wait for the completion of first block compression in order to get a valid initial compression ratio. This estimation should bound the inflation into the size of one compressed block approximately, similar to non-parallel case.

  2. I add TableBuilder::EstimatedFileSize for SST size estimation specially for compaction usage. Maybe in future we can make use of the historical compression ratio idea to estimate SST size in finer granularity during compaction, like after each TableBuilder::Add() call.

  3. I change BlockBasedTableBuilder::ParallelCompressionRep from class to a struct, following the same convention as BlockBasedTableBuilder::Rep struct.

PTAL. Thanks!

Copy link
Contributor

@siying siying left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for making the change. I don't have major comments anymore.

Please update summary of the pull request to be clearer. Consider to remove "Add Feature -" from the PR title to be more concise. Also add an entry HISTORY.md to explain this new feature.

// Make sure that we wrote enough to check all 7 levels
ASSERT_EQ(listener->max_level_checked, 6);
// Make sure that we wrote enough to check all 7 levels
ASSERT_EQ(listener->max_level_checked, 6);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we need a unit test that validates the data in the database. That is the keys are expected and not lost during the compaction process.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added consistency check between data written and data in the database. This should benefit original DBTest2::CompressionOptions test as well.

// because block building and compression are pipelined, so there might be
// inflight blocks being compressed and not finally written when
// BlockBasedTableBuilder::FileSize() is called. SST size is thus estimated
// in BlockBasedTableBuilder::EstimatedFileSize(), using historical
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a public header so ideally mentioning of internal function like BlockBasedTableBuilder::EstimatedFileSize() is not recommended. Imagine the readers of header files under include/rocksdb/ are RocksDB users who don't read source code. I think the mentioning of internal functions here can be removed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rewrote the option demonstration without code details.

}

r->pc_rep->write_queue.push(std::move(block_rep.slot_ptr));
r->pc_rep->compress_queue.push(std::move(block_rep));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving a pointer is confusing too. My vote would be to move away from this move.

if (rep_->compression_opts.parallel_threads > 1) {
// Use compression ratio so far and inflight raw bytes to estimate
// final SST size.
return FileSize() + uint64_t((rep_->pc_rep->raw_bytes_submitted -
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We follow Google C++ Style, which bans C style casting: https://google.github.io/styleguide/cppguide.html#Casting try to use a C++ style casting instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

r->pc_rep->curr_compression_ratio =
(r->pc_rep->curr_compression_ratio * r->pc_rep->raw_bytes_compressed +
block_rep->compressed_data->size()) /
double(r->pc_rep->raw_bytes_compressed + block_rep->data->size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We follow Google C++ Style, which bans C style casting: https://google.github.io/styleguide/cppguide.html#Casting try to use a C++ style casting instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@cp5555 cp5555 changed the title Add Feature - Add pipelined & parallel compression optimization Add pipelined & parallel compression optimization Mar 25, 2020
@yzygitzh
Copy link
Contributor

Hi @siying ,

I've finished addressing latest comments, main changes include:

  1. Modified the design of WorkQueue so that we can get rid of std::move.
  2. Updated HISTORY.md.
  3. Comments and code style fixing. Commit messages, PR title and PR description are also polished.

PTAL. Thanks!

Copy link
Contributor

@facebook-github-bot facebook-github-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@siying has imported this pull request. If you are a Facebook employee, you can view this diff on Phabricator.

Copy link
Contributor

@facebook-github-bot facebook-github-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@siying has imported this pull request. If you are a Facebook employee, you can view this diff on Phabricator.

Copy link
Contributor

@siying siying left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it mostly looks good to me. Sorry that I have a late comment.

We also need to add an option to to the stress test tool. It can be done in a follow up pull request but needs to be done. Start with adding an option in db_stress to cover it. You can start with looking at how DB options are set up in db_stress_tool/db_stress_test_base.cc and add an option for parallel level. After doing that, set it up by a chance in tools/db_crashtest.py.

Thanks again for working on such a complicated feature. We are really close to land it.

Rep* r = rep_;
ParallelCompressionRep::BlockRepSlot* slot;
ParallelCompressionRep::BlockRep* block_rep;
while (r->pc_rep->write_queue.pop(slot)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if r->status is OK, we should exit the loop and avoid calling WriteRawBlock() again. I believe inside WriteRawBlock() we only assert status OK and in release mode we would just override the status, which can cause problem. I think it's safer to just exit the loop if r->status.

Ideally the failure case is tested.

Sorry for this late comment. I should have noticed it earlier. But this is serious and we have to fix it before commit the feature.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Did you mean exiting the loop when r->status is NOT OK?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Sorry for the typo.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, there seems to be a data race between the main thread and the block writer thread with rep_->status().

Those variables added to estimate file size might also have a data race.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For variables to estimate file size,

raw_bytes_submitted will only be updated and accessed in block building thread, should be safe;

raw_bytes_compressed and curr_compression_ratio will be updated in writer thread, and accessed in block building thread. Their updates should be single-threaded. There might be the case where r->offset is already updated, but raw_bytes_compressed and curr_compression_ratio are not updated yet, causing an estimation error of 1-compressed-block-size. Shall we make these updates atomic, or you think it's acceptable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that, in orignal BlockBasedTableBuilder, when compression is aborted, rep_->status is set to Status::Corruption, but WriteRawBlock is still called. Is WriteRawBlock meant to write uncompressed data here, or we should return before WriteRawBlock is called?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the data race for variables related to file size estimation, making the variables used by BlockBasedTableBuilder::EstimatedFileSize atomic should be good enough to me.

The compression validation failure seems to be a bug. We don't have to fix the bug with non-parallel case. But if it is fixed in parallel writer case, it is great.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variables used by BlockBasedTableBuilder::EstimatedFileSize is all protected by estimation_mutex. This should lead to more accurate (and predictable) estimation then separate atomic variables.

I've added if (!ok()) check for compression in both parallel and non-parallel cases. A fake faulting compressor is needed in future for thorough testing though.

@facebook-github-bot
Copy link
Contributor

@cp5555 has updated the pull request. Re-import the pull request

@yzygitzh
Copy link
Contributor

yzygitzh commented Mar 26, 2020

Hi @siying ,

Again thanks for your continuous efforts on code review. It has been really helpful because I'm not quite familiar with RocksDB design principles, and I've learned a lot during the code revision.

For problems about error checking and data racing, here comes the updates:

  1. Seems original non-parallel code was not checking status between compression and WriteRawBlock(). I suppose it should be checked because as you said, there is an assert in WriteRawBlock(). So I add the checking, and now it returns if status is not OK before WriteRawBlock().

  2. Before BlockBasedTableBuilder::Finish(), main thread and writer thread might access rep_->status, compression thread(s) and write thread might update rep_->status. In order to make those updates and accesses thread-safe,

    • I add a field status in BlockRep as compression status output, so compression thread(s) will only update their own block's status. As a result, compression threads are safe. These local status's are checked and assigned to global status (if non-OK) once block_rep is popped out in write thread.
    • I wrap BlockBasedTableBuilder::status into a mutex, and introduce SetStatusAtomic to set status within the same mutex. As a result, main thread and writer thread are safe.
  3. In order to test status fail in compression, seems we have to write a fake faulting compressor, and it's not trivial. Maybe I can work on this as another PR, coming alongside with db_stress PR.

  4. I also wrap SST size estimation operations into a mutex so there should be no data race. Besides, the estimation considers kBlockTrailerSize size now.

  5. According to my local test, there is no performance drop with former synchronizations equipped.

PTAL. Thanks!

Ziyue

yzygitzh added a commit to yzygitzh/rocksdb that referenced this pull request Oct 21, 2020
This commit makes facebook#6262's code change tidier and easier to understand by:

1. Wrapping parallel compression initialization and termination into
   common methods;
2. Wrapping BlockRep initialization, push/pop into common methods;
3. Wrapping file size estimation into common methods;
4. Fixing function declarations that use non-const reference;
5. Fixing some uninitialized variables;
6. Fixing first_block data race;
7. Making BlockRep::status check in BlockBasedTableBuilder::Finish only present
if ok().
8. Making assert(ok()) in BlockBasedTableBuilder::CompressAndVerifyBlock only
present in non-parallel compression mode. In parallel compression mode,
compression will abort if status is not OK.
9. Eliminating potential data race caused by BlockBasedTableBuilder::GetStatus()
and BlockBasedTableBuilder::GetIOStatus() by returning status copy instead of
unprotected reference.
facebook-github-bot pushed a commit that referenced this pull request Oct 22, 2020
Summary:
This commit makes #6262 code change tidier and easier to understand by:

1. Wrapping parallel compression initialization and termination into
   common methods;
2. Wrapping BlockRep initialization, push/pop into common methods;
3. Wrapping file size estimation into common methods;
4. Fixing function declarations that use non-const reference;
5. Fixing some uninitialized variables;
6. Fixing first_block data race;
7. Making BlockRep::status check in BlockBasedTableBuilder::Finish only present
if ok();
8. Making assert(ok()) in BlockBasedTableBuilder::CompressAndVerifyBlock only
present in non-parallel compression mode. In parallel compression mode,
compression will abort if status is not OK;
9. Eliminating potential data race caused by BlockBasedTableBuilder::GetStatus()
and BlockBasedTableBuilder::GetIOStatus() by returning status copy instead of
unprotected reference.

Pull Request resolved: #6888

Reviewed By: ajkr

Differential Revision: D21957110

Pulled By: jay-zhuang

fbshipit-source-id: 3a29892f249209513f030349756cecd7736eae80
tabokie pushed a commit to tabokie/rocksdb that referenced this pull request Nov 9, 2020
Summary:
This PR adds support for pipelined & parallel compression optimization for `BlockBasedTableBuilder`. This optimization makes block building, block compression and block appending a pipeline, and uses multiple threads to accelerate block compression. Users can set `CompressionOptions::parallel_threads` greater than 1 to enable compression parallelism.
Pull Request resolved: facebook#6262

Reviewed By: ajkr

Differential Revision: D20651306

fbshipit-source-id: 62125590a9c15b6d9071def9dc72589c1696a4cb

Signed-off-by: tabokie <xy.tao@outlook.com>
tabokie pushed a commit to tabokie/rocksdb that referenced this pull request Nov 9, 2020
Summary:
With facebook#6262, UBSAN fails with "division by zero":

[ RUN      ] Timestamp/DBBasicTestWithTimestampCompressionSettings.PutAndGetWithCompaction/3
internal_repo_rocksdb/repo/table/block_based/block_based_table_builder.cc:1066:39: runtime error: division by zero
    #0 0x7ffb3117b071 in rocksdb::BlockBasedTableBuilder::WriteRawBlock(rocksdb::Slice const&, rocksdb::CompressionType, rocksdb::BlockHandle*, bool) internal_repo_rocksdb/repo/table/block_based/block_based_table_builder.cc:1066
    facebook#1 0x7ffb311775e1 in rocksdb::BlockBasedTableBuilder::WriteBlock(rocksdb::Slice const&, rocksdb::BlockHandle*, bool) internal_repo_rocksdb/repo/table/block_based/block_based_table_builder.cc:848
    facebook#2 0x7ffb311771a2 in rocksdb::BlockBasedTableBuilder::WriteBlock(rocksdb::BlockBuilder*, rocksdb::BlockHandle*, bool) internal_repo_rocksdb/repo/table/block_based/block_based_table_builder.cc:832

This is caused by not returning immediately after CompressAndVerifyBlock call
in WriteBlock when rep_->status == kBuffered.
Pull Request resolved: facebook#6633

Test Plan: Run all existing test.

Reviewed By: anand1976

Differential Revision: D20808366

fbshipit-source-id: 09f24b7c0fbaf4c7a8fc48cac61fa6fcb9b85811

Signed-off-by: tabokie <xy.tao@outlook.com>
tabokie pushed a commit to tabokie/rocksdb that referenced this pull request Nov 9, 2020
…ok#6635)

Summary:
This failure was introduced in facebook#6262
Pull Request resolved: facebook#6635

Reviewed By: siying

Differential Revision: D20822602

Pulled By: anand1976

fbshipit-source-id: 96b316816cce6b95b092a7fc46ea968ed6ba8809

Signed-off-by: tabokie <xy.tao@outlook.com>
tabokie pushed a commit to tabokie/rocksdb that referenced this pull request Nov 10, 2020
Summary:
facebook#6262 causes CLANG analyze to complain. Add assertion to suppress the warning.
Pull Request resolved: facebook#6641

Test Plan: Run "clang analyze" and make sure it passes.

Reviewed By: anand1976

Differential Revision: D20841722

fbshipit-source-id: 5fa6e0c5cfe7a822214c9b898a408df59d4fd2cd

Signed-off-by: tabokie <xy.tao@outlook.com>
tabokie pushed a commit to tabokie/rocksdb that referenced this pull request Nov 10, 2020
Summary:
This commit makes facebook#6262 code change tidier and easier to understand by:

1. Wrapping parallel compression initialization and termination into
   common methods;
2. Wrapping BlockRep initialization, push/pop into common methods;
3. Wrapping file size estimation into common methods;
4. Fixing function declarations that use non-const reference;
5. Fixing some uninitialized variables;
6. Fixing first_block data race;
7. Making BlockRep::status check in BlockBasedTableBuilder::Finish only present
if ok();
8. Making assert(ok()) in BlockBasedTableBuilder::CompressAndVerifyBlock only
present in non-parallel compression mode. In parallel compression mode,
compression will abort if status is not OK;
9. Eliminating potential data race caused by BlockBasedTableBuilder::GetStatus()
and BlockBasedTableBuilder::GetIOStatus() by returning status copy instead of
unprotected reference.

Pull Request resolved: facebook#6888

Reviewed By: ajkr

Differential Revision: D21957110

Pulled By: jay-zhuang

fbshipit-source-id: 3a29892f249209513f030349756cecd7736eae80

Signed-off-by: tabokie <xy.tao@outlook.com>
codingrhythm pushed a commit to SafetyCulture/rocksdb that referenced this pull request Mar 5, 2021
Summary:
This commit makes facebook#6262 code change tidier and easier to understand by:

1. Wrapping parallel compression initialization and termination into
   common methods;
2. Wrapping BlockRep initialization, push/pop into common methods;
3. Wrapping file size estimation into common methods;
4. Fixing function declarations that use non-const reference;
5. Fixing some uninitialized variables;
6. Fixing first_block data race;
7. Making BlockRep::status check in BlockBasedTableBuilder::Finish only present
if ok();
8. Making assert(ok()) in BlockBasedTableBuilder::CompressAndVerifyBlock only
present in non-parallel compression mode. In parallel compression mode,
compression will abort if status is not OK;
9. Eliminating potential data race caused by BlockBasedTableBuilder::GetStatus()
and BlockBasedTableBuilder::GetIOStatus() by returning status copy instead of
unprotected reference.

Pull Request resolved: facebook#6888

Reviewed By: ajkr

Differential Revision: D21957110

Pulled By: jay-zhuang

fbshipit-source-id: 3a29892f249209513f030349756cecd7736eae80
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants