Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge pull request #84 from basho/mv-level-work3

  • Loading branch information...
commit 31fa6dd2ad010a9f5428abe3c4c9e9f7125294a1 2 parents 66841e9 + 206b277
@jtuple jtuple authored
View
112 db/db_impl.cc
@@ -369,7 +369,7 @@ Status DBImpl::Recover(VersionEdit* edit) {
// read manifest
s = versions_->Recover();
- // Verify 2.0 directory structure created and ready
+ // Verify Riak 1.3 directory structure created and ready
if (s.ok() && !TestForLevelDirectories(env_, dbname_, versions_->current()))
{
int level;
@@ -460,6 +460,60 @@ Status DBImpl::Recover(VersionEdit* edit) {
return s;
}
+
+void DBImpl::CheckCompactionState()
+{
+ mutex_.AssertHeld();
+ bool log_flag, need_compaction;
+
+ // Verify Riak 1.4 level sizing, run compactions to fix as necessary
+ // (also recompacts hard repair of all files to level 0)
+
+ log_flag=false;
+ need_compaction=false;
+
+ // loop on pending background compactions
+ // reminder: mutex_ is held
+ do
+ {
+ int level;
+
+ // wait out executing compaction (Wait gives mutex to compactions)
+ if (bg_compaction_scheduled_)
+ bg_cv_.Wait();
+
+ for (level=0, need_compaction=false;
+ level<config::kNumLevels && !need_compaction;
+ ++level)
+ {
+ if (versions_->IsLevelOverlapped(level)
+ && config::kL0_SlowdownWritesTrigger<=versions_->NumLevelFiles(level))
+ {
+ need_compaction=true;
+ MaybeScheduleCompaction();
+ if (!log_flag)
+ {
+ log_flag=true;
+ Log(options_.info_log, "Cleanup compactions started ... DB::Open paused");
+ } // if
+ } //if
+ } // for
+
+ } while(bg_compaction_scheduled_ && need_compaction);
+
+ if (log_flag)
+ Log(options_.info_log, "Cleanup compactions completed ... DB::Open continuing");
+
+ // prior code only called this function instead of CheckCompactionState
+ // (duplicates original Google functionality)
+ else
+ MaybeScheduleCompaction();
+
+ return;
+
+} // DBImpl::CheckCompactionState()
+
+
Status DBImpl::RecoverLogFile(uint64_t log_number,
VersionEdit* edit,
SequenceNumber* max_sequence) {
@@ -1083,8 +1137,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
for (; input->Valid() && !shutting_down_.Acquire_Load(); )
{
- // Prioritize immutable compaction work
- imm_micros+=PrioritizeWork(is_level0_compaction);
+ // Prioritize compaction work ... every 100 keys
+ if (NULL==compact->builder
+ || 0==(compact->builder->NumEntries() % 100))
+ imm_micros+=PrioritizeWork(is_level0_compaction);
Slice key = input->key();
if (compact->builder != NULL
@@ -1129,7 +1185,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
{
// raise this compaction's priority if it is blocking a
// dire compaction of level 0 files
- if (config::kL0_SlowdownWritesTrigger < versions_->current()->NumFiles(0))
+ if ((int)config::kL0_SlowdownWritesTrigger < versions_->current()->NumFiles(0))
{
pthread_rwlock_rdlock(&gThreadLock1);
is_level0_compaction=true;
@@ -1426,39 +1482,27 @@ Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
}
Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
- int throttle;
-
- // protect use of versions_ ... apply lock
- mutex_.Lock();
- throttle=versions_->WriteThrottleUsec(bg_compaction_scheduled_);
- mutex_.Unlock();
- if (0!=throttle)
- {
- /// slowing each call down sequentially
- MutexLock l(&throttle_mutex_);
-
- // throttle is per key write, how many in batch?
- // (batch multiplier killed AAE, removed)
- env_->SleepForMicroseconds(throttle /* * WriteBatchInternal::Count(my_batch)*/);
- gPerfCounters->Add(ePerfDebug0, throttle);
- } // if
+ Status status;
+ int throttle(0);
Writer w(&mutex_);
w.batch = my_batch;
w.sync = options.sync;
w.done = false;
+ { // place mutex_ within a block
+ // not changing tabs to ease compare to Google sources
MutexLock l(&mutex_);
writers_.push_back(&w);
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
}
if (w.done) {
- return w.status;
+ return w.status; // skips throttle ... maintenance unfriendly coding, bastards
}
// May temporarily unlock and wait.
- Status status = MakeRoomForWrite(my_batch == NULL);
+ status = MakeRoomForWrite(my_batch == NULL);
uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w;
if (status.ok() && my_batch != NULL) { // NULL batch is for compactions
@@ -1504,6 +1548,22 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
gPerfCounters->Inc(ePerfApiWrite);
+ // protect use of versions_ ... still within scope of mutex_ lock
+ throttle=versions_->WriteThrottleUsec(bg_compaction_scheduled_);
+ } // release MutexLock l(&mutex_)
+
+ // throttle on exit to reduce possible reordering
+ if (0!=throttle)
+ {
+ /// slowing each call down sequentially
+ MutexLock l(&throttle_mutex_);
+
+ // throttle is per key write, how many in batch?
+ // (batch multiplier killed AAE, removed)
+ env_->SleepForMicroseconds(throttle /* * WriteBatchInternal::Count(my_batch)*/);
+ gPerfCounters->Add(ePerfDebug0, throttle);
+ } // if
+
return status;
}
@@ -1565,7 +1625,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
Status s;
// hint to background compaction.
- level0_good=(versions_->NumLevelFiles(0) < config::kL0_CompactionTrigger);
+ level0_good=(versions_->NumLevelFiles(0) < (int)config::kL0_CompactionTrigger);
while (true) {
if (!bg_error_.ok()) {
@@ -1575,7 +1635,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
break;
} else if (
allow_delay &&
- versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) {
+ versions_->NumLevelFiles(0) >= (int)config::kL0_SlowdownWritesTrigger) {
// We are getting close to hitting a hard limit on the number of
// L0 files. Rather than delaying a single write by several
// seconds when we hit the hard limit, start delaying each
@@ -1653,7 +1713,7 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
return false;
} else {
char buf[100];
- snprintf(buf, sizeof(buf), "%d",
+ snprintf(buf, sizeof(buf), "%zd",
versions_->NumLevelFiles(static_cast<int>(level)));
*value = buf;
return true;
@@ -1772,7 +1832,7 @@ Status DB::Open(const Options& options, const std::string& dbname,
}
if (s.ok()) {
impl->DeleteObsoleteFiles();
- impl->MaybeScheduleCompaction();
+ impl->CheckCompactionState();
}
}
impl->mutex_.Unlock();
View
5 db/db_impl.h
@@ -77,6 +77,11 @@ class DBImpl : public DB {
// be made to the descriptor are added to *edit.
Status Recover(VersionEdit* edit);
+ // Riak routine: pause DB::Open if too many compactions
+ // stacked up immediately. Happens in some repairs and
+ // some Riak upgrades
+ void CheckCompactionState();
+
void MaybeIgnoreError(Status* s) const;
// Delete any unneeded files and stale in-memory entries.
View
4 db/dbformat.h
@@ -22,7 +22,7 @@ class Compaction;
// parameters set via options.
namespace config {
static const int kNumLevels = 7;
-static const int kNumOverlapLevels = 3;
+static const int kNumOverlapLevels = 2;
// Level-0 compaction is started when we hit this many files.
static const size_t kL0_CompactionTrigger = 4;
@@ -31,7 +31,7 @@ static const size_t kL0_CompactionTrigger = 4;
static const size_t kL0_SlowdownWritesTrigger = 8;
// Maximum number of level-0 files. We stop writes at this point.
-static const int kL0_StopWritesTrigger = 12;
+static const size_t kL0_StopWritesTrigger = 12;
// Maximum level to which a new compacted memtable is pushed if it
// does not create overlap. We try to push to level 2 to avoid the
View
29 db/version_set.cc
@@ -54,12 +54,12 @@ static struct
// WARNING: m_OverlappedFiles flags need to match config::kNumOverlapFiles ... until unified
{
{10485760, 262144000, 57671680, 209715200, 0, 300000000, true},
- {10485760, 262144000, 57671680, 419430400, 0, 1500000000, true},
- {10485760, 262144000, 57671680, 4194304000, 0, 31457280, true},
- {10485760, 125829120, 57671680, 1610612736, 30000000, 41943040, false},
- {10485760, 147286400, 57671680, 41943040000, 33554432000, 52428800, false},
- {10485760, 188743680, 57671680, 419430400000, 335544320000, 62914560, false},
- {10485760, 220200960, 57671680, 4194304000000, 3355443200000, 73400320, false}
+ {10485760, 82914560, 57671680, 419430400, 0, 209715200, true},
+ {10485760, 104371840, 57671680, 1006632960, 200000000, 314572800, false},
+ {10485760, 125829120, 57671680, 4094304000, 3355443200, 419430400, false},
+ {10485760, 147286400, 57671680, 41943040000, 33554432000, 524288000, false},
+ {10485760, 188743680, 57671680, 419430400000, 335544320000, 629145600, false},
+ {10485760, 220200960, 57671680, 4194304000000, 3355443200000, 734003200, false}
};
@@ -1087,10 +1087,9 @@ void VersionSet::Finalize(Version* v) {
{
int loop, count, value;
- count=(v->files_[level].size() - config::kL0_SlowdownWritesTrigger) +1;
+ count=(v->files_[level].size() - config::kL0_SlowdownWritesTrigger);
- // logarithmic throttle. 8 works against FusionIO, but 7 or 6 should be tested.
- for (loop=0, value=8; loop<count; ++loop)
+ for (loop=0, value=4; loop<count; ++loop)
value*=8;
penalty+=value;
@@ -1116,9 +1115,9 @@ void VersionSet::Finalize(Version* v) {
// first sort layer needs to clear before next dump of overlapped files.
else
- penalty_score = static_cast<double>(level_bytes) / gLevelTraits[level].m_DesiredBytesForLevel;
+ penalty_score = (1<(static_cast<double>(level_bytes) / gLevelTraits[level].m_DesiredBytesForLevel)? 1.0 : 0);
- if (1.0<penalty_score)
+ if (1.0<=penalty_score)
penalty+=(static_cast<int>(penalty_score));
}
@@ -1168,12 +1167,18 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
return log->AddRecord(record);
}
-int VersionSet::NumLevelFiles(int level) const {
+size_t VersionSet::NumLevelFiles(int level) const {
assert(level >= 0);
assert(level < config::kNumLevels);
return current_->files_[level].size();
}
+bool VersionSet::IsLevelOverlapped(int level) const {
+ assert(level >= 0);
+ assert(level < config::kNumLevels);
+ return(gLevelTraits[level].m_OverlappedFiles);
+}
+
const char* VersionSet::LevelSummary(LevelSummaryStorage* scratch) const {
// Update code if kNumLevels changes
assert(config::kNumLevels == 7);
View
5 db/version_set.h
@@ -194,7 +194,10 @@ class VersionSet {
}
// Return the number of Table files at the specified level.
- int NumLevelFiles(int level) const;
+ size_t NumLevelFiles(int level) const;
+
+ // is the specified level overlapped (or if false->sorted)
+ bool IsLevelOverlapped(int level) const;
// Return the combined file size of all files at the specified level.
int64_t NumLevelBytes(int level) const;
Please sign in to comment.
Something went wrong with that request. Please try again.