Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mv level work3 ... change from 3 overlapped levels to 2 #84

Merged
merged 5 commits into from Jun 26, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
112 changes: 86 additions & 26 deletions db/db_impl.cc
Expand Up @@ -369,7 +369,7 @@ Status DBImpl::Recover(VersionEdit* edit) {
// read manifest // read manifest
s = versions_->Recover(); s = versions_->Recover();


// Verify 2.0 directory structure created and ready // Verify Riak 1.3 directory structure created and ready
if (s.ok() && !TestForLevelDirectories(env_, dbname_, versions_->current())) if (s.ok() && !TestForLevelDirectories(env_, dbname_, versions_->current()))
{ {
int level; int level;
Expand Down Expand Up @@ -460,6 +460,60 @@ Status DBImpl::Recover(VersionEdit* edit) {
return s; return s;
} }



void DBImpl::CheckCompactionState()
{
mutex_.AssertHeld();
bool log_flag, need_compaction;

// Verify Riak 1.4 level sizing, run compactions to fix as necessary
// (also recompacts hard repair of all files to level 0)

log_flag=false;
need_compaction=false;

// loop on pending background compactions
// reminder: mutex_ is held
do
{
int level;

// wait out executing compaction (Wait gives mutex to compactions)
if (bg_compaction_scheduled_)
bg_cv_.Wait();

for (level=0, need_compaction=false;
level<config::kNumLevels && !need_compaction;
++level)
{
if (versions_->IsLevelOverlapped(level)
&& config::kL0_SlowdownWritesTrigger<=versions_->NumLevelFiles(level))
{
need_compaction=true;
MaybeScheduleCompaction();
if (!log_flag)
{
log_flag=true;
Log(options_.info_log, "Cleanup compactions started ... DB::Open paused");
} // if
} //if
} // for

} while(bg_compaction_scheduled_ && need_compaction);

if (log_flag)
Log(options_.info_log, "Cleanup compactions completed ... DB::Open continuing");

// prior code only called this function instead of CheckCompactionState
// (duplicates original Google functionality)
else
MaybeScheduleCompaction();

return;

} // DBImpl::CheckCompactionState()


Status DBImpl::RecoverLogFile(uint64_t log_number, Status DBImpl::RecoverLogFile(uint64_t log_number,
VersionEdit* edit, VersionEdit* edit,
SequenceNumber* max_sequence) { SequenceNumber* max_sequence) {
Expand Down Expand Up @@ -1083,8 +1137,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {


for (; input->Valid() && !shutting_down_.Acquire_Load(); ) for (; input->Valid() && !shutting_down_.Acquire_Load(); )
{ {
// Prioritize immutable compaction work // Prioritize compaction work ... every 100 keys
imm_micros+=PrioritizeWork(is_level0_compaction); if (NULL==compact->builder
|| 0==(compact->builder->NumEntries() % 100))
imm_micros+=PrioritizeWork(is_level0_compaction);


Slice key = input->key(); Slice key = input->key();
if (compact->builder != NULL if (compact->builder != NULL
Expand Down Expand Up @@ -1129,7 +1185,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
{ {
// raise this compaction's priority if it is blocking a // raise this compaction's priority if it is blocking a
// dire compaction of level 0 files // dire compaction of level 0 files
if (config::kL0_SlowdownWritesTrigger < versions_->current()->NumFiles(0)) if ((int)config::kL0_SlowdownWritesTrigger < versions_->current()->NumFiles(0))
{ {
pthread_rwlock_rdlock(&gThreadLock1); pthread_rwlock_rdlock(&gThreadLock1);
is_level0_compaction=true; is_level0_compaction=true;
Expand Down Expand Up @@ -1426,39 +1482,27 @@ Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
} }


Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
int throttle; Status status;

int throttle(0);
// protect use of versions_ ... apply lock
mutex_.Lock();
throttle=versions_->WriteThrottleUsec(bg_compaction_scheduled_);
mutex_.Unlock();
if (0!=throttle)
{
/// slowing each call down sequentially
MutexLock l(&throttle_mutex_);

// throttle is per key write, how many in batch?
// (batch multiplier killed AAE, removed)
env_->SleepForMicroseconds(throttle /* * WriteBatchInternal::Count(my_batch)*/);
gPerfCounters->Add(ePerfDebug0, throttle);
} // if


Writer w(&mutex_); Writer w(&mutex_);
w.batch = my_batch; w.batch = my_batch;
w.sync = options.sync; w.sync = options.sync;
w.done = false; w.done = false;


{ // place mutex_ within a block
// not changing tabs to ease compare to Google sources
MutexLock l(&mutex_); MutexLock l(&mutex_);
writers_.push_back(&w); writers_.push_back(&w);
while (!w.done && &w != writers_.front()) { while (!w.done && &w != writers_.front()) {
w.cv.Wait(); w.cv.Wait();
} }
if (w.done) { if (w.done) {
return w.status; return w.status; // skips throttle ... maintenance unfriendly coding, bastards
} }


// May temporarily unlock and wait. // May temporarily unlock and wait.
Status status = MakeRoomForWrite(my_batch == NULL); status = MakeRoomForWrite(my_batch == NULL);
uint64_t last_sequence = versions_->LastSequence(); uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w; Writer* last_writer = &w;
if (status.ok() && my_batch != NULL) { // NULL batch is for compactions if (status.ok() && my_batch != NULL) { // NULL batch is for compactions
Expand Down Expand Up @@ -1504,6 +1548,22 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {


gPerfCounters->Inc(ePerfApiWrite); gPerfCounters->Inc(ePerfApiWrite);


// protect use of versions_ ... still within scope of mutex_ lock
throttle=versions_->WriteThrottleUsec(bg_compaction_scheduled_);
} // release MutexLock l(&mutex_)

// throttle on exit to reduce possible reordering
if (0!=throttle)
{
/// slowing each call down sequentially
MutexLock l(&throttle_mutex_);

// throttle is per key write, how many in batch?
// (batch multiplier killed AAE, removed)
env_->SleepForMicroseconds(throttle /* * WriteBatchInternal::Count(my_batch)*/);
gPerfCounters->Add(ePerfDebug0, throttle);
} // if

return status; return status;
} }


Expand Down Expand Up @@ -1565,7 +1625,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
Status s; Status s;


// hint to background compaction. // hint to background compaction.
level0_good=(versions_->NumLevelFiles(0) < config::kL0_CompactionTrigger); level0_good=(versions_->NumLevelFiles(0) < (int)config::kL0_CompactionTrigger);


while (true) { while (true) {
if (!bg_error_.ok()) { if (!bg_error_.ok()) {
Expand All @@ -1575,7 +1635,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
break; break;
} else if ( } else if (
allow_delay && allow_delay &&
versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) { versions_->NumLevelFiles(0) >= (int)config::kL0_SlowdownWritesTrigger) {
// We are getting close to hitting a hard limit on the number of // We are getting close to hitting a hard limit on the number of
// L0 files. Rather than delaying a single write by several // L0 files. Rather than delaying a single write by several
// seconds when we hit the hard limit, start delaying each // seconds when we hit the hard limit, start delaying each
Expand Down Expand Up @@ -1653,7 +1713,7 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
return false; return false;
} else { } else {
char buf[100]; char buf[100];
snprintf(buf, sizeof(buf), "%d", snprintf(buf, sizeof(buf), "%zd",
versions_->NumLevelFiles(static_cast<int>(level))); versions_->NumLevelFiles(static_cast<int>(level)));
*value = buf; *value = buf;
return true; return true;
Expand Down Expand Up @@ -1772,7 +1832,7 @@ Status DB::Open(const Options& options, const std::string& dbname,
} }
if (s.ok()) { if (s.ok()) {
impl->DeleteObsoleteFiles(); impl->DeleteObsoleteFiles();
impl->MaybeScheduleCompaction(); impl->CheckCompactionState();
} }
} }
impl->mutex_.Unlock(); impl->mutex_.Unlock();
Expand Down
5 changes: 5 additions & 0 deletions db/db_impl.h
Expand Up @@ -77,6 +77,11 @@ class DBImpl : public DB {
// be made to the descriptor are added to *edit. // be made to the descriptor are added to *edit.
Status Recover(VersionEdit* edit); Status Recover(VersionEdit* edit);


// Riak routine: pause DB::Open if too many compactions
// stacked up immediately. Happens in some repairs and
// some Riak upgrades
void CheckCompactionState();

void MaybeIgnoreError(Status* s) const; void MaybeIgnoreError(Status* s) const;


// Delete any unneeded files and stale in-memory entries. // Delete any unneeded files and stale in-memory entries.
Expand Down
4 changes: 2 additions & 2 deletions db/dbformat.h
Expand Up @@ -22,7 +22,7 @@ class Compaction;
// parameters set via options. // parameters set via options.
namespace config { namespace config {
static const int kNumLevels = 7; static const int kNumLevels = 7;
static const int kNumOverlapLevels = 3; static const int kNumOverlapLevels = 2;


// Level-0 compaction is started when we hit this many files. // Level-0 compaction is started when we hit this many files.
static const size_t kL0_CompactionTrigger = 4; static const size_t kL0_CompactionTrigger = 4;
Expand All @@ -31,7 +31,7 @@ static const size_t kL0_CompactionTrigger = 4;
static const size_t kL0_SlowdownWritesTrigger = 8; static const size_t kL0_SlowdownWritesTrigger = 8;


// Maximum number of level-0 files. We stop writes at this point. // Maximum number of level-0 files. We stop writes at this point.
static const int kL0_StopWritesTrigger = 12; static const size_t kL0_StopWritesTrigger = 12;


// Maximum level to which a new compacted memtable is pushed if it // Maximum level to which a new compacted memtable is pushed if it
// does not create overlap. We try to push to level 2 to avoid the // does not create overlap. We try to push to level 2 to avoid the
Expand Down
29 changes: 17 additions & 12 deletions db/version_set.cc
Expand Up @@ -54,12 +54,12 @@ static struct
// WARNING: m_OverlappedFiles flags need to match config::kNumOverlapFiles ... until unified // WARNING: m_OverlappedFiles flags need to match config::kNumOverlapFiles ... until unified
{ {
{10485760, 262144000, 57671680, 209715200, 0, 300000000, true}, {10485760, 262144000, 57671680, 209715200, 0, 300000000, true},
{10485760, 262144000, 57671680, 419430400, 0, 1500000000, true}, {10485760, 82914560, 57671680, 419430400, 0, 209715200, true},
{10485760, 262144000, 57671680, 4194304000, 0, 31457280, true}, {10485760, 104371840, 57671680, 1006632960, 200000000, 314572800, false},
{10485760, 125829120, 57671680, 1610612736, 30000000, 41943040, false}, {10485760, 125829120, 57671680, 4094304000, 3355443200, 419430400, false},
{10485760, 147286400, 57671680, 41943040000, 33554432000, 52428800, false}, {10485760, 147286400, 57671680, 41943040000, 33554432000, 524288000, false},
{10485760, 188743680, 57671680, 419430400000, 335544320000, 62914560, false}, {10485760, 188743680, 57671680, 419430400000, 335544320000, 629145600, false},
{10485760, 220200960, 57671680, 4194304000000, 3355443200000, 73400320, false} {10485760, 220200960, 57671680, 4194304000000, 3355443200000, 734003200, false}
}; };




Expand Down Expand Up @@ -1087,10 +1087,9 @@ void VersionSet::Finalize(Version* v) {
{ {
int loop, count, value; int loop, count, value;


count=(v->files_[level].size() - config::kL0_SlowdownWritesTrigger) +1; count=(v->files_[level].size() - config::kL0_SlowdownWritesTrigger);


// logarithmic throttle. 8 works against FusionIO, but 7 or 6 should be tested. for (loop=0, value=4; loop<count; ++loop)
for (loop=0, value=8; loop<count; ++loop)
value*=8; value*=8;


penalty+=value; penalty+=value;
Expand All @@ -1116,9 +1115,9 @@ void VersionSet::Finalize(Version* v) {


// first sort layer needs to clear before next dump of overlapped files. // first sort layer needs to clear before next dump of overlapped files.
else else
penalty_score = static_cast<double>(level_bytes) / gLevelTraits[level].m_DesiredBytesForLevel; penalty_score = (1<(static_cast<double>(level_bytes) / gLevelTraits[level].m_DesiredBytesForLevel)? 1.0 : 0);


if (1.0<penalty_score) if (1.0<=penalty_score)
penalty+=(static_cast<int>(penalty_score)); penalty+=(static_cast<int>(penalty_score));
} }


Expand Down Expand Up @@ -1168,12 +1167,18 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
return log->AddRecord(record); return log->AddRecord(record);
} }


int VersionSet::NumLevelFiles(int level) const { size_t VersionSet::NumLevelFiles(int level) const {
assert(level >= 0); assert(level >= 0);
assert(level < config::kNumLevels); assert(level < config::kNumLevels);
return current_->files_[level].size(); return current_->files_[level].size();
} }


bool VersionSet::IsLevelOverlapped(int level) const {
assert(level >= 0);
assert(level < config::kNumLevels);
return(gLevelTraits[level].m_OverlappedFiles);
}

const char* VersionSet::LevelSummary(LevelSummaryStorage* scratch) const { const char* VersionSet::LevelSummary(LevelSummaryStorage* scratch) const {
// Update code if kNumLevels changes // Update code if kNumLevels changes
assert(config::kNumLevels == 7); assert(config::kNumLevels == 7);
Expand Down
5 changes: 4 additions & 1 deletion db/version_set.h
Expand Up @@ -194,7 +194,10 @@ class VersionSet {
} }


// Return the number of Table files at the specified level. // Return the number of Table files at the specified level.
int NumLevelFiles(int level) const; size_t NumLevelFiles(int level) const;

// is the specified level overlapped (or if false->sorted)
bool IsLevelOverlapped(int level) const;


// Return the combined file size of all files at the specified level. // Return the combined file size of all files at the specified level.
int64_t NumLevelBytes(int level) const; int64_t NumLevelBytes(int level) const;
Expand Down