Skip to content

Commit

Permalink
MB-51914: Set a max limit on #connections to a bucket
Browse files Browse the repository at this point in the history
Only allow external clients to select a bucket if the total
number of connections bound to the limit is lower than 600.

Once the requirements for the limit (is it a global limit
or may it be set differently across the buckets etc) we
need to implement a way to set/tune the limit.

Change-Id: Ife4a37bd2e34f7a8e0574145730797a068539ece
Reviewed-on: https://review.couchbase.org/c/kv_engine/+/174391
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: Trond Norbye <trond.norbye@couchbase.com>
  • Loading branch information
trondn committed May 5, 2022
1 parent 5b2a789 commit b182130
Show file tree
Hide file tree
Showing 11 changed files with 126 additions and 1 deletion.
1 change: 1 addition & 0 deletions daemon/connection.cc
Expand Up @@ -347,6 +347,7 @@ cb::engine_errc Connection::remapErrorCode(cb::engine_errc code) {
case cb::engine_errc::opaque_no_match:
return code;

case cb::engine_errc::too_many_connections:
case cb::engine_errc::scope_size_limit_exceeded:
return cb::engine_errc::too_big;

Expand Down
15 changes: 15 additions & 0 deletions daemon/memcached.cc
Expand Up @@ -55,6 +55,7 @@
#include <platform/strerror.h>
#include <platform/sysinfo.h>
#include <platform/timeutils.h>
#include <serverless/config.h>
#include <statistics/prometheus.h>
#include <utilities/breakpad.h>
#include <utilities/openssl_utils.h>
Expand All @@ -70,6 +71,13 @@
#include <numa.h>
#endif

namespace cb::serverless {
Config& Config::instance() {
static Config instance;
return instance;
}
} // namespace cb::serverless

std::atomic<bool> memcached_shutdown;
std::atomic<bool> sigint;
std::atomic<bool> sigterm;
Expand Down Expand Up @@ -444,6 +452,13 @@ static std::string configure_numa_policy() {
#endif // HAVE_LIBNUMA

static void settings_init() {
if (getenv("MEMCACHED_UNIT_TESTS")) {
auto& config = cb::serverless::Config::instance();
// Reduce the max number so we don't have to create so many connections
config.maxConnectionsPerBucket =
cb::serverless::test::MaxConnectionsPerBucket;
}

auto& settings = Settings::instance();

// Set up the listener functions
Expand Down
1 change: 1 addition & 0 deletions daemon/protocol/mcbp/executors.cc
Expand Up @@ -69,6 +69,7 @@ void handle_executor_status(Cookie& cookie, cb::engine_errc status) {
case engine_errc::scope_size_limit_exceeded:
case engine_errc::range_scan_cancelled:
case engine_errc::range_scan_more:
case engine_errc::too_many_connections:
cookie.sendResponse(mapped);
break;
}
Expand Down
14 changes: 14 additions & 0 deletions daemon/protocol/mcbp/select_bucket_executor.cc
Expand Up @@ -14,9 +14,11 @@
#include <daemon/buckets.h>
#include <daemon/cookie.h>
#include <daemon/memcached.h>
#include <daemon/settings.h>
#include <logger/logger.h>
#include <mcbp/protocol/request.h>
#include <platform/scope_timer.h>
#include <serverless/config.h>
#include <utilities/engine_errc_2_mcbp.h>

cb::engine_errc select_bucket(Cookie& cookie, const std::string& bucketname) {
Expand Down Expand Up @@ -51,6 +53,18 @@ cb::engine_errc select_bucket(Cookie& cookie, const std::string& bucketname) {
return cb::engine_errc::not_supported;
}

if (isServerlessDeployment() && !connection.isInternal()) {
using cb::serverless::Config;
if (connection.getBucket().clients >=
Config::instance().maxConnectionsPerBucket) {
if (oldIndex != connection.getBucketIndex()) {
associate_bucket(cookie, all_buckets[oldIndex].name);
}
cookie.setErrorContext("Too many bucket connections");
return cb::engine_errc::too_many_connections;
}
}

connection.setPushedClustermapRevno({});
return cb::engine_errc::success;
}
Expand Down
5 changes: 5 additions & 0 deletions daemon/settings.h
Expand Up @@ -1024,3 +1024,8 @@ class Settings {
bool write_compute_unit_size = false;
} has;
};

static inline bool isServerlessDeployment() {
return Settings::instance().getDeploymentModel() ==
DeploymentModel::Serverless;
}
1 change: 1 addition & 0 deletions engines/ep/tests/module_tests/range_scan_test.cc
Expand Up @@ -515,6 +515,7 @@ bool TestRangeScanHandler::validateStatus(cb::engine_errc code) {
case cb::engine_errc::not_supported:
case cb::engine_errc::would_block:
case cb::engine_errc::too_big:
case cb::engine_errc::too_many_connections:
case cb::engine_errc::disconnect:
case cb::engine_errc::no_access:
case cb::engine_errc::temporary_failure:
Expand Down
2 changes: 2 additions & 0 deletions engines/utilities/engine_error.cc
Expand Up @@ -117,6 +117,8 @@ std::string cb::to_string(cb::engine_errc code) {
return "range scan cancelled";
case engine_errc::range_scan_more:
return "range scan more";
case engine_errc::too_many_connections:
return "too many connections";
};
throw std::invalid_argument(
"engine_error_category::message: code does not represent a "
Expand Down
3 changes: 3 additions & 0 deletions include/memcached/engine_error.h
Expand Up @@ -133,6 +133,9 @@ enum class engine_errc {
/// RangeScan has more data available
range_scan_more = 0x27,

/// Too many connections
too_many_connections = 0x28,

/** Generic failue. */
failed = 0xff
};
Expand Down
26 changes: 26 additions & 0 deletions include/serverless/config.h
@@ -0,0 +1,26 @@
/*
* Copyright 2022-Present Couchbase, Inc.
*
* Use of this software is governed by the Business Source License included
* in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
* in that file, in accordance with the Business Source License, use of this
* software will be governed by the Apache License, Version 2.0, included in
* the file licenses/APL2.txt.
*/

#pragma once

#include <cstddef>

namespace cb::serverless {
struct Config {
static Config& instance();
/// The maximum number of (external) connections for a bucket
size_t maxConnectionsPerBucket = {600};
};

namespace test {
constexpr size_t MaxConnectionsPerBucket = 16;
}

} // namespace cb::serverless
56 changes: 55 additions & 1 deletion tests/testapp_serverless/serverless_test.cc
Expand Up @@ -15,6 +15,8 @@
#include <cluster_framework/cluster.h>
#include <protocol/connection/client_connection.h>
#include <protocol/connection/client_mcbp_commands.h>
#include <serverless/config.h>
#include <deque>

namespace cb::test {

Expand Down Expand Up @@ -45,7 +47,10 @@ void ServerlessTest::SetUpTestCase() {
"bucket-@": {
"privileges": [
"Read",
"Write"
"SimpleStats",
"Insert",
"Delete",
"Upsert"
]
}
},
Expand Down Expand Up @@ -85,4 +90,53 @@ void ServerlessTest::SetUp() {
void ServerlessTest::TearDown() {
Test::TearDown();
}

TEST_F(ServerlessTest, MaxConnectionPerBucket) {
using namespace cb::serverless::test;
auto admin = cluster->getConnection(0);
admin->authenticate("@admin", "password");
auto getNumClients = [&admin]() -> std::size_t {
size_t num_clients = 0;
admin->stats(
[&num_clients](const auto& k, const auto& v) {
nlohmann::json json = nlohmann::json::parse(v);
num_clients = json["clients"].get<size_t>();
},
"bucket_details bucket-0");
return num_clients;
};

std::deque<std::unique_ptr<MemcachedConnection>> connections;
bool done = false;
BinprotResponse rsp;
do {
auto conn = cluster->getConnection(0);
conn->authenticate("bucket-0", "bucket-0");
rsp = conn->execute(BinprotGenericCommand{
cb::mcbp::ClientOpcode::SelectBucket, "bucket-0"});
if (rsp.isSuccess()) {
connections.emplace_back(std::move(conn));
ASSERT_LE(getNumClients(), MaxConnectionsPerBucket);
} else {
ASSERT_EQ(cb::mcbp::Status::RateLimitedMaxConnections,
rsp.getStatus());
// Without XERROR E2BIG should be returned
conn->setXerrorSupport(false);
rsp = conn->execute(BinprotGenericCommand{
cb::mcbp::ClientOpcode::SelectBucket, "bucket-0"});
ASSERT_FALSE(rsp.isSuccess());
ASSERT_EQ(cb::mcbp::Status::E2big, rsp.getStatus());
done = true;
}
} while (!done);

// But we should be allowed to connect internal users
for (int ii = 0; ii < 5; ++ii) {
auto conn = cluster->getConnection(0);
conn->authenticate("@admin", "password");
conn->selectBucket("bucket-0");
connections.emplace_back(std::move(conn));
}
EXPECT_EQ(MaxConnectionsPerBucket + 5, getNumClients());
}
} // namespace cb::test
3 changes: 3 additions & 0 deletions utilities/engine_errc_2_mcbp.cc
Expand Up @@ -82,6 +82,9 @@ cb::mcbp::Status cb::mcbp::to_status(cb::engine_errc code) {
return Status::RangeScanCancelled;
case engine_errc::range_scan_more:
return Status::RangeScanMore;
case engine_errc::too_many_connections:
return Status::RateLimitedMaxConnections;

case engine_errc::would_block:
case engine_errc::disconnect:
case engine_errc::predicate_failed:
Expand Down

0 comments on commit b182130

Please sign in to comment.