Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto recovery from out of space errors #4164

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion db/compaction_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class CompactionJobTest : public testing::Test {
shutting_down_(false),
preserve_deletes_seqnum_(0),
mock_table_factory_(new mock::MockTableFactory()),
error_handler_(db_options_, &mutex_) {
error_handler_(nullptr, db_options_, &mutex_) {
EXPECT_OK(env_->CreateDirIfMissing(dbname_));
db_options_.db_paths.emplace_back(dbname_,
std::numeric_limits<uint64_t>::max());
Expand Down
105 changes: 91 additions & 14 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
use_custom_gc_(seq_per_batch),
preserve_deletes_(options.preserve_deletes),
closed_(false),
error_handler_(immutable_db_options_, &mutex_) {
error_handler_(this, immutable_db_options_, &mutex_),
shutdown_flag_(false) {
// !batch_per_trx_ implies seq_per_batch_ because it is only unset for
// WriteUnprepared, which should use seq_per_batch_.
assert(batch_per_txn_ || seq_per_batch_);
Expand Down Expand Up @@ -259,16 +260,62 @@ Status DBImpl::Resume() {
return Status::OK();
}

Status s = error_handler_.GetBGError();
if (s.severity() > Status::Severity::kHardError) {
if (error_handler_.IsRecoveryInProgress()) {
// Don't allow a mix of manual and automatic recovery
return Status::Busy();
}

mutex_.Unlock();
Status s = error_handler_.RecoverFromBGError(true);
mutex_.Lock();
return s;
}

// This function implements the guts of recovery from a background error. It
// is eventually called for both manual as well as automatic recovery. It does
// the following -
// 1. Wait for currently scheduled background flush/compaction to exit, in
// order to inadvertently causing an error and thinking recovery failed
// 2. Flush memtables if there's any data for all the CFs. This may result
// another error, which will be saved by error_handler_ and reported later
// as the recovery status
// 3. Find and delete any obsolete files
// 4. Schedule compactions if needed for all the CFs. This is needed as the
// flush in the prior step might have been a no-op for some CFs, which
// means a new super version wouldn't have been installed
Status DBImpl::ResumeImpl() {
mutex_.AssertHeld();
WaitForBackgroundWork();

Status bg_error = error_handler_.GetBGError();
Status s;
if (shutdown_flag_) {
// Returning shutdown status to SFM during auto recovery will cause it
// to abort the recovery and allow the shutdown to progress
s = Status::ShutdownInProgress();
}
if (s.ok() && bg_error.severity() > Status::Severity::kHardError) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"DB resume requested but failed due to Fatal/Unrecoverable error");
return s;
s = bg_error;
}

// We cannot guarantee consistency of the WAL. So force flush Memtables of
// all the column families
if (s.ok()) {
s = FlushAllCFs(FlushReason::kErrorRecovery);
if (!s.ok()) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"DB resume requested but failed due to Flush failure [%s]",
s.ToString().c_str());
}
}

JobContext job_context(0);
FindObsoleteFiles(&job_context, true);
error_handler_.ClearBGError();
if (s.ok()) {
s = error_handler_.ClearBGError();
}
mutex_.Unlock();

job_context.manifest_file_number = 1;
Expand All @@ -277,13 +324,36 @@ Status DBImpl::Resume() {
}
job_context.Clean();

ROCKS_LOG_INFO(immutable_db_options_.info_log, "Successfully resumed DB");
if (s.ok()) {
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Successfully resumed DB");
}
mutex_.Lock();
MaybeScheduleFlushOrCompaction();
// Check for shutdown again before scheduling further compactions,
// since we released and re-acquired the lock above
if (shutdown_flag_) {
s = Status::ShutdownInProgress();
}
if (s.ok()) {
for (auto cfd : *versions_->GetColumnFamilySet()) {
SchedulePendingCompaction(cfd);
}
MaybeScheduleFlushOrCompaction();
}

// Wake up any waiters - in this case, it could be the shutdown thread
bg_cv_.SignalAll();

// No need to check BGError again. If something happened, event listener would be
// notified and the operation causing it would have failed
return Status::OK();
return s;
}

void DBImpl::WaitForBackgroundWork() {
// Wait for background work to finish
while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
bg_flush_scheduled_) {
bg_cv_.Wait();
}
}

// Will lock the mutex_, will wait for completion if wait is true
Expand Down Expand Up @@ -313,14 +383,20 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
if (!wait) {
return;
}
// Wait for background work to finish
while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
bg_flush_scheduled_) {
bg_cv_.Wait();
}
WaitForBackgroundWork();
}

Status DBImpl::CloseHelper() {
// Guarantee that there is no background error recovery in progress before
// continuing with the shutdown
mutex_.Lock();
shutdown_flag_ = true;
error_handler_.CancelErrorRecovery();
while (error_handler_.IsRecoveryInProgress()) {
bg_cv_.Wait();
}
mutex_.Unlock();

// CancelAllBackgroundWork called with false means we just set the shutdown
// marker. After this we do a variant of the waiting and unschedule work
// (to consider: moving all the waiting into CancelAllBackgroundWork(true))
Expand All @@ -338,7 +414,8 @@ Status DBImpl::CloseHelper() {
// Wait for background work to finish
while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
bg_flush_scheduled_ || bg_purge_scheduled_ ||
pending_purge_obsolete_files_) {
pending_purge_obsolete_files_ ||
error_handler_.IsRecoveryInProgress()) {
TEST_SYNC_POINT("DBImpl::~DBImpl:WaitJob");
bg_cv_.Wait();
}
Expand Down
20 changes: 18 additions & 2 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,7 @@ class DBImpl : public DB {

private:
friend class DB;
friend class ErrorHandler;
friend class InternalStats;
friend class PessimisticTransaction;
friend class TransactionBaseImpl;
Expand Down Expand Up @@ -845,6 +846,8 @@ class DBImpl : public DB {
bool read_only = false, bool error_if_log_file_exist = false,
bool error_if_data_exists_in_logs = false);

Status ResumeImpl();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you write a comment describing what the function is doing?


void MaybeIgnoreError(Status* s) const;

const Status CreateArchivalDirectory();
Expand Down Expand Up @@ -1042,9 +1045,10 @@ class DBImpl : public DB {
LogBuffer* log_buffer,
PrepickedCompaction* prepicked_compaction);
Status BackgroundFlush(bool* madeProgress, JobContext* job_context,
LogBuffer* log_buffer);
LogBuffer* log_buffer, FlushReason* reason);

bool EnoughRoomForCompaction(const std::vector<CompactionInputFiles>& inputs,
bool EnoughRoomForCompaction(ColumnFamilyData* cfd,
const std::vector<CompactionInputFiles>& inputs,
bool* sfm_bookkeeping, LogBuffer* log_buffer);

void PrintStatistics();
Expand Down Expand Up @@ -1078,6 +1082,10 @@ class DBImpl : public DB {

Status CloseHelper();

Status FlushAllCFs(FlushReason flush_reason);

void WaitForBackgroundWork();

// table_cache_ provides its own synchronization
std::shared_ptr<Cache> table_cache_;

Expand Down Expand Up @@ -1534,6 +1542,14 @@ class DBImpl : public DB {
bool closed_;

ErrorHandler error_handler_;

// Flag to indicate that the DB instance shutdown has been initiated. This
// different from shutting_down_ atomic in that it is set at the beginning
// of shutdown sequence, specifically in order to prevent any background
// error recovery from going on in parallel. The latter, shutting_down_,
// is set a little later during the shutdown after scheduling memtable
// flushes
bool shutdown_flag_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe call it shutdown_initiated_ directly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm picky but if possible can we place it together with those bool variables, for better padding?

};

extern Options SanitizeOptions(const std::string& db,
Expand Down
Loading