Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: limit long time rocksdb iteration operation #500

Merged
merged 24 commits into from Apr 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/base/pegasus_const.cpp
Expand Up @@ -69,4 +69,8 @@ const std::string PEGASUS_CLUSTER_SECTION_NAME("pegasus.clusters");

/// table level slow query
const std::string ROCKSDB_ENV_SLOW_QUERY_THRESHOLD("replica.slow_query_threshold");

/// time threshold of each rocksdb iteration
const std::string
ROCKSDB_ITERATION_THRESHOLD_TIME_MS("replica.rocksdb_iteration_threshold_time_ms");
} // namespace pegasus
2 changes: 2 additions & 0 deletions src/base/pegasus_const.h
Expand Up @@ -45,4 +45,6 @@ extern const std::string ROCKDB_CHECKPOINT_RESERVE_TIME_SECONDS;
extern const std::string PEGASUS_CLUSTER_SECTION_NAME;

extern const std::string ROCKSDB_ENV_SLOW_QUERY_THRESHOLD;

extern const std::string ROCKSDB_ITERATION_THRESHOLD_TIME_MS;
} // namespace pegasus
6 changes: 6 additions & 0 deletions src/server/config.ini
Expand Up @@ -280,6 +280,12 @@
# Bloom filter type, should be either 'common' or 'prefix'
rocksdb_filter_type = prefix

# 3000, 30MB, 1000, 30s
rocksdb_multi_get_max_iteration_count = 3000
rocksdb_multi_get_max_iteration_size = 31457280
rocksdb_max_iteration_count = 1000
hycdong marked this conversation as resolved.
Show resolved Hide resolved
rocksdb_iteration_threshold_time_ms = 30000

checkpoint_reserve_min_count = 2
checkpoint_reserve_time_seconds = 1800

Expand Down
180 changes: 160 additions & 20 deletions src/server/pegasus_server_impl.cpp

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions src/server/pegasus_server_impl.h
Expand Up @@ -19,6 +19,7 @@
#include "pegasus_scan_context.h"
#include "pegasus_manual_compact_service.h"
#include "pegasus_write_service.h"
#include "range_read_limiter.h"

namespace pegasus {
namespace server {
Expand Down Expand Up @@ -232,6 +233,8 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service

void update_slow_query_threshold(const std::map<std::string, std::string> &envs);

void update_rocksdb_iteration_threshold(const std::map<std::string, std::string> &envs);

// return true if parse compression types 'config' success, otherwise return false.
// 'compression_per_level' will not be changed if parse failed.
bool parse_compression_types(const std::string &config,
Expand Down Expand Up @@ -320,6 +323,8 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service
uint64_t _slow_query_threshold_ns;
uint64_t _slow_query_threshold_ns_in_config;

range_read_limiter_options _rng_rd_opts;

std::shared_ptr<KeyWithTTLCompactionFilterFactory> _key_ttl_compaction_filter_factory;
std::shared_ptr<rocksdb::Statistics> _statistics;
rocksdb::DBOptions _db_opts;
Expand Down
91 changes: 91 additions & 0 deletions src/server/range_read_limiter.h
@@ -0,0 +1,91 @@
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#pragma once

#include <dsn/dist/replication/replication.codes.h>

namespace pegasus {
namespace server {

class pegasus_server_impl;

struct range_read_limiter_options
{
uint32_t multi_get_max_iteration_count;
uint64_t multi_get_max_iteration_size;
uint32_t rocksdb_max_iteration_count;
uint64_t rocksdb_iteration_threshold_time_ms_in_config;
uint64_t rocksdb_iteration_threshold_time_ms;
};

class range_read_limiter
{
public:
range_read_limiter(uint32_t max_iteration_count,
uint64_t max_iteration_size,
uint64_t threshold_time_ms)
: _max_count(max_iteration_count), _max_size(max_iteration_size)
{
_module_num = _max_count <= 10 ? 1 : _max_count / 10;
_max_duration_time = threshold_time_ms > 0 ? threshold_time_ms * 1e6 : 0;
_iteration_start_time_ns = dsn_now_ns();
}

bool valid()
{
if (_iteration_count >= _max_count) {
return false;
}
if (_max_size > 0 && _iteration_size >= _max_size) {
return false;
}
return time_check();
}

// during rocksdb iteration, if iteration_count % module_num == 0, we will check if iteration
// exceed time threshold, which means we at most check ten times during iteration
bool time_check()
{
if (_max_duration_time > 0 && (_iteration_count + 1) % _module_num == 0 &&
dsn_now_ns() - _iteration_start_time_ns > _max_duration_time) {
_exceed_limit = true;
_iteration_duration_time_ns = dsn_now_ns() - _iteration_start_time_ns;
return false;
}
return true;
}

void time_check_after_incomplete_scan()
{
if (_max_duration_time > 0 &&
dsn_now_ns() - _iteration_start_time_ns > _max_duration_time) {
_exceed_limit = true;
_iteration_duration_time_ns = dsn_now_ns() - _iteration_start_time_ns;
}
}

void add_count() { ++_iteration_count; }
void add_size(uint64_t size) { _iteration_size += size; }

bool exceed_limit() { return _exceed_limit; }
uint32_t get_iteration_count() { return _iteration_count; }
uint64_t duration_time() { return _iteration_duration_time_ns; }
uint64_t max_duration_time() { return _max_duration_time; }

private:
bool _exceed_limit{false};

uint32_t _iteration_count{0};
uint64_t _iteration_size{0};
uint64_t _iteration_start_time_ns{0};
uint64_t _iteration_duration_time_ns{0};

uint32_t _max_count{0};
uint64_t _max_size{0};
uint64_t _max_duration_time{0};
int32_t _module_num{1};
};
} // namespace server
} // namespace pegasus
2 changes: 2 additions & 0 deletions src/shell/commands/data_operations.cpp
Expand Up @@ -1013,6 +1013,8 @@ bool sortkey_count(command_executor *e, shell_context *sc, arguments args)
int ret = sc->pg_client->sortkey_count(hash_key, count, sc->timeout_ms, &info);
if (ret != pegasus::PERR_OK) {
fprintf(stderr, "ERROR: %s\n", sc->pg_client->get_error_string(ret));
} else if (count == -1) {
fprintf(stderr, "ERROR: it takes too long to count sortkey\n");
} else {
fprintf(stderr, "%" PRId64 "\n", count);
}
Expand Down
2 changes: 1 addition & 1 deletion src/test/function_test/run.sh
Expand Up @@ -41,9 +41,9 @@ exit_if_fail $? "run test check_and_set failed: $test_case $config_file $table_n
GTEST_OUTPUT="xml:$REPORT_DIR/check_and_mutate.xml" GTEST_FILTER="check_and_mutate.*" ./$test_case $config_file $table_name
exit_if_fail $? "run test check_and_mutate failed: $test_case $config_file $table_name"
GTEST_OUTPUT="xml:$REPORT_DIR/scan.xml" GTEST_FILTER="scan.*" ./$test_case $config_file $table_name
exit_if_fail $? "run test scan failed: $test_case $config_file $table_name"
GTEST_OUTPUT="xml:$REPORT_DIR/ttl.xml" GTEST_FILTER="ttl.*" ./$test_case $config_file $table_name
exit_if_fail $? "run test ttl failed: $test_case $config_file $table_name"
exit_if_fail $? "run test scan failed: $test_case $config_file $table_name"
GTEST_OUTPUT="xml:$REPORT_DIR/slog_log.xml" GTEST_FILTER="lost_log.*" ./$test_case $config_file $table_name
exit_if_fail $? "run test slog_lost failed: $test_case $config_file $table_name"
GTEST_OUTPUT="xml:$REPORT_DIR/recall.xml" GTEST_FILTER="drop_and_recall.*" ./$test_case $config_file $table_name
Expand Down
10 changes: 10 additions & 0 deletions src/test/function_test/test_basic.cpp
Expand Up @@ -546,6 +546,16 @@ TEST(basic, multi_get)
ASSERT_EQ(1, (int)new_values.size());
ASSERT_EQ("5", new_values["5"]);

// set a expired value
ret = client->set("basic_test_multi_get", "", "expire_value", 5000, 1);
ASSERT_EQ(PERR_OK, ret);
std::this_thread::sleep_for(std::chrono::seconds(1));
new_values.clear();
ret = client->multi_get("basic_test_multi_get", "", "", options, new_values, 2);
ASSERT_EQ(PERR_INCOMPLETE, ret);
ASSERT_EQ(1, (int)new_values.size());
ASSERT_EQ("1", new_values["1"]);

// multi_del
std::set<std::string> sortkeys;
sortkeys.insert("");
Expand Down
42 changes: 42 additions & 0 deletions src/test/function_test/test_scan.cpp
Expand Up @@ -7,14 +7,17 @@
#include <vector>
#include <map>

#include <dsn/dist/replication/replication_ddl_client.h>
#include <dsn/service_api_c.h>
#include <unistd.h>
#include <pegasus/client.h>
#include <gtest/gtest.h>
#include "base/pegasus_const.h"

using namespace ::pegasus;

extern pegasus_client *client;
extern std::shared_ptr<dsn::replication::replication_ddl_client> ddl_client;
static const char CCH[] = "_0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
static char buffer[256];
static std::map<std::string, std::map<std::string, std::string>> base;
Expand Down Expand Up @@ -397,3 +400,42 @@ TEST_F(scan, OVERALL)
}
compare(data, base);
}

TEST_F(scan, ITERATION_TIME_LIMIT)
{
// update iteration threshold to 1ms
auto response = ddl_client->set_app_envs(
client->get_app_name(), {ROCKSDB_ITERATION_THRESHOLD_TIME_MS}, {std::to_string(1)});
ASSERT_EQ(true, response.is_ok());
ASSERT_EQ(dsn::ERR_OK, response.get_value().err);
// wait envs to be synced.
std::this_thread::sleep_for(std::chrono::seconds(30));

// write data into table
int32_t i = 0;
std::string sort_key;
std::string value;
while (i < 9000) {
sort_key = random_string();
value = random_string();
int ret = client->set(expected_hash_key, sort_key, value);
ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << expected_hash_key
<< ", sort_key=" << sort_key
<< ", error=" << client->get_error_string(ret);
i++;
}

// get sortkey count timeout
int64_t count = 0;
int ret = client->sortkey_count(expected_hash_key, count);
ASSERT_EQ(0, ret);
ASSERT_EQ(count, -1);

// set iteration threshold to 100ms
response = ddl_client->set_app_envs(
client->get_app_name(), {ROCKSDB_ITERATION_THRESHOLD_TIME_MS}, {std::to_string(100)});
ASSERT_EQ(true, response.is_ok());
ASSERT_EQ(dsn::ERR_OK, response.get_value().err);
// wait envs to be synced.
std::this_thread::sleep_for(std::chrono::seconds(30));
}