Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add internal compaction API for Secondary instance #8171

Closed
wants to merge 11 commits into from
154 changes: 139 additions & 15 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -312,14 +312,19 @@ CompactionJob::CompactionJob(
const std::atomic<int>* manual_compaction_paused, const std::string& db_id,
const std::string& db_session_id, std::string full_history_ts_low,
BlobFileCompletionCallback* blob_callback)
: job_id_(job_id),
compact_(new CompactionState(compaction)),
compaction_job_stats_(compaction_job_stats),
: compact_(new CompactionState(compaction)),
compaction_stats_(compaction->compaction_reason(), 1),
db_options_(db_options),
log_buffer_(log_buffer),
output_directory_(output_directory),
stats_(stats),
bottommost_level_(false),
write_hint_(Env::WLTH_NOT_SET),
job_id_(job_id),
compaction_job_stats_(compaction_job_stats),
dbname_(dbname),
db_id_(db_id),
db_session_id_(db_session_id),
db_options_(db_options),
file_options_(file_options),
env_(db_options.env),
io_tracer_(io_tracer),
Expand All @@ -330,22 +335,17 @@ CompactionJob::CompactionJob(
shutting_down_(shutting_down),
manual_compaction_paused_(manual_compaction_paused),
preserve_deletes_seqnum_(preserve_deletes_seqnum),
log_buffer_(log_buffer),
db_directory_(db_directory),
output_directory_(output_directory),
blob_output_directory_(blob_output_directory),
stats_(stats),
db_mutex_(db_mutex),
db_error_handler_(db_error_handler),
existing_snapshots_(std::move(existing_snapshots)),
earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
snapshot_checker_(snapshot_checker),
table_cache_(std::move(table_cache)),
event_logger_(event_logger),
bottommost_level_(false),
paranoid_file_checks_(paranoid_file_checks),
measure_io_stats_(measure_io_stats),
write_hint_(Env::WLTH_NOT_SET),
thread_pri_(thread_pri),
full_history_ts_low_(std::move(full_history_ts_low)),
blob_callback_(blob_callback) {
Expand Down Expand Up @@ -1550,9 +1550,7 @@ Status CompactionJob::FinishCompactionOutputFile(
FileDescriptor output_fd;
uint64_t oldest_blob_file_number = kInvalidBlobFileNumber;
if (meta != nullptr) {
fname =
TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths,
meta->fd.GetNumber(), meta->fd.GetPathId());
fname = GetTableFileName(meta->fd.GetNumber());
output_fd = meta->fd;
oldest_blob_file_number = meta->oldest_blob_file_number;
} else {
Expand Down Expand Up @@ -1672,9 +1670,7 @@ Status CompactionJob::OpenCompactionOutputFile(
assert(sub_compact->builder == nullptr);
// no need to lock because VersionSet::next_file_number_ is atomic
uint64_t file_number = versions_->NewFileNumber();
std::string fname =
TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths,
file_number, sub_compact->compaction->output_path_id());
std::string fname = GetTableFileName(file_number);
// Fire events.
ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
#ifndef ROCKSDB_LITE
Expand Down Expand Up @@ -1937,4 +1933,132 @@ void CompactionJob::LogCompaction() {
}
}

std::string CompactionJob::GetTableFileName(uint64_t file_number) {
return TableFileName(compact_->compaction->immutable_cf_options()->cf_paths,
file_number, compact_->compaction->output_path_id());
}

std::string CompactionServiceCompactionJob::GetTableFileName(
uint64_t file_number) {
return MakeTableFileName(output_path_, file_number);
}

CompactionServiceCompactionJob::CompactionServiceCompactionJob(
int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
const FileOptions& file_options, VersionSet* versions,
const std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
FSDirectory* output_directory, Statistics* stats,
InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
std::vector<SequenceNumber> existing_snapshots,
std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
const std::string& dbname, const std::shared_ptr<IOTracer>& io_tracer,
const std::string& db_id, const std::string& db_session_id,
const std::string& output_path,
const CompactionServiceInput& compaction_service_input,
CompactionServiceResult* compaction_service_result)
: CompactionJob(
job_id, compaction, db_options, file_options, versions, shutting_down,
0, log_buffer, nullptr, output_directory, nullptr, stats, db_mutex,
db_error_handler, existing_snapshots, kMaxSequenceNumber, nullptr,
table_cache, event_logger,
compaction->mutable_cf_options()->paranoid_file_checks,
compaction->mutable_cf_options()->report_bg_io_stats, dbname,
&(compaction_service_result->stats), Env::Priority::USER, io_tracer,
nullptr, db_id, db_session_id,
compaction->column_family_data()->GetFullHistoryTsLow()),
output_path_(output_path),
compaction_input_(compaction_service_input),
compaction_result_(compaction_service_result) {}

Status CompactionServiceCompactionJob::Run() {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_RUN);

auto* c = compact_->compaction;
assert(c->column_family_data() != nullptr);
assert(c->column_family_data()->current()->storage_info()->NumLevelFiles(
compact_->compaction->level()) > 0);

write_hint_ =
c->column_family_data()->CalculateSSTWriteHint(c->output_level());
bottommost_level_ = c->bottommost_level();

compact_->sub_compact_states.emplace_back(c, compaction_input_.begin,
compaction_input_.end,
compaction_input_.approx_size);

log_buffer_->FlushBufferToLog();
LogCompaction();
const uint64_t start_micros = db_options_.clock->NowMicros();
// Pick the only sub-compaction we should have
assert(compact_->sub_compact_states.size() == 1);
SubcompactionState* sub_compact = compact_->sub_compact_states.data();

ProcessKeyValueCompaction(sub_compact);

compaction_stats_.micros = db_options_.clock->NowMicros() - start_micros;
compaction_stats_.cpu_micros = sub_compact->compaction_job_stats.cpu_micros;

RecordTimeToHistogram(stats_, COMPACTION_TIME, compaction_stats_.micros);
RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME,
compaction_stats_.cpu_micros);

Status status = sub_compact->status;
IOStatus io_s = sub_compact->io_status;

if (io_status_.ok()) {
io_status_ = io_s;
}

if (status.ok()) {
constexpr IODebugContext* dbg = nullptr;

if (output_directory_) {
io_s = output_directory_->Fsync(IOOptions(), dbg);
}
}
if (io_status_.ok()) {
io_status_ = io_s;
}
if (status.ok()) {
status = io_s;
}
if (status.ok()) {
// TODO: Add verify_table() and VerifyCompactionFileConsistency()
}

// Finish up all book-keeping to unify the subcompaction results
AggregateStatistics();
UpdateCompactionStats();

compaction_result_->bytes_written = IOSTATS(bytes_written);
compaction_result_->bytes_read = IOSTATS(bytes_read);
RecordCompactionIOStats();

LogFlush(db_options_.info_log);
compact_->status = status;
compact_->status.PermitUncheckedError();

// Build compaction result
compaction_result_->output_level = compact_->compaction->output_level();
compaction_result_->output_path = output_path_;
for (const auto& output_file : sub_compact->outputs) {
auto& meta = output_file.meta;
compaction_result_->output_files.emplace_back(
MakeTableFileName(meta.fd.GetNumber()), meta.fd.smallest_seqno,
meta.fd.largest_seqno, meta.smallest.Encode().ToString(),
meta.largest.Encode().ToString(), meta.oldest_ancester_time,
meta.file_creation_time, output_file.validator.GetHash(),
meta.marked_for_compaction);
}
compaction_result_->num_output_records = sub_compact->num_output_records;
compaction_result_->total_bytes = sub_compact->total_bytes;

return status;
}

void CompactionServiceCompactionJob::CleanupCompaction() {
CompactionJob::CleanupCompaction();
}

} // namespace ROCKSDB_NAMESPACE
Loading