Skip to content

Commit

Permalink
server: support app env rocksdb.checkpoint.reserve_min_count and rock…
Browse files Browse the repository at this point in the history
…sdb.checkpoint.reserve_time_seconds (#252)
  • Loading branch information
qinzuoyan committed Feb 27, 2019
1 parent cc81519 commit 16c53ff
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 13 deletions.
3 changes: 3 additions & 0 deletions src/base/pegasus_const.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP("skip");
/// <= 0 means no effect
const std::string TABLE_LEVEL_DEFAULT_TTL("default_ttl");

const std::string ROCKDB_CHECKPOINT_RESERVE_MIN_COUNT("rocksdb.checkpoint.reserve_min_count");
const std::string ROCKDB_CHECKPOINT_RESERVE_TIME_SECONDS("rocksdb.checkpoint.reserve_time_seconds");

/// read cluster meta address from this section
const std::string PEGASUS_CLUSTER_SECTION_NAME("pegasus.clusters");
} // namespace pegasus
3 changes: 3 additions & 0 deletions src/base/pegasus_const.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,8 @@ extern const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP;

extern const std::string TABLE_LEVEL_DEFAULT_TTL;

extern const std::string ROCKDB_CHECKPOINT_RESERVE_MIN_COUNT;
extern const std::string ROCKDB_CHECKPOINT_RESERVE_TIME_SECONDS;

extern const std::string PEGASUS_CLUSTER_SECTION_NAME;
} // namespace pegasus
61 changes: 49 additions & 12 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,15 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
_wt_opts.disableWAL = true;

// get the checkpoint reserve options.
_checkpoint_reserve_min_count = (uint32_t)dsn_config_get_value_uint64(
_checkpoint_reserve_min_count_in_config = (uint32_t)dsn_config_get_value_uint64(
"pegasus.server", "checkpoint_reserve_min_count", 3, "checkpoint_reserve_min_count");
_checkpoint_reserve_time_seconds =
_checkpoint_reserve_min_count = _checkpoint_reserve_min_count_in_config;
_checkpoint_reserve_time_seconds_in_config =
(uint32_t)dsn_config_get_value_uint64("pegasus.server",
"checkpoint_reserve_time_seconds",
0,
"checkpoint_reserve_time_seconds, 0 means no check");
_checkpoint_reserve_time_seconds = _checkpoint_reserve_time_seconds_in_config;

_update_rdb_stat_interval = std::chrono::seconds(dsn_config_get_value_uint64(
"pegasus.server", "update_rdb_stat_interval", 600, "update_rdb_stat_interval, in seconds"));
Expand Down Expand Up @@ -2309,6 +2311,7 @@ void pegasus_server_impl::update_app_envs(const std::map<std::string, std::strin
{
update_usage_scenario(envs);
update_default_ttl(envs);
update_checkpoint_reserve(envs);
_manual_compact_svc.start_manual_compact_if_needed(envs);
}

Expand All @@ -2327,17 +2330,15 @@ void pegasus_server_impl::update_usage_scenario(const std::map<std::string, std:
if (new_usage_scenario != _usage_scenario) {
std::string old_usage_scenario = _usage_scenario;
if (set_usage_scenario(new_usage_scenario)) {
ddebug("%s: update app env[%s] from %s to %s succeed",
replica_name(),
ROCKSDB_ENV_USAGE_SCENARIO_KEY.c_str(),
old_usage_scenario.c_str(),
new_usage_scenario.c_str());
ddebug_replica("update app env[{}] from {} to {} succeed",
ROCKSDB_ENV_USAGE_SCENARIO_KEY,
old_usage_scenario,
new_usage_scenario);
} else {
derror("%s: update app env[%s] from %s to %s failed",
replica_name(),
ROCKSDB_ENV_USAGE_SCENARIO_KEY.c_str(),
old_usage_scenario.c_str(),
new_usage_scenario.c_str());
derror_replica("update app env[{}] from {} to {} failed",
ROCKSDB_ENV_USAGE_SCENARIO_KEY,
old_usage_scenario,
new_usage_scenario);
}
}
}
Expand All @@ -2356,6 +2357,42 @@ void pegasus_server_impl::update_default_ttl(const std::map<std::string, std::st
}
}

void pegasus_server_impl::update_checkpoint_reserve(const std::map<std::string, std::string> &envs)
{
int32_t count = _checkpoint_reserve_min_count_in_config;
int32_t time = _checkpoint_reserve_time_seconds_in_config;

auto find = envs.find(ROCKDB_CHECKPOINT_RESERVE_MIN_COUNT);
if (find != envs.end()) {
if (!dsn::buf2int32(find->second, count) || count <= 0) {
derror_replica("{}={} is invalid.", find->first, find->second);
return;
}
}
find = envs.find(ROCKDB_CHECKPOINT_RESERVE_TIME_SECONDS);
if (find != envs.end()) {
if (!dsn::buf2int32(find->second, time) || time < 0) {
derror_replica("{}={} is invalid.", find->first, find->second);
return;
}
}

if (count != _checkpoint_reserve_min_count) {
ddebug_replica("update app env[{}] from {} to {} succeed",
ROCKDB_CHECKPOINT_RESERVE_MIN_COUNT,
_checkpoint_reserve_min_count,
count);
_checkpoint_reserve_min_count = count;
}
if (time != _checkpoint_reserve_time_seconds) {
ddebug_replica("update app env[{}] from {} to {} succeed",
ROCKDB_CHECKPOINT_RESERVE_TIME_SECONDS,
_checkpoint_reserve_time_seconds,
time);
_checkpoint_reserve_time_seconds = time;
}
}

bool pegasus_server_impl::parse_compression_types(
const std::string &config, std::vector<rocksdb::CompressionType> &compression_per_level)
{
Expand Down
4 changes: 4 additions & 0 deletions src/server/pegasus_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service

void update_default_ttl(const std::map<std::string, std::string> &envs);

void update_checkpoint_reserve(const std::map<std::string, std::string> &envs);

// return true if parse compression types 'config' success, otherwise return false.
// 'compression_per_level' will not be changed if parse failed.
bool parse_compression_types(const std::string &config,
Expand Down Expand Up @@ -278,6 +280,8 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service

std::unique_ptr<pegasus_server_write> _server_write;

uint32_t _checkpoint_reserve_min_count_in_config;
uint32_t _checkpoint_reserve_time_seconds_in_config;
uint32_t _checkpoint_reserve_min_count;
uint32_t _checkpoint_reserve_time_seconds;
std::atomic_bool _is_checkpointing; // whether the db is doing checkpoint
Expand Down

0 comments on commit 16c53ff

Please sign in to comment.