Skip to content

Commit

Permalink
SetOptions() for memtable related options
Browse files Browse the repository at this point in the history
Summary: as title

Test Plan:
make all check
I will think a way to set up stress test for this

Reviewers: sdong, yhchiang, igor

Reviewed By: igor

Subscribers: leveldb

Differential Revision: https://reviews.facebook.net/D23055
  • Loading branch information
Lei Jin committed Sep 17, 2014
1 parent e4eca6a commit a062e1f
Show file tree
Hide file tree
Showing 14 changed files with 219 additions and 56 deletions.
32 changes: 26 additions & 6 deletions db/column_family.cc
Expand Up @@ -27,6 +27,7 @@
#include "db/write_controller.h"
#include "util/autovector.h"
#include "util/hash_skiplist_rep.h"
#include "util/options_helper.h"

namespace rocksdb {

Expand Down Expand Up @@ -212,7 +213,7 @@ void SuperVersionUnrefHandle(void* ptr) {

ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name,
Version* dummy_versions, Cache* table_cache,
const ColumnFamilyOptions& options,
const ColumnFamilyOptions& cf_options,
const DBOptions* db_options,
const EnvOptions& env_options,
ColumnFamilySet* column_family_set)
Expand All @@ -222,9 +223,10 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name,
current_(nullptr),
refs_(0),
dropped_(false),
internal_comparator_(options.comparator),
options_(*db_options, SanitizeOptions(&internal_comparator_, options)),
internal_comparator_(cf_options.comparator),
options_(*db_options, SanitizeOptions(&internal_comparator_, cf_options)),
ioptions_(options_),
mutable_cf_options_(options_),
mem_(nullptr),
imm_(options_.min_write_buffer_number_to_merge),
super_version_(nullptr),
Expand Down Expand Up @@ -378,13 +380,12 @@ const EnvOptions* ColumnFamilyData::soptions() const {

void ColumnFamilyData::SetCurrent(Version* current) { current_ = current; }

void ColumnFamilyData::CreateNewMemtable() {
void ColumnFamilyData::CreateNewMemtable(const MemTableOptions& moptions) {
assert(current_ != nullptr);
if (mem_ != nullptr) {
delete mem_->Unref();
}
mem_ = new MemTable(internal_comparator_, ioptions_,
MemTableOptions(options_));
mem_ = new MemTable(internal_comparator_, ioptions_, moptions);
mem_->Ref();
}

Expand Down Expand Up @@ -486,7 +487,15 @@ bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {

SuperVersion* ColumnFamilyData::InstallSuperVersion(
SuperVersion* new_superversion, port::Mutex* db_mutex) {
db_mutex->AssertHeld();
return InstallSuperVersion(new_superversion, db_mutex, mutable_cf_options_);
}

SuperVersion* ColumnFamilyData::InstallSuperVersion(
SuperVersion* new_superversion, port::Mutex* db_mutex,
const MutableCFOptions& mutable_cf_options) {
new_superversion->db_mutex = db_mutex;
new_superversion->mutable_cf_options = mutable_cf_options;
new_superversion->Init(mem_, imm_.current(), current_);
SuperVersion* old_superversion = super_version_;
super_version_ = new_superversion;
Expand Down Expand Up @@ -522,6 +531,17 @@ void ColumnFamilyData::ResetThreadLocalSuperVersions() {
}
}

bool ColumnFamilyData::SetOptions(
const std::unordered_map<std::string, std::string>& options_map) {
MutableCFOptions new_mutable_cf_options;
if (GetMutableOptionsFromStrings(mutable_cf_options_, options_map,
&new_mutable_cf_options)) {
mutable_cf_options_ = new_mutable_cf_options;
return true;
}
return false;
}

ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
const DBOptions* db_options,
const EnvOptions& env_options,
Expand Down
25 changes: 22 additions & 3 deletions db/column_family.h
Expand Up @@ -23,6 +23,7 @@
#include "db/table_cache.h"
#include "util/thread_local.h"
#include "db/flush_scheduler.h"
#include "util/mutable_cf_options.h"

namespace rocksdb {

Expand Down Expand Up @@ -80,6 +81,7 @@ struct SuperVersion {
MemTable* mem;
MemTableListVersion* imm;
Version* current;
MutableCFOptions mutable_cf_options;
std::atomic<uint32_t> refs;
// We need to_delete because during Cleanup(), imm->Unref() returns
// all memtables that we need to free through this vector. We then
Expand Down Expand Up @@ -168,11 +170,24 @@ class ColumnFamilyData {
void SetLogNumber(uint64_t log_number) { log_number_ = log_number; }
uint64_t GetLogNumber() const { return log_number_; }

// TODO(ljin): make this API thread-safe once we allow updating options_
const Options* options() const { return &options_; }
// thread-safe
const Options* options() const { return &options_; }
const EnvOptions* soptions() const;
const ImmutableCFOptions* ioptions() const { return &ioptions_; }
// REQUIRES: DB mutex held
// This returns the MutableCFOptions used by current SuperVersion
// You shoul use this API to reference MutableCFOptions most of the time.
const MutableCFOptions* mutable_cf_options() const {
return &(super_version_->mutable_cf_options);
}
// REQUIRES: DB mutex held
// This returns the latest MutableCFOptions, which may be not in effect yet.
const MutableCFOptions* GetLatestMutableCFOptions() const {
return &mutable_cf_options_;
}
// REQUIRES: DB mutex held
bool SetOptions(
const std::unordered_map<std::string, std::string>& options_map);

InternalStats* internal_stats() { return internal_stats_.get(); }

Expand All @@ -182,7 +197,7 @@ class ColumnFamilyData {
Version* dummy_versions() { return dummy_versions_; }
void SetMemtable(MemTable* new_mem) { mem_ = new_mem; }
void SetCurrent(Version* current);
void CreateNewMemtable();
void CreateNewMemtable(const MemTableOptions& moptions);

TableCache* table_cache() const { return table_cache_.get(); }

Expand Down Expand Up @@ -223,6 +238,9 @@ class ColumnFamilyData {
// if its reference count is zero and needs deletion or nullptr if not
// As argument takes a pointer to allocated SuperVersion to enable
// the clients to allocate SuperVersion outside of mutex.
SuperVersion* InstallSuperVersion(SuperVersion* new_superversion,
port::Mutex* db_mutex,
const MutableCFOptions& mutable_cf_options);
SuperVersion* InstallSuperVersion(SuperVersion* new_superversion,
port::Mutex* db_mutex);

Expand Down Expand Up @@ -255,6 +273,7 @@ class ColumnFamilyData {

const Options options_;
const ImmutableCFOptions ioptions_;
MutableCFOptions mutable_cf_options_;

std::unique_ptr<TableCache> table_cache_;

Expand Down
31 changes: 25 additions & 6 deletions db/db_impl.cc
Expand Up @@ -1228,7 +1228,8 @@ Status DBImpl::Recover(
if (!s.ok()) {
// Clear memtables if recovery failed
for (auto cfd : *versions_->GetColumnFamilySet()) {
cfd->CreateNewMemtable();
cfd->CreateNewMemtable(MemTableOptions(
*cfd->GetLatestMutableCFOptions(), *cfd->options()));
}
}
}
Expand Down Expand Up @@ -1356,7 +1357,8 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// file-systems cause the DB::Open() to fail.
return status;
}
cfd->CreateNewMemtable();
cfd->CreateNewMemtable(MemTableOptions(
*cfd->GetLatestMutableCFOptions(), *cfd->options()));
}
}
}
Expand Down Expand Up @@ -1393,7 +1395,8 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// Recovery failed
break;
}
cfd->CreateNewMemtable();
cfd->CreateNewMemtable(MemTableOptions(
*cfd->GetLatestMutableCFOptions(), *cfd->options()));
}

// write MANIFEST with update
Expand Down Expand Up @@ -1623,6 +1626,7 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd,
}

if (s.ok()) {
// Use latest MutableCFOptions
InstallSuperVersion(cfd, deletion_state);
if (madeProgress) {
*madeProgress = 1;
Expand Down Expand Up @@ -1714,6 +1718,13 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family,
return s;
}

bool DBImpl::SetOptions(ColumnFamilyHandle* column_family,
const std::unordered_map<std::string, std::string>& options_map) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
MutexLock l(&mutex_);
return cfh->cfd()->SetOptions(options_map);
}

// return the same level if it cannot be moved
int DBImpl::FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd, int level) {
mutex_.AssertHeld();
Expand Down Expand Up @@ -1784,6 +1795,7 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
cfd->GetName().c_str(), edit.DebugString().data());

status = versions_->LogAndApply(cfd, &edit, &mutex_, db_directory_.get());
// Use latest MutableCFOptions
superversion_to_free = cfd->InstallSuperVersion(new_superversion, &mutex_);
new_superversion = nullptr;

Expand Down Expand Up @@ -2322,6 +2334,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
}
status = versions_->LogAndApply(c->column_family_data(), c->edit(), &mutex_,
db_directory_.get());
// Use latest MutableCFOptions
InstallSuperVersion(c->column_family_data(), deletion_state);
LogToBuffer(log_buffer, "[%s] Deleted %d files\n",
c->column_family_data()->GetName().c_str(),
Expand All @@ -2338,6 +2351,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
f->smallest_seqno, f->largest_seqno);
status = versions_->LogAndApply(c->column_family_data(), c->edit(), &mutex_,
db_directory_.get());
// Use latest MutableCFOptions
InstallSuperVersion(c->column_family_data(), deletion_state);

Version::LevelSummaryStorage tmp;
Expand Down Expand Up @@ -3322,6 +3336,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,

if (status.ok()) {
status = InstallCompactionResults(compact, log_buffer);
// Use latest MutableCFOptions
InstallSuperVersion(cfd, deletion_state);
}
Version::LevelSummaryStorage tmp;
Expand Down Expand Up @@ -3426,6 +3441,7 @@ void DBImpl::InstallSuperVersion(ColumnFamilyData* cfd,
SuperVersion* new_superversion =
(deletion_state.new_superversion != nullptr) ?
deletion_state.new_superversion : new SuperVersion();
// Use latest MutableCFOptions
SuperVersion* old_superversion =
cfd->InstallSuperVersion(new_superversion, &mutex_);
deletion_state.new_superversion = nullptr;
Expand Down Expand Up @@ -3618,6 +3634,7 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
auto cfd =
versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
assert(cfd != nullptr);
// Use latest MutableCFOptions
delete cfd->InstallSuperVersion(new SuperVersion(), &mutex_);
*handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_);
Log(db_options_.info_log, "Created column family [%s] (ID %u)",
Expand Down Expand Up @@ -4138,6 +4155,7 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd,
uint64_t new_log_number =
creating_new_log ? versions_->NewFileNumber() : logfile_number_;
SuperVersion* new_superversion = nullptr;
const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
mutex_.Unlock();
Status s;
{
Expand All @@ -4156,8 +4174,8 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd,

if (s.ok()) {
new_mem = new MemTable(cfd->internal_comparator(),
*cfd->ioptions(),
MemTableOptions(*cfd->options()));
*cfd->ioptions(), MemTableOptions(mutable_cf_options,
*cfd->options()));
new_superversion = new SuperVersion();
}
}
Expand Down Expand Up @@ -4197,7 +4215,7 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd,
"[%s] New memtable created with log file: #%" PRIu64 "\n",
cfd->GetName().c_str(), logfile_number_);
context->superversions_to_free_.push_back(
cfd->InstallSuperVersion(new_superversion, &mutex_));
cfd->InstallSuperVersion(new_superversion, &mutex_, mutable_cf_options));
return s;
}

Expand Down Expand Up @@ -4672,6 +4690,7 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
}
if (s.ok()) {
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
// Use latest MutableCFOptions
delete cfd->InstallSuperVersion(new SuperVersion(), &impl->mutex_);
}
impl->alive_log_files_.push_back(
Expand Down
4 changes: 4 additions & 0 deletions db/db_impl.h
Expand Up @@ -112,6 +112,10 @@ class DBImpl : public DB {
bool reduce_level = false, int target_level = -1,
uint32_t target_path_id = 0);

using DB::SetOptions;
bool SetOptions(ColumnFamilyHandle* column_family,
const std::unordered_map<std::string, std::string>& options_map);

using DB::NumberLevels;
virtual int NumberLevels(ColumnFamilyHandle* column_family);
using DB::MaxMemCompactionLevel;
Expand Down
18 changes: 10 additions & 8 deletions db/memtable.cc
Expand Up @@ -31,18 +31,20 @@

namespace rocksdb {

MemTableOptions::MemTableOptions(const Options& options)
: write_buffer_size(options.write_buffer_size),
arena_block_size(options.arena_block_size),
memtable_prefix_bloom_bits(options.memtable_prefix_bloom_bits),
memtable_prefix_bloom_probes(options.memtable_prefix_bloom_probes),
MemTableOptions::MemTableOptions(
const MutableCFOptions& mutable_cf_options, const Options& options)
: write_buffer_size(mutable_cf_options.write_buffer_size),
arena_block_size(mutable_cf_options.arena_block_size),
memtable_prefix_bloom_bits(mutable_cf_options.memtable_prefix_bloom_bits),
memtable_prefix_bloom_probes(
mutable_cf_options.memtable_prefix_bloom_probes),
memtable_prefix_bloom_huge_page_tlb_size(
options.memtable_prefix_bloom_huge_page_tlb_size),
mutable_cf_options.memtable_prefix_bloom_huge_page_tlb_size),
inplace_update_support(options.inplace_update_support),
inplace_update_num_locks(options.inplace_update_num_locks),
inplace_callback(options.inplace_callback),
max_successive_merges(options.max_successive_merges),
filter_deletes(options.filter_deletes) {}
max_successive_merges(mutable_cf_options.max_successive_merges),
filter_deletes(mutable_cf_options.filter_deletes) {}

MemTable::MemTable(const InternalKeyComparator& cmp,
const ImmutableCFOptions& ioptions,
Expand Down
5 changes: 4 additions & 1 deletion db/memtable.h
Expand Up @@ -21,6 +21,7 @@
#include "rocksdb/immutable_options.h"
#include "util/arena.h"
#include "util/dynamic_bloom.h"
#include "util/mutable_cf_options.h"

namespace rocksdb {

Expand All @@ -30,7 +31,9 @@ class MemTableIterator;
class MergeContext;

struct MemTableOptions {
explicit MemTableOptions(const Options& options);
explicit MemTableOptions(
const MutableCFOptions& mutable_cf_options,
const Options& options);
size_t write_buffer_size;
size_t arena_block_size;
uint32_t memtable_prefix_bloom_bits;
Expand Down
3 changes: 2 additions & 1 deletion db/repair.cc
Expand Up @@ -219,7 +219,8 @@ class Repairer {
std::string scratch;
Slice record;
WriteBatch batch;
MemTable* mem = new MemTable(icmp_, ioptions_, MemTableOptions(options_));
MemTable* mem = new MemTable(icmp_, ioptions_,
MemTableOptions(MutableCFOptions(options_), options_));
auto cf_mems_default = new ColumnFamilyMemTablesDefault(mem, &options_);
mem->Ref();
int counter = 0;
Expand Down
10 changes: 7 additions & 3 deletions db/version_set.cc
Expand Up @@ -2962,17 +2962,21 @@ void VersionSet::GetObsoleteFiles(std::vector<FileMetaData*>* files) {
}

ColumnFamilyData* VersionSet::CreateColumnFamily(
const ColumnFamilyOptions& options, VersionEdit* edit) {
const ColumnFamilyOptions& cf_options, VersionEdit* edit) {
assert(edit->is_column_family_add_);

Version* dummy_versions = new Version(nullptr, this);
auto new_cfd = column_family_set_->CreateColumnFamily(
edit->column_family_name_, edit->column_family_, dummy_versions, options);
edit->column_family_name_, edit->column_family_, dummy_versions,
cf_options);

Version* v = new Version(new_cfd, this, current_version_number_++);

AppendVersion(new_cfd, v);
new_cfd->CreateNewMemtable();
// GetLatestMutableCFOptions() is safe here without mutex since the
// cfd is not available to client
new_cfd->CreateNewMemtable(MemTableOptions(
*new_cfd->GetLatestMutableCFOptions(), *new_cfd->options()));
new_cfd->SetLogNumber(edit->log_number_);
return new_cfd;
}
Expand Down
2 changes: 1 addition & 1 deletion db/write_batch_test.cc
Expand Up @@ -28,7 +28,7 @@ static std::string PrintContents(WriteBatch* b) {
Options options;
options.memtable_factory = factory;
MemTable* mem = new MemTable(cmp, ImmutableCFOptions(options),
MemTableOptions(options));
MemTableOptions(MutableCFOptions(options), options));
mem->Ref();
std::string state;
ColumnFamilyMemTablesDefault cf_mems_default(mem, &options);
Expand Down

0 comments on commit a062e1f

Please sign in to comment.