Skip to content

Commit

Permalink
Fix regression test failures introduced by PR #4164 (#4375)
Browse files Browse the repository at this point in the history
Summary:
1. Add override keyword to overridden virtual functions in EventListener
2. Fix a memory corruption that can happen during DB shutdown when in
read-only mode due to a background write error
3. Fix uninitialized buffers in error_handler_test.cc that cause
valgrind to complain
Pull Request resolved: #4375

Differential Revision: D9875779

Pulled By: anand1976

fbshipit-source-id: 022ede1edc01a9f7e21ecf4c61ef7d46545d0640
  • Loading branch information
Anand Ananthabhotla authored and facebook-github-bot committed Sep 17, 2018
1 parent 8c25204 commit 30c21df
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 29 deletions.
12 changes: 12 additions & 0 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
// as well.
use_custom_gc_(seq_per_batch),
shutdown_initiated_(false),
own_sfm_(options.sst_file_manager == nullptr),
preserve_deletes_(options.preserve_deletes),
closed_(false),
error_handler_(this, immutable_db_options_, &mutex_) {
Expand Down Expand Up @@ -520,6 +521,17 @@ Status DBImpl::CloseHelper() {
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Shutdown complete");
LogFlush(immutable_db_options_.info_log);

#ifndef ROCKSDB_LITE
// If the sst_file_manager was allocated by us during DB::Open(), ccall
// Close() on it before closing the info_log. Otherwise, background thread
// in SstFileManagerImpl might try to log something
if (immutable_db_options_.sst_file_manager && own_sfm_) {
auto sfm = static_cast<SstFileManagerImpl*>(
immutable_db_options_.sst_file_manager.get());
sfm->Close();
}
#endif // ROCKSDB_LITE

if (immutable_db_options_.info_log && own_info_log_) {
Status s = immutable_db_options_.info_log->Close();
if (ret.ok()) {
Expand Down
3 changes: 3 additions & 0 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1541,6 +1541,9 @@ class DBImpl : public DB {
// is set a little later during the shutdown after scheduling memtable
// flushes
bool shutdown_initiated_;
// Flag to indicate whether sst_file_manager object was allocated in
// DB::Open() or passed to us
bool own_sfm_;

// Clients must periodically call SetPreserveDeletesSequenceNumber()
// to advance this seqnum. Default value is 0 which means ALL deletes are
Expand Down
46 changes: 17 additions & 29 deletions db/error_handler_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ class ErrorHandlerListener : public EventListener {
file_count_(0),
fault_env_(nullptr) {}

void OnTableFileCreationStarted(const TableFileCreationBriefInfo& /*ti*/) {
void OnTableFileCreationStarted(
const TableFileCreationBriefInfo& /*ti*/) override {
InstrumentedMutexLock l(&mutex_);
file_creation_started_ = true;
if (file_count_ > 0) {
Expand All @@ -61,13 +62,14 @@ class ErrorHandlerListener : public EventListener {
}

void OnErrorRecoveryBegin(BackgroundErrorReason /*reason*/,
Status /*bg_error*/, bool* auto_recovery) {
Status /*bg_error*/,
bool* auto_recovery) override {
if (*auto_recovery && no_auto_recovery_) {
*auto_recovery = false;
}
}

void OnErrorRecoveryCompleted(Status /*old_bg_error*/) {
void OnErrorRecoveryCompleted(Status /*old_bg_error*/) override {
InstrumentedMutexLock l(&mutex_);
recovery_complete_ = true;
cv_.SignalAll();
Expand Down Expand Up @@ -237,7 +239,6 @@ TEST_F(DBErrorHandlingTest, CorruptionError) {
Destroy(options);
}

#ifndef TRAVIS
TEST_F(DBErrorHandlingTest, AutoRecoverFlushError) {
std::unique_ptr<FaultInjectionTestEnv> fault_env(
new FaultInjectionTestEnv(Env::Default()));
Expand Down Expand Up @@ -307,17 +308,16 @@ TEST_F(DBErrorHandlingTest, WALWriteError) {
options.env = fault_env.get();
options.listeners.emplace_back(listener);
Status s;
Random rnd(301);

listener->EnableAutoRecovery();
DestroyAndReopen(options);

{
WriteBatch batch;
char val[1024];

for (auto i = 0; i<100; ++i) {
sprintf(val, "%d", i);
batch.Put(Key(i), Slice(val, sizeof(val)));
batch.Put(Key(i), RandomString(&rnd, 1024));
}

WriteOptions wopts;
Expand All @@ -327,12 +327,10 @@ TEST_F(DBErrorHandlingTest, WALWriteError) {

{
WriteBatch batch;
char val[1024];
int write_error = 0;

for (auto i = 100; i<199; ++i) {
sprintf(val, "%d", i);
batch.Put(Key(i), Slice(val, sizeof(val)));
batch.Put(Key(i), RandomString(&rnd, 1024));
}

SyncPoint::GetInstance()->SetCallBack("WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
Expand Down Expand Up @@ -378,18 +376,17 @@ TEST_F(DBErrorHandlingTest, MultiCFWALWriteError) {
options.env = fault_env.get();
options.listeners.emplace_back(listener);
Status s;
Random rnd(301);

listener->EnableAutoRecovery();
CreateAndReopenWithCF({"one", "two", "three"}, options);

{
WriteBatch batch;
char val[1024];

for (auto i = 1; i < 4; ++i) {
for (auto j = 0; j < 100; ++j) {
sprintf(val, "%d", j);
batch.Put(handles_[i], Key(j), Slice(val, sizeof(val)));
batch.Put(handles_[i], Key(j), RandomString(&rnd, 1024));
}
}

Expand All @@ -400,13 +397,11 @@ TEST_F(DBErrorHandlingTest, MultiCFWALWriteError) {

{
WriteBatch batch;
char val[1024];
int write_error = 0;

// Write to one CF
for (auto i = 100; i < 199; ++i) {
sprintf(val, "%d", i);
batch.Put(handles_[2], Key(i), Slice(val, sizeof(val)));
batch.Put(handles_[2], Key(i), RandomString(&rnd, 1024));
}

SyncPoint::GetInstance()->SetCallBack(
Expand Down Expand Up @@ -462,6 +457,7 @@ TEST_F(DBErrorHandlingTest, MultiDBCompactionError) {
std::vector<DB*> db;
std::shared_ptr<SstFileManager> sfm(NewSstFileManager(def_env));
int kNumDbInstances = 3;
Random rnd(301);

for (auto i = 0; i < kNumDbInstances; ++i) {
listener.emplace_back(new ErrorHandlerListener());
Expand Down Expand Up @@ -489,11 +485,9 @@ TEST_F(DBErrorHandlingTest, MultiDBCompactionError) {

for (auto i = 0; i < kNumDbInstances; ++i) {
WriteBatch batch;
char val[1024];

for (auto j = 0; j <= 100; ++j) {
sprintf(val, "%d", j);
batch.Put(Key(j), Slice(val, sizeof(val)));
batch.Put(Key(j), RandomString(&rnd, 1024));
}

WriteOptions wopts;
Expand All @@ -505,12 +499,10 @@ TEST_F(DBErrorHandlingTest, MultiDBCompactionError) {
def_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
for (auto i = 0; i < kNumDbInstances; ++i) {
WriteBatch batch;
char val[1024];

// Write to one CF
for (auto j = 100; j < 199; ++j) {
sprintf(val, "%d", j);
batch.Put(Key(j), Slice(val, sizeof(val)));
batch.Put(Key(j), RandomString(&rnd, 1024));
}

WriteOptions wopts;
Expand Down Expand Up @@ -561,6 +553,7 @@ TEST_F(DBErrorHandlingTest, MultiDBVariousErrors) {
std::vector<DB*> db;
std::shared_ptr<SstFileManager> sfm(NewSstFileManager(def_env));
int kNumDbInstances = 3;
Random rnd(301);

for (auto i = 0; i < kNumDbInstances; ++i) {
listener.emplace_back(new ErrorHandlerListener());
Expand Down Expand Up @@ -600,11 +593,9 @@ TEST_F(DBErrorHandlingTest, MultiDBVariousErrors) {

for (auto i = 0; i < kNumDbInstances; ++i) {
WriteBatch batch;
char val[1024];

for (auto j = 0; j <= 100; ++j) {
sprintf(val, "%d", j);
batch.Put(Key(j), Slice(val, sizeof(val)));
batch.Put(Key(j), RandomString(&rnd, 1024));
}

WriteOptions wopts;
Expand All @@ -616,12 +607,10 @@ TEST_F(DBErrorHandlingTest, MultiDBVariousErrors) {
def_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
for (auto i = 0; i < kNumDbInstances; ++i) {
WriteBatch batch;
char val[1024];

// Write to one CF
for (auto j = 100; j < 199; ++j) {
sprintf(val, "%d", j);
batch.Put(Key(j), Slice(val, sizeof(val)));
batch.Put(Key(j), RandomString(&rnd, 1024));
}

WriteOptions wopts;
Expand Down Expand Up @@ -682,7 +671,6 @@ TEST_F(DBErrorHandlingTest, MultiDBVariousErrors) {
options.clear();
delete def_env;
}
#endif

} // namespace rocksdb

Expand Down
7 changes: 7 additions & 0 deletions util/sst_file_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,15 @@ SstFileManagerImpl::SstFileManagerImpl(Env* env, std::shared_ptr<Logger> logger,
}

SstFileManagerImpl::~SstFileManagerImpl() {
Close();
}

void SstFileManagerImpl::Close() {
{
MutexLock l(&mu_);
if (closing_) {
return;
}
closing_ = true;
cv_.SignalAll();
}
Expand Down
4 changes: 4 additions & 0 deletions util/sst_file_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ class SstFileManagerImpl : public SstFileManager {

DeleteScheduler* delete_scheduler() { return &delete_scheduler_; }

// Stop the error recovery background thread. This should be called only
// once in the object's lifetime, and before the destructor
void Close();

private:
// REQUIRES: mutex locked
void OnAddFileImpl(const std::string& file_path, uint64_t file_size,
Expand Down

0 comments on commit 30c21df

Please sign in to comment.