Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RemoteCompaction support Fallback to local compaction #8709

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* Add compaction priority information in RemoteCompaction, which can be used to schedule high priority job first.
* Added new callback APIs `OnBlobFileCreationStarted`,`OnBlobFileCreated`and `OnBlobFileDeleted` in `EventListener` class of listener.h. It notifies listeners during creation/deletion of individual blob files in Integrated BlobDB. It also log blob file creation finished event and deletion event in LOG file.
* Batch blob read requests for `DB::MultiGet` using `MultiRead`.
* Add support for fallback to local compaction, the user can return `CompactionServiceJobStatus::kUseLocal` to instruct RocksDB to run the compaction locally instead of waiting for the remote compaction result.

### Public API change
* Remove obsolete implementation details FullKey and ParseFullKey from public API
Expand Down
88 changes: 67 additions & 21 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,8 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
}

#ifndef ROCKSDB_LITE
void CompactionJob::ProcessKeyValueCompactionWithCompactionService(
CompactionServiceJobStatus
CompactionJob::ProcessKeyValueCompactionWithCompactionService(
SubcompactionState* sub_compact) {
assert(sub_compact);
assert(sub_compact->compaction);
Expand Down Expand Up @@ -969,7 +970,7 @@ void CompactionJob::ProcessKeyValueCompactionWithCompactionService(
Status s = compaction_input.Write(&compaction_input_binary);
if (!s.ok()) {
sub_compact->status = s;
return;
return CompactionServiceJobStatus::kFailure;
}

std::ostringstream input_files_oss;
Expand All @@ -988,36 +989,73 @@ void CompactionJob::ProcessKeyValueCompactionWithCompactionService(
GetCompactionId(sub_compact), thread_pri_);
CompactionServiceJobStatus compaction_status =
db_options_.compaction_service->StartV2(info, compaction_input_binary);
if (compaction_status != CompactionServiceJobStatus::kSuccess) {
sub_compact->status =
Status::Incomplete("CompactionService failed to start compaction job.");
return;
switch (compaction_status) {
case CompactionServiceJobStatus::kSuccess:
break;
case CompactionServiceJobStatus::kFailure:
sub_compact->status = Status::Incomplete(
"CompactionService failed to start compaction job.");
ROCKS_LOG_WARN(db_options_.info_log,
"[%s] [JOB %d] Remote compaction failed to start.",
compaction_input.column_family.name.c_str(), job_id_);
return compaction_status;
case CompactionServiceJobStatus::kUseLocal:
ROCKS_LOG_INFO(
db_options_.info_log,
"[%s] [JOB %d] Remote compaction fallback to local by API Start.",
compaction_input.column_family.name.c_str(), job_id_);
return compaction_status;
default:
assert(false); // unknown status
break;
}

ROCKS_LOG_INFO(db_options_.info_log,
"[%s] [JOB %d] Waiting for remote compaction...",
compaction_input.column_family.name.c_str(), job_id_);
std::string compaction_result_binary;
compaction_status = db_options_.compaction_service->WaitForCompleteV2(
info, &compaction_result_binary);

if (compaction_status == CompactionServiceJobStatus::kUseLocal) {
ROCKS_LOG_INFO(db_options_.info_log,
"[%s] [JOB %d] Remote compaction fallback to local by API "
"WaitForComplete.",
compaction_input.column_family.name.c_str(), job_id_);
return compaction_status;
}

CompactionServiceResult compaction_result;
s = CompactionServiceResult::Read(compaction_result_binary,
&compaction_result);
if (compaction_status != CompactionServiceJobStatus::kSuccess) {
sub_compact->status =
s.ok() ? compaction_result.status
: Status::Incomplete(
"CompactionService failed to run compaction job.");
compaction_result.status.PermitUncheckedError();

if (compaction_status == CompactionServiceJobStatus::kFailure) {
if (s.ok()) {
if (compaction_result.status.ok()) {
sub_compact->status = Status::Incomplete(
"CompactionService failed to run the compaction job (even though "
"the internal status is okay).");
} else {
// set the current sub compaction status with the status returned from
// remote
sub_compact->status = compaction_result.status;
}
} else {
sub_compact->status = Status::Incomplete(
"CompactionService failed to run the compaction job (and no valid "
"result is returned).");
compaction_result.status.PermitUncheckedError();
}
ROCKS_LOG_WARN(db_options_.info_log,
"[%s] [JOB %d] Remote compaction failed, status: %s",
compaction_input.column_family.name.c_str(), job_id_,
s.ToString().c_str());
return;
"[%s] [JOB %d] Remote compaction failed.",
compaction_input.column_family.name.c_str(), job_id_);
return compaction_status;
}

if (!s.ok()) {
sub_compact->status = s;
compaction_result.status.PermitUncheckedError();
return;
return CompactionServiceJobStatus::kFailure;
}
sub_compact->status = compaction_result.status;

Expand All @@ -1037,7 +1075,7 @@ void CompactionJob::ProcessKeyValueCompactionWithCompactionService(

if (!s.ok()) {
sub_compact->status = s;
return;
return CompactionServiceJobStatus::kFailure;
}

for (const auto& file : compaction_result.output_files) {
Expand All @@ -1048,15 +1086,15 @@ void CompactionJob::ProcessKeyValueCompactionWithCompactionService(
s = fs_->RenameFile(src_file, tgt_file, IOOptions(), nullptr);
if (!s.ok()) {
sub_compact->status = s;
return;
return CompactionServiceJobStatus::kFailure;
}

FileMetaData meta;
uint64_t file_size;
s = fs_->GetFileSize(tgt_file, IOOptions(), &file_size, nullptr);
if (!s.ok()) {
sub_compact->status = s;
return;
return CompactionServiceJobStatus::kFailure;
}
meta.fd = FileDescriptor(file_num, compaction->output_path_id(), file_size,
file.smallest_seqno, file.largest_seqno);
Expand All @@ -1077,6 +1115,7 @@ void CompactionJob::ProcessKeyValueCompactionWithCompactionService(
sub_compact->total_bytes = compaction_result.total_bytes;
IOSTATS_ADD(bytes_written, compaction_result.bytes_written);
IOSTATS_ADD(bytes_read, compaction_result.bytes_read);
return CompactionServiceJobStatus::kSuccess;
}
#endif // !ROCKSDB_LITE

Expand All @@ -1086,7 +1125,14 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {

#ifndef ROCKSDB_LITE
if (db_options_.compaction_service) {
return ProcessKeyValueCompactionWithCompactionService(sub_compact);
CompactionServiceJobStatus comp_status =
ProcessKeyValueCompactionWithCompactionService(sub_compact);
if (comp_status == CompactionServiceJobStatus::kSuccess ||
comp_status == CompactionServiceJobStatus::kFailure) {
return;
}
// fallback to local compaction
assert(comp_status == CompactionServiceJobStatus::kUseLocal);
}
#endif // !ROCKSDB_LITE

Expand Down
2 changes: 1 addition & 1 deletion db/compaction/compaction_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class CompactionJob {
// consecutive groups such that each group has a similar size.
void GenSubcompactionBoundaries();

void ProcessKeyValueCompactionWithCompactionService(
CompactionServiceJobStatus ProcessKeyValueCompactionWithCompactionService(
SubcompactionState* sub_compact);

// update the thread status for starting a compaction.
Expand Down
127 changes: 126 additions & 1 deletion db/compaction/compaction_service_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ class TestCompactionServiceBase {
override_start_status = s;
}

void OverrideWaitStatus(CompactionServiceJobStatus s) {
is_override_wait_status = true;
override_wait_status = s;
}

void OverrideWaitResult(std::string str) {
is_override_wait_result = true;
override_wait_result = std::move(str);
Expand All @@ -27,6 +32,7 @@ class TestCompactionServiceBase {
void ResetOverride() {
is_override_wait_result = false;
is_override_start_status = false;
is_override_wait_status = false;
}

virtual ~TestCompactionServiceBase() = default;
Expand All @@ -35,6 +41,9 @@ class TestCompactionServiceBase {
bool is_override_start_status = false;
CompactionServiceJobStatus override_start_status =
CompactionServiceJobStatus::kFailure;
bool is_override_wait_status = false;
CompactionServiceJobStatus override_wait_status =
CompactionServiceJobStatus::kFailure;
bool is_override_wait_result = false;
std::string override_wait_result;
};
Expand Down Expand Up @@ -76,6 +85,10 @@ class MyTestCompactionServiceLegacy : public CompactionService,
jobs_.erase(i);
}

if (is_override_wait_status) {
return override_wait_status;
}

CompactionServiceOptionsOverride options_override;
options_override.env = options_.env;
options_override.file_checksum_gen_factory =
Expand Down Expand Up @@ -160,6 +173,10 @@ class MyTestCompactionService : public CompactionService,
jobs_.erase(i);
}

if (is_override_wait_status) {
return override_wait_status;
}

CompactionServiceOptionsOverride options_override;
options_override.env = options_.env;
options_override.file_checksum_gen_factory =
Expand Down Expand Up @@ -323,7 +340,7 @@ TEST_P(CompactionServiceTest, BasicCompactions) {
Statistics* compactor_statistics = GetCompactorStatistics();
ASSERT_GE(my_cs->GetCompactionNum(), 1);

// make sure the compaction statistics is only recorded on remote side
// make sure the compaction statistics is only recorded on the remote side
ASSERT_GE(
compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), 1);
Statistics* primary_statistics = GetPrimaryStatistics();
Expand Down Expand Up @@ -658,6 +675,114 @@ TEST_P(CompactionServiceTest, CompactionInfo) {
ASSERT_EQ(Env::BOTTOM, info.priority);
}

TEST_P(CompactionServiceTest, FallbackLocalAuto) {
Options options = CurrentOptions();
ReopenWithCompactionService(&options);

auto my_cs = GetCompactionService();
Statistics* compactor_statistics = GetCompactorStatistics();
Statistics* primary_statistics = GetPrimaryStatistics();
uint64_t compactor_new_key =
compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY);
uint64_t primary_new_key =
primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY);

my_cs->OverrideStartStatus(CompactionServiceJobStatus::kUseLocal);

for (int i = 0; i < 20; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 10 + j;
ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id)));
}
ASSERT_OK(Flush());
}

for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 20 + j * 2;
ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id)));
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());

// verify result
for (int i = 0; i < 200; i++) {
auto result = Get(Key(i));
if (i % 2) {
ASSERT_EQ(result, "value" + ToString(i));
} else {
ASSERT_EQ(result, "value_new" + ToString(i));
}
}

ASSERT_EQ(my_cs->GetCompactionNum(), 0);

// make sure the compaction statistics is only recorded on the local side
ASSERT_EQ(
compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY),
compactor_new_key);
ASSERT_GT(primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY),
primary_new_key);
}

TEST_P(CompactionServiceTest, FallbackLocalManual) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
ReopenWithCompactionService(&options);

GenerateTestData();
VerifyTestData();

auto my_cs = GetCompactionService();
Statistics* compactor_statistics = GetCompactorStatistics();
Statistics* primary_statistics = GetPrimaryStatistics();
uint64_t compactor_new_key =
compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY);
uint64_t primary_new_key =
primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY);

// re-enable remote compaction
my_cs->ResetOverride();
std::string start_str = Key(15);
std::string end_str = Key(45);
Slice start(start_str);
Slice end(end_str);
uint64_t comp_num = my_cs->GetCompactionNum();

ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
// make sure the compaction statistics is only recorded on the remote side
ASSERT_GT(
compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY),
compactor_new_key);
ASSERT_EQ(primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY),
primary_new_key);

// return run local again with API WaitForComplete
my_cs->OverrideWaitStatus(CompactionServiceJobStatus::kUseLocal);
start_str = Key(120);
start = start_str;
comp_num = my_cs->GetCompactionNum();
compactor_new_key =
compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY);
primary_new_key =
primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY);

ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, nullptr));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we expect some stat to have increased on primary side , like COMPACTION_BYTES_READ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added check for statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), which shows the updated keys.
Not sure how we could check COMPACTION_BYTES_READ, seems we need to do GetThreadList() and check?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, thanks. COMPACTION_BYTES_READ is also a stat you can get from statistics->getTickerCount().

Is this the fix?

  ASSERT_GE(primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY),
            primary_new_key);

If no compaction happened on the primary would that still pass because the ticker stat would equal primary_new_key?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my mistake, it should ASSERT_GT not GE.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the confusion. COMPACT_READ_BYTES is the name of the ticker stat as we discussed.

Copy link
Contributor Author

@jay-zhuang jay-zhuang Sep 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense.
But currently, we record the compact_read/write_bytes on both remote and local side (which should be fixed):
https://github.com/facebook/rocksdb/blob/9dd09dc4f302f2de5a2c34ab2cbc4c393b4503cc/db/compaction/compaction_job.cc#L1117-L1118
I think we did that at the beginning just to make sure with/without remote compaction it behaves exactly the same, which it's not necessary. I'll fix that and maybe add new metrics for remote compaction if we need record that.
On the other hand, I think because of the how the test CompactionService is implemented (which runs the remote compaction with the local compaction thread), the read/write bytes is still counted locally. I think we may need to update the test CompactionService, maybe to a separate process (ideally).
Anyway, I'm not going to fix it in this diff. Will follow up on that.

ASSERT_EQ(my_cs->GetCompactionNum(),
comp_num); // no remote compaction is run
// make sure the compaction statistics is only recorded on the local side
ASSERT_EQ(
compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY),
compactor_new_key);
ASSERT_GT(primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY),
primary_new_key);

// verify result after 2 manual compactions
VerifyTestData();
}

INSTANTIATE_TEST_CASE_P(
CompactionServiceTest, CompactionServiceTest,
::testing::Values(
Expand Down
2 changes: 1 addition & 1 deletion include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ extern const char* kHostnameForDbHostId;
enum class CompactionServiceJobStatus : char {
kSuccess,
kFailure,
kUseLocal, // TODO: Add support for use local compaction
kUseLocal,
};

struct CompactionServiceJobInfo {
Expand Down