Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

db_stress verify with lost unsynced operations #8966

Closed
wants to merge 13 commits into from
9 changes: 8 additions & 1 deletion db_stress_tool/db_stress_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,14 +225,21 @@ size_t GenerateValue(uint32_t rand, char* v, size_t max_sz) {
((rand % kRandomValueMaxFactor) + 1) * FLAGS_value_size_mult;
assert(value_sz <= max_sz && value_sz >= sizeof(uint32_t));
(void)max_sz;
*((uint32_t*)v) = rand;
PutUnaligned(reinterpret_cast<uint32_t*>(v), rand);
for (size_t i = sizeof(uint32_t); i < value_sz; i++) {
v[i] = (char)(rand ^ i);
}
v[value_sz] = '\0';
return value_sz; // the size of the value set.
}

uint32_t GetValueBase(Slice s) {
assert(s.size() >= sizeof(uint32_t));
uint32_t res;
GetUnaligned(reinterpret_cast<const uint32_t*>(s.data()), &res);
return res;
}

std::string NowNanosStr() {
uint64_t t = db_stress_env->NowNanos();
std::string ret;
Expand Down
1 change: 1 addition & 0 deletions db_stress_tool/db_stress_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,7 @@ extern std::vector<int64_t> GenerateNKeys(ThreadState* thread, int num_keys,
uint64_t iteration);

extern size_t GenerateValue(uint32_t rand, char* v, size_t max_sz);
extern uint32_t GetValueBase(Slice s);

extern StressTest* CreateCfConsistencyStressTest();
extern StressTest* CreateBatchedOpsStressTest();
Expand Down
2 changes: 1 addition & 1 deletion db_stress_tool/db_stress_gflags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,7 @@ DEFINE_bool(sync_fault_injection, false,
"and unsynced data in DB will lost after crash. In such a case we "
"track DB changes in a trace file (\"*.trace\") in "
"--expected_values_dir for verifying there are no holes in the "
"recovered data (future work).");
"recovered data.");

DEFINE_bool(best_efforts_recovery, false,
"If true, use best efforts recovery.");
Expand Down
4 changes: 4 additions & 0 deletions db_stress_tool/db_stress_shared_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ class SharedState {
return expected_state_manager_->SaveAtAndAfter(db);
}

bool HasHistory() { return expected_state_manager_->HasHistory(); }

Status Restore(DB* db) { return expected_state_manager_->Restore(db); }

// Requires external locking covering all keys in `cf`.
void ClearColumnFamily(int cf) {
return expected_state_manager_->ClearColumnFamily(cf);
Expand Down
15 changes: 14 additions & 1 deletion db_stress_tool/db_stress_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,20 @@ void StressTest::FinishInitDb(SharedState* shared) {
fprintf(stdout, "Compaction filter factory: %s\n",
compaction_filter_factory->Name());
}
// TODO(ajkr): First restore if there's already a trace.

if (shared->HasHistory() && IsStateTracked()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This IsStateTracked() causes a problem in the following sequence:

  • -sync_fault_injection=1 -test_batches_snapshot=0
  • -test_batches_snapshot=1
  • -test_batches_snapshot=0

The second command can cause DB seqno to advance beyond what is recoverable in expected values, which causes the third command to fail.

// The way it works right now is, if there's any history, that means the
// previous run mutating the DB had all its operations traced, in which case
// we should always be able to `Restore()` the expected values to match the
// `db_`'s current seqno.
Status s = shared->Restore(db_);
if (!s.ok()) {
fprintf(stderr, "Error restoring historical expected values: %s\n",
s.ToString().c_str());
exit(1);
}
}

if (FLAGS_sync_fault_injection && IsStateTracked()) {
Status s = shared->SaveAtAndAfter(db_);
if (!s.ok()) {
Expand Down
212 changes: 210 additions & 2 deletions db_stress_tool/expected_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@

#include "db_stress_tool/expected_state.h"

#include "db_stress_tool/db_stress_common.h"
#include "db_stress_tool/db_stress_shared_state.h"
#include "rocksdb/trace_reader_writer.h"
#include "rocksdb/trace_record_result.h"

namespace ROCKSDB_NAMESPACE {

Expand Down Expand Up @@ -318,13 +320,220 @@ Status FileExpectedStateManager::SaveAtAndAfter(DB* /* db */) {
}
#endif // ROCKSDB_LITE

bool FileExpectedStateManager::HasHistory() {
return saved_seqno_ != kMaxSequenceNumber;
}

#ifndef ROCKSDB_LITE

// An `ExpectedStateTraceRecordHandler` applies a configurable number of
// write operation trace records to the configured expected state. It is used in
// `FileExpectedStateManager::Restore()` to sync the expected state with the
// DB's post-recovery state.
class ExpectedStateTraceRecordHandler : public TraceRecord::Handler,
public WriteBatch::Handler {
public:
ExpectedStateTraceRecordHandler(uint64_t max_write_ops, ExpectedState* state)
: max_write_ops_(max_write_ops), state_(state) {}

~ExpectedStateTraceRecordHandler() {
assert(num_write_ops_ == max_write_ops_);
}

Status Handle(const WriteQueryTraceRecord& record,
std::unique_ptr<TraceRecordResult>* /* result */) override {
if (num_write_ops_ == max_write_ops_) {
return Status::OK();
}
WriteBatch batch(record.GetWriteBatchRep().ToString());
return batch.Iterate(this);
}

// Ignore reads.
Status Handle(const GetQueryTraceRecord& /* record */,
std::unique_ptr<TraceRecordResult>* /* result */) override {
return Status::OK();
}

// Ignore reads.
Status Handle(const IteratorSeekQueryTraceRecord& /* record */,
std::unique_ptr<TraceRecordResult>* /* result */) override {
return Status::OK();
}

// Ignore reads.
Status Handle(const MultiGetQueryTraceRecord& /* record */,
std::unique_ptr<TraceRecordResult>* /* result */) override {
return Status::OK();
}

// Below are the WriteBatch::Handler overrides. We could use a separate
// object, but it's convenient and works to share state with the
// `TraceRecord::Handler`.

Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
uint64_t key_id;
if (!GetIntVal(key.ToString(), &key_id)) {
return Status::Corruption("unable to parse key", key.ToString());
}
uint32_t value_id = GetValueBase(value);

state_->Put(column_family_id, static_cast<int64_t>(key_id), value_id,
false /* pending */);
++num_write_ops_;
return Status::OK();
}

Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
uint64_t key_id;
if (!GetIntVal(key.ToString(), &key_id)) {
return Status::Corruption("unable to parse key", key.ToString());
}

state_->Delete(column_family_id, static_cast<int64_t>(key_id),
false /* pending */);
++num_write_ops_;
return Status::OK();
}

Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) override {
return DeleteCF(column_family_id, key);
}

Status DeleteRangeCF(uint32_t column_family_id, const Slice& begin_key,
const Slice& end_key) override {
uint64_t begin_key_id, end_key_id;
if (!GetIntVal(begin_key.ToString(), &begin_key_id)) {
return Status::Corruption("unable to parse begin key",
begin_key.ToString());
}
if (!GetIntVal(end_key.ToString(), &end_key_id)) {
return Status::Corruption("unable to parse end key", end_key.ToString());
}

state_->DeleteRange(column_family_id, static_cast<int64_t>(begin_key_id),
static_cast<int64_t>(end_key_id), false /* pending */);
++num_write_ops_;
return Status::OK();
}

Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
return PutCF(column_family_id, key, value);
}

private:
uint64_t num_write_ops_ = 0;
uint64_t max_write_ops_;
ExpectedState* state_;
};

Status FileExpectedStateManager::Restore(DB* db) {
assert(HasHistory());
SequenceNumber seqno = db->GetLatestSequenceNumber();
if (seqno < saved_seqno_) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For clarity, I would assert(HasHistory())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return Status::Corruption("DB is older than any restorable expected state");
}

std::string state_filename = ToString(saved_seqno_) + kStateFilenameSuffix;
std::string state_file_path = GetPathForFilename(state_filename);

std::string latest_file_temp_path =
GetTempPathForFilename(kLatestBasename + kStateFilenameSuffix);
std::string latest_file_path =
GetPathForFilename(kLatestBasename + kStateFilenameSuffix);

std::string trace_filename = ToString(saved_seqno_) + kTraceFilenameSuffix;
std::string trace_file_path = GetPathForFilename(trace_filename);

std::unique_ptr<TraceReader> trace_reader;
Status s = NewFileTraceReader(Env::Default(), EnvOptions(), trace_file_path,
&trace_reader);

if (s.ok()) {
// We are going to replay on top of "`seqno`.state" to create a new
// "LATEST.state". Start off by creating a tempfile so we can later make the
// new "LATEST.state" appear atomically using `RenameFile()`.
s = CopyFile(FileSystem::Default(), state_file_path, latest_file_temp_path,
0 /* size */, false /* use_fsync */);
}

{
std::unique_ptr<Replayer> replayer;
std::unique_ptr<ExpectedState> state;
std::unique_ptr<TraceRecord::Handler> handler;
if (s.ok()) {
state.reset(new FileExpectedState(latest_file_temp_path, max_key_,
num_column_families_));
s = state->Open(false /* create */);
}
if (s.ok()) {
handler.reset(new ExpectedStateTraceRecordHandler(seqno - saved_seqno_,
state.get()));
// TODO(ajkr): An API limitation requires we provide `handles` although
// they will be unused since we only use the replayer for reading records.
// Just give a default CFH for now to satisfy the requirement.
s = db->NewDefaultReplayer({db->DefaultColumnFamily()} /* handles */,
std::move(trace_reader), &replayer);
}

if (s.ok()) {
s = replayer->Prepare();
}
for (;;) {
std::unique_ptr<TraceRecord> record;
s = replayer->Next(&record);
if (!s.ok()) {
break;
}
std::unique_ptr<TraceRecordResult> res;
record->Accept(handler.get(), &res);
}
if (s.IsIncomplete()) {
// OK because `Status::Incomplete` is expected upon finishing all the
// trace records.
s = Status::OK();
}
}

if (s.ok()) {
s = FileSystem::Default()->RenameFile(latest_file_temp_path,
latest_file_path, IOOptions(),
nullptr /* dbg */);
}
if (s.ok()) {
latest_.reset(new FileExpectedState(latest_file_path, max_key_,
num_column_families_));
s = latest_->Open(false /* create */);
}

// Delete old state/trace files. We must delete the state file first.
// Otherwise, a crash-recovery immediately after deleting the trace file could
// lead to `Restore()` unable to replay to `seqno`.
if (s.ok()) {
s = Env::Default()->DeleteFile(state_file_path);
}
if (s.ok()) {
saved_seqno_ = kMaxSequenceNumber;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe say something like SaveAtAndAfter(...) will be called after Restore(...) to initialize tracing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might or might not be called after reaching here, depending on whether the current run uses -sync_fault_injection=1. Reaching this code path only means the previous db_stress run used -sync_fault_injection=1.

s = Env::Default()->DeleteFile(trace_file_path);
}
return s;
}
#else // ROCKSDB_LITE
Status FileExpectedStateManager::Restore(DB* /* db */) {
return Status::NotSupported();
}
#endif // ROCKSDB_LITE

Status FileExpectedStateManager::Clean() {
std::vector<std::string> expected_state_dir_children;
Status s = Env::Default()->GetChildren(expected_state_dir_path_,
&expected_state_dir_children);
// An incomplete `Open()` or incomplete `SaveAtAndAfter()` could have left
// behind invalid temporary files. An incomplete `SaveAtAndAfter()` could have
// also left behind stale state/trace files.
// also left behind stale state/trace files. An incomplete `Restore()` could
// have left behind stale trace files.
for (size_t i = 0; s.ok() && i < expected_state_dir_children.size(); ++i) {
const auto& filename = expected_state_dir_children[i];
if (filename.rfind(kTempFilenamePrefix, 0 /* pos */) == 0 &&
Expand All @@ -349,7 +558,6 @@ Status FileExpectedStateManager::Clean() {
ParseUint64(filename.substr(
0, filename.size() - kTraceFilenameSuffix.size())) <
saved_seqno_) {
assert(saved_seqno_ != kMaxSequenceNumber);
// Delete stale trace files.
s = Env::Default()->DeleteFile(GetPathForFilename(filename));
}
Expand Down
41 changes: 40 additions & 1 deletion db_stress_tool/expected_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,30 @@ class ExpectedStateManager {
virtual Status Open() = 0;

// Saves expected values for the current state of `db` and begins tracking
// changes.
// changes. Following a successful `SaveAtAndAfter()`, `Restore()` can be
// called on the same DB, as long as its state does not roll back to before
// its current state.
//
// Requires external locking preventing concurrent execution with any other
// member function. Furthermore, `db` must not be mutated while this function
// is executing.
virtual Status SaveAtAndAfter(DB* db) = 0;

// Returns true if at least one state of historical expected values can be
// restored.
//
// Requires external locking preventing concurrent execution with any other
// member function.
virtual bool HasHistory() = 0;

// Restores expected values according to the current state of `db`. See
// `SaveAtAndAfter()` for conditions where this can be called.
//
// Requires external locking preventing concurrent execution with any other
// member function. Furthermore, `db` must not be mutated while this function
// is executing.
virtual Status Restore(DB* db) = 0;

// Requires external locking covering all keys in `cf`.
void ClearColumnFamily(int cf) { return latest_->ClearColumnFamily(cf); }

Expand Down Expand Up @@ -204,8 +221,21 @@ class FileExpectedStateManager : public ExpectedStateManager {
//
// This implementation makes a copy of "LATEST.state" into
// "<current seqno>.state", and starts a trace in "<current seqno>.trace".
// Due to using external files, a following `Restore()` can happen even
// from a different process.
Status SaveAtAndAfter(DB* db) override;

// See `ExpectedStateManager::HasHistory()` API doc.
bool HasHistory() override;

// See `ExpectedStateManager::Restore()` API doc.
//
// Say `db->GetLatestSequenceNumber()` was `a` last time `SaveAtAndAfter()`
// was called and now it is `b`. Then this function replays `b - a` write
// operations from "`a`.trace" onto "`a`.state", and then copies the resulting
// file into "LATEST.state".
Status Restore(DB* db) override;

private:
// Requires external locking preventing concurrent execution with any other
// member function.
Expand Down Expand Up @@ -238,6 +268,15 @@ class AnonExpectedStateManager : public ExpectedStateManager {
return Status::NotSupported();
}

// See `ExpectedStateManager::HasHistory()` API doc.
bool HasHistory() override { return false; }

// See `ExpectedStateManager::Restore()` API doc.
//
// This implementation returns `Status::NotSupported` since we do not
// currently have a need to keep history of expected state within a process.
Status Restore(DB* /* db */) override { return Status::NotSupported(); }

// Requires external locking preventing concurrent execution with any other
// member function.
Status Open() override;
Expand Down