Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move dump stats to a separate thread #4382

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions db/compacted_db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ Status CompactedDBImpl::Open(const Options& options,
std::unique_ptr<CompactedDBImpl> db(new CompactedDBImpl(db_options, dbname));
Status s = db->Init(options);
if (s.ok()) {
db->StartTimedTasks();
ROCKS_LOG_INFO(db->immutable_db_options_.info_log,
"Opened the db as fully compacted mode");
LogFlush(db->immutable_db_options_.info_log);
Expand Down
134 changes: 80 additions & 54 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,20 @@ void DBImpl::WaitForBackgroundWork() {

// Will lock the mutex_, will wait for completion if wait is true
void DBImpl::CancelAllBackgroundWork(bool wait) {
InstrumentedMutexLock l(&mutex_);

ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Shutdown: canceling all background work");

InstrumentedMutexLock l(&mutex_);
// To avoid deadlock, `thread_dump_stats_->cancel()` needs to be called
// before grabbing db mutex because the actual worker function
// `DBImpl::DumpStats()` also holds db mutex
if (thread_dump_stats_ != nullptr) {
mutex_.Unlock();
thread_dump_stats_->cancel();
mutex_.Lock();
thread_dump_stats_.reset();
}
if (!shutting_down_.load(std::memory_order_acquire) &&
has_unpersisted_data_.load(std::memory_order_relaxed) &&
!mutable_db_options_.avoid_flush_during_shutdown) {
Expand Down Expand Up @@ -577,66 +586,68 @@ void DBImpl::PrintStatistics() {
}
}

void DBImpl::MaybeDumpStats() {
mutex_.Lock();
unsigned int stats_dump_period_sec =
mutable_db_options_.stats_dump_period_sec;
mutex_.Unlock();
if (stats_dump_period_sec == 0) return;

const uint64_t now_micros = env_->NowMicros();

if (last_stats_dump_time_microsec_ + stats_dump_period_sec * 1000000 <=
now_micros) {
// Multiple threads could race in here simultaneously.
// However, the last one will update last_stats_dump_time_microsec_
// atomically. We could see more than one dump during one dump
// period in rare cases.
last_stats_dump_time_microsec_ = now_micros;
void DBImpl::StartTimedTasks() {
unsigned int stats_dump_period_sec = 0;
{
InstrumentedMutexLock l(&mutex_);
stats_dump_period_sec = mutable_db_options_.stats_dump_period_sec;
if (stats_dump_period_sec > 0) {
if (!thread_dump_stats_) {
thread_dump_stats_.reset(new rocksdb::RepeatableThread(
[this]() { DBImpl::DumpStats(); }, "dump_st", env_,
stats_dump_period_sec * 1000000));
}
}
}
}

void DBImpl::DumpStats() {
TEST_SYNC_POINT("DBImpl::DumpStats:1");
#ifndef ROCKSDB_LITE
const DBPropertyInfo* cf_property_info =
GetPropertyInfo(DB::Properties::kCFStats);
assert(cf_property_info != nullptr);
const DBPropertyInfo* db_property_info =
GetPropertyInfo(DB::Properties::kDBStats);
assert(db_property_info != nullptr);

std::string stats;
{
InstrumentedMutexLock l(&mutex_);
default_cf_internal_stats_->GetStringProperty(
*db_property_info, DB::Properties::kDBStats, &stats);
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->initialized()) {
cfd->internal_stats()->GetStringProperty(
*cf_property_info, DB::Properties::kCFStatsNoFileHistogram,
&stats);
}
}
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->initialized()) {
cfd->internal_stats()->GetStringProperty(
*cf_property_info, DB::Properties::kCFFileHistogram, &stats);
}
const DBPropertyInfo* cf_property_info =
GetPropertyInfo(DB::Properties::kCFStats);
assert(cf_property_info != nullptr);
const DBPropertyInfo* db_property_info =
GetPropertyInfo(DB::Properties::kDBStats);
assert(db_property_info != nullptr);

std::string stats;
if (shutdown_initiated_) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdown_initiated_ is not an atomic, and I believe accessing it also require holding db mutex. Maybe you want to turn it into an atomic?

return;
}
{
InstrumentedMutexLock l(&mutex_);
default_cf_internal_stats_->GetStringProperty(
*db_property_info, DB::Properties::kDBStats, &stats);
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->initialized()) {
cfd->internal_stats()->GetStringProperty(
*cf_property_info, DB::Properties::kCFStatsNoFileHistogram, &stats);
}
}
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"------- DUMPING STATS -------");
ROCKS_LOG_WARN(immutable_db_options_.info_log, "%s", stats.c_str());
if (immutable_db_options_.dump_malloc_stats) {
stats.clear();
DumpMallocStats(&stats);
if (!stats.empty()) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"------- Malloc STATS -------");
ROCKS_LOG_WARN(immutable_db_options_.info_log, "%s", stats.c_str());
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->initialized()) {
cfd->internal_stats()->GetStringProperty(
*cf_property_info, DB::Properties::kCFFileHistogram, &stats);
}
}
}
TEST_SYNC_POINT("DBImpl::DumpStats:2");
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"------- DUMPING STATS -------");
ROCKS_LOG_WARN(immutable_db_options_.info_log, "%s", stats.c_str());
if (immutable_db_options_.dump_malloc_stats) {
stats.clear();
DumpMallocStats(&stats);
if (!stats.empty()) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"------- Malloc STATS -------");
ROCKS_LOG_WARN(immutable_db_options_.info_log, "%s", stats.c_str());
}
}
#endif // !ROCKSDB_LITE

PrintStatistics();
}
PrintStatistics();
}

void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) {
Expand Down Expand Up @@ -762,7 +773,22 @@ Status DBImpl::SetDBOptions(
new_options.max_background_compactions, Env::Priority::LOW);
MaybeScheduleFlushOrCompaction();
}

if (new_options.stats_dump_period_sec !=
mutable_db_options_.stats_dump_period_sec) {
if (thread_dump_stats_) {
mutex_.Unlock();
thread_dump_stats_->cancel();
mutex_.Lock();
}
if (new_options.stats_dump_period_sec > 0) {
thread_dump_stats_.reset(new rocksdb::RepeatableThread(
[this]() { DBImpl::DumpStats(); }, "dump_st", env_,
new_options.stats_dump_period_sec * 1000000));
}
else {
thread_dump_stats_.reset();
}
}
write_controller_.set_max_delayed_write_rate(
new_options.delayed_write_rate);
table_cache_.get()->SetCapacity(new_options.max_open_files == -1
Expand Down
13 changes: 11 additions & 2 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
#include "util/autovector.h"
#include "util/event_logger.h"
#include "util/hash.h"
#include "util/repeatable_thread.h"
#include "util/stop_watch.h"
#include "util/thread_local.h"
#include "util/trace_replay.h"
Expand Down Expand Up @@ -465,6 +466,7 @@ class DBImpl : public DB {
int TEST_BGCompactionsAllowed() const;
int TEST_BGFlushesAllowed() const;
size_t TEST_GetWalPreallocateBlockSize(uint64_t write_buffer_size) const;
void TEST_WaitForTimedTaskRun(std::function<void()> callback) const;

#endif // NDEBUG

Expand Down Expand Up @@ -1062,10 +1064,13 @@ class DBImpl : public DB {
const std::vector<CompactionInputFiles>& inputs,
bool* sfm_bookkeeping, LogBuffer* log_buffer);

// Schedule background tasks
void StartTimedTasks();

void PrintStatistics();

// dump rocksdb.stats to LOG
void MaybeDumpStats();
void DumpStats();

// Return the minimum empty level that could hold the total data in the
// input level. Return the input level, if such level could not be found.
Expand Down Expand Up @@ -1468,6 +1473,10 @@ class DBImpl : public DB {
// Only to be set during initialization
std::unique_ptr<PreReleaseCallback> recoverable_state_pre_release_callback_;

// handle for scheduling jobs at fixed intervals
// REQUIRES: mutex locked
std::unique_ptr<rocksdb::RepeatableThread> thread_dump_stats_;

// No copying allowed
DBImpl(const DBImpl&);
void operator=(const DBImpl&);
Expand Down Expand Up @@ -1547,7 +1556,7 @@ class DBImpl : public DB {
// error recovery from going on in parallel. The latter, shutting_down_,
// is set a little later during the shutdown after scheduling memtable
// flushes
bool shutdown_initiated_;
std::atomic<bool> shutdown_initiated_;
// Flag to indicate whether sst_file_manager object was allocated in
// DB::Open() or passed to us
bool own_sfm_;
Expand Down
1 change: 0 additions & 1 deletion db/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1655,7 +1655,6 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
bool made_progress = false;
JobContext job_context(next_job_id_.fetch_add(1), true);
TEST_SYNC_POINT("BackgroundCallCompaction:0");
MaybeDumpStats();
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
immutable_db_options_.info_log.get());
{
Expand Down
5 changes: 5 additions & 0 deletions db/db_impl_debug.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,5 +243,10 @@ size_t DBImpl::TEST_GetWalPreallocateBlockSize(
return GetWalPreallocateBlockSize(write_buffer_size);
}

void DBImpl::TEST_WaitForTimedTaskRun(std::function<void()> callback) const {
if (thread_dump_stats_ != nullptr) {
thread_dump_stats_->TEST_WaitForRun(callback);
}
}
} // namespace rocksdb
#endif // NDEBUG
3 changes: 3 additions & 0 deletions db/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1295,6 +1295,9 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
persist_options_status.ToString());
}
}
if (s.ok()) {
impl->StartTimedTasks();
}
if (!s.ok()) {
for (auto* h : *handles) {
delete h;
Expand Down
27 changes: 27 additions & 0 deletions db/db_options_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,33 @@ TEST_F(DBOptionsTest, SetStatsDumpPeriodSec) {
}
}

TEST_F(DBOptionsTest, RunStatsDumpPeriodSec) {
Options options;
options.create_if_missing = true;
options.stats_dump_period_sec = 5;
std::unique_ptr<rocksdb::MockTimeEnv> mock_env;
mock_env.reset(new rocksdb::MockTimeEnv(env_));
mock_env->set_current_time(0); // in seconds
options.env = mock_env.get();
int counter = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::DumpStats:1", [&](void* /*arg*/) {
counter++;
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
Reopen(options);
ASSERT_EQ(5, dbfull()->GetDBOptions().stats_dump_period_sec);
dbfull()->TEST_WaitForTimedTaskRun([&] { mock_env->set_current_time(5); });
ASSERT_GE(counter, 1);

// Test cacel job through SetOptions
ASSERT_OK(dbfull()->SetDBOptions({{"stats_dump_period_sec", "0"}}));
int old_val = counter;
env_->SleepForMicroseconds(10000000);
ASSERT_EQ(counter, old_val);
Close();
}

static void assert_candidate_files_empty(DBImpl* dbfull, const bool empty) {
dbfull->TEST_LockMutex();
JobContext job_context(0);
Expand Down
2 changes: 1 addition & 1 deletion db/db_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ class SpecialEnv : public EnvWrapper {

std::atomic<int> delete_count_;

bool time_elapse_only_sleep_;
std::atomic<bool> time_elapse_only_sleep_;

bool no_slowdown_;

Expand Down
2 changes: 1 addition & 1 deletion env/mock_env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ class TestMemLogger : public Logger {
static const uint64_t flush_every_seconds_ = 5;
std::atomic_uint_fast64_t last_flush_micros_;
Env* env_;
bool flush_pending_;
std::atomic<bool> flush_pending_;

public:
TestMemLogger(std::unique_ptr<WritableFile> f, Env* env,
Expand Down