From 4eaa771c01727a6785b81f5f4a09840d38e31cf6 Mon Sep 17 00:00:00 2001 From: Yu Zhang Date: Fri, 2 Feb 2024 18:07:57 -0800 Subject: [PATCH] Refactor external sst file ingestion job (#12305) Summary: Updates some documentations and invariant assertions after https://github.com/facebook/rocksdb/issues/12257 and https://github.com/facebook/rocksdb/issues/12284. Also refactored some duplicate code and improved some error message and preconditions for errors. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12305 Test Plan: Existing unit tests Reviewed By: hx235 Differential Revision: D53371325 Pulled By: jowlyzhang fbshipit-source-id: fb0edcb3a3602cdf0a292ef437cfdfe897fc6c99 --- db/external_sst_file_ingestion_job.cc | 71 +++++++++++++-------------- db/external_sst_file_ingestion_job.h | 18 +++---- 2 files changed, 39 insertions(+), 50 deletions(-) diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index 663ea94d43f..a5a60746232 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -97,7 +97,9 @@ Status ExternalSstFileIngestionJob::Prepare( } if (ingestion_options_.ingest_behind && files_overlap_) { - return Status::NotSupported("Files have overlapping ranges"); + return Status::NotSupported( + "Files with overlapping ranges cannot be ingested with ingestion " + "behind mode."); } // Copy/Move external files into DB @@ -142,7 +144,7 @@ Status ExternalSstFileIngestionJob::Prepare( // Original file is on a different FS, use copy instead of hard linking. f.copy_file = true; ROCKS_LOG_INFO(db_options_.info_log, - "Triy to link file %s but it's not supported : %s", + "Tried to link file %s but it's not supported : %s", path_outside_db.c_str(), status.ToString().c_str()); } } else { @@ -188,7 +190,7 @@ Status ExternalSstFileIngestionJob::Prepare( // Generate and check the sst file checksum. Note that, if // IngestExternalFileOptions::write_global_seqno is true, we will not update // the checksum information in the files_to_ingests_ here, since the file is - // upadted with the new global_seqno. After global_seqno is updated, DB will + // updated with the new global_seqno. After global_seqno is updated, DB will // generate the new checksum and store it in the Manifest. In all other cases // if ingestion_options_.write_global_seqno == true and // verify_file_checksum is false, we only check the checksum function name. @@ -299,8 +301,7 @@ Status ExternalSstFileIngestionJob::Prepare( } } } else if (files_checksums.size() != files_checksum_func_names.size() || - (files_checksums.size() == files_checksum_func_names.size() && - files_checksums.size() != 0)) { + files_checksums.size() != 0) { // The checksum or checksum function name vector are not both empty // and they are incomplete. status = Status::InvalidArgument( @@ -316,21 +317,9 @@ Status ExternalSstFileIngestionJob::Prepare( } } - // TODO: The following is duplicated with Cleanup(). if (!status.ok()) { - IOOptions io_opts; // We failed, remove all files that we copied into the db - for (IngestedFileInfo& f : files_to_ingest_) { - if (f.internal_file_path.empty()) { - continue; - } - Status s = fs_->DeleteFile(f.internal_file_path, io_opts, nullptr); - if (!s.ok()) { - ROCKS_LOG_WARN(db_options_.info_log, - "AddFile() clean up for file %s failed : %s", - f.internal_file_path.c_str(), s.ToString().c_str()); - } - } + DeleteInternalFiles(); } return status; @@ -431,26 +420,26 @@ Status ExternalSstFileIngestionJob::Run() { if (!status.ok()) { return status; } - if (smallest_parsed.sequence == 0) { + if (smallest_parsed.sequence == 0 && assigned_seqno != 0) { UpdateInternalKey(f.smallest_internal_key.rep(), assigned_seqno, smallest_parsed.type); } - if (largest_parsed.sequence == 0) { + if (largest_parsed.sequence == 0 && assigned_seqno != 0) { UpdateInternalKey(f.largest_internal_key.rep(), assigned_seqno, largest_parsed.type); } status = AssignGlobalSeqnoForIngestedFile(&f, assigned_seqno); + if (!status.ok()) { + return status; + } TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Run", &assigned_seqno); + assert(assigned_seqno == 0 || assigned_seqno == last_seqno + 1); if (assigned_seqno > last_seqno) { - assert(assigned_seqno == last_seqno + 1); last_seqno = assigned_seqno; ++consumed_seqno_count_; } - if (!status.ok()) { - return status; - } status = GenerateChecksumForIngestedFile(&f); if (!status.ok()) { @@ -631,17 +620,7 @@ void ExternalSstFileIngestionJob::Cleanup(const Status& status) { if (!status.ok()) { // We failed to add the files to the database // remove all the files we copied - for (IngestedFileInfo& f : files_to_ingest_) { - if (f.internal_file_path.empty()) { - continue; - } - Status s = fs_->DeleteFile(f.internal_file_path, io_opts, nullptr); - if (!s.ok()) { - ROCKS_LOG_WARN(db_options_.info_log, - "AddFile() clean up for file %s failed : %s", - f.internal_file_path.c_str(), s.ToString().c_str()); - } - } + DeleteInternalFiles(); consumed_seqno_count_ = 0; files_overlap_ = false; } else if (status.ok() && ingestion_options_.move_files) { @@ -659,6 +638,21 @@ void ExternalSstFileIngestionJob::Cleanup(const Status& status) { } } +void ExternalSstFileIngestionJob::DeleteInternalFiles() { + IOOptions io_opts; + for (IngestedFileInfo& f : files_to_ingest_) { + if (f.internal_file_path.empty()) { + continue; + } + Status s = fs_->DeleteFile(f.internal_file_path, io_opts, nullptr); + if (!s.ok()) { + ROCKS_LOG_WARN(db_options_.info_log, + "AddFile() clean up for file %s failed : %s", + f.internal_file_path.c_str(), s.ToString().c_str()); + } + } +} + Status ExternalSstFileIngestionJob::GetIngestedFileInfo( const std::string& external_file, uint64_t new_file_number, IngestedFileInfo* file_to_ingest, SuperVersion* sv) { @@ -1001,7 +995,8 @@ Status ExternalSstFileIngestionJob::AssignGlobalSeqnoForIngestedFile( return Status::OK(); } else if (!ingestion_options_.allow_global_seqno) { return Status::InvalidArgument("Global seqno is required, but disabled"); - } else if (file_to_ingest->global_seqno_offset == 0) { + } else if (ingestion_options_.write_global_seqno && + file_to_ingest->global_seqno_offset == 0) { return Status::InvalidArgument( "Trying to set global seqno for a file that don't have a global seqno " "field"); @@ -1073,8 +1068,8 @@ IOStatus ExternalSstFileIngestionJob::GenerateChecksumForIngestedFile( if (!io_s.ok()) { return io_s; } - file_to_ingest->file_checksum = file_checksum; - file_to_ingest->file_checksum_func_name = file_checksum_func_name; + file_to_ingest->file_checksum = std::move(file_checksum); + file_to_ingest->file_checksum_func_name = std::move(file_checksum_func_name); return IOStatus::OK(); } diff --git a/db/external_sst_file_ingestion_job.h b/db/external_sst_file_ingestion_job.h index 49bb1e31e59..a08dcb52279 100644 --- a/db/external_sst_file_ingestion_job.h +++ b/db/external_sst_file_ingestion_job.h @@ -43,7 +43,7 @@ struct IngestedFileInfo { uint64_t num_entries; // total number of range deletions in external file uint64_t num_range_deletions; - // Id of column family this file shoule be ingested into + // Id of column family this file should be ingested into uint32_t cf_id; // TableProperties read from external file TableProperties table_properties; @@ -102,16 +102,7 @@ class ExternalSstFileIngestionJob { assert(directories != nullptr); } - ~ExternalSstFileIngestionJob() { - for (const auto& c : file_ingesting_compactions_) { - cfd_->compaction_picker()->UnregisterCompaction(c); - delete c; - } - - for (const auto& f : compaction_input_metdatas_) { - delete f; - } - } + ~ExternalSstFileIngestionJob() { UnregisterRange(); } // Prepare the job by copying external files into the DB. Status Prepare(const std::vector& external_files_paths, @@ -156,7 +147,7 @@ class ExternalSstFileIngestionJob { return files_to_ingest_; } - // How many sequence numbers did we consume as part of the ingest job? + // How many sequence numbers did we consume as part of the ingestion job? int ConsumedSequenceNumbersCount() const { return consumed_seqno_count_; } private: @@ -203,6 +194,9 @@ class ExternalSstFileIngestionJob { // compactions. void CreateEquivalentFileIngestingCompactions(); + // Remove all the internal files created, called when ingestion job fails. + void DeleteInternalFiles(); + SystemClock* clock_; FileSystemPtr fs_; VersionSet* versions_;