Skip to content
This repository has been archived by the owner on Jul 12, 2023. It is now read-only.

Commit

Permalink
Add an option to enable Pegasus features (#14)
Browse files Browse the repository at this point in the history
* Add an option to enable Pegasus feature
  • Loading branch information
acelyc111 committed Apr 18, 2019
1 parent 7d8817d commit 86d7fd6
Show file tree
Hide file tree
Showing 23 changed files with 122 additions and 70 deletions.
2 changes: 2 additions & 0 deletions db/db_filesnapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ Status DBImpl::GetLiveFilesQuick(std::vector<std::string>& ret,
uint64_t* manifest_file_size,
SequenceNumber* last_sequence,
uint64_t* last_decree) {
// ATTENTION(laiyingchun): only used for Pegasus.
assert(pegasus_data_);
*manifest_file_size = 0;
*last_sequence = 0;
*last_decree = 0;
Expand Down
3 changes: 2 additions & 1 deletion db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
concurrent_prepare_(options.concurrent_prepare),
manual_wal_flush_(options.manual_wal_flush),
seq_per_batch_(options.seq_per_batch),
pegasus_data_(options.pegasus_data),
// TODO(myabandeh): revise this when we change options.seq_per_batch
use_custom_gc_(options.seq_per_batch),
preserve_deletes_(options.preserve_deletes) {
Expand Down Expand Up @@ -756,7 +757,7 @@ uint64_t DBImpl::GetLastFlushedDecree() {

mutex_.Lock();
// ATTENTION(qinzuoyan): only use default column family.
assert(versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1u);
assert(!pegasus_data_ || versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1u);
versions_->GetColumnFamilySet()->GetDefault()->current()->GetLastFlushSeqDecree(&seq, &d);
mutex_.Unlock();

Expand Down
1 change: 1 addition & 0 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1334,6 +1334,7 @@ class DBImpl : public DB {
const bool concurrent_prepare_;
const bool manual_wal_flush_;
const bool seq_per_batch_;
const bool pegasus_data_;
const bool use_custom_gc_;

// Clients must periodically call SetPreserveDeletesSequenceNumber()
Expand Down
4 changes: 3 additions & 1 deletion db/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -987,7 +987,9 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
return Status::OK();
}

if (!cfd->mem()->IsEmpty()) {
// ATTENTION(laiyingchun): An optimization to avoid switching empty memtable
// for Pegasus(single CF).
if (!pegasus_data_ || !cfd->mem()->IsEmpty()) {
WriteThread::Writer w;
if (!writes_stopped) {
write_thread_.EnterUnbatched(&w, &mutex_);
Expand Down
18 changes: 9 additions & 9 deletions db/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,

// ATTENTION(qinzuoyan): always only use default column family under
// replication framework.
//assert(single_column_family_mode_);
assert(!pegasus_data_ || single_column_family_mode_);

Status status;
if (write_options.low_pri) {
Expand All @@ -92,14 +92,14 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
}

// ATTENTION(laiyingchun): disable_memtable is always false in pegasus
assert(!disable_memtable);
assert(!pegasus_data_ || !disable_memtable);
if (concurrent_prepare_ && disable_memtable) {
return WriteImplWALOnly(write_options, my_batch, callback, log_used,
log_ref, seq_used);
}

// ATTENTION(laiyingchun): enable_pipelined_write is always false in pegasus
assert(!immutable_db_options_.enable_pipelined_write);
assert(!pegasus_data_ || !immutable_db_options_.enable_pipelined_write);
if (immutable_db_options_.enable_pipelined_write) {
return PipelinedWriteImpl(write_options, my_batch, callback, log_used,
log_ref, disable_memtable, seq_used);
Expand All @@ -119,7 +119,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// ATTENTION(qinzuoyan): because write is always applied in single thread
// under replication framework, so we must be the only write batch and
// must be STATE_GROUP_LEADER.
assert(w.state == WriteThread::STATE_GROUP_LEADER);
assert(!pegasus_data_ || w.state == WriteThread::STATE_GROUP_LEADER);
if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
// we are a non-leader in a parallel group
PERF_TIMER_GUARD(write_memtable_time);
Expand Down Expand Up @@ -195,7 +195,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
if (status.ok()) {
// ATTENTION(qinzuoyan): because write is always applied in single thread
// under replication framework, so we must be the only write batch.
assert(write_group.size == 1);
assert(!pegasus_data_ || write_group.size == 1);

// Rules for when we can update the memtable concurrently
// 1. supported by memtable
Expand Down Expand Up @@ -225,9 +225,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
}

// ATTENTION(qinzuoyan): under replication framework, batch should not be empty.
assert(total_count > 0);
assert(!pegasus_data_ || total_count > 0);
// ATTENTION(laiyingchun): seq_per_batch_ should always be false as default value.
assert(!seq_per_batch_);
assert(!pegasus_data_ || !seq_per_batch_);
size_t seq_inc = seq_per_batch_ ? write_group.size : total_count;

const bool concurrent_update = concurrent_prepare_;
Expand Down Expand Up @@ -286,13 +286,13 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
PERF_TIMER_GUARD(write_memtable_time);

// ATTENTION(laiyingchun): parallel should always be false because write_group.size == 1.
assert(!parallel);
assert(!pegasus_data_ || !parallel);
if (!parallel) {
w.status = WriteBatchInternal::InsertInto(
write_group, current_sequence, column_family_memtables_.get(),
&flush_scheduler_, write_options.ignore_missing_column_families,
0 /*recovery_log_number*/, this, parallel, seq_per_batch_,
write_options.given_decree);
write_options.given_decree, pegasus_data_);
} else {
SequenceNumber next_sequence = current_sequence;
for (auto* writer : write_group) {
Expand Down
3 changes: 2 additions & 1 deletion db/db_test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ DBTestBase::DBTestBase(const std::string path)
option_config_(kDefault) {
env_->SetBackgroundThreads(1, Env::LOW);
env_->SetBackgroundThreads(1, Env::HIGH);
dbname_ = test::TmpDir(env_) + path;
size_t tid = std::hash<std::thread::id>()(std::this_thread::get_id());
dbname_ = test::TmpDir(env_) + path + "_" + std::to_string(tid);
alternative_wal_dir_ = dbname_ + "/wal";
alternative_db_log_dir_ = dbname_ + "/db_log_dir";
auto options = CurrentOptions();
Expand Down
2 changes: 1 addition & 1 deletion db/db_universal_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,7 @@ TEST_P(DBTestUniversalCompaction, UniversalCompactionCompressRatio2) {
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
}
ASSERT_LT(TotalSize(), 120000U * 12 * 0.8 + 120000 * 2);
ASSERT_LT(TotalSize(), 120000U * 12 * 0.82 + 120000 * 2);
}

// Test that checks trivial move in universal compaction
Expand Down
18 changes: 10 additions & 8 deletions db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,16 @@ void FlushJob::PickMemTable() {
// will no longer be picked up for recovery.
edit_->SetLogNumber(mems_.back()->GetNextLogNumber());
edit_->SetColumnFamily(cfd_->GetID());
// Mark the last sequence/decree of all memtables to be flushed. Although
// entries mems are sorted in ascending order by their created, we should
// iterate all mems but not take the last one because memtable may be empty.
for (auto mem : mems_) {
SequenceNumber seq;
uint64_t d;
mem->GetLastSeqDecree(&seq, &d);
edit_->UpdateLastFlushSeqDecree(seq, d);
if (db_options_.pegasus_data) {
// Mark the last sequence/decree of all memtables to be flushed. Although
// entries mems are sorted in ascending order by their created, we should
// iterate all mems but not take the last one because memtable may be empty.
for (auto mem : mems_) {
SequenceNumber seq;
uint64_t d;
mem->GetLastSeqDecree(&seq, &d);
edit_->UpdateLastFlushSeqDecree(seq, d);
}
}

// path 0 for level 0 file.
Expand Down
80 changes: 47 additions & 33 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2472,12 +2472,14 @@ void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
assert(v != current);
if (current != nullptr) {
assert(current->refs_ > 0);
// inherit last sequence/decree from old version, but for flush the old
// value would not take effect.
SequenceNumber seq;
uint64_t d;
current->GetLastFlushSeqDecree(&seq, &d);
v->UpdateLastFlushSeqDecree(seq, d);
if (db_options_->pegasus_data) {
// inherit last sequence/decree from old version, but for flush the old
// value would not take effect.
SequenceNumber seq;
uint64_t d;
current->GetLastFlushSeqDecree(&seq, &d);
v->UpdateLastFlushSeqDecree(seq, d);
}
current->Unref();
}
column_family_data->SetCurrent(v);
Expand Down Expand Up @@ -2718,11 +2720,13 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
max_log_number_in_batch =
std::max(max_log_number_in_batch, e->log_number_);
}
// update last flush sequence/decree from VersionEdit
SequenceNumber seq;
uint64_t d;
e->GetLastFlushSeqDecree(&seq, &d);
v->UpdateLastFlushSeqDecree(seq, d);
if (db_options_->pegasus_data) {
// update last flush sequence/decree from VersionEdit
SequenceNumber seq;
uint64_t d;
e->GetLastFlushSeqDecree(&seq, &d);
v->UpdateLastFlushSeqDecree(seq, d);
}
}
if (max_log_number_in_batch != 0) {
assert(column_family_data->GetLogNumber() <= max_log_number_in_batch);
Expand Down Expand Up @@ -2785,10 +2789,13 @@ void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) {
edit->SetLastSequence(db_options_->concurrent_prepare
? last_to_be_written_sequence_
: last_sequence_);
SequenceNumber seq;
uint64_t d;
column_family_set_->GetDefault()->current()->GetLastFlushSeqDecree(&seq, &d);
edit->UpdateLastFlushSeqDecree(seq, d);
if (db_options_->pegasus_data) {
assert(column_family_set_->NumberOfColumnFamilies() == 1u);
SequenceNumber seq;
uint64_t d;
column_family_set_->GetDefault()->current()->GetLastFlushSeqDecree(&seq, &d);
edit->UpdateLastFlushSeqDecree(seq, d);
}
if (edit->is_column_family_drop_) {
// if we drop column family, we have to make sure to save max column family,
// so that we don't reuse existing ID
Expand Down Expand Up @@ -2819,10 +2826,13 @@ void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
edit->SetLastSequence(db_options_->concurrent_prepare
? last_to_be_written_sequence_
: last_sequence_);
SequenceNumber seq;
uint64_t d;
column_family_set_->GetDefault()->current()->GetLastFlushSeqDecree(&seq, &d);
edit->UpdateLastFlushSeqDecree(seq, d);
if (db_options_->pegasus_data) {
assert(column_family_set_->NumberOfColumnFamilies() == 1u);
SequenceNumber seq;
uint64_t d;
column_family_set_->GetDefault()->current()->GetLastFlushSeqDecree(&seq, &d);
edit->UpdateLastFlushSeqDecree(seq, d);
}

uint64_t ms = column_family_set_->GetLastManualCompactFinishTime();
edit->SetLastManualCompactFinishTime(ms);
Expand Down Expand Up @@ -3072,11 +3082,12 @@ Status VersionSet::Recover(
} else if (!have_last_sequence) {
s = Status::Corruption("no last-sequence-number entry in descriptor");
} else if (!have_value_schema_version) {
s = Status::Corruption("no value-schema-version entry in descriptor");
if (db_options_->pegasus_data) {
s = Status::Corruption("no value_schema_version entry in descriptor");
}
} else if (!have_last_manual_compact_finish_time) {
ROCKS_LOG_WARN(db_options_->info_log,
"no last-manual-compact-finish-time entry in descriptor,"
" it can be only occurred when update from a lower version");
"no last-manual-compact-finish-time entry in descriptor");
}

if (!have_prev_log_number) {
Expand Down Expand Up @@ -3139,9 +3150,11 @@ Status VersionSet::Recover(
new Version(cfd, this, env_options_, current_version_number_++);
builder->SaveTo(v->storage_info());

// update last flush sequence/decree
auto& p = last_flush_seq_decree_map[cfd->GetID()];
v->UpdateLastFlushSeqDecree(p.first, p.second);
if (db_options_->pegasus_data) {
// update last flush sequence/decree
auto &p = last_flush_seq_decree_map[cfd->GetID()];
v->UpdateLastFlushSeqDecree(p.first, p.second);
}

// Install recovered version
v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
Expand All @@ -3159,25 +3172,26 @@ Status VersionSet::Recover(
db_options_->info_log,
"Recovered from manifest file:%s succeeded,"
"manifest_file_number is %lu, next_file_number is %lu, "
"last_sequence is %lu, last_flush_sequence is %lu, log_number is %lu,"
"last_sequence is %lu, log_number is %lu,"
"prev_log_number is %lu,"
"max_column_family is %u,"
"value_schema_version is %u,"
"last_manual_compact_finish_time is %lu\n",
manifest_filename.c_str(), (unsigned long)manifest_file_number_,
(unsigned long)next_file_number_.load(), (unsigned long)last_sequence_,
(unsigned long)LastFlushSequence(),
(unsigned long)log_number, (unsigned long)prev_log_number_,
column_family_set_->GetMaxColumnFamily(),
column_family_set_->GetValueSchemaVersion(),
column_family_set_->GetLastManualCompactFinishTime());

// for pegasus, we have disabled WAL, so we need to reset last_sequence to
// last_flush_sequence
last_sequence_ = LastFlushSequence();
ROCKS_LOG_INFO(db_options_->info_log,
"Reset last_sequence to last_flush_sequence: %lu",
(unsigned long)last_sequence_);
if (db_options_->pegasus_data) {
// For Pegasus, we have disabled WAL, so we need to reset last_sequence to
// last_flush_sequence.
last_sequence_ = LastFlushSequence();
ROCKS_LOG_INFO(db_options_->info_log,
"Reset last_sequence to last_flush_sequence: %lu",
(unsigned long)last_sequence_);
}

for (auto cfd : *column_family_set_) {
if (cfd->IsDropped()) {
Expand Down
2 changes: 2 additions & 0 deletions db/version_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,8 @@ class VersionSet {

// Return the last flush sequence number of default column family.
uint64_t LastFlushSequence() {
assert(db_options_->pegasus_data);
assert(column_family_set_->NumberOfColumnFamilies() == 1u);
SequenceNumber seq;
uint64_t d;
column_family_set_->GetDefault()->current()->GetLastFlushSeqDecree(&seq, &d);
Expand Down
25 changes: 17 additions & 8 deletions db/write_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,7 @@ class MemTableInserter : public WriteBatch::Handler {
bool seq_per_batch_;
// Whether the memtable write will be done only after the commit
bool write_after_commit_;
bool pegasus_data_;

MemPostInfoMap& GetPostMap() {
assert(concurrent_memtable_writes_);
Expand All @@ -929,7 +930,8 @@ class MemTableInserter : public WriteBatch::Handler {
uint64_t recovering_log_number, DB* db,
bool concurrent_memtable_writes,
uint64_t decree,
bool* has_valid_writes = nullptr, bool seq_per_batch = false)
bool* has_valid_writes = nullptr, bool seq_per_batch = false,
bool pegasus_data = false)
: sequence_(_sequence),
cf_mems_(cf_mems),
decree_(decree),
Expand All @@ -946,7 +948,8 @@ class MemTableInserter : public WriteBatch::Handler {
// Write after commit currently uses one seq per key (instead of per
// batch). So seq_per_batch being false indicates write_after_commit
// approach.
write_after_commit_(!seq_per_batch) {
write_after_commit_(!seq_per_batch),
pegasus_data_(pegasus_data) {
assert(cf_mems_);
}

Expand Down Expand Up @@ -1089,7 +1092,9 @@ class MemTableInserter : public WriteBatch::Handler {
// Since all Puts are logged in trasaction logs (if enabled), always bump
// sequence number. Even if the update eventually fails and does not result
// in memtable add/update.
mem->UpdateLastSeqDecree(sequence_, decree_);
if (pegasus_data_) {
mem->UpdateLastSeqDecree(sequence_, decree_);
}
MaybeAdvanceSeq();
CheckMemtableFull();
return Status::OK();
Expand All @@ -1105,7 +1110,9 @@ class MemTableInserter : public WriteBatch::Handler {
MemTable* mem = cf_mems_->GetMemTable();
mem->Add(sequence_, delete_type, key, value, concurrent_memtable_writes_,
get_post_process_info(mem));
mem->UpdateLastSeqDecree(sequence_, decree_);
if (pegasus_data_) {
mem->UpdateLastSeqDecree(sequence_, decree_);
}
MaybeAdvanceSeq();
CheckMemtableFull();
return Status::OK();
Expand Down Expand Up @@ -1263,8 +1270,9 @@ class MemTableInserter : public WriteBatch::Handler {
// Add merge operator to memtable
mem->Add(sequence_, kTypeMerge, key, value);
}

mem->UpdateLastSeqDecree(sequence_, decree_);
if (pegasus_data_) {
mem->UpdateLastSeqDecree(sequence_, decree_);
}
MaybeAdvanceSeq();
CheckMemtableFull();
return Status::OK();
Expand Down Expand Up @@ -1428,11 +1436,12 @@ Status WriteBatchInternal::InsertInto(
ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
bool ignore_missing_column_families, uint64_t recovery_log_number, DB* db,
bool concurrent_memtable_writes, bool seq_per_batch,
uint64_t decree) {
uint64_t decree, bool pegasus_data) {
MemTableInserter inserter(sequence, memtables, flush_scheduler,
ignore_missing_column_families, recovery_log_number,
db, concurrent_memtable_writes, decree,
nullptr /*has_valid_writes*/, seq_per_batch);
nullptr /*has_valid_writes*/, seq_per_batch,
pegasus_data);
for (auto w : write_group) {
if (!w->ShouldWriteToMemtable()) {
w->sequence = inserter.sequence();
Expand Down
3 changes: 2 additions & 1 deletion db/write_batch_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ class WriteBatchInternal {
uint64_t log_number = 0, DB* db = nullptr,
bool concurrent_memtable_writes = false,
bool seq_per_batch = false,
uint64_t decree = 0);
uint64_t decree = 0,
bool pegasus_data = false);

// Convenience form of InsertInto when you have only one batch
// next_seq returns the seq after last sequence number used in MemTable insert
Expand Down
Loading

0 comments on commit 86d7fd6

Please sign in to comment.