Skip to content

Commit

Permalink
Resume compaction
Browse files Browse the repository at this point in the history
  • Loading branch information
hx235 committed Dec 23, 2023
1 parent 106058c commit 7b881cd
Show file tree
Hide file tree
Showing 24 changed files with 963 additions and 100 deletions.
136 changes: 132 additions & 4 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1139,21 +1139,107 @@ void ColumnFamilyData::CreateNewMemtable(

bool ColumnFamilyData::NeedsCompaction() const {
return !mutable_cf_options_.disable_auto_compactions &&
compaction_picker_->NeedsCompaction(current_->storage_info());
(!resumable_compactions_of_all_db_sessions_.empty() ||
compaction_picker_->NeedsCompaction(current_->storage_info()));
}

Compaction* ColumnFamilyData::PickCompaction(
const MutableCFOptions& mutable_options,
const MutableDBOptions& mutable_db_options, LogBuffer* log_buffer) {
auto* result = compaction_picker_->PickCompaction(
GetName(), mutable_options, mutable_db_options, current_->storage_info(),
log_buffer);
Compaction* result =
PickResumableCompaction(mutable_options, mutable_db_options);
if (result == nullptr) {
result = compaction_picker_->PickCompaction(
GetName(), mutable_options, mutable_db_options,
current_->storage_info(), log_buffer);
}
if (result != nullptr) {
result->FinalizeInputInfo(current_);
}
return result;
}

Compaction* ColumnFamilyData::PickResumableCompaction(
const MutableCFOptions& mutable_options,
const MutableDBOptions& mutable_db_options) {
Compaction* result = nullptr;

while (!resumable_compactions_of_all_db_sessions_.empty()) {
auto iter_1 = resumable_compactions_of_all_db_sessions_.begin();
auto db_session_id = iter_1->first;
ResumableCompactionsPerDBSession resumable_compactions_per_db_session =
iter_1->second;
while (!resumable_compactions_per_db_session.empty()) {
auto iter_2 = resumable_compactions_per_db_session.begin();
auto compaction_job_id = iter_2->first;
ResumableCompaction resumable_compaction = iter_2->second;

resumable_compactions_per_db_session.erase(iter_2);
if (resumable_compactions_per_db_session.empty()) {
resumable_compactions_of_all_db_sessions_.erase(iter_1);
}

result = GetCompactionFromResumableCompaction(
db_session_id, compaction_job_id, resumable_compaction,
mutable_options, mutable_db_options);

if (result != nullptr) {
return result;
}
}
}

return result;
}

Compaction* ColumnFamilyData::GetCompactionFromResumableCompaction(
std::string& db_session_id, uint64_t compaction_job_id,
ResumableCompaction& resumable_compaction,
const MutableCFOptions& mutable_options,
const MutableDBOptions& mutable_db_options) const {
std::vector<CompactionInputFiles> inputs;

for (const auto& resumable_compaction_inputs_per_level :
resumable_compaction.inputs) {
CompactionInputFiles compaction_input_files;
compaction_input_files.level = resumable_compaction_inputs_per_level.first;
if (resumable_compaction_inputs_per_level.second.empty()) {
return nullptr;
}
for (const auto& compaction_input_file :
resumable_compaction_inputs_per_level.second) {
FileMetaData* file = current_->storage_info()->GetFileMetaDataByNumber(
compaction_input_file);
if (file == nullptr || file->being_compacted) {
return nullptr;
} else {
compaction_input_files.files.push_back(file);
}
}
inputs.push_back(compaction_input_files);
}

if (compaction_picker_->FilesRangeOverlapWithCompaction(
inputs, resumable_compaction.output_level,
resumable_compaction.penultimate_output_level)) {
return nullptr;
}

Compaction* result = new Compaction(
current_->storage_info(), ioptions_, mutable_options, mutable_db_options,
inputs, resumable_compaction.output_level,
std::numeric_limits<uint64_t>::max(),
std::numeric_limits<uint64_t>::max(), 0, CompressionType::kNoCompression,
CompressionOptions(), Temperature::kUnknown, 0 /* max_subcompactions */,
{}, false, "", -1, false, true, CompactionReason::kResumableCompaction,
BlobGarbageCollectionPolicy::kUseDefault, -1,
{db_session_id, compaction_job_id, resumable_compaction});

compaction_picker_->RegisterCompaction(result);

return result;
}

bool ColumnFamilyData::RangeOverlapWithCompaction(
const Slice& smallest_user_key, const Slice& largest_user_key,
int level) const {
Expand Down Expand Up @@ -1608,6 +1694,48 @@ void ColumnFamilyData::RecoverEpochNumbers() {
vstorage->RecoverEpochNumbers(this);
}

void ColumnFamilyData::AddResumableCompaction(
const ResumableCompactionInfo& info) {
const uint64_t compaction_job_id =
GetCompactionJobIdFromCompactionID(info.compaction_id);
const uint64_t subcompaction_job_id =
GetSubCompactionJobIdFromCompactionID(info.compaction_id);

auto iter_1 =
resumable_compactions_of_all_db_sessions_.find(info.db_session_id);
if (iter_1 == resumable_compactions_of_all_db_sessions_.end()) {
resumable_compactions_of_all_db_sessions_.insert({info.db_session_id, {}});
}
ResumableCompactionsPerDBSession& resumable_compactions_per_db_session =
resumable_compactions_of_all_db_sessions_[info.db_session_id];

auto iter_2 = resumable_compactions_per_db_session.find(compaction_job_id);
if (iter_2 == resumable_compactions_per_db_session.end()) {
ResumableCompaction resumable_compaction = {
info.inputs,
info.output_level,
info.penultimate_output_level,
info.penultimate_output_level_smallest,
info.penultimate_output_level_largest,
{}};
resumable_compactions_per_db_session.insert(
{compaction_job_id, resumable_compaction});
}

const bool has_subcompaction = info.subcompaction_start.size() != 0 ||
info.subcompaction_end.size() != 0;
if (has_subcompaction) {
ResumableCompaction& resumable_compaction =
resumable_compactions_per_db_session[compaction_job_id];
assert(resumable_compaction.subcompaction_job_id_to_range.find(
subcompaction_job_id) ==
resumable_compaction.subcompaction_job_id_to_range.end());
resumable_compaction.subcompaction_job_id_to_range.insert(
{subcompaction_job_id,
{info.subcompaction_start, info.subcompaction_end}});
}
}

ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
const ImmutableDBOptions* db_options,
const FileOptions& file_options,
Expand Down
31 changes: 31 additions & 0 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,23 @@ class BlobFileCache;
class BlobSource;

extern const double kIncSlowdownRatio;

struct ResumableCompaction {
std::vector<std::pair<int, std::vector<uint64_t>>> inputs;
int output_level;
int penultimate_output_level;
InternalKey penultimate_output_level_smallest;
InternalKey penultimate_output_level_largest;
std::map<uint64_t, std::pair<std::string, std::string>>
subcompaction_job_id_to_range;
};

using ResumableCompactionsPerDBSession =
std::map<uint64_t /* compaction job id */, ResumableCompaction>;

using ResumableCompactionsOfAllDBSessions =
std::map<std::string /* db session id */, ResumableCompactionsPerDBSession>;

// This file contains a list of data structures for managing column family
// level metadata.
//
Expand Down Expand Up @@ -549,6 +566,8 @@ class ColumnFamilyData {
// of its files (if missing)
void RecoverEpochNumbers();

void AddResumableCompaction(const ResumableCompactionInfo& info);

private:
friend class ColumnFamilySet;
ColumnFamilyData(uint32_t id, const std::string& name,
Expand All @@ -564,6 +583,16 @@ class ColumnFamilyData {

std::vector<std::string> GetDbPaths() const;

Compaction* PickResumableCompaction(
const MutableCFOptions& mutable_options,
const MutableDBOptions& mutable_db_options);

Compaction* GetCompactionFromResumableCompaction(
std::string& db_session_id, uint64_t compaction_job_id,
ResumableCompaction& resumable_compaction,
const MutableCFOptions& mutable_options,
const MutableDBOptions& mutable_db_options) const;

uint32_t id_;
const std::string name_;
Version* dummy_versions_; // Head of circular doubly-linked list of versions.
Expand Down Expand Up @@ -650,6 +679,8 @@ class ColumnFamilyData {
bool mempurge_used_;

std::atomic<uint64_t> next_epoch_number_;

ResumableCompactionsOfAllDBSessions resumable_compactions_of_all_db_sessions_;
};

// ColumnFamilySet has interesting thread-safety requirements
Expand Down
26 changes: 20 additions & 6 deletions db/compaction/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,9 @@ Compaction::Compaction(
bool _deletion_compaction, bool l0_files_might_overlap,
CompactionReason _compaction_reason,
BlobGarbageCollectionPolicy _blob_garbage_collection_policy,
double _blob_garbage_collection_age_cutoff)
double _blob_garbage_collection_age_cutoff,
std::tuple<std::string, uint64_t, ResumableCompaction>
resumable_compaction_info)
: input_vstorage_(vstorage),
start_level_(_inputs[0].level),
output_level_(_output_level),
Expand Down Expand Up @@ -333,11 +335,15 @@ Compaction::Compaction(
_blob_garbage_collection_age_cutoff > 1
? mutable_cf_options()->blob_garbage_collection_age_cutoff
: _blob_garbage_collection_age_cutoff),
resumable_compaction_info_(resumable_compaction_info),
penultimate_level_(
// For simplicity, we don't support the concept of "penultimate level"
// with `CompactionReason::kExternalSstIngestion` and
// `CompactionReason::kRefitLevel`
_compaction_reason == CompactionReason::kExternalSstIngestion ||
IsResumableCompaction()
? GetResumableCompaction().penultimate_output_level
:
// For simplicity, we don't support the concept of "penultimate
// level" with `CompactionReason::kExternalSstIngestion` and
// `CompactionReason::kRefitLevel`
_compaction_reason == CompactionReason::kExternalSstIngestion ||
_compaction_reason == CompactionReason::kRefitLevel
? Compaction::kInvalidLevel
: EvaluatePenultimateLevel(vstorage, immutable_options_,
Expand Down Expand Up @@ -396,7 +402,15 @@ Compaction::Compaction(
}
}

PopulatePenultimateLevelOutputRange();
if (IsResumableCompaction()) {
const ResumableCompaction& resumable_compaction = GetResumableCompaction();
penultimate_level_smallest_ =
resumable_compaction.penultimate_output_level_smallest;
penultimate_level_largest_ =
resumable_compaction.penultimate_output_level_largest;
} else {
PopulatePenultimateLevelOutputRange();
}
}

void Compaction::PopulatePenultimateLevelOutputRange() {
Expand Down
49 changes: 47 additions & 2 deletions db/compaction/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,20 @@ int sstableKeyCompare(const Comparator* user_cmp, const InternalKey* a,
int sstableKeyCompare(const Comparator* user_cmp, const InternalKey& a,
const InternalKey* b);

inline uint64_t CalculateCompactionId(uint64_t compaction_job_id,
uint64_t sub_compaction_job_id) {
return compaction_job_id << 32 | sub_compaction_job_id;
}

inline uint64_t GetCompactionJobIdFromCompactionID(uint64_t compaction_id) {
return compaction_id >> 32;
}

inline uint64_t GetSubCompactionJobIdFromCompactionID(uint64_t compaction_id) {
constexpr uint32_t LAST_32_BITS_MASK = 0xffffffffU;
return compaction_id & LAST_32_BITS_MASK;
}

// An AtomicCompactionUnitBoundary represents a range of keys [smallest,
// largest] that exactly spans one ore more neighbouring SSTs on the same
// level. Every pair of SSTs in this range "overlap" (i.e., the largest
Expand Down Expand Up @@ -96,7 +110,12 @@ class Compaction {
CompactionReason compaction_reason = CompactionReason::kUnknown,
BlobGarbageCollectionPolicy blob_garbage_collection_policy =
BlobGarbageCollectionPolicy::kUseDefault,
double blob_garbage_collection_age_cutoff = -1);
double blob_garbage_collection_age_cutoff = -1,
std::tuple<std::string, uint64_t, ResumableCompaction>
resumable_compaction_info =
std::tuple<std::string, uint64_t, ResumableCompaction>(
"", std::numeric_limits<uint64_t>::max(),
ResumableCompaction()));

// The type of the penultimate level output range
enum class PenultimateOutputRangeType : int {
Expand Down Expand Up @@ -432,6 +451,30 @@ class Compaction {
const int start_level,
const int output_level);

const InternalKey GetPenultimateLevelSmallestKey() const {
return penultimate_level_smallest_;
}

const InternalKey GetPenultimateLevelLargestKey() const {
return penultimate_level_largest_;
}

bool IsResumableCompaction() const {
return !std::get<0>(resumable_compaction_info_).empty();
}

const std::string& GetResumableCompactionDBSessionID() const {
return std::get<0>(resumable_compaction_info_);
}

uint64_t GetResumableCompactionJobId() const {
return std::get<1>(resumable_compaction_info_);
}

const ResumableCompaction& GetResumableCompaction() const {
return std::get<2>(resumable_compaction_info_);
}

private:
void SetInputVersion(Version* input_version);

Expand Down Expand Up @@ -558,6 +601,9 @@ class Compaction {
// Blob garbage collection age cutoff.
double blob_garbage_collection_age_cutoff_;

const std::tuple<std::string, uint64_t, ResumableCompaction>
resumable_compaction_info_;

// only set when per_key_placement feature is enabled, -1 (kInvalidLevel)
// means not supported.
const int penultimate_level_;
Expand Down Expand Up @@ -596,5 +642,4 @@ struct PerKeyPlacementContext {

// Return sum of sizes of all files in `files`.
extern uint64_t TotalFileSize(const std::vector<FileMetaData*>& files);

} // namespace ROCKSDB_NAMESPACE

0 comments on commit 7b881cd

Please sign in to comment.