From c7ce4bf96f96da40da14d0394a97fc95f10f7576 Mon Sep 17 00:00:00 2001 From: Trond Norbye Date: Tue, 14 Dec 2021 10:02:42 +0100 Subject: [PATCH] MB-50078: BP of M B 47707 - Enforce TLS Backport of MB-47707. Close connections once the parent port gets deleted. Change-Id: I07cbd58100ebca4b80e3ce94f0306a5825e01b11 Reviewed-on: https://review.couchbase.org/c/kv_engine/+/167545 Well-Formed: Restriction Checker Reviewed-by: Jim Walker Tested-by: Build Bot --- daemon/connection.cc | 67 ++++++++++++++++++-- daemon/connection.h | 16 +++-- daemon/connections.cc | 10 +-- daemon/connections.h | 2 +- daemon/front_end_thread.h | 11 ++-- daemon/listening_port.h | 6 +- daemon/memcached.cc | 7 +++ daemon/memcached.h | 2 +- daemon/server_socket.cc | 1 + daemon/settings.cc | 22 ++++++- daemon/settings.h | 16 +++++ daemon/thread.cc | 17 ++--- protocol/connection/client_connection.cc | 2 +- protocol/connection/client_connection.h | 2 +- tests/testapp/testapp_interfaces.cc | 79 +++++++++++++++++++++++- 15 files changed, 222 insertions(+), 38 deletions(-) diff --git a/daemon/connection.cc b/daemon/connection.cc index 72785a3226..3737fbf916 100644 --- a/daemon/connection.cc +++ b/daemon/connection.cc @@ -150,7 +150,7 @@ nlohmann::json Connection::toJSON() const { ret["protocol"] = "memcached"; ret["peername"] = getPeername().c_str(); ret["sockname"] = getSockname().c_str(); - ret["parent_port"] = parent_port; + ret["parent_port"] = listening_port->port; ret["bucket_index"] = getBucketIndex(); ret["internal"] = isInternal(); @@ -308,6 +308,10 @@ cb::engine_errc Connection::dropPrivilege(cb::rbac::Privilege privilege) { return cb::engine_errc::no_access; } +in_port_t Connection::getParentPort() const { + return listening_port->port; +} + cb::rbac::PrivilegeAccess Connection::checkPrivilege( cb::rbac::Privilege privilege, Cookie& cookie) { cb::rbac::PrivilegeAccess ret; @@ -1404,13 +1408,13 @@ Connection::Connection(FrontEndThread& thr) Connection::Connection(SOCKET sfd, event_base* b, - const ListeningPort& ifc, + std::shared_ptr ifc, FrontEndThread& thr) : socketDescriptor(sfd), - connectedToSystemPort(ifc.system), + connectedToSystemPort(ifc->system), base(b), thread(thr), - parent_port(ifc.port), + listening_port(std::move(ifc)), peername(cb::net::getPeerNameAsJson(socketDescriptor).dump()), sockname(cb::net::getSockNameAsJson(socketDescriptor).dump()), stateMachine(*this), @@ -1422,8 +1426,8 @@ Connection::Connection(SOCKET sfd, msglist.reserve(MSG_LIST_INITIAL); iov.resize(IOV_LIST_INITIAL); - if (ifc.isSslPort()) { - if (!enableSSL(ifc.sslCert, ifc.sslKey)) { + if (listening_port->isSslPort()) { + if (!enableSSL(listening_port->sslCert, listening_port->sslKey)) { throw std::runtime_error(std::to_string(getId()) + " Failed to enable SSL"); } @@ -1621,6 +1625,57 @@ void Connection::runEventLoop(short which) { conn_return_buffers(this); } +void Connection::reEvaluateParentPort() { + if (listening_port->valid) { + return; + } + + switch (getState()) { + case StateMachine::State::new_cmd: + case StateMachine::State::waiting: + case StateMachine::State::read_packet_header: + case StateMachine::State::parse_cmd: + case StateMachine::State::read_packet_body: + case StateMachine::State::validate: + case StateMachine::State::execute: + case StateMachine::State::send_data: + case StateMachine::State::ship_log: + break; + case StateMachine::State::closing: + case StateMachine::State::pending_close: + case StateMachine::State::immediate_close: + case StateMachine::State::destroyed: + return; + } + + bool localhost = false; + if (Settings::instance().isLocalhostInterfaceWhitelisted()) { + // Make sure we don't tear down localhost connections + if (listening_port->family == AF_INET) { + localhost = + peername.find(R"("ip":"127.0.0.1")") != std::string::npos; + } else { + localhost = peername.find(R"("ip":"::1")") != std::string::npos; + } + } + + if (localhost) { + LOG_INFO( + "{} Keeping connection alive even if server port was removed: " + "{}", + getId(), + getDescription()); + } else { + LOG_INFO("{} Shutting down; server port was removed: {}", + getId(), + getDescription()); + setTerminationReason("Server port shut down"); + + setState(StateMachine::State::closing); + signalIfIdle(); + } +} + bool Connection::close() { bool ewb = false; uint32_t rc = refcount; diff --git a/daemon/connection.h b/daemon/connection.h index 1622603dfa..bcae2e90ce 100644 --- a/daemon/connection.h +++ b/daemon/connection.h @@ -94,7 +94,7 @@ class Connection : public dcp_message_producers { Connection(SOCKET sfd, event_base* b, - const ListeningPort& ifc, + std::shared_ptr ifc, FrontEndThread& thr); ~Connection() override; @@ -248,9 +248,7 @@ class Connection : public dcp_message_producers { return thread; } - in_port_t getParentPort() const { - return parent_port; - } + in_port_t getParentPort() const; /** * Check if this connection is in posession of the requested privilege @@ -363,6 +361,10 @@ class Connection : public dcp_message_producers { return cb::engine_errc(remapErrorCode(ENGINE_ERROR_CODE(code))); } + /// Revaluate if the parent port is still valid or not (and if + /// we should shut down the connection or not). + void reEvaluateParentPort(); + /** * Add the specified number of ns to the amount of CPU time this * connection have used on the CPU (We could alternatively have @@ -1116,8 +1118,10 @@ class Connection : public dcp_message_producers { /** Pointer to the thread object serving this connection */ FrontEndThread& thread; - /** Listening port that creates this connection instance */ - const in_port_t parent_port{0}; + /// The description of the listening port which accepted the client + /// (needed in order to shut down the connection if the administrator + /// disables the port) + std::shared_ptr listening_port; /** * The index of the connected bucket diff --git a/daemon/connections.cc b/daemon/connections.cc index d4bae1c6b7..9b9a3a02c9 100644 --- a/daemon/connections.cc +++ b/daemon/connections.cc @@ -61,7 +61,7 @@ static void maybe_return_single_buffer(Connection& c, static void conn_destructor(Connection* c); static Connection* allocate_connection(SOCKET sfd, event_base* base, - const ListeningPort& interface, + std::shared_ptr interface, FrontEndThread& thread); static void release_connection(Connection* c); @@ -159,10 +159,10 @@ void run_event_loop(Connection* c, short which) { } Connection* conn_new(SOCKET sfd, - const ListeningPort& interface, + std::shared_ptr interface, struct event_base* base, FrontEndThread& thread) { - auto* c = allocate_connection(sfd, base, interface, thread); + auto* c = allocate_connection(sfd, base, std::move(interface), thread); if (c == nullptr) { return nullptr; } @@ -248,12 +248,12 @@ static void conn_destructor(Connection* c) { */ static Connection* allocate_connection(SOCKET sfd, event_base* base, - const ListeningPort& interface, + std::shared_ptr interface, FrontEndThread& thread) { Connection* ret = nullptr; try { - ret = new Connection(sfd, base, interface, thread); + ret = new Connection(sfd, base, std::move(interface), thread); std::lock_guard lock(connections.mutex); connections.conns.push_back(ret); stats.conn_structs++; diff --git a/daemon/connections.h b/daemon/connections.h index d319a1df49..1a77d86da7 100644 --- a/daemon/connections.h +++ b/daemon/connections.h @@ -70,7 +70,7 @@ void conn_return_buffers(Connection* c); * @return a connection object on success, nullptr otherwise */ Connection* conn_new(SOCKET sfd, - const ListeningPort& interface, + std::shared_ptr interface, struct event_base* base, FrontEndThread& thread); diff --git a/daemon/front_end_thread.h b/daemon/front_end_thread.h index caf4f1dc10..407f486cfe 100644 --- a/daemon/front_end_thread.h +++ b/daemon/front_end_thread.h @@ -41,8 +41,6 @@ class Connection; class ListeningPort; struct thread_stats; -using SharedListeningPort = std::shared_ptr; - struct FrontEndThread { /** * Pending IO requests for this thread. Maps each pending Connection to @@ -87,12 +85,15 @@ struct FrontEndThread { class ConnectionQueue { public: ~ConnectionQueue(); - void push(SOCKET socket, SharedListeningPort interface); - void swap(std::vector>& other); + void push(SOCKET socket, std::shared_ptr interface); + void swap( + std::vector>>& + other); protected: std::mutex mutex; - std::vector> connections; + std::vector>> + connections; } new_conn_queue; /// Mutex to lock protect access to this object. diff --git a/daemon/listening_port.h b/daemon/listening_port.h index d6d37f60bb..cbd3ada295 100644 --- a/daemon/listening_port.h +++ b/daemon/listening_port.h @@ -18,6 +18,7 @@ #pragma once #include +#include #include #include @@ -46,8 +47,6 @@ class ListeningPort { sslCert(std::move(cert)) { } - ListeningPort(const ListeningPort& other) = default; - /// The tag provided by the user to identify the port. It is possible /// to use ephemeral ports in the system, and if we want to change /// such ports at runtime the system needs a way to find the correct @@ -84,4 +83,7 @@ class ListeningPort { bool isSslPort() const { return !sslKey.empty() && !sslCert.empty(); } + + /// Set to false once the interface is being shut down + std::atomic_bool valid{true}; }; diff --git a/daemon/memcached.cc b/daemon/memcached.cc index b9bfc537dd..9c7c29fda4 100644 --- a/daemon/memcached.cc +++ b/daemon/memcached.cc @@ -887,6 +887,7 @@ static void dispatch_event_handler(evutil_socket_t fd, short, void *) { bool changes = false; auto interfaces = Settings::instance().getInterfaces(); + bool interfaces_dropped = false; // Step one, enable all new ports bool success = true; @@ -982,6 +983,7 @@ static void dispatch_event_handler(evutil_socket_t fd, short, void *) { // erase returns the element following this one (or end()) changes = true; iter = listen_conn.erase(iter); + interfaces_dropped = true; } else { // look at the next element ++iter; @@ -992,6 +994,11 @@ static void dispatch_event_handler(evutil_socket_t fd, short, void *) { if (changes) { create_portnumber_file(false); } + + if (interfaces_dropped) { + iterate_all_connections( + [](auto& conn) { conn.reEvaluateParentPort(); }); + } } if (is_listen_disabled()) { diff --git a/daemon/memcached.h b/daemon/memcached.h index c3ed231111..076d62c0d7 100644 --- a/daemon/memcached.h +++ b/daemon/memcached.h @@ -79,7 +79,7 @@ void threads_shutdown(); void threads_cleanup(); class ListeningPort; -void dispatch_conn_new(SOCKET sfd, std::shared_ptr& interface); +void dispatch_conn_new(SOCKET sfd, std::shared_ptr interface); /* Lock wrappers for cache functions that are called from main loop. */ int is_listen_thread(void); diff --git a/daemon/server_socket.cc b/daemon/server_socket.cc index 42f9163b47..3fd56b8dce 100644 --- a/daemon/server_socket.cc +++ b/daemon/server_socket.cc @@ -54,6 +54,7 @@ ServerSocket::ServerSocket(SOCKET fd, } ServerSocket::~ServerSocket() { + interface->valid.store(false, std::memory_order_release); std::string tagstr; if (!interface->tag.empty()) { tagstr = " \"" + interface->tag + "\""; diff --git a/daemon/settings.cc b/daemon/settings.cc index e9b9fb117f..23ab79d17c 100644 --- a/daemon/settings.cc +++ b/daemon/settings.cc @@ -686,6 +686,11 @@ static void handle_opentracing(Settings& s, const nlohmann::json& obj) { s.setOpenTracingConfig(std::make_shared(obj)); } +static void handle_whitelist_localhost_interface(Settings& s, + const nlohmann::json& obj) { + s.setWhitelistLocalhostInterface(obj.get()); +} + void Settings::reconfigure(const nlohmann::json& json) { // Nuke the default interface added to the system in settings_init and // use the ones in the configuration file.. (this is a bit messy) @@ -757,7 +762,9 @@ void Settings::reconfigure(const nlohmann::json& json) { handle_max_concurrent_commands_per_connection}, {"opentracing", handle_opentracing}, {"portnumber_file", handle_portnumber_file}, - {"parent_identifier", handle_parent_identifier}}; + {"parent_identifier", handle_parent_identifier}, + {"whitelist_localhost_interface", + handle_whitelist_localhost_interface}}; for (const auto& obj : json.items()) { bool found = false; @@ -1243,6 +1250,19 @@ void Settings::updateSettings(const Settings& other, bool apply) { } } + if (other.has.whitelist_localhost_interface) { + if (other.whitelist_localhost_interface != + whitelist_localhost_interface) { + LOG_INFO( + R"(Change whitelist of localhost interface from "{}" to "{}")", + isLocalhostInterfaceWhitelisted() ? "enabled" : "disabled", + other.isLocalhostInterfaceWhitelisted() ? "enabled" + : "disabled"); + setWhitelistLocalhostInterface( + other.isLocalhostInterfaceWhitelisted()); + } + } + if (other.has.max_concurrent_commands_per_connection) { if (other.getMaxConcurrentCommandsPerConnection() != getMaxConcurrentCommandsPerConnection()) { diff --git a/daemon/settings.h b/daemon/settings.h index 24887b9b8d..32bf638287 100644 --- a/daemon/settings.h +++ b/daemon/settings.h @@ -843,6 +843,16 @@ class Settings { notify_changed("num_writer_threads"); } + bool isLocalhostInterfaceWhitelisted() const { + return whitelist_localhost_interface.load(std::memory_order_acquire); + } + + void setWhitelistLocalhostInterface(bool val) { + whitelist_localhost_interface.store(val, std::memory_order_release); + has.whitelist_localhost_interface = true; + notify_changed("whitelist_localhost_interface"); + } + protected: /// Should the server always collect trace information for commands std::atomic_bool always_collect_trace_info{false}; @@ -1027,6 +1037,11 @@ class Settings { std::atomic num_reader_threads{0}; std::atomic num_writer_threads{0}; + /// If "localhost" is whitelisted from deleting connections as part + /// of server cleanup. This setting should only be used for unit + /// tests + std::atomic_bool whitelist_localhost_interface{true}; + public: /** * Flags for each of the above config options, indicating if they were @@ -1081,6 +1096,7 @@ class Settings { bool num_writer_threads = false; bool portnumber_file = false; bool parent_identifier = false; + bool whitelist_localhost_interface = false; } has; protected: diff --git a/daemon/thread.cc b/daemon/thread.cc index 37f661f39f..fbc8db7a1e 100644 --- a/daemon/thread.cc +++ b/daemon/thread.cc @@ -47,14 +47,14 @@ FrontEndThread::ConnectionQueue::~ConnectionQueue() { } } -void FrontEndThread::ConnectionQueue::push(SOCKET sock, - SharedListeningPort interface) { +void FrontEndThread::ConnectionQueue::push( + SOCKET sock, std::shared_ptr interface) { std::lock_guard guard(mutex); - connections.emplace_back(sock, interface); + connections.emplace_back(sock, std::move(interface)); } void FrontEndThread::ConnectionQueue::swap( - std::vector>& other) { + std::vector>>& other) { std::lock_guard guard(mutex); connections.swap(other); } @@ -244,11 +244,12 @@ void drain_notification_channel(evutil_socket_t fd) { } static void dispatch_new_connections(FrontEndThread& me) { - std::vector> connections; + std::vector>> connections; me.new_conn_queue.swap(connections); for (const auto& entry : connections) { - if (conn_new(entry.first, *entry.second, me.base, me) == nullptr) { + if (conn_new(entry.first, std::move(entry.second), me.base, me) == + nullptr) { if (entry.second->system) { --stats.system_conns; } @@ -388,13 +389,13 @@ static size_t last_thread = 0; * Dispatches a new connection to another thread. This is only ever called * from the main thread, or because of an incoming connection. */ -void dispatch_conn_new(SOCKET sfd, SharedListeningPort& interface) { +void dispatch_conn_new(SOCKET sfd, std::shared_ptr interface) { size_t tid = (last_thread + 1) % Settings::instance().getNumWorkerThreads(); auto& thread = threads[tid]; last_thread = tid; try { - thread.new_conn_queue.push(sfd, interface); + thread.new_conn_queue.push(sfd, std::move(interface)); } catch (const std::bad_alloc& e) { LOG_WARNING("dispatch_conn_new: Failed to dispatch new connection: {}", e.what()); diff --git a/protocol/connection/client_connection.cc b/protocol/connection/client_connection.cc index a1a566d63e..25e39e8b47 100644 --- a/protocol/connection/client_connection.cc +++ b/protocol/connection/client_connection.cc @@ -708,7 +708,7 @@ static Frame to_frame(const BinprotCommand& command) { return frame; } -std::unique_ptr MemcachedConnection::clone() { +std::unique_ptr MemcachedConnection::clone() const { auto result = std::make_unique( this->host, this->port, this->family, this->ssl); result->auto_retry_tmpfail = this->auto_retry_tmpfail; diff --git a/protocol/connection/client_connection.h b/protocol/connection/client_connection.h index bc52dc5a79..67ccefe03b 100644 --- a/protocol/connection/client_connection.h +++ b/protocol/connection/client_connection.h @@ -291,7 +291,7 @@ class MemcachedConnection { // Creates clone (copy) of the given connection - i.e. a second independent // channel to memcached. Used for multi-connection testing. - std::unique_ptr clone(); + std::unique_ptr clone() const; std::string getName() const { return name; diff --git a/tests/testapp/testapp_interfaces.cc b/tests/testapp/testapp_interfaces.cc index 4082ad6c16..a19d9f603f 100644 --- a/tests/testapp/testapp_interfaces.cc +++ b/tests/testapp/testapp_interfaces.cc @@ -21,7 +21,10 @@ * be dynamically changed. */ -class InterfacesTest : public TestappClientTest {}; +class InterfacesTest : public TestappClientTest { +protected: + void test_mb47707(bool whitelist_localhost_interface); +}; INSTANTIATE_TEST_CASE_P(TransportProtocols, InterfacesTest, @@ -202,3 +205,77 @@ TEST_P(InterfacesTest, AFamilyChangeInterface) { reconfigure(); parse_portnumber_file(); } + +void InterfacesTest::test_mb47707(bool whitelist_localhost_interface) { + memcached_cfg["whitelist_localhost_interface"] = + whitelist_localhost_interface; + reconfigure(); + + auto interfaces = memcached_cfg["interfaces"]; + memcached_cfg["interfaces"].emplace_back( + nlohmann::json{{"tag", "MB-47707"}, + {"port", 0}, + {"ipv4", "required"}, + {"ipv6", "off"}, + {"host", "*"}}); + reconfigure(); + parse_portnumber_file(); + + std::unique_ptr connection; + connectionMap.iterate([&connection](const MemcachedConnection& c) { + if (c.getTag() == "MB-47707") { + connection = c.clone(); + } + }); + ASSERT_TRUE(connection) << "Failed to locate the new connection"; + auto rsp = connection->execute( + BinprotGenericCommand{cb::mcbp::ClientOpcode::SaslListMechs}); + ASSERT_TRUE(rsp.isSuccess()) << "Status: " << to_string(rsp.getStatus()) + << " message " << rsp.getDataString(); + + // Remove the interface! + memcached_cfg["interfaces"] = interfaces; + reconfigure(); + parse_portnumber_file(); + + bool found = false; + connectionMap.iterate([&found](const MemcachedConnection& c) { + if (c.getTag() == "MB-47707") { + found = true; + } + }); + ASSERT_FALSE(found) << "The port should have been gone"; + + if (whitelist_localhost_interface) { + // The connection should not be disconnected + rsp = connection->execute( + BinprotGenericCommand{cb::mcbp::ClientOpcode::SaslListMechs}); + ASSERT_TRUE(rsp.isSuccess()) << "Status: " << to_string(rsp.getStatus()) + << " message " << rsp.getDataString(); + } else { + // The connection should be disconnected + try { + rsp = connection->execute(BinprotGenericCommand{ + cb::mcbp::ClientOpcode::SaslListMechs}); + FAIL() << "Expected the connection to be disconnected.\n" + << "Status: " << to_string(rsp.getStatus()) + << "\nmessage: " << rsp.getDataString(); + } catch (const std::system_error& error) { + // we should probably have checked if the error code is + // conn-reset, but then again that may be different on windows + // mac and linux... + } + } +} + +/// Verify that we don't disconnect localhost connections as part of +/// interface deletion if they're bound to localhost +TEST_P(InterfacesTest, MB_47707_LocalhostWhitelisted) { + test_mb47707(true); +} + +/// Verify that we disconnect localhost connections as part of +/// interface deletion even if they're bound to localhost +TEST_P(InterfacesTest, MB_47707_LocalhostNotWhitelisted) { + test_mb47707(false); +}