Skip to content

Commit

Permalink
Make bytes_per_sync and wal_bytes_per_sync mutable
Browse files Browse the repository at this point in the history
Summary:
SUMMARY
Moves the bytes_per_sync and wal_bytes_per_sync options from immutableoptions to mutable options. Also if wal_bytes_per_sync is changed, the wal file and memtables are flushed.
TEST PLAN
ran make check
all passed

Two new tests SetBytesPerSync, SetWalBytesPerSync check that after issuing setoptions with a new value for the var, the db options have the new value.
Closes #2893

Reviewed By: yiwu-arbug

Differential Revision: D5845814

Pulled By: TheRushingWookie

fbshipit-source-id: 93b52d779ce623691b546679dcd984a06d2ad1bd
  • Loading branch information
TheRushingWookie authored and facebook-github-bot committed Sep 28, 2017
1 parent ec48e5c commit 6a541af
Show file tree
Hide file tree
Showing 22 changed files with 192 additions and 80 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* `BackupableDBOptions::max_valid_backups_to_open == 0` now means no backups will be opened during BackupEngine initialization. Previously this condition disabled limiting backups opened.

### New Features
* `DBOptions::bytes_per_sync` and `DBOptions::wal_bytes_per_sync` can now be changed dynamically, `DBOptions::wal_bytes_per_sync` will flush all memtables and switch to a new WAL file.
### Bug Fixes
* Fix a potential data inconsistency issue during point-in-time recovery. `DB:Open()` will abort if column family inconsistency is found during PIT recovery.

Expand Down
9 changes: 7 additions & 2 deletions db/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2023,8 +2023,8 @@ void rocksdb_options_set_paranoid_checks(
opt->rep.paranoid_checks = v;
}

void rocksdb_options_set_db_paths(rocksdb_options_t* opt,
const rocksdb_dbpath_t** dbpath_values,
void rocksdb_options_set_db_paths(rocksdb_options_t* opt,
const rocksdb_dbpath_t** dbpath_values,
size_t num_paths) {
std::vector<DbPath> db_paths(num_paths);
for (size_t i = 0; i < num_paths; ++i) {
Expand Down Expand Up @@ -2266,6 +2266,11 @@ void rocksdb_options_set_use_adaptive_mutex(
opt->rep.use_adaptive_mutex = v;
}

void rocksdb_options_set_wal_bytes_per_sync(
rocksdb_options_t* opt, uint64_t v) {
opt->rep.wal_bytes_per_sync = v;
}

void rocksdb_options_set_bytes_per_sync(
rocksdb_options_t* opt, uint64_t v) {
opt->rep.bytes_per_sync = v;
Expand Down
10 changes: 4 additions & 6 deletions db/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
#include "monitoring/iostats_context_imp.h"
#include "monitoring/perf_context_imp.h"
#include "monitoring/thread_status_util.h"
#include "port/likely.h"
#include "port/port.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
Expand Down Expand Up @@ -264,7 +263,7 @@ void CompactionJob::AggregateStatistics() {

CompactionJob::CompactionJob(
int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
const EnvOptions& env_options, VersionSet* versions,
const EnvOptions env_options, VersionSet* versions,
const std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
Directory* db_directory, Directory* output_directory, Statistics* stats,
InstrumentedMutex* db_mutex, Status* db_bg_error,
Expand Down Expand Up @@ -1261,11 +1260,10 @@ Status CompactionJob::OpenCompactionOutputFile(
#endif // !ROCKSDB_LITE
// Make the output file
unique_ptr<WritableFile> writable_file;
EnvOptions opt_env_opts =
env_->OptimizeForCompactionTableWrite(env_options_, db_options_);
bool syncpoint_arg = env_options_.use_direct_writes;
TEST_SYNC_POINT_CALLBACK("CompactionJob::OpenCompactionOutputFile",
&opt_env_opts.use_direct_writes);
Status s = NewWritableFile(env_, fname, &writable_file, opt_env_opts);
&syncpoint_arg);
Status s = NewWritableFile(env_, fname, &writable_file, env_options_);
if (!s.ok()) {
ROCKS_LOG_ERROR(
db_options_.info_log,
Expand Down
4 changes: 2 additions & 2 deletions db/compaction_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class CompactionJob {
public:
CompactionJob(int job_id, Compaction* compaction,
const ImmutableDBOptions& db_options,
const EnvOptions& env_options, VersionSet* versions,
const EnvOptions env_options, VersionSet* versions,
const std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
Directory* db_directory, Directory* output_directory,
Statistics* stats, InstrumentedMutex* db_mutex,
Expand Down Expand Up @@ -127,7 +127,7 @@ class CompactionJob {
// DBImpl state
const std::string& dbname_;
const ImmutableDBOptions& db_options_;
const EnvOptions& env_options_;
const EnvOptions env_options_;

Env* env_;
VersionSet* versions_;
Expand Down
25 changes: 21 additions & 4 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
has_unpersisted_data_(false),
unable_to_flush_oldest_log_(false),
env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)),
env_options_for_compaction_(env_->OptimizeForCompactionTableWrite(
env_options_,
immutable_db_options_)),
num_running_ingest_file_(0),
#ifndef ROCKSDB_LITE
wal_manager_(immutable_db_options_, env_options_),
Expand Down Expand Up @@ -539,6 +542,7 @@ Status DBImpl::SetDBOptions(
MutableDBOptions new_options;
Status s;
Status persist_options_status;
bool wal_changed = false;
WriteThread::Writer w;
WriteContext write_context;
{
Expand All @@ -557,12 +561,25 @@ Status DBImpl::SetDBOptions(
table_cache_.get()->SetCapacity(new_options.max_open_files == -1
? TableCache::kInfiniteCapacity
: new_options.max_open_files - 10);

wal_changed = mutable_db_options_.wal_bytes_per_sync !=
new_options.wal_bytes_per_sync;
if (new_options.bytes_per_sync == 0) {
new_options.bytes_per_sync = 1024 * 1024;
}
mutable_db_options_ = new_options;

env_options_for_compaction_ = EnvOptions(BuildDBOptions(
immutable_db_options_,
mutable_db_options_));
env_options_for_compaction_ = env_->OptimizeForCompactionTableWrite(
env_options_for_compaction_,
immutable_db_options_);
env_options_for_compaction_.bytes_per_sync
= mutable_db_options_.bytes_per_sync;
env_->OptimizeForCompactionTableWrite(env_options_for_compaction_,
immutable_db_options_);
write_thread_.EnterUnbatched(&w, &mutex_);
if (total_log_size_ > GetMaxTotalWalSize()) {
Status purge_wal_status = HandleWALFull(&write_context);
if (total_log_size_ > GetMaxTotalWalSize() || wal_changed) {
Status purge_wal_status = SwitchWAL(&write_context);
if (!purge_wal_status.ok()) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"Unable to purge WAL files in SetDBOptions() -- %s",
Expand Down
7 changes: 5 additions & 2 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ class DBImpl : public DB {
ColumnFamilyHandle* column_family = nullptr,
bool disallow_trivial_move = false);

void TEST_HandleWALFull();
void TEST_SwitchWAL();

bool TEST_UnableToFlushOldestLog() {
return unable_to_flush_oldest_log_;
Expand Down Expand Up @@ -750,7 +750,7 @@ class DBImpl : public DB {
Status WaitForFlushMemTable(ColumnFamilyData* cfd);

// REQUIRES: mutex locked
Status HandleWALFull(WriteContext* write_context);
Status SwitchWAL(WriteContext* write_context);

// REQUIRES: mutex locked
Status HandleWriteBufferFull(WriteContext* write_context);
Expand Down Expand Up @@ -1168,6 +1168,9 @@ class DBImpl : public DB {
// The options to access storage files
const EnvOptions env_options_;

// Additonal options for compaction and flush
EnvOptions env_options_for_compaction_;

// Number of running IngestExternalFile() calls.
// REQUIRES: mutex held
int num_running_ingest_file_;
Expand Down
23 changes: 13 additions & 10 deletions db/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,11 @@ Status DBImpl::FlushMemTableToOutputFile(
snapshots_.GetAll(&earliest_write_conflict_snapshot);

FlushJob flush_job(
dbname_, cfd, immutable_db_options_, mutable_cf_options, env_options_,
versions_.get(), &mutex_, &shutting_down_, snapshot_seqs,
earliest_write_conflict_snapshot, job_context, log_buffer,
directories_.GetDbDir(), directories_.GetDataDir(0U),
dbname_, cfd, immutable_db_options_, mutable_cf_options,
env_options_for_compaction_, versions_.get(), &mutex_,
&shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot,
job_context, log_buffer, directories_.GetDbDir(),
directories_.GetDataDir(0U),
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
&event_logger_, mutable_cf_options.report_bg_io_stats);

Expand Down Expand Up @@ -532,8 +533,9 @@ Status DBImpl::CompactFilesImpl(

assert(is_snapshot_supported_ || snapshots_.empty());
CompactionJob compaction_job(
job_context->job_id, c.get(), immutable_db_options_, env_options_,
versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(),
job_context->job_id, c.get(), immutable_db_options_,
env_options_for_compaction_, versions_.get(), &shutting_down_,
log_buffer, directories_.GetDbDir(),
directories_.GetDataDir(c->output_path_id()), stats_, &mutex_, &bg_error_,
snapshot_seqs, earliest_write_conflict_snapshot, table_cache_,
&event_logger_, c->mutable_cf_options()->paranoid_file_checks,
Expand Down Expand Up @@ -1676,11 +1678,12 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,

assert(is_snapshot_supported_ || snapshots_.empty());
CompactionJob compaction_job(
job_context->job_id, c.get(), immutable_db_options_, env_options_,
versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(),
job_context->job_id, c.get(), immutable_db_options_,
env_options_for_compaction_, versions_.get(), &shutting_down_,
log_buffer, directories_.GetDbDir(),
directories_.GetDataDir(c->output_path_id()), stats_, &mutex_,
&bg_error_, snapshot_seqs, earliest_write_conflict_snapshot,
table_cache_, &event_logger_,
&bg_error_, snapshot_seqs,
earliest_write_conflict_snapshot, table_cache_, &event_logger_,
c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_,
&compaction_job_stats);
Expand Down
4 changes: 2 additions & 2 deletions db/db_impl_debug.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ uint64_t DBImpl::TEST_GetLevel0TotalSize() {
return default_cf_handle_->cfd()->current()->storage_info()->NumLevelBytes(0);
}

void DBImpl::TEST_HandleWALFull() {
void DBImpl::TEST_SwitchWAL() {
WriteContext write_context;
InstrumentedMutexLock l(&mutex_);
HandleWALFull(&write_context);
SwitchWAL(&write_context);
}

int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes(
Expand Down
4 changes: 1 addition & 3 deletions db/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -884,11 +884,9 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
std::vector<SequenceNumber> snapshot_seqs =
snapshots_.GetAll(&earliest_write_conflict_snapshot);

EnvOptions optimized_env_options =
env_->OptimizeForCompactionTableWrite(env_options_, immutable_db_options_);
s = BuildTable(
dbname_, env_, *cfd->ioptions(), mutable_cf_options,
optimized_env_options, cfd->table_cache(), iter.get(),
env_options_for_compaction_, cfd->table_cache(), iter.get(),
std::unique_ptr<InternalIterator>(mem->NewRangeTombstoneIterator(ro)),
&meta, cfd->internal_comparator(),
cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
Expand Down
4 changes: 2 additions & 2 deletions db/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1);
if (UNLIKELY(status.ok() && !single_column_family_mode_ &&
total_log_size_ > GetMaxTotalWalSize())) {
status = HandleWALFull(write_context);
status = SwitchWAL(write_context);
}

if (UNLIKELY(status.ok() && write_buffer_manager_->ShouldFlush())) {
Expand Down Expand Up @@ -830,7 +830,7 @@ Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
return status;
}

Status DBImpl::HandleWALFull(WriteContext* write_context) {
Status DBImpl::SwitchWAL(WriteContext* write_context) {
mutex_.AssertHeld();
assert(write_context != nullptr);
Status status;
Expand Down
81 changes: 81 additions & 0 deletions db/db_options_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,87 @@ TEST_F(DBOptionsTest, GetLatestCFOptions) {
GetMutableCFOptionsMap(dbfull()->GetOptions(handles_[1])));
}

TEST_F(DBOptionsTest, SetBytesPerSync) {
const size_t kValueSize = 1024 * 1024 * 8;
Options options;
options.create_if_missing = true;
options.bytes_per_sync = 1024 * 1024;
options.use_direct_reads = false;
options.write_buffer_size = 400 * kValueSize;
options.disable_auto_compactions = true;
options.env = env_;
Reopen(options);
int counter = 0;
int low_bytes_per_sync = 0;
int i = 0;
const std::string kValue(kValueSize, 'v');
ASSERT_EQ(options.bytes_per_sync, dbfull()->GetDBOptions().bytes_per_sync);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"WritableFileWriter::RangeSync:0", [&](void* arg) {
counter++;
});

WriteOptions write_opts;
for (; i < 40; i++) {
Put(Key(i), kValue, write_opts);
}
i = 0;
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
low_bytes_per_sync = counter;
counter = 0;
// 8388608 = 8 * 1024 * 1024
ASSERT_OK(dbfull()->SetDBOptions({{"bytes_per_sync", "8388608"}}));
ASSERT_EQ(8388608, dbfull()->GetDBOptions().bytes_per_sync);
for (; i < 40; i++) {
Put(Key(i), kValue, write_opts);
}
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_GT(counter, 0);
ASSERT_GT(low_bytes_per_sync, 0);
ASSERT_GT(low_bytes_per_sync, counter);
}

TEST_F(DBOptionsTest, SetWalBytesPerSync) {
const size_t kValueSize = 1024 * 1024 * 3;
Options options;
options.create_if_missing = true;
options.wal_bytes_per_sync = 512;
options.write_buffer_size = 100 * kValueSize;
options.disable_auto_compactions = true;
options.env = env_;
Reopen(options);
ASSERT_EQ(512, dbfull()->GetDBOptions().wal_bytes_per_sync);
int counter = 0;
int low_bytes_per_sync = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"WritableFileWriter::RangeSync:0", [&](void* arg) {
counter++;
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
const std::string kValue(kValueSize, 'v');
int i = 0;
for (; i < 10; i++) {
Put(Key(i), kValue);
}
// Do not flush. If we flush here, SwitchWAL will reuse old WAL file since its
// empty and will not get the new wal_bytes_per_sync value.
low_bytes_per_sync = counter;
//5242880 = 1024 * 1024 * 5
ASSERT_OK(dbfull()->SetDBOptions({{"wal_bytes_per_sync", "5242880"}}));
ASSERT_EQ(5242880, dbfull()->GetDBOptions().wal_bytes_per_sync);
counter = 0;
i = 0;
for (; i < 10; i++) {
Put(Key(i), kValue);
}
ASSERT_GT(counter, 0);
ASSERT_GT(low_bytes_per_sync, 0);
ASSERT_GT(low_bytes_per_sync, counter);
}

TEST_F(DBOptionsTest, SetOptionsAndReopen) {
Random rnd(1044);
auto rand_opts = GetRandomizedMutableCFOptionsMap(&rnd);
Expand Down
8 changes: 2 additions & 6 deletions db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
#include "monitoring/iostats_context_imp.h"
#include "monitoring/perf_context_imp.h"
#include "monitoring/thread_status_util.h"
#include "port/likely.h"
#include "port/port.h"
#include "db/memtable.h"
#include "rocksdb/db.h"
Expand Down Expand Up @@ -58,7 +57,7 @@ namespace rocksdb {
FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
const ImmutableDBOptions& db_options,
const MutableCFOptions& mutable_cf_options,
const EnvOptions& env_options, VersionSet* versions,
const EnvOptions env_options, VersionSet* versions,
InstrumentedMutex* db_mutex,
std::atomic<bool>* shutting_down,
std::vector<SequenceNumber> existing_snapshots,
Expand Down Expand Up @@ -294,16 +293,13 @@ Status FlushJob::WriteLevel0Table() {

TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
&output_compression_);
EnvOptions optimized_env_options =
db_options_.env->OptimizeForCompactionTableWrite(env_options_, db_options_);

int64_t _current_time = 0;
db_options_.env->GetCurrentTime(&_current_time); // ignore error
const uint64_t current_time = static_cast<uint64_t>(_current_time);

s = BuildTable(
dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_,
optimized_env_options, cfd_->table_cache(), iter.get(),
env_options_, cfd_->table_cache(), iter.get(),
std::move(range_del_iter), &meta_, cfd_->internal_comparator(),
cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
cfd_->GetName(), existing_snapshots_,
Expand Down
7 changes: 4 additions & 3 deletions db/flush_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ class FlushJob {
FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
const ImmutableDBOptions& db_options,
const MutableCFOptions& mutable_cf_options,
const EnvOptions& env_options, VersionSet* versions,
InstrumentedMutex* db_mutex, std::atomic<bool>* shutting_down,
const EnvOptions env_options,
VersionSet* versions, InstrumentedMutex* db_mutex,
std::atomic<bool>* shutting_down,
std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
JobContext* job_context, LogBuffer* log_buffer,
Expand All @@ -83,7 +84,7 @@ class FlushJob {
ColumnFamilyData* cfd_;
const ImmutableDBOptions& db_options_;
const MutableCFOptions& mutable_cf_options_;
const EnvOptions& env_options_;
const EnvOptions env_options_;
VersionSet* versions_;
InstrumentedMutex* db_mutex_;
std::atomic<bool>* shutting_down_;
Expand Down
Loading

0 comments on commit 6a541af

Please sign in to comment.