Skip to content

Commit

Permalink
MB-32704: Remove per-port setting of max connections
Browse files Browse the repository at this point in the history
*Problem*

Memcached used to have a “per port” setting of the maximum number
of client which may be connected to the port. The motivation
behind that was that we wanted to keep a pool of connections
available to make sure that ns_server could connect to the
system (via the 11209 port). Later on when we added support for
SSL we didn’t have time to look at the overall model, we just
copied the “per port” setting into the new SSL connection. This
leads into the following “problem” with the current
configuration:

    I can have 5000 connections to 11209 (plain, ipv4/6)
    I can have 30 000 connections to 11210 (plain, ipv4/6)
    I can have 30 000 connections to 11207 (SSL, ipv4/6)

In a deployment which use a mix of SSL and plain clients one may
use 60k clients connected to the system (30k of each type), but
deployments which only use PLAIN connections may only use 30k
connections (trying to use 30 001 would fail, even if we don’t
use a single SSL connection).

*Solution*

The configuration file provided to memcached contains two new
toplevel keys in the configuration:

    max_connections    - A number containing the maximum number
                         of connections allowed to memcached
                        (65k if we use the values in the example
                        above)

    system_connections - A number reserved for users authenticated
                         as system users. (5k if we use the values
                         in the example above)

memcached continues to accept all sockets until we reach
max_connections, at that time it'll accept the socket and
immediately close the socket.

To make sure that the normal client's don't occupy all of the
connections to the server, memcached performs checks in the
validation phase to determine if connections needs to be
closed by using the following logic:

     1. If the connection is represent a system-internal user,
        leave the connection alone and continue to execute the
        command.

     2. If we've exceeded the number of "user" connections (60k
        in the example above) and used more than half of the system
        connections the connection is disconnected if it is
        authenticated or the commad being executed isn't one of
        Hello, SaslListMech, SaslAuth or SaslStep.

Change-Id: I3aec178b48f45fa055efb465ca9cea59fd71a895
Reviewed-on: http://review.couchbase.org/103761
Reviewed-by: Jim Walker <jim@couchbase.com>
Tested-by: Build Bot <build@couchbase.com>
  • Loading branch information
trondn committed Jan 24, 2019
1 parent c486147 commit 9850da3
Show file tree
Hide file tree
Showing 15 changed files with 236 additions and 146 deletions.
15 changes: 14 additions & 1 deletion daemon/connection.cc
Expand Up @@ -244,7 +244,7 @@ void Connection::restartAuthentication() {
externalAuthManager->logoff(username);
}
sasl_conn.reset();
internal = false;
setInternal(false);
authenticated = false;
username = "";
}
Expand Down Expand Up @@ -1273,6 +1273,9 @@ Connection::Connection(SOCKET sfd, event_base* b, const ListeningPort& ifc)
}

Connection::~Connection() {
if (internal) {
--stats.system_conns;
}
if (authenticated && domain == cb::sasl::Domain::External) {
externalAuthManager->logoff(username);
}
Expand Down Expand Up @@ -1322,6 +1325,16 @@ bool Connection::shouldDelete() {
return getState() == StateMachine::State ::destroyed;
}

void Connection::setInternal(bool internal) {
if (Connection::internal) {
--stats.system_conns;
}
Connection::internal = internal;
if (internal) {
++stats.system_conns;
}
}

size_t Connection::getNumberOfCookies() const {
size_t ret = 0;
for (const auto& cookie : cookies) {
Expand Down
4 changes: 1 addition & 3 deletions daemon/connection.h
Expand Up @@ -173,9 +173,7 @@ class Connection : public dcp_message_producers {
* An internal user is a user which is used by one of the components
* in Couchbase (like ns_server, indexer etc).
*/
void setInternal(bool internal) {
Connection::internal = internal;
}
void setInternal(bool internal);

/**
* Update the username to reflect what the user used from the SASL
Expand Down
2 changes: 0 additions & 2 deletions daemon/listening_port.cc
Expand Up @@ -19,8 +19,6 @@

ListeningPort::ListeningPort(in_port_t port, std::string host, bool tcp_nodelay)
: port(port),
curr_conns(1),
maxconns(0),
host(std::move(host)),
ipv6(false),
ipv4(false),
Expand Down
6 changes: 0 additions & 6 deletions daemon/listening_port.h
Expand Up @@ -41,12 +41,6 @@ class ListeningPort {
*/
const in_port_t port;

/** The current number of connections connected to this port */
int curr_conns;

/** The maximum number of connections allowed for this port */
int maxconns;

/** The hostname this port is bound to ("*" means all interfaces) */
const std::string host;

Expand Down
77 changes: 41 additions & 36 deletions daemon/memcached.cc
Expand Up @@ -132,6 +132,11 @@ std::unique_ptr<ExecutorPool> executorPool;
/* Mutex for global stats */
std::mutex stats_mutex;

/// The maximum number of file handles we may have. During startup
/// we'll try to increase the allowed number of file handles to the
/// limit specified for the current user.
static size_t max_file_handles;

/*
* forward declarations
*/
Expand Down Expand Up @@ -430,6 +435,33 @@ static size_t get_number_of_worker_threads() {
return ret;
}

/// We might not support as many connections as requested if
/// we don't have enough file descriptors available
static void recalculate_max_connections() {
const auto maxconn = settings.getMaxConnections();
const auto system = (3 * (settings.getNumWorkerThreads() + 2)) + 1024;
const uint64_t maxfiles = maxconn + system;

if (max_file_handles < maxfiles) {
const auto newmax = max_file_handles - system;
settings.setMaxConnections(newmax, false);
LOG_WARNING(
"max_connections is set higher than the available number of "
"file descriptors available. Reduce max_connections to: {}",
newmax);

if (newmax > settings.getSystemConnections()) {
LOG_WARNING(
"system_connections > max_connections. Reduce "
"system_connections to {}",
settings.getSystemConnections(),
newmax,
newmax / 2);
settings.setSystemConnections(newmax / 2);
}
}
}

static void breakpad_changed_listener(const std::string&, Settings &s) {
cb::breakpad::initialize(s.getBreakpadSettings());
}
Expand Down Expand Up @@ -473,12 +505,10 @@ static void interfaces_changed_listener(const std::string&, Settings &s) {
for (const auto& ifc : s.getInterfaces()) {
auto* port = get_listening_port_instance(ifc.port);
if (port != nullptr) {
port->maxconns = ifc.maxconn;
port->tcp_nodelay = ifc.tcp_nodelay;
port->setSslSettings(ifc.ssl.key, ifc.ssl.cert);
}
}
s.calculateMaxconns();
}

#ifdef HAVE_LIBNUMA
Expand Down Expand Up @@ -516,6 +546,10 @@ static void settings_init() {
// Set up the listener functions
settings.addChangeListener("breakpad",
breakpad_changed_listener);
settings.addChangeListener("max_connections",
[](const std::string&, Settings& s) -> void {
recalculate_max_connections();
});
settings.addChangeListener("ssl_minimum_protocol",
ssl_minimum_protocol_changed_listener);
settings.addChangeListener("ssl_cipher_list",
Expand Down Expand Up @@ -978,9 +1012,6 @@ static void add_listening_port(const NetworkInterface *interf, in_port_t port, s
if (descr == nullptr) {
ListeningPort newport(port, interf->host, interf->tcp_nodelay);

newport.curr_conns = 1;
newport.maxconns = interf->maxconn;

newport.setSslSettings(interf->ssl.key, interf->ssl.cert);

if (family == AF_INET) {
Expand All @@ -998,7 +1029,6 @@ static void add_listening_port(const NetworkInterface *interf, in_port_t port, s
} else if (family == AF_INET6) {
descr->ipv6 = true;
}
++descr->curr_conns;
}
}

Expand Down Expand Up @@ -2119,32 +2149,6 @@ void delete_all_buckets() {
} while (!done);
}

static void set_max_filehandles() {
const uint64_t maxfiles = settings.getMaxconns() +
(3 * (settings.getNumWorkerThreads() + 2)) +
1024;

auto limit = cb::io::maximizeFileDescriptors(maxfiles);

if (limit < maxfiles) {
LOG_WARNING(
"Failed to set the number of file descriptors "
"to {} due to system resource restrictions. "
"This may cause the system to misbehave once you reach a "
"high connection count as the system won't be able open "
"new files on the system. The maximum number of file "
"descriptors is currently set to {}. The system "
"is configured to allow {} number of client connections, "
"and in addition to that the overhead of the worker "
"threads is {}. Finally the backed database needs to "
"open files to persist data.",
int(maxfiles),
int(limit),
settings.getMaxconns(),
(3 * (settings.getNumWorkerThreads() + 2)));
}
}

/**
* The log function used from SASL
*
Expand Down Expand Up @@ -2197,6 +2201,10 @@ extern "C" int memcached_main(int argc, char **argv) {
// allocation).
const std::string numa_status = configure_numa_policy();
#endif

max_file_handles = cb::io::maximizeFileDescriptors(
std::numeric_limits<uint32_t>::max());

std::unique_ptr<ParentMonitor> parent_monitor;

try {
Expand Down Expand Up @@ -2302,10 +2310,7 @@ extern "C" int memcached_main(int argc, char **argv) {
/* inform interested parties of initial verbosity level */
perform_callbacks(ON_LOG_LEVEL, nullptr, nullptr);

set_max_filehandles();

/* Aggregate the maximum number of connections */
settings.calculateMaxconns();
recalculate_max_connections();

if (getenv("MEMCACHED_CRASH_TEST")) {
// The crash tests wants the system to generate a crash.
Expand Down
6 changes: 0 additions & 6 deletions daemon/network_interface.cc
Expand Up @@ -24,11 +24,6 @@
#include <nlohmann/json.hpp>
#include <utilities/json_utilities.h>

static void handle_interface_maxconn(NetworkInterface& ifc,
nlohmann::json::const_iterator it) {
ifc.maxconn = gsl::narrow<int>(cb::jsonGet<size_t>(it));
}

static void handle_interface_port(NetworkInterface& ifc,
nlohmann::json::const_iterator it) {
ifc.port = in_port_t(cb::jsonGet<size_t>(it));
Expand Down Expand Up @@ -133,7 +128,6 @@ NetworkInterface::NetworkInterface(const nlohmann::json& json) {
};

std::vector<interface_config_tokens> handlers = {
{"maxconn", handle_interface_maxconn},
{"port", handle_interface_port},
{"host", handle_interface_host},
{"ipv4", handle_interface_ipv4},
Expand Down
1 change: 0 additions & 1 deletion daemon/network_interface.h
Expand Up @@ -39,7 +39,6 @@ class NetworkInterface {
std::string key;
std::string cert;
} ssl;
int maxconn = 1000;
in_port_t port = 11211;
Protocol ipv6 = Protocol::Optional;
Protocol ipv4 = Protocol::Optional;
Expand Down
11 changes: 4 additions & 7 deletions daemon/protocol/mcbp/stats_context.cc
Expand Up @@ -177,13 +177,10 @@ static ENGINE_ERROR_CODE server_stats(ADD_STAT add_stat_callback,
stats.daemon_conns);
add_stat(cookie, add_stat_callback, "curr_connections",
stats.curr_conns.load(std::memory_order_relaxed));
for (auto& instance : stats.listening_ports) {
std::string key =
"max_conns_on_port_" + std::to_string(instance.port);
add_stat(cookie, add_stat_callback, key.c_str(), instance.maxconns);
key = "curr_conns_on_port_" + std::to_string(instance.port);
add_stat(cookie, add_stat_callback, key.c_str(), instance.curr_conns);
}
add_stat(cookie,
add_stat_callback,
"system_connections",
stats.system_conns.load(std::memory_order_relaxed));
add_stat(cookie, add_stat_callback, "total_connections", stats.total_conns);
add_stat(cookie, add_stat_callback, "connection_structures",
stats.conn_structs);
Expand Down
30 changes: 5 additions & 25 deletions daemon/server_socket.cc
Expand Up @@ -128,41 +128,21 @@ void ServerSocket::acceptNewClient() {
return;
}

int port_conns;
ListeningPort* port_instance;
int curr_conns = stats.curr_conns.fetch_add(1, std::memory_order_relaxed);
{
std::lock_guard<std::mutex> guard(stats_mutex);
port_instance = get_listening_port_instance(listen_port);
cb_assert(port_instance);
port_conns = ++port_instance->curr_conns;
}
size_t curr_conns =
stats.curr_conns.fetch_add(1, std::memory_order_relaxed);

if (curr_conns >= settings.getMaxconns() ||
port_conns >= port_instance->maxconns) {
{
std::lock_guard<std::mutex> guard(stats_mutex);
--port_instance->curr_conns;
}
if (curr_conns >= settings.getMaxConnections()) {
stats.rejected_conns++;
LOG_WARNING(
"Too many open connections. Current/Limit for port "
"{}: {}/{}; total: {}/{}",
port_instance->port,
port_conns,
port_instance->maxconns,
R"(Too many open connections. total: {}/{})",
curr_conns,
settings.getMaxconns());
settings.getMaxConnections());

safe_close(client);
return;
}

if (cb::net::set_socket_noblocking(client) == -1) {
{
std::lock_guard<std::mutex> guard(stats_mutex);
--port_instance->curr_conns;
}
LOG_WARNING("Failed to make socket non-blocking. closing it");
safe_close(client);
return;
Expand Down
50 changes: 37 additions & 13 deletions daemon/settings.cc
Expand Up @@ -55,9 +55,7 @@ Settings::Settings()
reqs_per_event_low_priority(0),
default_reqs_per_event(00),
max_packet_size(0),
topkeys_size(0),
maxconns(0) {

topkeys_size(0) {
verbose.store(0);
connection_idle_time.reset();
dedupe_nmvb_maps.store(false);
Expand Down Expand Up @@ -435,6 +433,22 @@ static void handle_max_packet_size(Settings& s, const nlohmann::json& obj) {
1024);
}

static void handle_max_connections(Settings& s, const nlohmann::json& obj) {
if (!obj.is_number_unsigned()) {
cb::throwJsonTypeError(
R"("max_connections" must be a positive number)");
}
s.setMaxConnections(obj.get<size_t>());
}

static void handle_system_connections(Settings& s, const nlohmann::json& obj) {
if (!obj.is_number_unsigned()) {
cb::throwJsonTypeError(
R"("system_connections" must be a positive number)");
}
s.setSystemConnections(obj.get<size_t>());
}

/**
* Handle the "sasl_mechanisms" tag in the settings
*
Expand Down Expand Up @@ -606,6 +620,8 @@ void Settings::reconfigure(const nlohmann::json& json) {
{"ssl_minimum_protocol", handle_ssl_minimum_protocol},
{"breakpad", handle_breakpad},
{"max_packet_size", handle_max_packet_size},
{"max_connections", handle_max_connections},
{"system_connections", handle_system_connections},
{"sasl_mechanisms", handle_sasl_mechanisms},
{"ssl_sasl_mechanisms", handle_ssl_sasl_mechanisms},
{"stdin_listener", handle_stdin_listener},
Expand Down Expand Up @@ -876,6 +892,24 @@ void Settings::updateSettings(const Settings& other, bool apply) {
}
}

if (other.has.max_connections) {
if (other.max_connections != max_connections) {
LOG_INFO(R"(Change max connections from {} to {})",
max_connections,
other.max_connections);
setMaxConnections(other.max_connections);
}
}

if (other.has.system_connections) {
if (other.system_connections != system_connections) {
LOG_INFO(R"(Change max connections from {} to {})",
system_connections,
other.system_connections);
setSystemConnections(other.system_connections);
}
}

if (other.has.xattr_enabled) {
if (other.xattr_enabled != xattr_enabled) {
LOG_INFO("{} XATTR",
Expand Down Expand Up @@ -906,16 +940,6 @@ void Settings::updateSettings(const Settings& other, bool apply) {
continue;
}

if (i2.maxconn != i1.maxconn) {
LOG_INFO("Change max connections for {}:{} from {} to {}",
i1.host,
i1.port,
i1.maxconn,
i2.maxconn);
i1.maxconn = i2.maxconn;
changed = true;
}

if (i2.tcp_nodelay != i1.tcp_nodelay) {
LOG_INFO("{} TCP NODELAY for {}:{}",
i2.tcp_nodelay ? "Enable" : "Disable",
Expand Down

0 comments on commit 9850da3

Please sign in to comment.