Permalink
Browse files

Added Compact() method so users can force a compaction process.

  • Loading branch information...
1 parent 5d25c17 commit c011ef9a599843ed7d3dc2d14842edb70877ec05 @goossaert committed Mar 18, 2015
@@ -263,7 +263,7 @@ Status Database::PutChunkValidSize(WriteOptions& write_options,
Status Database::Delete(WriteOptions& write_options,
- ByteArray& key) {
+ ByteArray& key) {
if (is_closed_) return Status::IOError("The database is not open");
log::trace("Database::Delete()", "[%s]", key.ToString().c_str());
Status s = se_->FileSystemStatus();
@@ -277,6 +277,13 @@ void Database::Flush() {
}
+void Database::Compact() {
+ wb_->Flush();
+ se_->FlushCurrentFileForForcedCompaction();
+ se_->Compact();
+}
+
+
Snapshot Database::NewSnapshot() {
if (is_closed_) return Snapshot();
log::trace("Database::NewSnapshot()", "start");
@@ -232,6 +232,7 @@ class Database: public KingDB {
}
virtual void Flush();
+ virtual void Compact();
private:
View
@@ -73,6 +73,7 @@ class KingDB {
virtual Status Open() = 0;
virtual void Close() = 0;
virtual void Flush() = 0;
+ virtual void Compact() = 0;
};
} // namespace kdb
@@ -111,6 +111,7 @@ class Snapshot: public KingDB {
}
virtual void Flush() {}
+ virtual void Compact() {}
private:
kdb::DatabaseOptions db_options_;
@@ -65,7 +65,10 @@ class HSTableManager {
prefix_(prefix),
prefix_compaction_(prefix_compaction),
dirpath_locks_(dirpath_locks),
- wait_until_can_open_new_files_(false) {
+ wait_until_can_open_new_files_(false),
+ fileid_(0),
+ sequence_fileid_(0),
+ sequence_timestamp_(0) {
log::trace("HSTableManager::HSTableManager()", "dbname:%s prefix:%s", dbname.c_str(), prefix.c_str());
dbname_ = dbname;
hash_ = MakeHash(db_options.hash);
@@ -125,6 +128,13 @@ class HSTableManager {
log::trace("HSTableManager::SetSequenceFileId", "seq:%u", seq);
}
+ uint32_t GetSequenceFileIdForStableId() {
+ std::unique_lock<std::mutex> lock(mutex_sequence_fileid_);
+ uint64_t fileid = sequence_fileid_;
+ if (!has_file_) fileid += 1;
+ return fileid;
+ }
+
uint32_t GetSequenceFileId() {
std::unique_lock<std::mutex> lock(mutex_sequence_fileid_);
return sequence_fileid_;
@@ -179,7 +189,7 @@ class HSTableManager {
// should only be computing the highest stable file id, and not do anything
// else than that. I took this implementation shortcut to get the first beta
// version out asap, this needs to be cleaned up at some point.
- uint32_t fileid_max = GetSequenceFileId();
+ uint32_t fileid_max = GetSequenceFileIdForStableId(); //GetSequenceFileId();
uint32_t fileid_stable = 0;
uint32_t fileid_candidate = fileid_start;
uint64_t epoch_now = file_resource_manager.GetEpochNow();
@@ -189,10 +189,10 @@ class StorageEngine {
// - If M reaches 0, try one compaction run (trying to clear the large
// files if any), and if still unsuccessful, declare compaction impossible
// 6. If the compaction succeeded, update 'fileid_lastcompacted'
-
std::chrono::milliseconds duration(db_options_.storage__statistics_polling_interval);
uint32_t fileid_lastcompacted = 0;
uint32_t fileid_out = 0;
+ bool force_compaction = false;
while (true) {
uint64_t size_compaction = 0;
@@ -217,7 +217,7 @@ class StorageEngine {
if ( fileid_end > 0
&& fs_free_space > db_options_.compaction__filesystem__free_space_required
- && dbsize_uncompacted > size_compaction) {
+ && (dbsize_uncompacted > size_compaction || force_compaction)) {
while (true) {
fileid_out = 0;
Status s = Compaction(dbname_, fileid_lastcompacted + 1, fileid_end, size_compaction, &fileid_out);
@@ -226,14 +226,31 @@ class StorageEngine {
size_compaction /= 2;
} else {
fileid_lastcompacted = fileid_out;
+
+ if (force_compaction) {
+ int has_compacted_all_files = fileid_lastcompacted == fileid_end ? 1 : 0;
+ event_manager_->compaction_status.StartAndBlockUntilDone(has_compacted_all_files);
+ }
+
break;
}
if (IsStopRequested()) return;
}
+ } else if (force_compaction) {
+ // Could not perform compaction even though another method was trying
+ // to force it. That method is very probably blocking on
+ // compaction_status and needs to be unblocked, so here the compaction
+ // is faked just so that the method can be unblocked.
+ int has_compacted_all_files = 1;
+ event_manager_->compaction_status.StartAndBlockUntilDone(has_compacted_all_files);
}
+ force_compaction = false;
std::unique_lock<std::mutex> lock(mutex_loop_compaction_);
- cv_loop_compaction_.wait_for(lock, duration);
+ std::cv_status status = cv_loop_compaction_.wait_for(lock, duration);
+ if (status == std::cv_status::no_timeout) {
+ force_compaction = true;
+ }
if (IsStopRequested()) return;
}
}
@@ -1087,6 +1104,15 @@ class StorageEngine {
return Status::OK();
}
+ void Compact() {
+ while (true) {
+ cv_loop_compaction_.notify_all();
+ int has_compacted_all_files = event_manager_->compaction_status.Wait();
+ event_manager_->compaction_status.Done();
+ if (has_compacted_all_files) break;
+ }
+ }
+
// START: Helpers for Snapshots
// Caller must delete fileids_ignore
Status GetNewSnapshotData(uint32_t *snapshot_id, std::set<uint32_t> **fileids_ignore) {
@@ -1152,6 +1178,10 @@ class StorageEngine {
return hstable_manager_.FlushCurrentFile(1, 0);
}
+ uint32_t FlushCurrentFileForForcedCompaction() {
+ return hstable_manager_.FlushCurrentFile(1, 0);
+ }
+
std::vector<uint32_t>* GetFileidsIterator() {
return fileids_iterator_;
}
@@ -61,6 +61,7 @@ class EventManager {
Event<std::vector<Order>> flush_buffer;
Event<std::multimap<uint64_t, uint64_t>> update_index;
Event<int> clear_buffer;
+ Event<int> compaction_status;
};
}
View
@@ -412,7 +412,6 @@ TEST(DBTest, KeysWithNullBytes) {
}
-
TEST(DBTest, SingleThreadSmallEntries) {
while (IterateOverOptions()) {
kdb::Logger::set_current_level("emerg");
@@ -540,6 +539,75 @@ TEST(DBTest, SingleThreadSnapshot) {
}
+TEST(DBTest, SingleThreadSmallEntriesCompaction) {
+ while (IterateOverOptions()) {
+ kdb::Logger::set_current_level("emerg");
+ Open();
+
+ int size = 100;
+ char *buffer_large = new char[size+1];
+ for (auto i = 0; i < size; i++) {
+ buffer_large[i] = 'a';
+ }
+ buffer_large[size+1] = '\0';
+
+ int num_items = 1000;
+ KeyGenerator* kg = new RandomKeyGenerator();
+ std::map<std::string, std::string> saved_data;
+
+ for (auto i = 0; i < num_items; i++) {
+ std::string key_str = kg->GetKey(0, i, 16);
+ kdb::ByteArray key = kdb::ByteArray::NewDeepCopyByteArray(key_str.c_str(), key_str.size());
+
+ data_generator_->GenerateData(buffer_large, 100);
+ kdb::ByteArray value = kdb::ByteArray::NewDeepCopyByteArray(buffer_large, 100);
+
+ kdb::Status s = db_->Put(write_options_, key, value);
+ if (!s.IsOK()) {
+ fprintf(stderr, "ClientEmbedded - Error: %s\n", s.ToString().c_str());
+ }
+ std::string value_str(buffer_large, 100);
+ saved_data[key_str] = value_str;
+ }
+
+ // Force compaction
+ db_->Compact();
+
+ int count_items_end = 0;
+ kdb::Iterator iterator = db_->NewIterator(read_options_);
+ kdb::Status s = iterator.GetStatus();
+ if (!s.IsOK()) {
+ fprintf(stderr, "Error: %s\n", s.ToString().c_str());
+ }
+
+ for (iterator.Begin(); iterator.IsValid(); iterator.Next()) {
+ kdb::MultipartReader mp_reader = iterator.GetMultipartValue();
+
+ for (mp_reader.Begin(); mp_reader.IsValid(); mp_reader.Next()) {
+ kdb::ByteArray part;
+ kdb::Status s = mp_reader.GetPart(&part);
+ }
+
+ kdb::Status s = mp_reader.GetStatus();
+ if (s.IsOK()) {
+ count_items_end += 1;
+ } else {
+ fprintf(stderr, "ClientEmbedded - Error: %s\n", s.ToString().c_str());
+ }
+ }
+
+ delete[] buffer_large;
+ ASSERT_EQ(count_items_end, num_items);
+ iterator.Close();
+ Close();
+ }
+}
+
+
+
+
+
+
std::string run_command(const char* command) {
FILE *file;
char res[256];

0 comments on commit c011ef9

Please sign in to comment.