From b182130ffd1affc5d92555a3ee44c1bd5f91d529 Mon Sep 17 00:00:00 2001 From: Trond Norbye Date: Mon, 2 May 2022 10:38:34 +0200 Subject: [PATCH] MB-51914: Set a max limit on #connections to a bucket Only allow external clients to select a bucket if the total number of connections bound to the limit is lower than 600. Once the requirements for the limit (is it a global limit or may it be set differently across the buckets etc) we need to implement a way to set/tune the limit. Change-Id: Ife4a37bd2e34f7a8e0574145730797a068539ece Reviewed-on: https://review.couchbase.org/c/kv_engine/+/174391 Reviewed-by: Dave Rigby Tested-by: Trond Norbye --- daemon/connection.cc | 1 + daemon/memcached.cc | 15 +++++ daemon/protocol/mcbp/executors.cc | 1 + .../protocol/mcbp/select_bucket_executor.cc | 14 +++++ daemon/settings.h | 5 ++ .../ep/tests/module_tests/range_scan_test.cc | 1 + engines/utilities/engine_error.cc | 2 + include/memcached/engine_error.h | 3 + include/serverless/config.h | 26 +++++++++ tests/testapp_serverless/serverless_test.cc | 56 ++++++++++++++++++- utilities/engine_errc_2_mcbp.cc | 3 + 11 files changed, 126 insertions(+), 1 deletion(-) create mode 100644 include/serverless/config.h diff --git a/daemon/connection.cc b/daemon/connection.cc index 850e618fb4..9b9db45f37 100644 --- a/daemon/connection.cc +++ b/daemon/connection.cc @@ -347,6 +347,7 @@ cb::engine_errc Connection::remapErrorCode(cb::engine_errc code) { case cb::engine_errc::opaque_no_match: return code; + case cb::engine_errc::too_many_connections: case cb::engine_errc::scope_size_limit_exceeded: return cb::engine_errc::too_big; diff --git a/daemon/memcached.cc b/daemon/memcached.cc index 2f55950e44..7856dd8266 100644 --- a/daemon/memcached.cc +++ b/daemon/memcached.cc @@ -55,6 +55,7 @@ #include #include #include +#include #include #include #include @@ -70,6 +71,13 @@ #include #endif +namespace cb::serverless { +Config& Config::instance() { + static Config instance; + return instance; +} +} // namespace cb::serverless + std::atomic memcached_shutdown; std::atomic sigint; std::atomic sigterm; @@ -444,6 +452,13 @@ static std::string configure_numa_policy() { #endif // HAVE_LIBNUMA static void settings_init() { + if (getenv("MEMCACHED_UNIT_TESTS")) { + auto& config = cb::serverless::Config::instance(); + // Reduce the max number so we don't have to create so many connections + config.maxConnectionsPerBucket = + cb::serverless::test::MaxConnectionsPerBucket; + } + auto& settings = Settings::instance(); // Set up the listener functions diff --git a/daemon/protocol/mcbp/executors.cc b/daemon/protocol/mcbp/executors.cc index 7e606f35ef..2cb4e7ec66 100644 --- a/daemon/protocol/mcbp/executors.cc +++ b/daemon/protocol/mcbp/executors.cc @@ -69,6 +69,7 @@ void handle_executor_status(Cookie& cookie, cb::engine_errc status) { case engine_errc::scope_size_limit_exceeded: case engine_errc::range_scan_cancelled: case engine_errc::range_scan_more: + case engine_errc::too_many_connections: cookie.sendResponse(mapped); break; } diff --git a/daemon/protocol/mcbp/select_bucket_executor.cc b/daemon/protocol/mcbp/select_bucket_executor.cc index 61ee82577d..f9aa9be0c4 100644 --- a/daemon/protocol/mcbp/select_bucket_executor.cc +++ b/daemon/protocol/mcbp/select_bucket_executor.cc @@ -14,9 +14,11 @@ #include #include #include +#include #include #include #include +#include #include cb::engine_errc select_bucket(Cookie& cookie, const std::string& bucketname) { @@ -51,6 +53,18 @@ cb::engine_errc select_bucket(Cookie& cookie, const std::string& bucketname) { return cb::engine_errc::not_supported; } + if (isServerlessDeployment() && !connection.isInternal()) { + using cb::serverless::Config; + if (connection.getBucket().clients >= + Config::instance().maxConnectionsPerBucket) { + if (oldIndex != connection.getBucketIndex()) { + associate_bucket(cookie, all_buckets[oldIndex].name); + } + cookie.setErrorContext("Too many bucket connections"); + return cb::engine_errc::too_many_connections; + } + } + connection.setPushedClustermapRevno({}); return cb::engine_errc::success; } diff --git a/daemon/settings.h b/daemon/settings.h index ef2a2edb50..c0bfacbacd 100644 --- a/daemon/settings.h +++ b/daemon/settings.h @@ -1024,3 +1024,8 @@ class Settings { bool write_compute_unit_size = false; } has; }; + +static inline bool isServerlessDeployment() { + return Settings::instance().getDeploymentModel() == + DeploymentModel::Serverless; +} diff --git a/engines/ep/tests/module_tests/range_scan_test.cc b/engines/ep/tests/module_tests/range_scan_test.cc index 41866cc648..4819d4d623 100644 --- a/engines/ep/tests/module_tests/range_scan_test.cc +++ b/engines/ep/tests/module_tests/range_scan_test.cc @@ -515,6 +515,7 @@ bool TestRangeScanHandler::validateStatus(cb::engine_errc code) { case cb::engine_errc::not_supported: case cb::engine_errc::would_block: case cb::engine_errc::too_big: + case cb::engine_errc::too_many_connections: case cb::engine_errc::disconnect: case cb::engine_errc::no_access: case cb::engine_errc::temporary_failure: diff --git a/engines/utilities/engine_error.cc b/engines/utilities/engine_error.cc index 46ef66f605..43600a8f83 100644 --- a/engines/utilities/engine_error.cc +++ b/engines/utilities/engine_error.cc @@ -117,6 +117,8 @@ std::string cb::to_string(cb::engine_errc code) { return "range scan cancelled"; case engine_errc::range_scan_more: return "range scan more"; + case engine_errc::too_many_connections: + return "too many connections"; }; throw std::invalid_argument( "engine_error_category::message: code does not represent a " diff --git a/include/memcached/engine_error.h b/include/memcached/engine_error.h index 5631ac4f0d..66297683b7 100644 --- a/include/memcached/engine_error.h +++ b/include/memcached/engine_error.h @@ -133,6 +133,9 @@ enum class engine_errc { /// RangeScan has more data available range_scan_more = 0x27, + /// Too many connections + too_many_connections = 0x28, + /** Generic failue. */ failed = 0xff }; diff --git a/include/serverless/config.h b/include/serverless/config.h new file mode 100644 index 0000000000..f85753d5e5 --- /dev/null +++ b/include/serverless/config.h @@ -0,0 +1,26 @@ +/* + * Copyright 2022-Present Couchbase, Inc. + * + * Use of this software is governed by the Business Source License included + * in the file licenses/BSL-Couchbase.txt. As of the Change Date specified + * in that file, in accordance with the Business Source License, use of this + * software will be governed by the Apache License, Version 2.0, included in + * the file licenses/APL2.txt. + */ + +#pragma once + +#include + +namespace cb::serverless { +struct Config { + static Config& instance(); + /// The maximum number of (external) connections for a bucket + size_t maxConnectionsPerBucket = {600}; +}; + +namespace test { +constexpr size_t MaxConnectionsPerBucket = 16; +} + +} // namespace cb::serverless diff --git a/tests/testapp_serverless/serverless_test.cc b/tests/testapp_serverless/serverless_test.cc index 341d78291a..ee5845467e 100644 --- a/tests/testapp_serverless/serverless_test.cc +++ b/tests/testapp_serverless/serverless_test.cc @@ -15,6 +15,8 @@ #include #include #include +#include +#include namespace cb::test { @@ -45,7 +47,10 @@ void ServerlessTest::SetUpTestCase() { "bucket-@": { "privileges": [ "Read", - "Write" + "SimpleStats", + "Insert", + "Delete", + "Upsert" ] } }, @@ -85,4 +90,53 @@ void ServerlessTest::SetUp() { void ServerlessTest::TearDown() { Test::TearDown(); } + +TEST_F(ServerlessTest, MaxConnectionPerBucket) { + using namespace cb::serverless::test; + auto admin = cluster->getConnection(0); + admin->authenticate("@admin", "password"); + auto getNumClients = [&admin]() -> std::size_t { + size_t num_clients = 0; + admin->stats( + [&num_clients](const auto& k, const auto& v) { + nlohmann::json json = nlohmann::json::parse(v); + num_clients = json["clients"].get(); + }, + "bucket_details bucket-0"); + return num_clients; + }; + + std::deque> connections; + bool done = false; + BinprotResponse rsp; + do { + auto conn = cluster->getConnection(0); + conn->authenticate("bucket-0", "bucket-0"); + rsp = conn->execute(BinprotGenericCommand{ + cb::mcbp::ClientOpcode::SelectBucket, "bucket-0"}); + if (rsp.isSuccess()) { + connections.emplace_back(std::move(conn)); + ASSERT_LE(getNumClients(), MaxConnectionsPerBucket); + } else { + ASSERT_EQ(cb::mcbp::Status::RateLimitedMaxConnections, + rsp.getStatus()); + // Without XERROR E2BIG should be returned + conn->setXerrorSupport(false); + rsp = conn->execute(BinprotGenericCommand{ + cb::mcbp::ClientOpcode::SelectBucket, "bucket-0"}); + ASSERT_FALSE(rsp.isSuccess()); + ASSERT_EQ(cb::mcbp::Status::E2big, rsp.getStatus()); + done = true; + } + } while (!done); + + // But we should be allowed to connect internal users + for (int ii = 0; ii < 5; ++ii) { + auto conn = cluster->getConnection(0); + conn->authenticate("@admin", "password"); + conn->selectBucket("bucket-0"); + connections.emplace_back(std::move(conn)); + } + EXPECT_EQ(MaxConnectionsPerBucket + 5, getNumClients()); +} } // namespace cb::test diff --git a/utilities/engine_errc_2_mcbp.cc b/utilities/engine_errc_2_mcbp.cc index 01377dc7c3..728776ff45 100644 --- a/utilities/engine_errc_2_mcbp.cc +++ b/utilities/engine_errc_2_mcbp.cc @@ -82,6 +82,9 @@ cb::mcbp::Status cb::mcbp::to_status(cb::engine_errc code) { return Status::RangeScanCancelled; case engine_errc::range_scan_more: return Status::RangeScanMore; + case engine_errc::too_many_connections: + return Status::RateLimitedMaxConnections; + case engine_errc::would_block: case engine_errc::disconnect: case engine_errc::predicate_failed: