Skip to content

Commit

Permalink
Refactor external sst file ingestion job (#12305)
Browse files Browse the repository at this point in the history
Summary:
Updates some documentations and invariant assertions after #12257 and #12284. Also refactored some duplicate code and improved some error message and preconditions for errors.

Pull Request resolved: #12305

Test Plan: Existing unit tests

Reviewed By: hx235

Differential Revision: D53371325

Pulled By: jowlyzhang

fbshipit-source-id: fb0edcb3a3602cdf0a292ef437cfdfe897fc6c99
  • Loading branch information
jowlyzhang authored and facebook-github-bot committed Feb 3, 2024
1 parent 5620efc commit 4eaa771
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 50 deletions.
71 changes: 33 additions & 38 deletions db/external_sst_file_ingestion_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ Status ExternalSstFileIngestionJob::Prepare(
}

if (ingestion_options_.ingest_behind && files_overlap_) {
return Status::NotSupported("Files have overlapping ranges");
return Status::NotSupported(
"Files with overlapping ranges cannot be ingested with ingestion "
"behind mode.");
}

// Copy/Move external files into DB
Expand Down Expand Up @@ -142,7 +144,7 @@ Status ExternalSstFileIngestionJob::Prepare(
// Original file is on a different FS, use copy instead of hard linking.
f.copy_file = true;
ROCKS_LOG_INFO(db_options_.info_log,
"Triy to link file %s but it's not supported : %s",
"Tried to link file %s but it's not supported : %s",
path_outside_db.c_str(), status.ToString().c_str());
}
} else {
Expand Down Expand Up @@ -188,7 +190,7 @@ Status ExternalSstFileIngestionJob::Prepare(
// Generate and check the sst file checksum. Note that, if
// IngestExternalFileOptions::write_global_seqno is true, we will not update
// the checksum information in the files_to_ingests_ here, since the file is
// upadted with the new global_seqno. After global_seqno is updated, DB will
// updated with the new global_seqno. After global_seqno is updated, DB will
// generate the new checksum and store it in the Manifest. In all other cases
// if ingestion_options_.write_global_seqno == true and
// verify_file_checksum is false, we only check the checksum function name.
Expand Down Expand Up @@ -299,8 +301,7 @@ Status ExternalSstFileIngestionJob::Prepare(
}
}
} else if (files_checksums.size() != files_checksum_func_names.size() ||
(files_checksums.size() == files_checksum_func_names.size() &&
files_checksums.size() != 0)) {
files_checksums.size() != 0) {
// The checksum or checksum function name vector are not both empty
// and they are incomplete.
status = Status::InvalidArgument(
Expand All @@ -316,21 +317,9 @@ Status ExternalSstFileIngestionJob::Prepare(
}
}

// TODO: The following is duplicated with Cleanup().
if (!status.ok()) {
IOOptions io_opts;
// We failed, remove all files that we copied into the db
for (IngestedFileInfo& f : files_to_ingest_) {
if (f.internal_file_path.empty()) {
continue;
}
Status s = fs_->DeleteFile(f.internal_file_path, io_opts, nullptr);
if (!s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
"AddFile() clean up for file %s failed : %s",
f.internal_file_path.c_str(), s.ToString().c_str());
}
}
DeleteInternalFiles();
}

return status;
Expand Down Expand Up @@ -431,26 +420,26 @@ Status ExternalSstFileIngestionJob::Run() {
if (!status.ok()) {
return status;
}
if (smallest_parsed.sequence == 0) {
if (smallest_parsed.sequence == 0 && assigned_seqno != 0) {
UpdateInternalKey(f.smallest_internal_key.rep(), assigned_seqno,
smallest_parsed.type);
}
if (largest_parsed.sequence == 0) {
if (largest_parsed.sequence == 0 && assigned_seqno != 0) {
UpdateInternalKey(f.largest_internal_key.rep(), assigned_seqno,
largest_parsed.type);
}

status = AssignGlobalSeqnoForIngestedFile(&f, assigned_seqno);
if (!status.ok()) {
return status;
}
TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Run",
&assigned_seqno);
assert(assigned_seqno == 0 || assigned_seqno == last_seqno + 1);
if (assigned_seqno > last_seqno) {
assert(assigned_seqno == last_seqno + 1);
last_seqno = assigned_seqno;
++consumed_seqno_count_;
}
if (!status.ok()) {
return status;
}

status = GenerateChecksumForIngestedFile(&f);
if (!status.ok()) {
Expand Down Expand Up @@ -631,17 +620,7 @@ void ExternalSstFileIngestionJob::Cleanup(const Status& status) {
if (!status.ok()) {
// We failed to add the files to the database
// remove all the files we copied
for (IngestedFileInfo& f : files_to_ingest_) {
if (f.internal_file_path.empty()) {
continue;
}
Status s = fs_->DeleteFile(f.internal_file_path, io_opts, nullptr);
if (!s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
"AddFile() clean up for file %s failed : %s",
f.internal_file_path.c_str(), s.ToString().c_str());
}
}
DeleteInternalFiles();
consumed_seqno_count_ = 0;
files_overlap_ = false;
} else if (status.ok() && ingestion_options_.move_files) {
Expand All @@ -659,6 +638,21 @@ void ExternalSstFileIngestionJob::Cleanup(const Status& status) {
}
}

void ExternalSstFileIngestionJob::DeleteInternalFiles() {
IOOptions io_opts;
for (IngestedFileInfo& f : files_to_ingest_) {
if (f.internal_file_path.empty()) {
continue;
}
Status s = fs_->DeleteFile(f.internal_file_path, io_opts, nullptr);
if (!s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
"AddFile() clean up for file %s failed : %s",
f.internal_file_path.c_str(), s.ToString().c_str());
}
}
}

Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
const std::string& external_file, uint64_t new_file_number,
IngestedFileInfo* file_to_ingest, SuperVersion* sv) {
Expand Down Expand Up @@ -1001,7 +995,8 @@ Status ExternalSstFileIngestionJob::AssignGlobalSeqnoForIngestedFile(
return Status::OK();
} else if (!ingestion_options_.allow_global_seqno) {
return Status::InvalidArgument("Global seqno is required, but disabled");
} else if (file_to_ingest->global_seqno_offset == 0) {
} else if (ingestion_options_.write_global_seqno &&
file_to_ingest->global_seqno_offset == 0) {
return Status::InvalidArgument(
"Trying to set global seqno for a file that don't have a global seqno "
"field");
Expand Down Expand Up @@ -1073,8 +1068,8 @@ IOStatus ExternalSstFileIngestionJob::GenerateChecksumForIngestedFile(
if (!io_s.ok()) {
return io_s;
}
file_to_ingest->file_checksum = file_checksum;
file_to_ingest->file_checksum_func_name = file_checksum_func_name;
file_to_ingest->file_checksum = std::move(file_checksum);
file_to_ingest->file_checksum_func_name = std::move(file_checksum_func_name);
return IOStatus::OK();
}

Expand Down
18 changes: 6 additions & 12 deletions db/external_sst_file_ingestion_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ struct IngestedFileInfo {
uint64_t num_entries;
// total number of range deletions in external file
uint64_t num_range_deletions;
// Id of column family this file shoule be ingested into
// Id of column family this file should be ingested into
uint32_t cf_id;
// TableProperties read from external file
TableProperties table_properties;
Expand Down Expand Up @@ -102,16 +102,7 @@ class ExternalSstFileIngestionJob {
assert(directories != nullptr);
}

~ExternalSstFileIngestionJob() {
for (const auto& c : file_ingesting_compactions_) {
cfd_->compaction_picker()->UnregisterCompaction(c);
delete c;
}

for (const auto& f : compaction_input_metdatas_) {
delete f;
}
}
~ExternalSstFileIngestionJob() { UnregisterRange(); }

// Prepare the job by copying external files into the DB.
Status Prepare(const std::vector<std::string>& external_files_paths,
Expand Down Expand Up @@ -156,7 +147,7 @@ class ExternalSstFileIngestionJob {
return files_to_ingest_;
}

// How many sequence numbers did we consume as part of the ingest job?
// How many sequence numbers did we consume as part of the ingestion job?
int ConsumedSequenceNumbersCount() const { return consumed_seqno_count_; }

private:
Expand Down Expand Up @@ -203,6 +194,9 @@ class ExternalSstFileIngestionJob {
// compactions.
void CreateEquivalentFileIngestingCompactions();

// Remove all the internal files created, called when ingestion job fails.
void DeleteInternalFiles();

SystemClock* clock_;
FileSystemPtr fs_;
VersionSet* versions_;
Expand Down

0 comments on commit 4eaa771

Please sign in to comment.