Permalink
Browse files

Add routine to pause DBImpl::Recover while new overlapped areas are c…

…leared via compactions (Riak 1.3 to 1.4 conversion). Also cleaned some int versus size_t compiler warnings.
  • Loading branch information...
1 parent 74d5f82 commit 9a8d9a011f7387ead3afc5227c46bf7ce403ad1d Matthew V committed Jun 24, 2013
Showing with 57 additions and 5 deletions.
  1. +45 −2 db/db_impl.cc
  2. +1 −1 db/dbformat.h
  3. +7 −1 db/version_set.cc
  4. +4 −1 db/version_set.h
View
@@ -369,7 +369,7 @@ Status DBImpl::Recover(VersionEdit* edit) {
// read manifest
s = versions_->Recover();
- // Verify 2.0 directory structure created and ready
+ // Verify Riak 1.3 directory structure created and ready
if (s.ok() && !TestForLevelDirectories(env_, dbname_, versions_->current()))
{
int level;
@@ -457,6 +457,49 @@ Status DBImpl::Recover(VersionEdit* edit) {
}
}
+ // Verify Riak 1.4 level sizing, run compactions to fix as necessary
+ // (also recompacts hard repair of all files to level 0)
+ if (s.ok())
+ {
+ bool log_flag, need_compaction;
+
+ log_flag=false;
+ need_compaction=false;
+
+ // loop on pending background compactions
+ // reminder: mutex_ is held
+ do
+ {
+ int level;
+
+ // wait out executing compaction (Wait gives mutex to compactions)
+ if (bg_compaction_scheduled_)
+ bg_cv_.Wait();
+
+ for (level=0, need_compaction=false;
+ level<config::kNumLevels && !need_compaction;
+ ++level)
+ {
+ if (versions_->IsLevelOverlapped(level)
+ && config::kL0_SlowdownWritesTrigger<=versions_->NumLevelFiles(level))
+ {
+ need_compaction=true;
+ MaybeScheduleCompaction();
+ if (!log_flag)
+ {
+ log_flag=true;
+ Log(options_.info_log, "Cleanup compactions started ... DB::Open paused");
+ } // if
+ } //if
+ } // for
+
+ } while(bg_compaction_scheduled_ && need_compaction);
+
+ if (log_flag)
+ Log(options_.info_log, "Cleanup compactions completed ... DB::Open continuing");
+
+ } // if
+
return s;
}
@@ -1653,7 +1696,7 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
return false;
} else {
char buf[100];
- snprintf(buf, sizeof(buf), "%d",
+ snprintf(buf, sizeof(buf), "%zd",
versions_->NumLevelFiles(static_cast<int>(level)));
*value = buf;
return true;
View
@@ -31,7 +31,7 @@ static const size_t kL0_CompactionTrigger = 4;
static const size_t kL0_SlowdownWritesTrigger = 8;
// Maximum number of level-0 files. We stop writes at this point.
-static const int kL0_StopWritesTrigger = 12;
+static const size_t kL0_StopWritesTrigger = 12;
// Maximum level to which a new compacted memtable is pushed if it
// does not create overlap. We try to push to level 2 to avoid the
View
@@ -1167,12 +1167,18 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
return log->AddRecord(record);
}
-int VersionSet::NumLevelFiles(int level) const {
+size_t VersionSet::NumLevelFiles(int level) const {
assert(level >= 0);
assert(level < config::kNumLevels);
return current_->files_[level].size();
}
+bool VersionSet::IsLevelOverlapped(int level) const {
+ assert(level >= 0);
+ assert(level < config::kNumLevels);
+ return(gLevelTraits[level].m_OverlappedFiles);
+}
+
const char* VersionSet::LevelSummary(LevelSummaryStorage* scratch) const {
// Update code if kNumLevels changes
assert(config::kNumLevels == 7);
View
@@ -194,7 +194,10 @@ class VersionSet {
}
// Return the number of Table files at the specified level.
- int NumLevelFiles(int level) const;
+ size_t NumLevelFiles(int level) const;
+
+ // is the specified level overlapped (or if false->sorted)
+ bool IsLevelOverlapped(int level) const;
// Return the combined file size of all files at the specified level.
int64_t NumLevelBytes(int level) const;

0 comments on commit 9a8d9a0

Please sign in to comment.