From 839b5ea29b3dc41f1daa568822ced869b4c063f0 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Tue, 17 Mar 2020 18:20:24 +0800 Subject: [PATCH 01/17] abnormal iteration --- rdsn | 2 +- src/base/pegasus_const.cpp | 3 + src/base/pegasus_const.h | 2 + src/server/config.ini | 5 + src/server/config.min.ini | 1 - src/server/pegasus_server_impl.cpp | 153 +++++++++++++++++++++++-- src/server/pegasus_server_impl.h | 7 ++ src/shell/commands/data_operations.cpp | 2 + src/test/function_test/test_basic.cpp | 10 ++ src/test/function_test/test_scan.cpp | 34 ++++++ 10 files changed, 206 insertions(+), 13 deletions(-) diff --git a/rdsn b/rdsn index a6857588cf..040355e871 160000 --- a/rdsn +++ b/rdsn @@ -1 +1 @@ -Subproject commit a6857588cfa9cb14726f3cc5854e655e23a76c2c +Subproject commit 040355e871c4062cbabf0fc2f16e032d936d7940 diff --git a/src/base/pegasus_const.cpp b/src/base/pegasus_const.cpp index d5cc049b88..b50376a027 100644 --- a/src/base/pegasus_const.cpp +++ b/src/base/pegasus_const.cpp @@ -69,4 +69,7 @@ const std::string PEGASUS_CLUSTER_SECTION_NAME("pegasus.clusters"); /// table level slow query const std::string ROCKSDB_ENV_SLOW_QUERY_THRESHOLD("replica.slow_query_threshold"); + +/// duration threshold of each rocksdb iteration +const std::string ROCKSDB_ITERATION_THRESHOLD("replica.iterate_threshold"); } // namespace pegasus diff --git a/src/base/pegasus_const.h b/src/base/pegasus_const.h index 6d096f7b83..367b7dfd55 100644 --- a/src/base/pegasus_const.h +++ b/src/base/pegasus_const.h @@ -45,4 +45,6 @@ extern const std::string ROCKDB_CHECKPOINT_RESERVE_TIME_SECONDS; extern const std::string PEGASUS_CLUSTER_SECTION_NAME; extern const std::string ROCKSDB_ENV_SLOW_QUERY_THRESHOLD; + +extern const std::string ROCKSDB_ITERATION_THRESHOLD; } // namespace pegasus diff --git a/src/server/config.ini b/src/server/config.ini index dd8d79b8dc..d4ecaf1a02 100644 --- a/src/server/config.ini +++ b/src/server/config.ini @@ -280,6 +280,11 @@ # Bloom filter type, should be either 'common' or 'prefix' rocksdb_filter_type = prefix + # 10MB, 1000, 30s + rocksdb_multi_get_iterate_size_threshold = 100000000 + rocksdb_iterate_count_threshold = 1000 + rocksdb_iterate_threshold_ns = 30000000000 + checkpoint_reserve_min_count = 2 checkpoint_reserve_time_seconds = 1800 diff --git a/src/server/config.min.ini b/src/server/config.min.ini index f46bd60167..1ef87db577 100644 --- a/src/server/config.min.ini +++ b/src/server/config.min.ini @@ -121,7 +121,6 @@ perf_counter_sink = # The HTTP port exposed to Prometheus for pulling metrics from pegasus server. prometheus_port = @PROMETHEUS_PORT@ - [pegasus.collector] available_detect_app = @APP_NAME@ available_detect_alert_script_dir = ./package/bin diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index fdc7177bb8..e99bf15c30 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -90,6 +90,28 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r) 1000, "multi-get operation iterate count exceed this threshold will be logged, 0 means no check"); + _multi_get_iterate_size_threshold = + dsn_config_get_value_uint64("pegasus.server", + "rocksdb_multi_get_iterate_size_threshold", + 100000000, + "multi-get operation total key-value size exceed " + "this threshold will stop iterating rocksdb, 0 means no check"); + + _rocksdb_iterate_count_threshold = dsn_config_get_value_uint64( + "pegasus.server", + "rocksdb_iterate_count_threshold", + 1000, + "max iterate count for each rocksdb iterator operation, if exceed this threshold," + "iterator will be stopped"); + + _rocksdb_iterate_threshold_ns_in_config = + dsn_config_get_value_uint64("pegasus.server", + "rocksdb_iterate_threshold_ns", + 30000000000, + "max duration for each rocksdb iterator operation if exceed " + "this threshold, iterator will be stopped, 0 means no check"); + _rocksdb_iterate_threshold_ns = _rocksdb_iterate_threshold_ns_in_config; + // init rocksdb::DBOptions _db_opts.pegasus_data = true; _db_opts.pegasus_data_version = _pegasus_data_version; @@ -675,8 +697,16 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req return; } - int32_t max_kv_count = request.max_kv_count > 0 ? request.max_kv_count : INT_MAX; + int32_t max_iterate_count = + (request.max_kv_count > 0 && request.max_kv_count < _rocksdb_iterate_count_threshold) + ? request.max_kv_count + : _rocksdb_iterate_count_threshold; int32_t max_kv_size = request.max_kv_size > 0 ? request.max_kv_size : INT_MAX; + int32_t max_iterate_size_config = + _multi_get_iterate_size_threshold > 0 ? _multi_get_iterate_size_threshold : INT_MAX; + int32_t max_iterate_size = std::min(max_kv_size, max_iterate_size_config); + uint64_t time_threshold_ns = _rocksdb_iterate_threshold_ns; + uint32_t epoch_now = ::pegasus::utils::epoch_now(); int32_t count = 0; int64_t size = 0; @@ -759,7 +789,11 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req it.reset(_db->NewIterator(_data_cf_rd_opts)); it->Seek(start); bool first_exclusive = !start_inclusive; - while (count < max_kv_count && size < max_kv_size && it->Valid()) { + while (iterate_count < max_iterate_count && size < max_iterate_size && it->Valid()) { + if (time_threshold_ns > 0 && dsn_now_ns() - start_time > time_threshold_ns) { + break; + } + // check stop sort key int c = it->key().compare(stop); if (c > 0 || (c == 0 && !stop_inclusive)) { @@ -822,7 +856,12 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req it->SeekForPrev(stop); bool first_exclusive = !stop_inclusive; std::vector<::dsn::apps::key_value> reverse_kvs; - while (count < max_kv_count && size < max_kv_size && it->Valid()) { + + while (iterate_count < max_iterate_count && size < max_kv_size && it->Valid()) { + if (time_threshold_ns > 0 && dsn_now_ns() - start_time > time_threshold_ns) { + break; + } + // check start sort key int c = it->key().compare(start); if (c < 0 || (c == 0 && !start_inclusive)) { @@ -902,6 +941,13 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req } else if (it->Valid() && !complete) { // scan not completed resp.error = rocksdb::Status::kIncomplete; + // TODO(heyuchen): consider if add perf-counter here + dwarn_replica("rocksdb abnormal scan from {}: iterator_count = {}, iterator_size = {}, " + "time_used = {}", + reply.to_address().to_string(), + iterate_count, + size, + dsn_now_ns() - start_time); } } else { bool error_occurred = false; @@ -942,6 +988,7 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req } // check ttl if (status.ok()) { + iterate_count++; uint32_t expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, value); if (expire_ts > 0 && expire_ts <= epoch_now) { expire_count++; @@ -956,7 +1003,7 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req // extract value if (status.ok()) { // check if exceed limit - if (count >= max_kv_count || size >= max_kv_size) { + if (iterate_count > max_iterate_count || size > max_kv_size) { exceed_limit = true; break; } @@ -982,6 +1029,13 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req resp.kvs.clear(); } else if (exceed_limit) { resp.error = rocksdb::Status::kIncomplete; + // TODO(heyuchen): consider if add perf-counter here + dwarn_replica("rocksdb abnormal scan from {}: iterator_count = {}, iterator_size = {}, " + "time_used = {}", + reply.to_address().to_string(), + iterate_count, + size, + dsn_now_ns() - start_time); } else { resp.error = rocksdb::Status::kOk; } @@ -1039,10 +1093,14 @@ void pegasus_server_impl::on_sortkey_count(const ::dsn::blob &hash_key, { dassert(_is_open, ""); + // TODO(heyuchen): + // calculate qps and latency + ::dsn::apps::count_response resp; resp.app_id = _gpid.get_app_id(); resp.partition_index = _gpid.get_partition_index(); resp.server = _primary_address; + uint64_t start_time = dsn_now_ns(); // scan ::dsn::blob start_key, stop_key; @@ -1057,7 +1115,15 @@ void pegasus_server_impl::on_sortkey_count(const ::dsn::blob &hash_key, resp.count = 0; uint32_t epoch_now = ::pegasus::utils::epoch_now(); uint64_t expire_count = 0; + + uint64_t scan_threshold_ns = _rocksdb_iterate_threshold_ns; + bool exceed_limit = false; while (it->Valid()) { + if (scan_threshold_ns > 0 && dsn_now_ns() - start_time > scan_threshold_ns) { + exceed_limit = true; + break; + } + if (check_if_record_expired(epoch_now, it->value())) { expire_count++; if (_verbose_log) { @@ -1091,6 +1157,9 @@ void pegasus_server_impl::on_sortkey_count(const ::dsn::blob &hash_key, it->status().ToString().c_str()); } resp.count = 0; + } else if (exceed_limit) { + dwarn_replica("rocksdb scan takes too long time from {}", reply.to_address().to_string()); + resp.count = -1; } _cu_calculator->add_sortkey_count_cu(resp.error); @@ -1264,8 +1333,22 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request uint64_t expire_count = 0; uint64_t filter_count = 0; int32_t count = 0; - resp.kvs.reserve(request.batch_size); - while (count < request.batch_size && it->Valid()) { + + bool exceed_limit = false; + int32_t iterate_count = 0; + uint64_t scan_threshold_ns = _rocksdb_iterate_threshold_ns; + int32_t batch_count = + (request.batch_size < _rocksdb_iterate_count_threshold && request.batch_size > 0) + ? request.batch_size + : _rocksdb_iterate_count_threshold; + resp.kvs.reserve(batch_count); + + while (iterate_count < batch_count && it->Valid()) { + if (scan_threshold_ns > 0 && dsn_now_ns() - start_time > scan_threshold_ns) { + exceed_limit = true; + break; + } + int c = it->key().compare(stop); if (c > 0 || (c == 0 && !stop_inclusive)) { // out of range @@ -1282,6 +1365,8 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request } } + iterate_count++; + int r = append_key_value_for_scan(resp.kvs, it->key(), it->value(), @@ -1321,7 +1406,7 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request request.start_inclusive ? "inclusive" : "exclusive", ::pegasus::utils::c_escape_string(stop).c_str(), request.stop_inclusive ? "inclusive" : "exclusive", - request.batch_size, + batch_count, count, it->status().ToString().c_str()); } else { @@ -1331,6 +1416,9 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request it->status().ToString().c_str()); } resp.kvs.clear(); + } else if (exceed_limit) { + // scan exceed limit time + resp.error = rocksdb::Status::kIncomplete; } else if (it->Valid() && !complete) { // scan not completed std::unique_ptr context( @@ -1343,7 +1431,7 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request request.sort_key_filter_type, std::string(request.sort_key_filter_pattern.data(), request.sort_key_filter_pattern.length()), - request.batch_size, + batch_count, request.no_value)); int64_t handle = _context_cache.put(std::move(context)); resp.context_id = handle; @@ -1388,7 +1476,6 @@ void pegasus_server_impl::on_scan(const ::dsn::apps::scan_request &request, std::unique_ptr context = _context_cache.fetch(request.context_id); if (context) { rocksdb::Iterator *it = context->iterator.get(); - int32_t batch_size = context->batch_size; const rocksdb::Slice &stop = context->stop; bool stop_inclusive = context->stop_inclusive; ::dsn::apps::filter_type::type hash_key_filter_type = context->hash_key_filter_type; @@ -1402,7 +1489,20 @@ void pegasus_server_impl::on_scan(const ::dsn::apps::scan_request &request, uint64_t filter_count = 0; int32_t count = 0; - while (count < batch_size && it->Valid()) { + bool exceed_limit = false; + int32_t iterate_count = 0; + uint64_t scan_threshold_ns = _rocksdb_iterate_threshold_ns; + int32_t batch_count = + (context->batch_size < _rocksdb_iterate_count_threshold && context->batch_size > 0) + ? context->batch_size + : _rocksdb_iterate_count_threshold; + + while (iterate_count < batch_count && it->Valid()) { + if (scan_threshold_ns > 0 && dsn_now_ns() - start_time > scan_threshold_ns) { + exceed_limit = true; + break; + } + int c = it->key().compare(stop); if (c > 0 || (c == 0 && !stop_inclusive)) { // out of range @@ -1410,6 +1510,8 @@ void pegasus_server_impl::on_scan(const ::dsn::apps::scan_request &request, break; } + iterate_count++; + int r = append_key_value_for_scan(resp.kvs, it->key(), it->value(), @@ -1448,7 +1550,7 @@ void pegasus_server_impl::on_scan(const ::dsn::apps::scan_request &request, request.context_id, ::pegasus::utils::c_escape_string(stop).c_str(), stop_inclusive ? "inclusive" : "exclusive", - batch_size, + batch_count, count, it->status().ToString().c_str()); } else { @@ -1458,6 +1560,9 @@ void pegasus_server_impl::on_scan(const ::dsn::apps::scan_request &request, it->status().ToString().c_str()); } resp.kvs.clear(); + } else if (exceed_limit) { + // scan exceed limit time + resp.error = rocksdb::Status::kIncomplete; } else if (it->Valid() && !complete) { // scan not completed int64_t handle = _context_cache.put(std::move(context)); @@ -2391,6 +2496,7 @@ void pegasus_server_impl::update_app_envs(const std::map &envs) +{ + uint64_t threshold_ns = _rocksdb_iterate_threshold_ns_in_config; + auto find = envs.find(ROCKSDB_ITERATION_THRESHOLD); + if (find != envs.end()) { + // the unit of iterate threshold from env is ms + uint64_t threshold_ms; + if (!dsn::buf2uint64(find->second, threshold_ms) || threshold_ms < 0) { + derror_replica("{}={} is invalid.", find->first, find->second); + return; + } + threshold_ns = threshold_ms * 1e6; + } + + if (_rocksdb_iterate_threshold_ns != threshold_ns) { + ddebug_replica("update app env[{}] from \"{}\" to \"{}\" succeed", + ROCKSDB_ITERATION_THRESHOLD, + _rocksdb_iterate_threshold_ns, + threshold_ns); + _rocksdb_iterate_threshold_ns = threshold_ns; + } +} + bool pegasus_server_impl::parse_compression_types( const std::string &config, std::vector &compression_per_level) { diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h index 357c8edb75..d18ff032b8 100644 --- a/src/server/pegasus_server_impl.h +++ b/src/server/pegasus_server_impl.h @@ -231,6 +231,8 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service void update_slow_query_threshold(const std::map &envs); + void update_rocksdb_iterate_threshold(const std::map &envs); + // return true if parse compression types 'config' success, otherwise return false. // 'compression_per_level' will not be changed if parse failed. bool parse_compression_types(const std::string &config, @@ -309,6 +311,11 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service // slow query time threshold. exceed this threshold will be logged. uint64_t _slow_query_threshold_ns; uint64_t _slow_query_threshold_ns_in_config; + // abnormal multi_get/rocksdb_iteration + uint64_t _multi_get_iterate_size_threshold; + uint64_t _rocksdb_iterate_count_threshold; + uint64_t _rocksdb_iterate_threshold_ns_in_config; + uint64_t _rocksdb_iterate_threshold_ns; std::shared_ptr _key_ttl_compaction_filter_factory; std::shared_ptr _statistics; diff --git a/src/shell/commands/data_operations.cpp b/src/shell/commands/data_operations.cpp index b9dad86f38..ba7dfc09db 100644 --- a/src/shell/commands/data_operations.cpp +++ b/src/shell/commands/data_operations.cpp @@ -1013,6 +1013,8 @@ bool sortkey_count(command_executor *e, shell_context *sc, arguments args) int ret = sc->pg_client->sortkey_count(hash_key, count, sc->timeout_ms, &info); if (ret != pegasus::PERR_OK) { fprintf(stderr, "ERROR: %s\n", sc->pg_client->get_error_string(ret)); + } else if (count == -1) { + fprintf(stderr, "ERROR: it takes too long to count sortkey\n"); } else { fprintf(stderr, "%" PRId64 "\n", count); } diff --git a/src/test/function_test/test_basic.cpp b/src/test/function_test/test_basic.cpp index c26a300b0a..49448207ed 100644 --- a/src/test/function_test/test_basic.cpp +++ b/src/test/function_test/test_basic.cpp @@ -546,6 +546,16 @@ TEST(basic, multi_get) ASSERT_EQ(1, (int)new_values.size()); ASSERT_EQ("5", new_values["5"]); + // set a expired value + ret = client->set("basic_test_multi_get", "", "expire_value", 5000, 1); + ASSERT_EQ(PERR_OK, ret); + std::this_thread::sleep_for(std::chrono::seconds(1)); + new_values.clear(); + ret = client->multi_get("basic_test_multi_get", "", "", options, new_values, 2); + ASSERT_EQ(PERR_INCOMPLETE, ret); + ASSERT_EQ(1, (int)new_values.size()); + ASSERT_EQ("1", new_values["1"]); + // multi_del std::set sortkeys; sortkeys.insert(""); diff --git a/src/test/function_test/test_scan.cpp b/src/test/function_test/test_scan.cpp index 402b7c808b..aadf5e19d5 100644 --- a/src/test/function_test/test_scan.cpp +++ b/src/test/function_test/test_scan.cpp @@ -7,14 +7,17 @@ #include #include +#include #include #include #include #include +#include "base/pegasus_const.h" using namespace ::pegasus; extern pegasus_client *client; +extern std::shared_ptr ddl_client; static const char CCH[] = "_0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; static char buffer[256]; static std::map> base; @@ -397,3 +400,34 @@ TEST_F(scan, OVERALL) } compare(data, base); } + +TEST_F(scan, ITERATE_TIME_LIMIT) +{ + // update iterate threshold to 1ms + auto response = ddl_client->set_app_envs( + client->get_app_name(), {ROCKSDB_ITERATION_THRESHOLD}, {std::to_string(1)}); + ASSERT_EQ(true, response.is_ok()); + ASSERT_EQ(dsn::ERR_OK, response.get_value().err); + // wait envs to be synced. + std::this_thread::sleep_for(std::chrono::seconds(30)); + + // write data into table + int32_t i = 0; + std::string sort_key; + std::string value; + while (i < 9000) { + sort_key = random_string(); + value = random_string(); + int ret = client->set(expected_hash_key, sort_key, value); + ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << expected_hash_key + << ", sort_key=" << sort_key + << ", error=" << client->get_error_string(ret); + i++; + } + + // get sortkey count timeout + int64_t count = 0; + int ret = client->sortkey_count(expected_hash_key, count); + ASSERT_EQ(0, ret); + ASSERT_EQ(count, -1); +} From b33b75866ea140a8326e0a366d34faf439f235a9 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Fri, 20 Mar 2020 10:02:16 +0800 Subject: [PATCH 02/17] small fix --- src/server/config.min.ini | 1 + src/server/pegasus_server_impl.cpp | 67 +++++++++++++++++++----------- src/test/function_test/run.sh | 2 +- 3 files changed, 45 insertions(+), 25 deletions(-) diff --git a/src/server/config.min.ini b/src/server/config.min.ini index 1ef87db577..f46bd60167 100644 --- a/src/server/config.min.ini +++ b/src/server/config.min.ini @@ -121,6 +121,7 @@ perf_counter_sink = # The HTTP port exposed to Prometheus for pulling metrics from pegasus server. prometheus_port = @PROMETHEUS_PORT@ + [pegasus.collector] available_detect_app = @APP_NAME@ available_detect_alert_script_dir = ./package/bin diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index e99bf15c30..ff6a160143 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -785,12 +785,16 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req std::unique_ptr it; bool complete = false; + uint64_t iteration_time = dsn_now_ns(); + bool exceed_limit = false; if (!request.reverse) { it.reset(_db->NewIterator(_data_cf_rd_opts)); it->Seek(start); bool first_exclusive = !start_inclusive; while (iterate_count < max_iterate_count && size < max_iterate_size && it->Valid()) { - if (time_threshold_ns > 0 && dsn_now_ns() - start_time > time_threshold_ns) { + iteration_time = dsn_now_ns(); + if (time_threshold_ns > 0 && iteration_time - start_time > time_threshold_ns) { + exceed_limit = true; break; } @@ -856,9 +860,10 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req it->SeekForPrev(stop); bool first_exclusive = !stop_inclusive; std::vector<::dsn::apps::key_value> reverse_kvs; - while (iterate_count < max_iterate_count && size < max_kv_size && it->Valid()) { - if (time_threshold_ns > 0 && dsn_now_ns() - start_time > time_threshold_ns) { + iteration_time = dsn_now_ns(); + if (time_threshold_ns > 0 && iteration_time - start_time > time_threshold_ns) { + exceed_limit = true; break; } @@ -941,13 +946,13 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req } else if (it->Valid() && !complete) { // scan not completed resp.error = rocksdb::Status::kIncomplete; - // TODO(heyuchen): consider if add perf-counter here - dwarn_replica("rocksdb abnormal scan from {}: iterator_count = {}, iterator_size = {}, " - "time_used = {}", - reply.to_address().to_string(), - iterate_count, - size, - dsn_now_ns() - start_time); + if (exceed_limit) { + dwarn_replica( + "rocksdb abnormal scan from {}: time_used_ns({}) VS time_threshold_ns({})", + reply.to_address().to_string(), + iteration_time - start_time, + time_threshold_ns); + } } } else { bool error_occurred = false; @@ -1029,13 +1034,6 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req resp.kvs.clear(); } else if (exceed_limit) { resp.error = rocksdb::Status::kIncomplete; - // TODO(heyuchen): consider if add perf-counter here - dwarn_replica("rocksdb abnormal scan from {}: iterator_count = {}, iterator_size = {}, " - "time_used = {}", - reply.to_address().to_string(), - iterate_count, - size, - dsn_now_ns() - start_time); } else { resp.error = rocksdb::Status::kOk; } @@ -1093,14 +1091,13 @@ void pegasus_server_impl::on_sortkey_count(const ::dsn::blob &hash_key, { dassert(_is_open, ""); - // TODO(heyuchen): - // calculate qps and latency + _pfc_scan_qps->increment(); + uint64_t start_time = dsn_now_ns(); ::dsn::apps::count_response resp; resp.app_id = _gpid.get_app_id(); resp.partition_index = _gpid.get_partition_index(); resp.server = _primary_address; - uint64_t start_time = dsn_now_ns(); // scan ::dsn::blob start_key, stop_key; @@ -1118,8 +1115,10 @@ void pegasus_server_impl::on_sortkey_count(const ::dsn::blob &hash_key, uint64_t scan_threshold_ns = _rocksdb_iterate_threshold_ns; bool exceed_limit = false; + uint64_t iteration_time = dsn_now_ns(); while (it->Valid()) { - if (scan_threshold_ns > 0 && dsn_now_ns() - start_time > scan_threshold_ns) { + iteration_time = dsn_now_ns(); + if (scan_threshold_ns > 0 && iteration_time - start_time > scan_threshold_ns) { exceed_limit = true; break; } @@ -1158,11 +1157,15 @@ void pegasus_server_impl::on_sortkey_count(const ::dsn::blob &hash_key, } resp.count = 0; } else if (exceed_limit) { - dwarn_replica("rocksdb scan takes too long time from {}", reply.to_address().to_string()); + dwarn_replica("rocksdb abnormal scan from {}: time_used_ns({}) VS time_threshold_ns({})", + reply.to_address().to_string(), + iteration_time - start_time, + scan_threshold_ns); resp.count = -1; } _cu_calculator->add_sortkey_count_cu(resp.error); + _pfc_scan_latency->set(dsn_now_ns() - start_time); reply(resp); } @@ -1334,6 +1337,7 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request uint64_t filter_count = 0; int32_t count = 0; + uint64_t iteration_time = dsn_now_ns(); bool exceed_limit = false; int32_t iterate_count = 0; uint64_t scan_threshold_ns = _rocksdb_iterate_threshold_ns; @@ -1344,7 +1348,8 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request resp.kvs.reserve(batch_count); while (iterate_count < batch_count && it->Valid()) { - if (scan_threshold_ns > 0 && dsn_now_ns() - start_time > scan_threshold_ns) { + iteration_time = dsn_now_ns(); + if (scan_threshold_ns > 0 && iteration_time - start_time > scan_threshold_ns) { exceed_limit = true; break; } @@ -1419,6 +1424,12 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request } else if (exceed_limit) { // scan exceed limit time resp.error = rocksdb::Status::kIncomplete; + dwarn_replica("rocksdb abnormal scan from {}: batch_count={}, time_used_ns({}) VS " + "time_threshold_ns({})", + reply.to_address().to_string(), + batch_count, + iteration_time - start_time, + scan_threshold_ns); } else if (it->Valid() && !complete) { // scan not completed std::unique_ptr context( @@ -1489,6 +1500,7 @@ void pegasus_server_impl::on_scan(const ::dsn::apps::scan_request &request, uint64_t filter_count = 0; int32_t count = 0; + uint64_t iteration_time = dsn_now_ns(); bool exceed_limit = false; int32_t iterate_count = 0; uint64_t scan_threshold_ns = _rocksdb_iterate_threshold_ns; @@ -1498,7 +1510,8 @@ void pegasus_server_impl::on_scan(const ::dsn::apps::scan_request &request, : _rocksdb_iterate_count_threshold; while (iterate_count < batch_count && it->Valid()) { - if (scan_threshold_ns > 0 && dsn_now_ns() - start_time > scan_threshold_ns) { + iteration_time = dsn_now_ns(); + if (scan_threshold_ns > 0 && iteration_time - start_time > scan_threshold_ns) { exceed_limit = true; break; } @@ -1563,6 +1576,12 @@ void pegasus_server_impl::on_scan(const ::dsn::apps::scan_request &request, } else if (exceed_limit) { // scan exceed limit time resp.error = rocksdb::Status::kIncomplete; + dwarn_replica("rocksdb abnormal scan from {}: batch_count={}, time_used_ns({}) VS " + "time_threshold_ns({})", + reply.to_address().to_string(), + batch_count, + iteration_time - start_time, + scan_threshold_ns); } else if (it->Valid() && !complete) { // scan not completed int64_t handle = _context_cache.put(std::move(context)); diff --git a/src/test/function_test/run.sh b/src/test/function_test/run.sh index 593b20abe4..4bebccc72d 100755 --- a/src/test/function_test/run.sh +++ b/src/test/function_test/run.sh @@ -41,9 +41,9 @@ exit_if_fail $? "run test check_and_set failed: $test_case $config_file $table_n GTEST_OUTPUT="xml:$REPORT_DIR/check_and_mutate.xml" GTEST_FILTER="check_and_mutate.*" ./$test_case $config_file $table_name exit_if_fail $? "run test check_and_mutate failed: $test_case $config_file $table_name" GTEST_OUTPUT="xml:$REPORT_DIR/scan.xml" GTEST_FILTER="scan.*" ./$test_case $config_file $table_name +exit_if_fail $? "run test scan failed: $test_case $config_file $table_name" GTEST_OUTPUT="xml:$REPORT_DIR/ttl.xml" GTEST_FILTER="ttl.*" ./$test_case $config_file $table_name exit_if_fail $? "run test ttl failed: $test_case $config_file $table_name" -exit_if_fail $? "run test scan failed: $test_case $config_file $table_name" GTEST_OUTPUT="xml:$REPORT_DIR/slog_log.xml" GTEST_FILTER="lost_log.*" ./$test_case $config_file $table_name exit_if_fail $? "run test slog_lost failed: $test_case $config_file $table_name" GTEST_OUTPUT="xml:$REPORT_DIR/recall.xml" GTEST_FILTER="drop_and_recall.*" ./$test_case $config_file $table_name From 5c2c68d49c725895891f3d74d0981fd9d8cc9366 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Fri, 20 Mar 2020 16:29:58 +0800 Subject: [PATCH 03/17] update rdsn --- rdsn | 2 +- src/base/pegasus_const.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/rdsn b/rdsn index 040355e871..df3bdfc007 160000 --- a/rdsn +++ b/rdsn @@ -1 +1 @@ -Subproject commit 040355e871c4062cbabf0fc2f16e032d936d7940 +Subproject commit df3bdfc007e298d48b6f56a4d500adbb1c56fc38 diff --git a/src/base/pegasus_const.cpp b/src/base/pegasus_const.cpp index b50376a027..183f219a96 100644 --- a/src/base/pegasus_const.cpp +++ b/src/base/pegasus_const.cpp @@ -71,5 +71,5 @@ const std::string PEGASUS_CLUSTER_SECTION_NAME("pegasus.clusters"); const std::string ROCKSDB_ENV_SLOW_QUERY_THRESHOLD("replica.slow_query_threshold"); /// duration threshold of each rocksdb iteration -const std::string ROCKSDB_ITERATION_THRESHOLD("replica.iterate_threshold"); +const std::string ROCKSDB_ITERATION_THRESHOLD("replica.rocksdb_iteration_threshold"); } // namespace pegasus From cfae76ebc40007f98cfaf61095f295365863156f Mon Sep 17 00:00:00 2001 From: heyuchen Date: Fri, 20 Mar 2020 17:45:29 +0800 Subject: [PATCH 04/17] rename config option --- src/server/config.ini | 6 +++--- src/server/pegasus_server_impl.cpp | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/server/config.ini b/src/server/config.ini index d4ecaf1a02..5be63f002b 100644 --- a/src/server/config.ini +++ b/src/server/config.ini @@ -281,9 +281,9 @@ rocksdb_filter_type = prefix # 10MB, 1000, 30s - rocksdb_multi_get_iterate_size_threshold = 100000000 - rocksdb_iterate_count_threshold = 1000 - rocksdb_iterate_threshold_ns = 30000000000 + rocksdb_multi_get_max_iteration_size = 100000000 + rocksdb_max_iteration_count = 1000 + rocksdb_iteration_threshold_ns = 30000000000 checkpoint_reserve_min_count = 2 checkpoint_reserve_time_seconds = 1800 diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index ff6a160143..047222bbda 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -92,23 +92,23 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r) _multi_get_iterate_size_threshold = dsn_config_get_value_uint64("pegasus.server", - "rocksdb_multi_get_iterate_size_threshold", + "rocksdb_multi_get_max_iteration_size", 100000000, "multi-get operation total key-value size exceed " "this threshold will stop iterating rocksdb, 0 means no check"); _rocksdb_iterate_count_threshold = dsn_config_get_value_uint64( "pegasus.server", - "rocksdb_iterate_count_threshold", + "rocksdb_max_iteration_count", 1000, - "max iterate count for each rocksdb iterator operation, if exceed this threshold," + "max iteration count for each rocksdb iterator operation, if exceed this threshold," "iterator will be stopped"); _rocksdb_iterate_threshold_ns_in_config = dsn_config_get_value_uint64("pegasus.server", - "rocksdb_iterate_threshold_ns", + "rocksdb_iteration_threshold_ns", 30000000000, - "max duration for each rocksdb iterator operation if exceed " + "max duration for rocksdb iterator operation if exceed " "this threshold, iterator will be stopped, 0 means no check"); _rocksdb_iterate_threshold_ns = _rocksdb_iterate_threshold_ns_in_config; From b59e6ef0bad890553c3ed7e7f72d01a913bf8ead Mon Sep 17 00:00:00 2001 From: heyuchen Date: Fri, 20 Mar 2020 18:31:51 +0800 Subject: [PATCH 05/17] rename --- src/server/pegasus_server_impl.cpp | 104 ++++++++++++++------------- src/server/pegasus_server_impl.h | 10 +-- src/test/function_test/test_scan.cpp | 4 +- 3 files changed, 60 insertions(+), 58 deletions(-) diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index 047222bbda..9c786ed9b5 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -90,27 +90,27 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r) 1000, "multi-get operation iterate count exceed this threshold will be logged, 0 means no check"); - _multi_get_iterate_size_threshold = + _multi_get_max_iteration_size = dsn_config_get_value_uint64("pegasus.server", "rocksdb_multi_get_max_iteration_size", 100000000, "multi-get operation total key-value size exceed " "this threshold will stop iterating rocksdb, 0 means no check"); - _rocksdb_iterate_count_threshold = dsn_config_get_value_uint64( + _rocksdb_max_iteration_count = dsn_config_get_value_uint64( "pegasus.server", "rocksdb_max_iteration_count", 1000, "max iteration count for each rocksdb iterator operation, if exceed this threshold," "iterator will be stopped"); - _rocksdb_iterate_threshold_ns_in_config = + _rocksdb_iteration_threshold_ns_in_config = dsn_config_get_value_uint64("pegasus.server", "rocksdb_iteration_threshold_ns", 30000000000, "max duration for rocksdb iterator operation if exceed " "this threshold, iterator will be stopped, 0 means no check"); - _rocksdb_iterate_threshold_ns = _rocksdb_iterate_threshold_ns_in_config; + _rocksdb_iteration_threshold_ns = _rocksdb_iteration_threshold_ns_in_config; // init rocksdb::DBOptions _db_opts.pegasus_data = true; @@ -697,20 +697,19 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req return; } - int32_t max_iterate_count = - (request.max_kv_count > 0 && request.max_kv_count < _rocksdb_iterate_count_threshold) + int32_t max_iteration_count = + (request.max_kv_count > 0 && request.max_kv_count < _rocksdb_max_iteration_count) ? request.max_kv_count - : _rocksdb_iterate_count_threshold; + : _rocksdb_max_iteration_count; int32_t max_kv_size = request.max_kv_size > 0 ? request.max_kv_size : INT_MAX; - int32_t max_iterate_size_config = - _multi_get_iterate_size_threshold > 0 ? _multi_get_iterate_size_threshold : INT_MAX; - int32_t max_iterate_size = std::min(max_kv_size, max_iterate_size_config); - uint64_t time_threshold_ns = _rocksdb_iterate_threshold_ns; + int32_t max_iteration_size_config = + _multi_get_max_iteration_size > 0 ? _multi_get_max_iteration_size : INT_MAX; + int32_t max_iteration_size = std::min(max_kv_size, max_iteration_size_config); uint32_t epoch_now = ::pegasus::utils::epoch_now(); int32_t count = 0; int64_t size = 0; - int32_t iterate_count = 0; + int32_t iteration_count = 0; int32_t expire_count = 0; int32_t filter_count = 0; @@ -791,9 +790,11 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req it.reset(_db->NewIterator(_data_cf_rd_opts)); it->Seek(start); bool first_exclusive = !start_inclusive; - while (iterate_count < max_iterate_count && size < max_iterate_size && it->Valid()) { + while (iteration_count < max_iteration_count && size < max_iteration_size && + it->Valid()) { iteration_time = dsn_now_ns(); - if (time_threshold_ns > 0 && iteration_time - start_time > time_threshold_ns) { + if (_rocksdb_iteration_threshold_ns > 0 && + iteration_time - start_time > _rocksdb_iteration_threshold_ns) { exceed_limit = true; break; } @@ -816,7 +817,7 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req } } - iterate_count++; + iteration_count++; // extract value int r = append_key_value_for_multi_get(resp.kvs, @@ -860,9 +861,10 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req it->SeekForPrev(stop); bool first_exclusive = !stop_inclusive; std::vector<::dsn::apps::key_value> reverse_kvs; - while (iterate_count < max_iterate_count && size < max_kv_size && it->Valid()) { + while (iteration_count < max_iteration_count && size < max_kv_size && it->Valid()) { iteration_time = dsn_now_ns(); - if (time_threshold_ns > 0 && iteration_time - start_time > time_threshold_ns) { + if (_rocksdb_iteration_threshold_ns > 0 && + iteration_time - start_time > _rocksdb_iteration_threshold_ns) { exceed_limit = true; break; } @@ -885,7 +887,7 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req } } - iterate_count++; + iteration_count++; // extract value int r = append_key_value_for_multi_get(reverse_kvs, @@ -951,7 +953,7 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req "rocksdb abnormal scan from {}: time_used_ns({}) VS time_threshold_ns({})", reply.to_address().to_string(), iteration_time - start_time, - time_threshold_ns); + _rocksdb_iteration_threshold_ns); } } } else { @@ -993,7 +995,7 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req } // check ttl if (status.ok()) { - iterate_count++; + iteration_count++; uint32_t expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, value); if (expire_ts > 0 && expire_ts <= epoch_now) { expire_count++; @@ -1008,7 +1010,7 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req // extract value if (status.ok()) { // check if exceed limit - if (iterate_count > max_iterate_count || size > max_kv_size) { + if (iteration_count > max_iteration_count || size > max_kv_size) { exceed_limit = true; break; } @@ -1045,13 +1047,13 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req #endif uint64_t time_used = dsn_now_ns() - start_time; - if (is_multi_get_abnormal(time_used, size, iterate_count)) { + if (is_multi_get_abnormal(time_used, size, iteration_count)) { dwarn_replica( "rocksdb abnormal multi_get from {}: hash_key = {}, " "start_sort_key = {} ({}), stop_sort_key = {} ({}), " "sort_key_filter_type = {}, sort_key_filter_pattern = {}, " "max_kv_count = {}, max_kv_size = {}, reverse = {}, " - "result_count = {}, result_size = {}, iterate_count = {}, " + "result_count = {}, result_size = {}, iteration_count = {}, " "expire_count = {}, filter_count = {}, time_used = {} ns", reply.to_address().to_string(), ::pegasus::utils::c_escape_string(request.hash_key), @@ -1066,7 +1068,7 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req request.reverse ? "true" : "false", count, size, - iterate_count, + iteration_count, expire_count, filter_count, time_used); @@ -1113,12 +1115,12 @@ void pegasus_server_impl::on_sortkey_count(const ::dsn::blob &hash_key, uint32_t epoch_now = ::pegasus::utils::epoch_now(); uint64_t expire_count = 0; - uint64_t scan_threshold_ns = _rocksdb_iterate_threshold_ns; bool exceed_limit = false; uint64_t iteration_time = dsn_now_ns(); while (it->Valid()) { iteration_time = dsn_now_ns(); - if (scan_threshold_ns > 0 && iteration_time - start_time > scan_threshold_ns) { + if (_rocksdb_iteration_threshold_ns > 0 && + iteration_time - start_time > _rocksdb_iteration_threshold_ns) { exceed_limit = true; break; } @@ -1160,7 +1162,7 @@ void pegasus_server_impl::on_sortkey_count(const ::dsn::blob &hash_key, dwarn_replica("rocksdb abnormal scan from {}: time_used_ns({}) VS time_threshold_ns({})", reply.to_address().to_string(), iteration_time - start_time, - scan_threshold_ns); + _rocksdb_iteration_threshold_ns); resp.count = -1; } @@ -1339,17 +1341,17 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request uint64_t iteration_time = dsn_now_ns(); bool exceed_limit = false; - int32_t iterate_count = 0; - uint64_t scan_threshold_ns = _rocksdb_iterate_threshold_ns; + int32_t iteration_count = 0; int32_t batch_count = - (request.batch_size < _rocksdb_iterate_count_threshold && request.batch_size > 0) + (request.batch_size < _rocksdb_max_iteration_count && request.batch_size > 0) ? request.batch_size - : _rocksdb_iterate_count_threshold; + : _rocksdb_max_iteration_count; resp.kvs.reserve(batch_count); - while (iterate_count < batch_count && it->Valid()) { + while (iteration_count < batch_count && it->Valid()) { iteration_time = dsn_now_ns(); - if (scan_threshold_ns > 0 && iteration_time - start_time > scan_threshold_ns) { + if (_rocksdb_iteration_threshold_ns > 0 && + iteration_time - start_time > _rocksdb_iteration_threshold_ns) { exceed_limit = true; break; } @@ -1370,7 +1372,7 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request } } - iterate_count++; + iteration_count++; int r = append_key_value_for_scan(resp.kvs, it->key(), @@ -1429,7 +1431,7 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request reply.to_address().to_string(), batch_count, iteration_time - start_time, - scan_threshold_ns); + _rocksdb_iteration_threshold_ns); } else if (it->Valid() && !complete) { // scan not completed std::unique_ptr context( @@ -1502,16 +1504,16 @@ void pegasus_server_impl::on_scan(const ::dsn::apps::scan_request &request, uint64_t iteration_time = dsn_now_ns(); bool exceed_limit = false; - int32_t iterate_count = 0; - uint64_t scan_threshold_ns = _rocksdb_iterate_threshold_ns; + int32_t iteration_count = 0; int32_t batch_count = - (context->batch_size < _rocksdb_iterate_count_threshold && context->batch_size > 0) + (context->batch_size < _rocksdb_max_iteration_count && context->batch_size > 0) ? context->batch_size - : _rocksdb_iterate_count_threshold; + : _rocksdb_max_iteration_count; - while (iterate_count < batch_count && it->Valid()) { + while (iteration_count < batch_count && it->Valid()) { iteration_time = dsn_now_ns(); - if (scan_threshold_ns > 0 && iteration_time - start_time > scan_threshold_ns) { + if (_rocksdb_iteration_threshold_ns > 0 && + iteration_time - start_time > _rocksdb_iteration_threshold_ns) { exceed_limit = true; break; } @@ -1523,7 +1525,7 @@ void pegasus_server_impl::on_scan(const ::dsn::apps::scan_request &request, break; } - iterate_count++; + iteration_count++; int r = append_key_value_for_scan(resp.kvs, it->key(), @@ -1581,7 +1583,7 @@ void pegasus_server_impl::on_scan(const ::dsn::apps::scan_request &request, reply.to_address().to_string(), batch_count, iteration_time - start_time, - scan_threshold_ns); + _rocksdb_iteration_threshold_ns); } else if (it->Valid() && !complete) { // scan not completed int64_t handle = _context_cache.put(std::move(context)); @@ -2515,7 +2517,7 @@ void pegasus_server_impl::update_app_envs(const std::map &envs) { - uint64_t threshold_ns = _rocksdb_iterate_threshold_ns_in_config; + uint64_t threshold_ns = _rocksdb_iteration_threshold_ns_in_config; auto find = envs.find(ROCKSDB_ITERATION_THRESHOLD); if (find != envs.end()) { - // the unit of iterate threshold from env is ms + // the unit of iteration threshold from env is ms uint64_t threshold_ms; if (!dsn::buf2uint64(find->second, threshold_ms) || threshold_ms < 0) { derror_replica("{}={} is invalid.", find->first, find->second); @@ -2648,12 +2650,12 @@ void pegasus_server_impl::update_rocksdb_iterate_threshold( threshold_ns = threshold_ms * 1e6; } - if (_rocksdb_iterate_threshold_ns != threshold_ns) { + if (_rocksdb_iteration_threshold_ns != threshold_ns) { ddebug_replica("update app env[{}] from \"{}\" to \"{}\" succeed", ROCKSDB_ITERATION_THRESHOLD, - _rocksdb_iterate_threshold_ns, + _rocksdb_iteration_threshold_ns, threshold_ns); - _rocksdb_iterate_threshold_ns = threshold_ns; + _rocksdb_iteration_threshold_ns = threshold_ns; } } diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h index d18ff032b8..ab67af3449 100644 --- a/src/server/pegasus_server_impl.h +++ b/src/server/pegasus_server_impl.h @@ -231,7 +231,7 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service void update_slow_query_threshold(const std::map &envs); - void update_rocksdb_iterate_threshold(const std::map &envs); + void update_rocksdb_iteration_threshold(const std::map &envs); // return true if parse compression types 'config' success, otherwise return false. // 'compression_per_level' will not be changed if parse failed. @@ -312,10 +312,10 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service uint64_t _slow_query_threshold_ns; uint64_t _slow_query_threshold_ns_in_config; // abnormal multi_get/rocksdb_iteration - uint64_t _multi_get_iterate_size_threshold; - uint64_t _rocksdb_iterate_count_threshold; - uint64_t _rocksdb_iterate_threshold_ns_in_config; - uint64_t _rocksdb_iterate_threshold_ns; + uint64_t _multi_get_max_iteration_size; + uint64_t _rocksdb_max_iteration_count; + uint64_t _rocksdb_iteration_threshold_ns_in_config; + uint64_t _rocksdb_iteration_threshold_ns; std::shared_ptr _key_ttl_compaction_filter_factory; std::shared_ptr _statistics; diff --git a/src/test/function_test/test_scan.cpp b/src/test/function_test/test_scan.cpp index aadf5e19d5..3ae86bdd84 100644 --- a/src/test/function_test/test_scan.cpp +++ b/src/test/function_test/test_scan.cpp @@ -401,9 +401,9 @@ TEST_F(scan, OVERALL) compare(data, base); } -TEST_F(scan, ITERATE_TIME_LIMIT) +TEST_F(scan, ITERATION_TIME_LIMIT) { - // update iterate threshold to 1ms + // update iteration threshold to 1ms auto response = ddl_client->set_app_envs( client->get_app_name(), {ROCKSDB_ITERATION_THRESHOLD}, {std::to_string(1)}); ASSERT_EQ(true, response.is_ok()); From e2e34cb2ddb6a717d3625e560fa8fcffe4b3c69e Mon Sep 17 00:00:00 2001 From: heyuchen Date: Tue, 24 Mar 2020 15:52:34 +0800 Subject: [PATCH 06/17] fix according to review --- rdsn | 2 +- src/base/pegasus_const.cpp | 5 +- src/base/pegasus_const.h | 2 +- src/server/config.ini | 4 +- src/server/pegasus_server_impl.cpp | 184 +++++++++++++++------------ src/server/pegasus_server_impl.h | 6 +- src/test/function_test/test_scan.cpp | 10 +- 7 files changed, 124 insertions(+), 89 deletions(-) diff --git a/rdsn b/rdsn index df3bdfc007..9d7b731a76 160000 --- a/rdsn +++ b/rdsn @@ -1 +1 @@ -Subproject commit df3bdfc007e298d48b6f56a4d500adbb1c56fc38 +Subproject commit 9d7b731a769ca6098033195fb8b1344c00710e46 diff --git a/src/base/pegasus_const.cpp b/src/base/pegasus_const.cpp index 183f219a96..5d3064b369 100644 --- a/src/base/pegasus_const.cpp +++ b/src/base/pegasus_const.cpp @@ -70,6 +70,7 @@ const std::string PEGASUS_CLUSTER_SECTION_NAME("pegasus.clusters"); /// table level slow query const std::string ROCKSDB_ENV_SLOW_QUERY_THRESHOLD("replica.slow_query_threshold"); -/// duration threshold of each rocksdb iteration -const std::string ROCKSDB_ITERATION_THRESHOLD("replica.rocksdb_iteration_threshold"); +/// time threshold of each rocksdb iteration +const std::string + ROCKSDB_ITERATION_THRESHOLD_TIME_MS("replica.rocksdb_iteration_threshold_time_ms"); } // namespace pegasus diff --git a/src/base/pegasus_const.h b/src/base/pegasus_const.h index 367b7dfd55..7e557cd88c 100644 --- a/src/base/pegasus_const.h +++ b/src/base/pegasus_const.h @@ -46,5 +46,5 @@ extern const std::string PEGASUS_CLUSTER_SECTION_NAME; extern const std::string ROCKSDB_ENV_SLOW_QUERY_THRESHOLD; -extern const std::string ROCKSDB_ITERATION_THRESHOLD; +extern const std::string ROCKSDB_ITERATION_THRESHOLD_TIME_MS; } // namespace pegasus diff --git a/src/server/config.ini b/src/server/config.ini index 5be63f002b..a912f21ed5 100644 --- a/src/server/config.ini +++ b/src/server/config.ini @@ -281,9 +281,9 @@ rocksdb_filter_type = prefix # 10MB, 1000, 30s - rocksdb_multi_get_max_iteration_size = 100000000 + rocksdb_multi_get_max_iteration_size = 10000000 rocksdb_max_iteration_count = 1000 - rocksdb_iteration_threshold_ns = 30000000000 + rocksdb_iteration_threshold_time_ms = 30000 checkpoint_reserve_min_count = 2 checkpoint_reserve_time_seconds = 1800 diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index 9c786ed9b5..ea125f0e53 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -93,24 +93,24 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r) _multi_get_max_iteration_size = dsn_config_get_value_uint64("pegasus.server", "rocksdb_multi_get_max_iteration_size", - 100000000, + 10 << 20, "multi-get operation total key-value size exceed " "this threshold will stop iterating rocksdb, 0 means no check"); - _rocksdb_max_iteration_count = dsn_config_get_value_uint64( + _rocksdb_max_iteration_count = (uint32_t)dsn_config_get_value_uint64( "pegasus.server", "rocksdb_max_iteration_count", 1000, "max iteration count for each rocksdb iterator operation, if exceed this threshold," "iterator will be stopped"); - _rocksdb_iteration_threshold_ns_in_config = - dsn_config_get_value_uint64("pegasus.server", - "rocksdb_iteration_threshold_ns", - 30000000000, - "max duration for rocksdb iterator operation if exceed " - "this threshold, iterator will be stopped, 0 means no check"); - _rocksdb_iteration_threshold_ns = _rocksdb_iteration_threshold_ns_in_config; + _rocksdb_iteration_threshold_time_ms_in_config = dsn_config_get_value_uint64( + "pegasus.server", + "rocksdb_iteration_threshold_time_ms", + 30000, + "max duration for handling one pegasus scan request(sortkey_count/multiget/scan) if exceed " + "this threshold, iterator will be stopped, 0 means no check"); + _rocksdb_iteration_threshold_time_ms = _rocksdb_iteration_threshold_time_ms_in_config; // init rocksdb::DBOptions _db_opts.pegasus_data = true; @@ -697,10 +697,9 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req return; } - int32_t max_iteration_count = - (request.max_kv_count > 0 && request.max_kv_count < _rocksdb_max_iteration_count) - ? request.max_kv_count - : _rocksdb_max_iteration_count; + uint32_t max_kv_count = request.max_kv_count > 0 ? request.max_kv_count : INT_MAX; + uint32_t max_iteration_count = std::min(max_kv_count, _rocksdb_max_iteration_count); + int32_t max_kv_size = request.max_kv_size > 0 ? request.max_kv_size : INT_MAX; int32_t max_iteration_size_config = _multi_get_max_iteration_size > 0 ? _multi_get_max_iteration_size : INT_MAX; @@ -784,21 +783,18 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req std::unique_ptr it; bool complete = false; - uint64_t iteration_time = dsn_now_ns(); + bool need_time_check_during_iteration = + _rocksdb_iteration_threshold_time_ms > 0 && max_kv_count > 100; + uint64_t iteration_threshold_time_ns = _rocksdb_iteration_threshold_time_ms * 1e6; bool exceed_limit = false; + uint64_t iteration_time = dsn_now_ns(); + if (!request.reverse) { it.reset(_db->NewIterator(_data_cf_rd_opts)); it->Seek(start); bool first_exclusive = !start_inclusive; while (iteration_count < max_iteration_count && size < max_iteration_size && it->Valid()) { - iteration_time = dsn_now_ns(); - if (_rocksdb_iteration_threshold_ns > 0 && - iteration_time - start_time > _rocksdb_iteration_threshold_ns) { - exceed_limit = true; - break; - } - // check stop sort key int c = it->key().compare(stop); if (c > 0 || (c == 0 && !stop_inclusive)) { @@ -818,6 +814,13 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req } iteration_count++; + if (need_time_check_during_iteration && iteration_count % 100 == 0) { + iteration_time = dsn_now_ns(); + if (iteration_time - start_time > iteration_threshold_time_ns) { + exceed_limit = true; + break; + } + } // extract value int r = append_key_value_for_multi_get(resp.kvs, @@ -862,13 +865,6 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req bool first_exclusive = !stop_inclusive; std::vector<::dsn::apps::key_value> reverse_kvs; while (iteration_count < max_iteration_count && size < max_kv_size && it->Valid()) { - iteration_time = dsn_now_ns(); - if (_rocksdb_iteration_threshold_ns > 0 && - iteration_time - start_time > _rocksdb_iteration_threshold_ns) { - exceed_limit = true; - break; - } - // check start sort key int c = it->key().compare(start); if (c < 0 || (c == 0 && !start_inclusive)) { @@ -888,6 +884,13 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req } iteration_count++; + if (need_time_check_during_iteration && iteration_count % 100 == 0) { + iteration_time = dsn_now_ns(); + if (iteration_time - start_time > iteration_threshold_time_ns) { + exceed_limit = true; + break; + } + } // extract value int r = append_key_value_for_multi_get(reverse_kvs, @@ -950,10 +953,10 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req resp.error = rocksdb::Status::kIncomplete; if (exceed_limit) { dwarn_replica( - "rocksdb abnormal scan from {}: time_used_ns({}) VS time_threshold_ns({})", + "rocksdb abnormal scan from {}: time_used({}ns) VS time_threshold({}ns)", reply.to_address().to_string(), iteration_time - start_time, - _rocksdb_iteration_threshold_ns); + iteration_threshold_time_ns); } } } else { @@ -1114,17 +1117,22 @@ void pegasus_server_impl::on_sortkey_count(const ::dsn::blob &hash_key, resp.count = 0; uint32_t epoch_now = ::pegasus::utils::epoch_now(); uint64_t expire_count = 0; + uint64_t iteration_count = 0; - bool exceed_limit = false; + bool need_iteration_time_check = _rocksdb_iteration_threshold_time_ms > 0; + uint64_t iteration_threshold_time_ns = _rocksdb_iteration_threshold_time_ms * 1e6; uint64_t iteration_time = dsn_now_ns(); + bool exceed_limit = false; + while (it->Valid()) { - iteration_time = dsn_now_ns(); - if (_rocksdb_iteration_threshold_ns > 0 && - iteration_time - start_time > _rocksdb_iteration_threshold_ns) { - exceed_limit = true; - break; + ++iteration_count; + if (need_iteration_time_check && iteration_count % 100 == 0) { + iteration_time = dsn_now_ns(); + if (iteration_time - start_time > iteration_threshold_time_ns) { + exceed_limit = true; + break; + } } - if (check_if_record_expired(epoch_now, it->value())) { expire_count++; if (_verbose_log) { @@ -1159,10 +1167,10 @@ void pegasus_server_impl::on_sortkey_count(const ::dsn::blob &hash_key, } resp.count = 0; } else if (exceed_limit) { - dwarn_replica("rocksdb abnormal scan from {}: time_used_ns({}) VS time_threshold_ns({})", + dwarn_replica("rocksdb abnormal scan from {}: time_used({}ns) VS time_threshold({}ns)", reply.to_address().to_string(), iteration_time - start_time, - _rocksdb_iteration_threshold_ns); + iteration_threshold_time_ns); resp.count = -1; } @@ -1338,24 +1346,19 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request uint64_t expire_count = 0; uint64_t filter_count = 0; int32_t count = 0; - - uint64_t iteration_time = dsn_now_ns(); - bool exceed_limit = false; int32_t iteration_count = 0; - int32_t batch_count = - (request.batch_size < _rocksdb_max_iteration_count && request.batch_size > 0) - ? request.batch_size - : _rocksdb_max_iteration_count; + + uint32_t request_batch_size = request.batch_size > 0 ? request.batch_size : INT_MAX; + uint32_t batch_count = std::min(request_batch_size, _rocksdb_max_iteration_count); resp.kvs.reserve(batch_count); - while (iteration_count < batch_count && it->Valid()) { - iteration_time = dsn_now_ns(); - if (_rocksdb_iteration_threshold_ns > 0 && - iteration_time - start_time > _rocksdb_iteration_threshold_ns) { - exceed_limit = true; - break; - } + bool need_time_check_during_iteration = + _rocksdb_iteration_threshold_time_ms > 0 && batch_count > 100; + uint64_t iteration_threshold_time_ns = _rocksdb_iteration_threshold_time_ms * 1e6; + bool exceed_limit = false; + uint64_t iteration_time = dsn_now_ns(); + while (iteration_count < batch_count && it->Valid()) { int c = it->key().compare(stop); if (c > 0 || (c == 0 && !stop_inclusive)) { // out of range @@ -1373,6 +1376,13 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request } iteration_count++; + if (need_time_check_during_iteration && iteration_count % 100 == 0) { + iteration_time = dsn_now_ns(); + if (iteration_time - start_time > iteration_threshold_time_ns) { + exceed_limit = true; + break; + } + } int r = append_key_value_for_scan(resp.kvs, it->key(), @@ -1400,6 +1410,14 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request it->Next(); } + // check iteration time whether exceed limit + if (!complete) { + iteration_time = dsn_now_ns(); + if (iteration_time - start_time > iteration_threshold_time_ns) { + exceed_limit = true; + } + } + resp.error = it->status().code(); if (!it->status().ok()) { // error occur @@ -1431,7 +1449,7 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request reply.to_address().to_string(), batch_count, iteration_time - start_time, - _rocksdb_iteration_threshold_ns); + iteration_threshold_time_ns); } else if (it->Valid() && !complete) { // scan not completed std::unique_ptr context( @@ -1501,23 +1519,18 @@ void pegasus_server_impl::on_scan(const ::dsn::apps::scan_request &request, uint64_t expire_count = 0; uint64_t filter_count = 0; int32_t count = 0; + int32_t iteration_count = 0; - uint64_t iteration_time = dsn_now_ns(); + uint32_t context_batch_size = context->batch_size > 0 ? context->batch_size : INT_MAX; + uint32_t batch_count = std::min(context_batch_size, _rocksdb_max_iteration_count); + + bool need_time_check_during_iteration = + _rocksdb_iteration_threshold_time_ms > 0 && batch_count > 100; + uint64_t iteration_threshold_time_ns = _rocksdb_iteration_threshold_time_ms * 1e6; bool exceed_limit = false; - int32_t iteration_count = 0; - int32_t batch_count = - (context->batch_size < _rocksdb_max_iteration_count && context->batch_size > 0) - ? context->batch_size - : _rocksdb_max_iteration_count; + uint64_t iteration_time = dsn_now_ns(); while (iteration_count < batch_count && it->Valid()) { - iteration_time = dsn_now_ns(); - if (_rocksdb_iteration_threshold_ns > 0 && - iteration_time - start_time > _rocksdb_iteration_threshold_ns) { - exceed_limit = true; - break; - } - int c = it->key().compare(stop); if (c > 0 || (c == 0 && !stop_inclusive)) { // out of range @@ -1526,6 +1539,13 @@ void pegasus_server_impl::on_scan(const ::dsn::apps::scan_request &request, } iteration_count++; + if (need_time_check_during_iteration && iteration_count % 100 == 0) { + iteration_time = dsn_now_ns(); + if (iteration_time - start_time > iteration_threshold_time_ns) { + exceed_limit = true; + break; + } + } int r = append_key_value_for_scan(resp.kvs, it->key(), @@ -1553,6 +1573,14 @@ void pegasus_server_impl::on_scan(const ::dsn::apps::scan_request &request, it->Next(); } + // check iteration time whether exceed limit + if (!complete) { + iteration_time = dsn_now_ns(); + if (iteration_time - start_time > iteration_threshold_time_ns) { + exceed_limit = true; + } + } + resp.error = it->status().code(); if (!it->status().ok()) { // error occur @@ -1578,12 +1606,12 @@ void pegasus_server_impl::on_scan(const ::dsn::apps::scan_request &request, } else if (exceed_limit) { // scan exceed limit time resp.error = rocksdb::Status::kIncomplete; - dwarn_replica("rocksdb abnormal scan from {}: batch_count={}, time_used_ns({}) VS " - "time_threshold_ns({})", + dwarn_replica("rocksdb abnormal scan from {}: batch_count={}, time_used({}ns) VS " + "time_threshold({}ns)", reply.to_address().to_string(), batch_count, iteration_time - start_time, - _rocksdb_iteration_threshold_ns); + iteration_threshold_time_ns); } else if (it->Valid() && !complete) { // scan not completed int64_t handle = _context_cache.put(std::move(context)); @@ -2638,24 +2666,22 @@ void pegasus_server_impl::update_slow_query_threshold( void pegasus_server_impl::update_rocksdb_iteration_threshold( const std::map &envs) { - uint64_t threshold_ns = _rocksdb_iteration_threshold_ns_in_config; - auto find = envs.find(ROCKSDB_ITERATION_THRESHOLD); + uint64_t threshold_ms = _rocksdb_iteration_threshold_time_ms_in_config; + auto find = envs.find(ROCKSDB_ITERATION_THRESHOLD_TIME_MS); if (find != envs.end()) { // the unit of iteration threshold from env is ms - uint64_t threshold_ms; if (!dsn::buf2uint64(find->second, threshold_ms) || threshold_ms < 0) { derror_replica("{}={} is invalid.", find->first, find->second); return; } - threshold_ns = threshold_ms * 1e6; } - if (_rocksdb_iteration_threshold_ns != threshold_ns) { + if (_rocksdb_iteration_threshold_time_ms != threshold_ms) { ddebug_replica("update app env[{}] from \"{}\" to \"{}\" succeed", - ROCKSDB_ITERATION_THRESHOLD, - _rocksdb_iteration_threshold_ns, - threshold_ns); - _rocksdb_iteration_threshold_ns = threshold_ns; + ROCKSDB_ITERATION_THRESHOLD_TIME_MS, + _rocksdb_iteration_threshold_time_ms, + threshold_ms); + _rocksdb_iteration_threshold_time_ms = threshold_ms; } } diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h index ab67af3449..f8260f323a 100644 --- a/src/server/pegasus_server_impl.h +++ b/src/server/pegasus_server_impl.h @@ -313,9 +313,9 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service uint64_t _slow_query_threshold_ns_in_config; // abnormal multi_get/rocksdb_iteration uint64_t _multi_get_max_iteration_size; - uint64_t _rocksdb_max_iteration_count; - uint64_t _rocksdb_iteration_threshold_ns_in_config; - uint64_t _rocksdb_iteration_threshold_ns; + uint32_t _rocksdb_max_iteration_count; + uint64_t _rocksdb_iteration_threshold_time_ms_in_config; + uint64_t _rocksdb_iteration_threshold_time_ms; std::shared_ptr _key_ttl_compaction_filter_factory; std::shared_ptr _statistics; diff --git a/src/test/function_test/test_scan.cpp b/src/test/function_test/test_scan.cpp index 3ae86bdd84..fe0ee179f5 100644 --- a/src/test/function_test/test_scan.cpp +++ b/src/test/function_test/test_scan.cpp @@ -405,7 +405,7 @@ TEST_F(scan, ITERATION_TIME_LIMIT) { // update iteration threshold to 1ms auto response = ddl_client->set_app_envs( - client->get_app_name(), {ROCKSDB_ITERATION_THRESHOLD}, {std::to_string(1)}); + client->get_app_name(), {ROCKSDB_ITERATION_THRESHOLD_TIME_MS}, {std::to_string(1)}); ASSERT_EQ(true, response.is_ok()); ASSERT_EQ(dsn::ERR_OK, response.get_value().err); // wait envs to be synced. @@ -430,4 +430,12 @@ TEST_F(scan, ITERATION_TIME_LIMIT) int ret = client->sortkey_count(expected_hash_key, count); ASSERT_EQ(0, ret); ASSERT_EQ(count, -1); + + // set iteration threshold to 100ms + response = ddl_client->set_app_envs( + client->get_app_name(), {ROCKSDB_ITERATION_THRESHOLD_TIME_MS}, {std::to_string(100)}); + ASSERT_EQ(true, response.is_ok()); + ASSERT_EQ(dsn::ERR_OK, response.get_value().err); + // wait envs to be synced. + std::this_thread::sleep_for(std::chrono::seconds(30)); } From 5ab712a306bceff47dcda8af924a89be9fa78ee5 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Tue, 24 Mar 2020 16:28:41 +0800 Subject: [PATCH 07/17] update iteration time check --- src/server/pegasus_server_impl.cpp | 31 +++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index ea125f0e53..35b719bcc5 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -705,6 +705,10 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req _multi_get_max_iteration_size > 0 ? _multi_get_max_iteration_size : INT_MAX; int32_t max_iteration_size = std::min(max_kv_size, max_iteration_size_config); + // during rocksdb iteration, if iteration_count % module_num == 0, we will check if iteration + // exceed time threshold + uint32_t module_num = max_iteration_count <= 10 ? 1 : max_iteration_count / 10; + uint32_t epoch_now = ::pegasus::utils::epoch_now(); int32_t count = 0; int64_t size = 0; @@ -783,8 +787,7 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req std::unique_ptr it; bool complete = false; - bool need_time_check_during_iteration = - _rocksdb_iteration_threshold_time_ms > 0 && max_kv_count > 100; + bool time_check = _rocksdb_iteration_threshold_time_ms > 0; uint64_t iteration_threshold_time_ns = _rocksdb_iteration_threshold_time_ms * 1e6; bool exceed_limit = false; uint64_t iteration_time = dsn_now_ns(); @@ -814,7 +817,7 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req } iteration_count++; - if (need_time_check_during_iteration && iteration_count % 100 == 0) { + if (time_check && iteration_count % module_num == 0) { iteration_time = dsn_now_ns(); if (iteration_time - start_time > iteration_threshold_time_ns) { exceed_limit = true; @@ -884,7 +887,7 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req } iteration_count++; - if (need_time_check_during_iteration && iteration_count % 100 == 0) { + if (time_check && iteration_count % module_num == 0) { iteration_time = dsn_now_ns(); if (iteration_time - start_time > iteration_threshold_time_ns) { exceed_limit = true; @@ -1119,14 +1122,16 @@ void pegasus_server_impl::on_sortkey_count(const ::dsn::blob &hash_key, uint64_t expire_count = 0; uint64_t iteration_count = 0; - bool need_iteration_time_check = _rocksdb_iteration_threshold_time_ms > 0; + bool time_check = _rocksdb_iteration_threshold_time_ms > 0; uint64_t iteration_threshold_time_ns = _rocksdb_iteration_threshold_time_ms * 1e6; uint64_t iteration_time = dsn_now_ns(); bool exceed_limit = false; + uint32_t module_num = + _rocksdb_max_iteration_count <= 10 ? 1 : _rocksdb_max_iteration_count / 10; while (it->Valid()) { ++iteration_count; - if (need_iteration_time_check && iteration_count % 100 == 0) { + if (time_check && iteration_count % module_num == 0) { iteration_time = dsn_now_ns(); if (iteration_time - start_time > iteration_threshold_time_ns) { exceed_limit = true; @@ -1352,11 +1357,11 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request uint32_t batch_count = std::min(request_batch_size, _rocksdb_max_iteration_count); resp.kvs.reserve(batch_count); - bool need_time_check_during_iteration = - _rocksdb_iteration_threshold_time_ms > 0 && batch_count > 100; + bool time_check = _rocksdb_iteration_threshold_time_ms > 0; uint64_t iteration_threshold_time_ns = _rocksdb_iteration_threshold_time_ms * 1e6; bool exceed_limit = false; uint64_t iteration_time = dsn_now_ns(); + uint32_t module_num = batch_count <= 10 ? 1 : batch_count / 10; while (iteration_count < batch_count && it->Valid()) { int c = it->key().compare(stop); @@ -1376,7 +1381,7 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request } iteration_count++; - if (need_time_check_during_iteration && iteration_count % 100 == 0) { + if (time_check && iteration_count % module_num == 0) { iteration_time = dsn_now_ns(); if (iteration_time - start_time > iteration_threshold_time_ns) { exceed_limit = true; @@ -1411,7 +1416,7 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request } // check iteration time whether exceed limit - if (!complete) { + if (!complete && time_check) { iteration_time = dsn_now_ns(); if (iteration_time - start_time > iteration_threshold_time_ns) { exceed_limit = true; @@ -1524,11 +1529,11 @@ void pegasus_server_impl::on_scan(const ::dsn::apps::scan_request &request, uint32_t context_batch_size = context->batch_size > 0 ? context->batch_size : INT_MAX; uint32_t batch_count = std::min(context_batch_size, _rocksdb_max_iteration_count); - bool need_time_check_during_iteration = - _rocksdb_iteration_threshold_time_ms > 0 && batch_count > 100; + bool time_check = _rocksdb_iteration_threshold_time_ms > 0; uint64_t iteration_threshold_time_ns = _rocksdb_iteration_threshold_time_ms * 1e6; bool exceed_limit = false; uint64_t iteration_time = dsn_now_ns(); + uint32_t module_num = batch_count <= 10 ? 1 : batch_count / 10; while (iteration_count < batch_count && it->Valid()) { int c = it->key().compare(stop); @@ -1539,7 +1544,7 @@ void pegasus_server_impl::on_scan(const ::dsn::apps::scan_request &request, } iteration_count++; - if (need_time_check_during_iteration && iteration_count % 100 == 0) { + if (time_check && iteration_count % module_num == 0) { iteration_time = dsn_now_ns(); if (iteration_time - start_time > iteration_threshold_time_ns) { exceed_limit = true; From 340b923402c100f8eb9e2c18af4fcd5687162706 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Wed, 25 Mar 2020 17:30:16 +0800 Subject: [PATCH 08/17] fix by code review and update rdsn --- rdsn | 2 +- src/server/config.ini | 2 +- src/server/pegasus_server_impl.cpp | 5 +++-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/rdsn b/rdsn index 9d7b731a76..3b1133b269 160000 --- a/rdsn +++ b/rdsn @@ -1 +1 @@ -Subproject commit 9d7b731a769ca6098033195fb8b1344c00710e46 +Subproject commit 3b1133b269c26f52f6c9d8718d1b904fcb440b0f diff --git a/src/server/config.ini b/src/server/config.ini index a912f21ed5..fb27a0def4 100644 --- a/src/server/config.ini +++ b/src/server/config.ini @@ -281,7 +281,7 @@ rocksdb_filter_type = prefix # 10MB, 1000, 30s - rocksdb_multi_get_max_iteration_size = 10000000 + rocksdb_multi_get_max_iteration_size = 10485760 rocksdb_max_iteration_count = 1000 rocksdb_iteration_threshold_time_ms = 30000 diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index 35b719bcc5..d69895e90e 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -867,7 +867,8 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req it->SeekForPrev(stop); bool first_exclusive = !stop_inclusive; std::vector<::dsn::apps::key_value> reverse_kvs; - while (iteration_count < max_iteration_count && size < max_kv_size && it->Valid()) { + while (iteration_count < max_iteration_count && size < max_iteration_size && + it->Valid()) { // check start sort key int c = it->key().compare(start); if (c < 0 || (c == 0 && !start_inclusive)) { @@ -1016,7 +1017,7 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req // extract value if (status.ok()) { // check if exceed limit - if (iteration_count > max_iteration_count || size > max_kv_size) { + if (iteration_count > max_iteration_count || size > max_iteration_size) { exceed_limit = true; break; } From fdef7deb26b215c42c86b7f91c1e8b988f814e5e Mon Sep 17 00:00:00 2001 From: heyuchen Date: Thu, 26 Mar 2020 09:25:21 +0800 Subject: [PATCH 09/17] update submodule --- rocksdb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rocksdb b/rocksdb index 52492c3131..f0e22c376e 160000 --- a/rocksdb +++ b/rocksdb @@ -1 +1 @@ -Subproject commit 52492c31313921d0476751fffc77b84ead156363 +Subproject commit f0e22c376e8de3ca576f4a37c857172d54cc4fa7 From 78f6922d94036b3577622c13d3694778e19f6943 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Thu, 26 Mar 2020 09:36:44 +0800 Subject: [PATCH 10/17] update submodule --- rocksdb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rocksdb b/rocksdb index f0e22c376e..52492c3131 160000 --- a/rocksdb +++ b/rocksdb @@ -1 +1 @@ -Subproject commit f0e22c376e8de3ca576f4a37c857172d54cc4fa7 +Subproject commit 52492c31313921d0476751fffc77b84ead156363 From 432d24599f9f615a7a0c7d1d571f1ea0241ee3ab Mon Sep 17 00:00:00 2001 From: heyuchen Date: Fri, 27 Mar 2020 14:06:59 +0800 Subject: [PATCH 11/17] fix by review --- src/server/config.ini | 5 +- src/server/iteration_limiter.h | 82 ++++++++++++++++ src/server/pegasus_server_impl.cpp | 151 +++++++++++------------------ src/server/pegasus_server_impl.h | 1 + 4 files changed, 144 insertions(+), 95 deletions(-) create mode 100644 src/server/iteration_limiter.h diff --git a/src/server/config.ini b/src/server/config.ini index fb27a0def4..bb431cffc6 100644 --- a/src/server/config.ini +++ b/src/server/config.ini @@ -280,8 +280,9 @@ # Bloom filter type, should be either 'common' or 'prefix' rocksdb_filter_type = prefix - # 10MB, 1000, 30s - rocksdb_multi_get_max_iteration_size = 10485760 + # 3000, 30MB, 1000, 30s + rocksdb_multi_get_max_iteration_count = 3000 + rocksdb_multi_get_max_iteration_size = 31457280 rocksdb_max_iteration_count = 1000 rocksdb_iteration_threshold_time_ms = 30000 diff --git a/src/server/iteration_limiter.h b/src/server/iteration_limiter.h new file mode 100644 index 0000000000..797be25081 --- /dev/null +++ b/src/server/iteration_limiter.h @@ -0,0 +1,82 @@ +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#pragma once + +#include +#include "pegasus_server_impl.h" + +namespace pegasus { +namespace server { + +class iteration_limiter +{ +public: + iteration_limiter(uint32_t max_iteration_count, + uint64_t max_iteration_size, + uint64_t threshold_time_ms) + { + _max_count = max_iteration_count; + _max_size = max_iteration_size; + _module_num = _max_count <= 10 ? 1 : _max_count / 10; + _max_duration_time = threshold_time_ms > 0 ? threshold_time_ms * 1e6 : 0; + _iteration_start_time_ns = dsn_now_ns(); + } + + bool valid() + { + if (_iteration_count >= _max_count) { + return false; + } + if (_max_size > 0 && _iteration_size >= _max_size) { + return false; + } + return true; + } + + // during rocksdb iteration, if iteration_count % module_num == 0, we will check if iteration + // exceed time threshold, which means we at most check ten times during iteration + bool time_check() + { + if (_max_duration_time > 0 && _iteration_count % _module_num == 0 && + dsn_now_ns() - _iteration_start_time_ns > _max_duration_time) { + _exceed_limit = true; + _iteration_duration_time_ns = dsn_now_ns() - _iteration_start_time_ns; + return false; + } + return true; + } + + void time_check_after_incomplete_scan() + { + if (_max_duration_time > 0 && + dsn_now_ns() - _iteration_start_time_ns > _max_duration_time) { + _exceed_limit = true; + _iteration_duration_time_ns = dsn_now_ns() - _iteration_start_time_ns; + } + } + + void add_count() { ++_iteration_count; } + void add_size(uint64_t size) { _iteration_size += size; } + + bool exceed_limit() { return _exceed_limit; } + uint32_t get_iteration_count() { return _iteration_count; } + uint64_t duration_time() { return _iteration_duration_time_ns; } + uint64_t max_duration_time() { return _max_duration_time; } + +private: + bool _exceed_limit{false}; + + uint32_t _iteration_count{0}; + uint64_t _iteration_size{0}; + uint64_t _iteration_start_time_ns{0}; + uint64_t _iteration_duration_time_ns{0}; + + uint32_t _max_count{0}; + uint64_t _max_size{0}; + uint64_t _max_duration_time{0}; + int32_t _module_num{1}; +}; +} // namespace server +} // namespace pegasus diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index d1b280d234..4d677a5550 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -24,6 +24,7 @@ #include "pegasus_event_listener.h" #include "pegasus_server_write.h" #include "meta_store.h" +#include "iteration_limiter.h" using namespace dsn::literals::chrono_literals; @@ -95,10 +96,18 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r) 1000, "multi-get operation iterate count exceed this threshold will be logged, 0 means no check"); + _multi_get_max_iteration_count = (uint32_t)dsn_config_get_value_uint64( + "pegasus.server", + "rocksdb_multi_get_max_iteration_count", + 3000, + "max iteration count for each rocksdb iterator operation for multi-get operation, if " + "exceed this threshold," + "iterator will be stopped"); + _multi_get_max_iteration_size = dsn_config_get_value_uint64("pegasus.server", "rocksdb_multi_get_max_iteration_size", - 10 << 20, + 30 << 20, "multi-get operation total key-value size exceed " "this threshold will stop iterating rocksdb, 0 means no check"); @@ -713,17 +722,13 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req } uint32_t max_kv_count = request.max_kv_count > 0 ? request.max_kv_count : INT_MAX; - uint32_t max_iteration_count = std::min(max_kv_count, _rocksdb_max_iteration_count); + uint32_t max_iteration_count = std::min(max_kv_count, _multi_get_max_iteration_count); int32_t max_kv_size = request.max_kv_size > 0 ? request.max_kv_size : INT_MAX; int32_t max_iteration_size_config = _multi_get_max_iteration_size > 0 ? _multi_get_max_iteration_size : INT_MAX; int32_t max_iteration_size = std::min(max_kv_size, max_iteration_size_config); - // during rocksdb iteration, if iteration_count % module_num == 0, we will check if iteration - // exceed time threshold - uint32_t module_num = max_iteration_count <= 10 ? 1 : max_iteration_count / 10; - uint32_t epoch_now = ::pegasus::utils::epoch_now(); int32_t count = 0; int64_t size = 0; @@ -802,17 +807,15 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req std::unique_ptr it; bool complete = false; - bool time_check = _rocksdb_iteration_threshold_time_ms > 0; - uint64_t iteration_threshold_time_ns = _rocksdb_iteration_threshold_time_ms * 1e6; - bool exceed_limit = false; - uint64_t iteration_time = dsn_now_ns(); + + std::unique_ptr limiter = dsn::make_unique( + max_iteration_count, max_iteration_size, _rocksdb_iteration_threshold_time_ms); if (!request.reverse) { it.reset(_db->NewIterator(_data_cf_rd_opts, _data_cf)); it->Seek(start); bool first_exclusive = !start_inclusive; - while (iteration_count < max_iteration_count && size < max_iteration_size && - it->Valid()) { + while (limiter->valid() && it->Valid()) { // check stop sort key int c = it->key().compare(stop); if (c > 0 || (c == 0 && !stop_inclusive)) { @@ -831,13 +834,9 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req } } - iteration_count++; - if (time_check && iteration_count % module_num == 0) { - iteration_time = dsn_now_ns(); - if (iteration_time - start_time > iteration_threshold_time_ns) { - exceed_limit = true; - break; - } + limiter->add_count(); + if (!limiter->time_check()) { + break; } // extract value @@ -851,7 +850,7 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req if (r == 1) { count++; auto &kv = resp.kvs.back(); - size += kv.key.length() + kv.value.length(); + limiter->add_size(kv.key.length() + kv.value.length()); } else if (r == 2) { expire_count++; } else { // r == 3 @@ -882,8 +881,7 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req it->SeekForPrev(stop); bool first_exclusive = !stop_inclusive; std::vector<::dsn::apps::key_value> reverse_kvs; - while (iteration_count < max_iteration_count && size < max_iteration_size && - it->Valid()) { + while (limiter->valid() && it->Valid()) { // check start sort key int c = it->key().compare(start); if (c < 0 || (c == 0 && !start_inclusive)) { @@ -902,13 +900,9 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req } } - iteration_count++; - if (time_check && iteration_count % module_num == 0) { - iteration_time = dsn_now_ns(); - if (iteration_time - start_time > iteration_threshold_time_ns) { - exceed_limit = true; - break; - } + limiter->add_count(); + if (!limiter->time_check()) { + break; } // extract value @@ -922,7 +916,7 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req if (r == 1) { count++; auto &kv = reverse_kvs.back(); - size += kv.key.length() + kv.value.length(); + limiter->add_size(kv.key.length() + kv.value.length()); } else if (r == 2) { expire_count++; } else { // r == 3 @@ -947,6 +941,7 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req } } + iteration_count = limiter->get_iteration_count(); resp.error = it->status().code(); if (!it->status().ok()) { // error occur @@ -970,12 +965,12 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req } else if (it->Valid() && !complete) { // scan not completed resp.error = rocksdb::Status::kIncomplete; - if (exceed_limit) { + if (limiter->exceed_limit()) { dwarn_replica( "rocksdb abnormal scan from {}: time_used({}ns) VS time_threshold({}ns)", reply.to_address().to_string(), - iteration_time - start_time, - iteration_threshold_time_ns); + limiter->duration_time(), + limiter->max_duration_time()); } } } else { @@ -1136,24 +1131,16 @@ void pegasus_server_impl::on_sortkey_count(const ::dsn::blob &hash_key, resp.count = 0; uint32_t epoch_now = ::pegasus::utils::epoch_now(); uint64_t expire_count = 0; - uint64_t iteration_count = 0; - bool time_check = _rocksdb_iteration_threshold_time_ms > 0; - uint64_t iteration_threshold_time_ns = _rocksdb_iteration_threshold_time_ms * 1e6; - uint64_t iteration_time = dsn_now_ns(); - bool exceed_limit = false; - uint32_t module_num = - _rocksdb_max_iteration_count <= 10 ? 1 : _rocksdb_max_iteration_count / 10; + std::unique_ptr limiter = dsn::make_unique( + _rocksdb_max_iteration_count, 0, _rocksdb_iteration_threshold_time_ms); while (it->Valid()) { - ++iteration_count; - if (time_check && iteration_count % module_num == 0) { - iteration_time = dsn_now_ns(); - if (iteration_time - start_time > iteration_threshold_time_ns) { - exceed_limit = true; - break; - } + limiter->add_count(); + if (!limiter->time_check()) { + break; } + if (check_if_record_expired(epoch_now, it->value())) { expire_count++; if (_verbose_log) { @@ -1187,11 +1174,11 @@ void pegasus_server_impl::on_sortkey_count(const ::dsn::blob &hash_key, it->status().ToString().c_str()); } resp.count = 0; - } else if (exceed_limit) { + } else if (limiter->exceed_limit()) { dwarn_replica("rocksdb abnormal scan from {}: time_used({}ns) VS time_threshold({}ns)", reply.to_address().to_string(), - iteration_time - start_time, - iteration_threshold_time_ns); + limiter->duration_time(), + limiter->max_duration_time()); resp.count = -1; } @@ -1367,19 +1354,15 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request uint64_t expire_count = 0; uint64_t filter_count = 0; int32_t count = 0; - int32_t iteration_count = 0; uint32_t request_batch_size = request.batch_size > 0 ? request.batch_size : INT_MAX; uint32_t batch_count = std::min(request_batch_size, _rocksdb_max_iteration_count); resp.kvs.reserve(batch_count); - bool time_check = _rocksdb_iteration_threshold_time_ms > 0; - uint64_t iteration_threshold_time_ns = _rocksdb_iteration_threshold_time_ms * 1e6; - bool exceed_limit = false; - uint64_t iteration_time = dsn_now_ns(); - uint32_t module_num = batch_count <= 10 ? 1 : batch_count / 10; + std::unique_ptr limiter = + dsn::make_unique(batch_count, 0, _rocksdb_iteration_threshold_time_ms); - while (iteration_count < batch_count && it->Valid()) { + while (limiter->valid() && it->Valid()) { int c = it->key().compare(stop); if (c > 0 || (c == 0 && !stop_inclusive)) { // out of range @@ -1396,13 +1379,9 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request } } - iteration_count++; - if (time_check && iteration_count % module_num == 0) { - iteration_time = dsn_now_ns(); - if (iteration_time - start_time > iteration_threshold_time_ns) { - exceed_limit = true; - break; - } + limiter->add_count(); + if (!limiter->time_check()) { + break; } int r = append_key_value_for_scan(resp.kvs, @@ -1432,11 +1411,8 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request } // check iteration time whether exceed limit - if (!complete && time_check) { - iteration_time = dsn_now_ns(); - if (iteration_time - start_time > iteration_threshold_time_ns) { - exceed_limit = true; - } + if (!complete) { + limiter->time_check_after_incomplete_scan(); } resp.error = it->status().code(); @@ -1462,15 +1438,15 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request it->status().ToString().c_str()); } resp.kvs.clear(); - } else if (exceed_limit) { + } else if (limiter->exceed_limit()) { // scan exceed limit time resp.error = rocksdb::Status::kIncomplete; dwarn_replica("rocksdb abnormal scan from {}: batch_count={}, time_used_ns({}) VS " "time_threshold_ns({})", reply.to_address().to_string(), batch_count, - iteration_time - start_time, - iteration_threshold_time_ns); + limiter->duration_time(), + limiter->max_duration_time()); } else if (it->Valid() && !complete) { // scan not completed std::unique_ptr context( @@ -1540,18 +1516,14 @@ void pegasus_server_impl::on_scan(const ::dsn::apps::scan_request &request, uint64_t expire_count = 0; uint64_t filter_count = 0; int32_t count = 0; - int32_t iteration_count = 0; uint32_t context_batch_size = context->batch_size > 0 ? context->batch_size : INT_MAX; uint32_t batch_count = std::min(context_batch_size, _rocksdb_max_iteration_count); - bool time_check = _rocksdb_iteration_threshold_time_ms > 0; - uint64_t iteration_threshold_time_ns = _rocksdb_iteration_threshold_time_ms * 1e6; - bool exceed_limit = false; - uint64_t iteration_time = dsn_now_ns(); - uint32_t module_num = batch_count <= 10 ? 1 : batch_count / 10; + std::unique_ptr limiter = dsn::make_unique( + batch_count, 0, _rocksdb_iteration_threshold_time_ms); - while (iteration_count < batch_count && it->Valid()) { + while (limiter->valid() && it->Valid()) { int c = it->key().compare(stop); if (c > 0 || (c == 0 && !stop_inclusive)) { // out of range @@ -1559,13 +1531,9 @@ void pegasus_server_impl::on_scan(const ::dsn::apps::scan_request &request, break; } - iteration_count++; - if (time_check && iteration_count % module_num == 0) { - iteration_time = dsn_now_ns(); - if (iteration_time - start_time > iteration_threshold_time_ns) { - exceed_limit = true; - break; - } + limiter->add_count(); + if (!limiter->time_check()) { + break; } int r = append_key_value_for_scan(resp.kvs, @@ -1596,10 +1564,7 @@ void pegasus_server_impl::on_scan(const ::dsn::apps::scan_request &request, // check iteration time whether exceed limit if (!complete) { - iteration_time = dsn_now_ns(); - if (iteration_time - start_time > iteration_threshold_time_ns) { - exceed_limit = true; - } + limiter->time_check_after_incomplete_scan(); } resp.error = it->status().code(); @@ -1624,15 +1589,15 @@ void pegasus_server_impl::on_scan(const ::dsn::apps::scan_request &request, it->status().ToString().c_str()); } resp.kvs.clear(); - } else if (exceed_limit) { + } else if (limiter->exceed_limit()) { // scan exceed limit time resp.error = rocksdb::Status::kIncomplete; dwarn_replica("rocksdb abnormal scan from {}: batch_count={}, time_used({}ns) VS " "time_threshold({}ns)", reply.to_address().to_string(), batch_count, - iteration_time - start_time, - iteration_threshold_time_ns); + limiter->duration_time(), + limiter->max_duration_time()); } else if (it->Valid() && !complete) { // scan not completed int64_t handle = _context_cache.put(std::move(context)); diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h index c6c210525a..28351937c9 100644 --- a/src/server/pegasus_server_impl.h +++ b/src/server/pegasus_server_impl.h @@ -322,6 +322,7 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service uint64_t _slow_query_threshold_ns; uint64_t _slow_query_threshold_ns_in_config; // abnormal multi_get/rocksdb_iteration + uint32_t _multi_get_max_iteration_count; uint64_t _multi_get_max_iteration_size; uint32_t _rocksdb_max_iteration_count; uint64_t _rocksdb_iteration_threshold_time_ms_in_config; From 1a0c8cfee86ab1eb5da9ee48171cdeaf3b60c7a0 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Fri, 27 Mar 2020 17:17:24 +0800 Subject: [PATCH 12/17] rename --- src/server/pegasus_server_impl.cpp | 12 ++++++------ src/server/pegasus_server_impl.h | 1 + .../{iteration_limiter.h => range_read_limiter.h} | 11 +++++------ 3 files changed, 12 insertions(+), 12 deletions(-) rename src/server/{iteration_limiter.h => range_read_limiter.h} (90%) diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index 4d677a5550..39dc90d098 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -24,7 +24,7 @@ #include "pegasus_event_listener.h" #include "pegasus_server_write.h" #include "meta_store.h" -#include "iteration_limiter.h" +#include "range_read_limiter.h" using namespace dsn::literals::chrono_literals; @@ -808,7 +808,7 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req std::unique_ptr it; bool complete = false; - std::unique_ptr limiter = dsn::make_unique( + std::unique_ptr limiter = dsn::make_unique( max_iteration_count, max_iteration_size, _rocksdb_iteration_threshold_time_ms); if (!request.reverse) { @@ -1132,7 +1132,7 @@ void pegasus_server_impl::on_sortkey_count(const ::dsn::blob &hash_key, uint32_t epoch_now = ::pegasus::utils::epoch_now(); uint64_t expire_count = 0; - std::unique_ptr limiter = dsn::make_unique( + std::unique_ptr limiter = dsn::make_unique( _rocksdb_max_iteration_count, 0, _rocksdb_iteration_threshold_time_ms); while (it->Valid()) { @@ -1359,8 +1359,8 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request uint32_t batch_count = std::min(request_batch_size, _rocksdb_max_iteration_count); resp.kvs.reserve(batch_count); - std::unique_ptr limiter = - dsn::make_unique(batch_count, 0, _rocksdb_iteration_threshold_time_ms); + std::unique_ptr limiter = + dsn::make_unique(batch_count, 0, _rocksdb_iteration_threshold_time_ms); while (limiter->valid() && it->Valid()) { int c = it->key().compare(stop); @@ -1520,7 +1520,7 @@ void pegasus_server_impl::on_scan(const ::dsn::apps::scan_request &request, uint32_t context_batch_size = context->batch_size > 0 ? context->batch_size : INT_MAX; uint32_t batch_count = std::min(context_batch_size, _rocksdb_max_iteration_count); - std::unique_ptr limiter = dsn::make_unique( + std::unique_ptr limiter = dsn::make_unique( batch_count, 0, _rocksdb_iteration_threshold_time_ms); while (limiter->valid() && it->Valid()) { diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h index 28351937c9..07d3694200 100644 --- a/src/server/pegasus_server_impl.h +++ b/src/server/pegasus_server_impl.h @@ -26,6 +26,7 @@ namespace server { class meta_store; class capacity_unit_calculator; class pegasus_server_write; +class range_read_limiter_options; class pegasus_server_impl : public ::dsn::apps::rrdb_service { diff --git a/src/server/iteration_limiter.h b/src/server/range_read_limiter.h similarity index 90% rename from src/server/iteration_limiter.h rename to src/server/range_read_limiter.h index 797be25081..f9aec43e92 100644 --- a/src/server/iteration_limiter.h +++ b/src/server/range_read_limiter.h @@ -10,15 +10,14 @@ namespace pegasus { namespace server { -class iteration_limiter +class range_read_limiter { public: - iteration_limiter(uint32_t max_iteration_count, - uint64_t max_iteration_size, - uint64_t threshold_time_ms) + range_read_limiter(uint32_t max_iteration_count, + uint64_t max_iteration_size, + uint64_t threshold_time_ms) + : _max_count(max_iteration_count), _max_size(max_iteration_size) { - _max_count = max_iteration_count; - _max_size = max_iteration_size; _module_num = _max_count <= 10 ? 1 : _max_count / 10; _max_duration_time = threshold_time_ms > 0 ? threshold_time_ms * 1e6 : 0; _iteration_start_time_ns = dsn_now_ns(); From 5b8f85244c28c19e9d54f9bad01aae6a4604c4c4 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Fri, 3 Apr 2020 15:00:43 +0800 Subject: [PATCH 13/17] update by review --- src/server/pegasus_server_impl.cpp | 66 +++++++++++++++++------------- src/server/pegasus_server_impl.h | 10 ++--- src/server/range_read_limiter.h | 12 +++++- 3 files changed, 51 insertions(+), 37 deletions(-) diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index c2dfa0112c..7333a07fce 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -24,7 +24,6 @@ #include "pegasus_event_listener.h" #include "pegasus_server_write.h" #include "meta_store.h" -#include "range_read_limiter.h" using namespace dsn::literals::chrono_literals; @@ -96,35 +95,37 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r) 1000, "multi-get operation iterate count exceed this threshold will be logged, 0 means no check"); - _multi_get_max_iteration_count = (uint32_t)dsn_config_get_value_uint64( + _rng_rd_opts.multi_get_max_iteration_count = (uint32_t)dsn_config_get_value_uint64( "pegasus.server", "rocksdb_multi_get_max_iteration_count", 3000, - "max iteration count for each rocksdb iterator operation for multi-get operation, if " + "max iteration count for each range read for multi-get operation, if " "exceed this threshold," "iterator will be stopped"); - _multi_get_max_iteration_size = + _rng_rd_opts.multi_get_max_iteration_size = dsn_config_get_value_uint64("pegasus.server", "rocksdb_multi_get_max_iteration_size", 30 << 20, "multi-get operation total key-value size exceed " "this threshold will stop iterating rocksdb, 0 means no check"); - _rocksdb_max_iteration_count = (uint32_t)dsn_config_get_value_uint64( - "pegasus.server", - "rocksdb_max_iteration_count", - 1000, - "max iteration count for each rocksdb iterator operation, if exceed this threshold," - "iterator will be stopped"); + _rng_rd_opts.rocksdb_max_iteration_count = + (uint32_t)dsn_config_get_value_uint64("pegasus.server", + "rocksdb_max_iteration_count", + 1000, + "max iteration count for each range " + "read, if exceed this threshold, " + "iterator will be stopped"); - _rocksdb_iteration_threshold_time_ms_in_config = dsn_config_get_value_uint64( + _rng_rd_opts.rocksdb_iteration_threshold_time_ms_in_config = dsn_config_get_value_uint64( "pegasus.server", "rocksdb_iteration_threshold_time_ms", 30000, "max duration for handling one pegasus scan request(sortkey_count/multiget/scan) if exceed " "this threshold, iterator will be stopped, 0 means no check"); - _rocksdb_iteration_threshold_time_ms = _rocksdb_iteration_threshold_time_ms_in_config; + _rng_rd_opts.rocksdb_iteration_threshold_time_ms = + _rng_rd_opts.rocksdb_iteration_threshold_time_ms_in_config; // init rocksdb::DBOptions _db_opts.pegasus_data = true; @@ -722,11 +723,13 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req } uint32_t max_kv_count = request.max_kv_count > 0 ? request.max_kv_count : INT_MAX; - uint32_t max_iteration_count = std::min(max_kv_count, _multi_get_max_iteration_count); + uint32_t max_iteration_count = + std::min(max_kv_count, _rng_rd_opts.multi_get_max_iteration_count); int32_t max_kv_size = request.max_kv_size > 0 ? request.max_kv_size : INT_MAX; - int32_t max_iteration_size_config = - _multi_get_max_iteration_size > 0 ? _multi_get_max_iteration_size : INT_MAX; + int32_t max_iteration_size_config = _rng_rd_opts.multi_get_max_iteration_size > 0 + ? _rng_rd_opts.multi_get_max_iteration_size + : INT_MAX; int32_t max_iteration_size = std::min(max_kv_size, max_iteration_size_config); uint32_t epoch_now = ::pegasus::utils::epoch_now(); @@ -808,8 +811,10 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req std::unique_ptr it; bool complete = false; - std::unique_ptr limiter = dsn::make_unique( - max_iteration_count, max_iteration_size, _rocksdb_iteration_threshold_time_ms); + std::unique_ptr limiter = + dsn::make_unique(max_iteration_count, + max_iteration_size, + _rng_rd_opts.rocksdb_iteration_threshold_time_ms); if (!request.reverse) { it.reset(_db->NewIterator(_data_cf_rd_opts, _data_cf)); @@ -973,7 +978,7 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req limiter->max_duration_time()); } } - } else { + } else { // condition: !request.sort_keys.empty() bool error_occurred = false; rocksdb::Status final_status; bool exceed_limit = false; @@ -1132,8 +1137,10 @@ void pegasus_server_impl::on_sortkey_count(const ::dsn::blob &hash_key, uint32_t epoch_now = ::pegasus::utils::epoch_now(); uint64_t expire_count = 0; - std::unique_ptr limiter = dsn::make_unique( - _rocksdb_max_iteration_count, 0, _rocksdb_iteration_threshold_time_ms); + std::unique_ptr limiter = + dsn::make_unique(_rng_rd_opts.rocksdb_max_iteration_count, + 0, + _rng_rd_opts.rocksdb_iteration_threshold_time_ms); while (it->Valid()) { limiter->add_count(); @@ -1356,11 +1363,11 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request int32_t count = 0; uint32_t request_batch_size = request.batch_size > 0 ? request.batch_size : INT_MAX; - uint32_t batch_count = std::min(request_batch_size, _rocksdb_max_iteration_count); + uint32_t batch_count = std::min(request_batch_size, _rng_rd_opts.rocksdb_max_iteration_count); resp.kvs.reserve(batch_count); - std::unique_ptr limiter = - dsn::make_unique(batch_count, 0, _rocksdb_iteration_threshold_time_ms); + std::unique_ptr limiter = dsn::make_unique( + batch_count, 0, _rng_rd_opts.rocksdb_iteration_threshold_time_ms); while (limiter->valid() && it->Valid()) { int c = it->key().compare(stop); @@ -1518,10 +1525,11 @@ void pegasus_server_impl::on_scan(const ::dsn::apps::scan_request &request, int32_t count = 0; uint32_t context_batch_size = context->batch_size > 0 ? context->batch_size : INT_MAX; - uint32_t batch_count = std::min(context_batch_size, _rocksdb_max_iteration_count); + uint32_t batch_count = + std::min(context_batch_size, _rng_rd_opts.rocksdb_max_iteration_count); std::unique_ptr limiter = dsn::make_unique( - batch_count, 0, _rocksdb_iteration_threshold_time_ms); + batch_count, 0, _rng_rd_opts.rocksdb_iteration_threshold_time_ms); while (limiter->valid() && it->Valid()) { int c = it->key().compare(stop); @@ -2648,7 +2656,7 @@ void pegasus_server_impl::update_slow_query_threshold( void pegasus_server_impl::update_rocksdb_iteration_threshold( const std::map &envs) { - uint64_t threshold_ms = _rocksdb_iteration_threshold_time_ms_in_config; + uint64_t threshold_ms = _rng_rd_opts.rocksdb_iteration_threshold_time_ms_in_config; auto find = envs.find(ROCKSDB_ITERATION_THRESHOLD_TIME_MS); if (find != envs.end()) { // the unit of iteration threshold from env is ms @@ -2658,12 +2666,12 @@ void pegasus_server_impl::update_rocksdb_iteration_threshold( } } - if (_rocksdb_iteration_threshold_time_ms != threshold_ms) { + if (_rng_rd_opts.rocksdb_iteration_threshold_time_ms != threshold_ms) { ddebug_replica("update app env[{}] from \"{}\" to \"{}\" succeed", ROCKSDB_ITERATION_THRESHOLD_TIME_MS, - _rocksdb_iteration_threshold_time_ms, + _rng_rd_opts.rocksdb_iteration_threshold_time_ms, threshold_ms); - _rocksdb_iteration_threshold_time_ms = threshold_ms; + _rng_rd_opts.rocksdb_iteration_threshold_time_ms = threshold_ms; } } diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h index 07d3694200..425141fda3 100644 --- a/src/server/pegasus_server_impl.h +++ b/src/server/pegasus_server_impl.h @@ -19,6 +19,7 @@ #include "pegasus_scan_context.h" #include "pegasus_manual_compact_service.h" #include "pegasus_write_service.h" +#include "range_read_limiter.h" namespace pegasus { namespace server { @@ -26,7 +27,6 @@ namespace server { class meta_store; class capacity_unit_calculator; class pegasus_server_write; -class range_read_limiter_options; class pegasus_server_impl : public ::dsn::apps::rrdb_service { @@ -322,12 +322,8 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service // slow query time threshold. exceed this threshold will be logged. uint64_t _slow_query_threshold_ns; uint64_t _slow_query_threshold_ns_in_config; - // abnormal multi_get/rocksdb_iteration - uint32_t _multi_get_max_iteration_count; - uint64_t _multi_get_max_iteration_size; - uint32_t _rocksdb_max_iteration_count; - uint64_t _rocksdb_iteration_threshold_time_ms_in_config; - uint64_t _rocksdb_iteration_threshold_time_ms; + + range_read_limiter_options _rng_rd_opts; std::shared_ptr _key_ttl_compaction_filter_factory; std::shared_ptr _statistics; diff --git a/src/server/range_read_limiter.h b/src/server/range_read_limiter.h index f9aec43e92..c9d2fe51b5 100644 --- a/src/server/range_read_limiter.h +++ b/src/server/range_read_limiter.h @@ -5,11 +5,21 @@ #pragma once #include -#include "pegasus_server_impl.h" namespace pegasus { namespace server { +class pegasus_server_impl; + +struct range_read_limiter_options +{ + uint32_t multi_get_max_iteration_count; + uint64_t multi_get_max_iteration_size; + uint32_t rocksdb_max_iteration_count; + uint64_t rocksdb_iteration_threshold_time_ms_in_config; + uint64_t rocksdb_iteration_threshold_time_ms; +}; + class range_read_limiter { public: From 7639d53ab32ce6f8d724abccb625e1c8c61b8b2b Mon Sep 17 00:00:00 2001 From: heyuchen Date: Fri, 3 Apr 2020 18:33:51 +0800 Subject: [PATCH 14/17] update by code review --- src/server/pegasus_server_impl.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index 7333a07fce..24dd9ece9c 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -981,6 +981,7 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req } else { // condition: !request.sort_keys.empty() bool error_occurred = false; rocksdb::Status final_status; + int32_t multiget_count = 0; bool exceed_limit = false; std::vector<::dsn::blob> keys_holder; std::vector keys; @@ -1017,7 +1018,7 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req } // check ttl if (status.ok()) { - iteration_count++; + multiget_count++; uint32_t expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, value); if (expire_ts > 0 && expire_ts <= epoch_now) { expire_count++; @@ -1032,7 +1033,7 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req // extract value if (status.ok()) { // check if exceed limit - if (iteration_count > max_iteration_count || size > max_iteration_size) { + if (multiget_count > max_iteration_count || size > max_iteration_size) { exceed_limit = true; break; } From 8afdd534af289ae5792160ce387a27acec341ea7 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Fri, 3 Apr 2020 19:51:25 +0800 Subject: [PATCH 15/17] update by review --- src/server/pegasus_server_impl.cpp | 15 --------------- src/server/range_read_limiter.h | 4 ++-- 2 files changed, 2 insertions(+), 17 deletions(-) diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index 24dd9ece9c..bb455026b8 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -840,9 +840,6 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req } limiter->add_count(); - if (!limiter->time_check()) { - break; - } // extract value int r = append_key_value_for_multi_get(resp.kvs, @@ -906,9 +903,6 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req } limiter->add_count(); - if (!limiter->time_check()) { - break; - } // extract value int r = append_key_value_for_multi_get(reverse_kvs, @@ -1145,9 +1139,6 @@ void pegasus_server_impl::on_sortkey_count(const ::dsn::blob &hash_key, while (it->Valid()) { limiter->add_count(); - if (!limiter->time_check()) { - break; - } if (check_if_record_expired(epoch_now, it->value())) { expire_count++; @@ -1388,9 +1379,6 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request } limiter->add_count(); - if (!limiter->time_check()) { - break; - } int r = append_key_value_for_scan(resp.kvs, it->key(), @@ -1541,9 +1529,6 @@ void pegasus_server_impl::on_scan(const ::dsn::apps::scan_request &request, } limiter->add_count(); - if (!limiter->time_check()) { - break; - } int r = append_key_value_for_scan(resp.kvs, it->key(), diff --git a/src/server/range_read_limiter.h b/src/server/range_read_limiter.h index c9d2fe51b5..b270002b69 100644 --- a/src/server/range_read_limiter.h +++ b/src/server/range_read_limiter.h @@ -41,14 +41,14 @@ class range_read_limiter if (_max_size > 0 && _iteration_size >= _max_size) { return false; } - return true; + return time_check(); } // during rocksdb iteration, if iteration_count % module_num == 0, we will check if iteration // exceed time threshold, which means we at most check ten times during iteration bool time_check() { - if (_max_duration_time > 0 && _iteration_count % _module_num == 0 && + if (_max_duration_time > 0 && (_iteration_count + 1) % _module_num == 0 && dsn_now_ns() - _iteration_start_time_ns > _max_duration_time) { _exceed_limit = true; _iteration_duration_time_ns = dsn_now_ns() - _iteration_start_time_ns; From 2de9d3bb6443867275b247686e29f0c136258ef7 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Sat, 4 Apr 2020 12:34:03 +0800 Subject: [PATCH 16/17] fix --- src/server/pegasus_server_impl.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index bb455026b8..5311f7f822 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -1136,8 +1136,7 @@ void pegasus_server_impl::on_sortkey_count(const ::dsn::blob &hash_key, dsn::make_unique(_rng_rd_opts.rocksdb_max_iteration_count, 0, _rng_rd_opts.rocksdb_iteration_threshold_time_ms); - - while (it->Valid()) { + while (limiter->time_check() && it->Valid()) { limiter->add_count(); if (check_if_record_expired(epoch_now, it->value())) { From bdaf2b5e97fb3b3c8ee5723af8d7c771f4860cf8 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Tue, 7 Apr 2020 18:42:46 +0800 Subject: [PATCH 17/17] update by review --- src/server/pegasus_server_impl.cpp | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index 5311f7f822..f7692f59ab 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -975,7 +975,6 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req } else { // condition: !request.sort_keys.empty() bool error_occurred = false; rocksdb::Status final_status; - int32_t multiget_count = 0; bool exceed_limit = false; std::vector<::dsn::blob> keys_holder; std::vector keys; @@ -1012,7 +1011,6 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req } // check ttl if (status.ok()) { - multiget_count++; uint32_t expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, value); if (expire_ts > 0 && expire_ts <= epoch_now) { expire_count++; @@ -1027,7 +1025,7 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req // extract value if (status.ok()) { // check if exceed limit - if (multiget_count > max_iteration_count || size > max_iteration_size) { + if (count >= max_kv_count || size >= max_kv_size) { exceed_limit = true; break; }