Skip to content

Commit

Permalink
MB-50078: BP of M B 47707 - Enforce TLS
Browse files Browse the repository at this point in the history
Backport of MB-47707. Close connections once the parent
port gets deleted.

Change-Id: I07cbd58100ebca4b80e3ce94f0306a5825e01b11
Reviewed-on: https://review.couchbase.org/c/kv_engine/+/167545
Well-Formed: Restriction Checker
Reviewed-by: Jim Walker <jim@couchbase.com>
Tested-by: Build Bot <build@couchbase.com>
  • Loading branch information
trondn authored and jimwwalker committed Dec 22, 2021
1 parent 515dce7 commit c7ce4bf
Show file tree
Hide file tree
Showing 15 changed files with 222 additions and 38 deletions.
67 changes: 61 additions & 6 deletions daemon/connection.cc
Expand Up @@ -150,7 +150,7 @@ nlohmann::json Connection::toJSON() const {
ret["protocol"] = "memcached";
ret["peername"] = getPeername().c_str();
ret["sockname"] = getSockname().c_str();
ret["parent_port"] = parent_port;
ret["parent_port"] = listening_port->port;
ret["bucket_index"] = getBucketIndex();
ret["internal"] = isInternal();

Expand Down Expand Up @@ -308,6 +308,10 @@ cb::engine_errc Connection::dropPrivilege(cb::rbac::Privilege privilege) {
return cb::engine_errc::no_access;
}

in_port_t Connection::getParentPort() const {
return listening_port->port;
}

cb::rbac::PrivilegeAccess Connection::checkPrivilege(
cb::rbac::Privilege privilege, Cookie& cookie) {
cb::rbac::PrivilegeAccess ret;
Expand Down Expand Up @@ -1404,13 +1408,13 @@ Connection::Connection(FrontEndThread& thr)

Connection::Connection(SOCKET sfd,
event_base* b,
const ListeningPort& ifc,
std::shared_ptr<ListeningPort> ifc,
FrontEndThread& thr)
: socketDescriptor(sfd),
connectedToSystemPort(ifc.system),
connectedToSystemPort(ifc->system),
base(b),
thread(thr),
parent_port(ifc.port),
listening_port(std::move(ifc)),
peername(cb::net::getPeerNameAsJson(socketDescriptor).dump()),
sockname(cb::net::getSockNameAsJson(socketDescriptor).dump()),
stateMachine(*this),
Expand All @@ -1422,8 +1426,8 @@ Connection::Connection(SOCKET sfd,
msglist.reserve(MSG_LIST_INITIAL);
iov.resize(IOV_LIST_INITIAL);

if (ifc.isSslPort()) {
if (!enableSSL(ifc.sslCert, ifc.sslKey)) {
if (listening_port->isSslPort()) {
if (!enableSSL(listening_port->sslCert, listening_port->sslKey)) {
throw std::runtime_error(std::to_string(getId()) +
" Failed to enable SSL");
}
Expand Down Expand Up @@ -1621,6 +1625,57 @@ void Connection::runEventLoop(short which) {
conn_return_buffers(this);
}

void Connection::reEvaluateParentPort() {
if (listening_port->valid) {
return;
}

switch (getState()) {
case StateMachine::State::new_cmd:
case StateMachine::State::waiting:
case StateMachine::State::read_packet_header:
case StateMachine::State::parse_cmd:
case StateMachine::State::read_packet_body:
case StateMachine::State::validate:
case StateMachine::State::execute:
case StateMachine::State::send_data:
case StateMachine::State::ship_log:
break;
case StateMachine::State::closing:
case StateMachine::State::pending_close:
case StateMachine::State::immediate_close:
case StateMachine::State::destroyed:
return;
}

bool localhost = false;
if (Settings::instance().isLocalhostInterfaceWhitelisted()) {
// Make sure we don't tear down localhost connections
if (listening_port->family == AF_INET) {
localhost =
peername.find(R"("ip":"127.0.0.1")") != std::string::npos;
} else {
localhost = peername.find(R"("ip":"::1")") != std::string::npos;
}
}

if (localhost) {
LOG_INFO(
"{} Keeping connection alive even if server port was removed: "
"{}",
getId(),
getDescription());
} else {
LOG_INFO("{} Shutting down; server port was removed: {}",
getId(),
getDescription());
setTerminationReason("Server port shut down");

setState(StateMachine::State::closing);
signalIfIdle();
}
}

bool Connection::close() {
bool ewb = false;
uint32_t rc = refcount;
Expand Down
16 changes: 10 additions & 6 deletions daemon/connection.h
Expand Up @@ -94,7 +94,7 @@ class Connection : public dcp_message_producers {

Connection(SOCKET sfd,
event_base* b,
const ListeningPort& ifc,
std::shared_ptr<ListeningPort> ifc,
FrontEndThread& thr);

~Connection() override;
Expand Down Expand Up @@ -248,9 +248,7 @@ class Connection : public dcp_message_producers {
return thread;
}

in_port_t getParentPort() const {
return parent_port;
}
in_port_t getParentPort() const;

/**
* Check if this connection is in posession of the requested privilege
Expand Down Expand Up @@ -363,6 +361,10 @@ class Connection : public dcp_message_producers {
return cb::engine_errc(remapErrorCode(ENGINE_ERROR_CODE(code)));
}

/// Revaluate if the parent port is still valid or not (and if
/// we should shut down the connection or not).
void reEvaluateParentPort();

/**
* Add the specified number of ns to the amount of CPU time this
* connection have used on the CPU (We could alternatively have
Expand Down Expand Up @@ -1116,8 +1118,10 @@ class Connection : public dcp_message_producers {
/** Pointer to the thread object serving this connection */
FrontEndThread& thread;

/** Listening port that creates this connection instance */
const in_port_t parent_port{0};
/// The description of the listening port which accepted the client
/// (needed in order to shut down the connection if the administrator
/// disables the port)
std::shared_ptr<ListeningPort> listening_port;

/**
* The index of the connected bucket
Expand Down
10 changes: 5 additions & 5 deletions daemon/connections.cc
Expand Up @@ -61,7 +61,7 @@ static void maybe_return_single_buffer(Connection& c,
static void conn_destructor(Connection* c);
static Connection* allocate_connection(SOCKET sfd,
event_base* base,
const ListeningPort& interface,
std::shared_ptr<ListeningPort> interface,
FrontEndThread& thread);

static void release_connection(Connection* c);
Expand Down Expand Up @@ -159,10 +159,10 @@ void run_event_loop(Connection* c, short which) {
}

Connection* conn_new(SOCKET sfd,
const ListeningPort& interface,
std::shared_ptr<ListeningPort> interface,
struct event_base* base,
FrontEndThread& thread) {
auto* c = allocate_connection(sfd, base, interface, thread);
auto* c = allocate_connection(sfd, base, std::move(interface), thread);
if (c == nullptr) {
return nullptr;
}
Expand Down Expand Up @@ -248,12 +248,12 @@ static void conn_destructor(Connection* c) {
*/
static Connection* allocate_connection(SOCKET sfd,
event_base* base,
const ListeningPort& interface,
std::shared_ptr<ListeningPort> interface,
FrontEndThread& thread) {
Connection* ret = nullptr;

try {
ret = new Connection(sfd, base, interface, thread);
ret = new Connection(sfd, base, std::move(interface), thread);
std::lock_guard<std::mutex> lock(connections.mutex);
connections.conns.push_back(ret);
stats.conn_structs++;
Expand Down
2 changes: 1 addition & 1 deletion daemon/connections.h
Expand Up @@ -70,7 +70,7 @@ void conn_return_buffers(Connection* c);
* @return a connection object on success, nullptr otherwise
*/
Connection* conn_new(SOCKET sfd,
const ListeningPort& interface,
std::shared_ptr<ListeningPort> interface,
struct event_base* base,
FrontEndThread& thread);

Expand Down
11 changes: 6 additions & 5 deletions daemon/front_end_thread.h
Expand Up @@ -41,8 +41,6 @@ class Connection;
class ListeningPort;
struct thread_stats;

using SharedListeningPort = std::shared_ptr<ListeningPort>;

struct FrontEndThread {
/**
* Pending IO requests for this thread. Maps each pending Connection to
Expand Down Expand Up @@ -87,12 +85,15 @@ struct FrontEndThread {
class ConnectionQueue {
public:
~ConnectionQueue();
void push(SOCKET socket, SharedListeningPort interface);
void swap(std::vector<std::pair<SOCKET, SharedListeningPort>>& other);
void push(SOCKET socket, std::shared_ptr<ListeningPort> interface);
void swap(
std::vector<std::pair<SOCKET, std::shared_ptr<ListeningPort>>>&
other);

protected:
std::mutex mutex;
std::vector<std::pair<SOCKET, SharedListeningPort>> connections;
std::vector<std::pair<SOCKET, std::shared_ptr<ListeningPort>>>
connections;
} new_conn_queue;

/// Mutex to lock protect access to this object.
Expand Down
6 changes: 4 additions & 2 deletions daemon/listening_port.h
Expand Up @@ -18,6 +18,7 @@
#pragma once

#include <platform/socket.h>
#include <atomic>
#include <string>
#include <utility>

Expand Down Expand Up @@ -46,8 +47,6 @@ class ListeningPort {
sslCert(std::move(cert)) {
}

ListeningPort(const ListeningPort& other) = default;

/// The tag provided by the user to identify the port. It is possible
/// to use ephemeral ports in the system, and if we want to change
/// such ports at runtime the system needs a way to find the correct
Expand Down Expand Up @@ -84,4 +83,7 @@ class ListeningPort {
bool isSslPort() const {
return !sslKey.empty() && !sslCert.empty();
}

/// Set to false once the interface is being shut down
std::atomic_bool valid{true};
};
7 changes: 7 additions & 0 deletions daemon/memcached.cc
Expand Up @@ -887,6 +887,7 @@ static void dispatch_event_handler(evutil_socket_t fd, short, void *) {

bool changes = false;
auto interfaces = Settings::instance().getInterfaces();
bool interfaces_dropped = false;

// Step one, enable all new ports
bool success = true;
Expand Down Expand Up @@ -982,6 +983,7 @@ static void dispatch_event_handler(evutil_socket_t fd, short, void *) {
// erase returns the element following this one (or end())
changes = true;
iter = listen_conn.erase(iter);
interfaces_dropped = true;
} else {
// look at the next element
++iter;
Expand All @@ -992,6 +994,11 @@ static void dispatch_event_handler(evutil_socket_t fd, short, void *) {
if (changes) {
create_portnumber_file(false);
}

if (interfaces_dropped) {
iterate_all_connections(
[](auto& conn) { conn.reEvaluateParentPort(); });
}
}

if (is_listen_disabled()) {
Expand Down
2 changes: 1 addition & 1 deletion daemon/memcached.h
Expand Up @@ -79,7 +79,7 @@ void threads_shutdown();
void threads_cleanup();

class ListeningPort;
void dispatch_conn_new(SOCKET sfd, std::shared_ptr<ListeningPort>& interface);
void dispatch_conn_new(SOCKET sfd, std::shared_ptr<ListeningPort> interface);

/* Lock wrappers for cache functions that are called from main loop. */
int is_listen_thread(void);
Expand Down
1 change: 1 addition & 0 deletions daemon/server_socket.cc
Expand Up @@ -54,6 +54,7 @@ ServerSocket::ServerSocket(SOCKET fd,
}

ServerSocket::~ServerSocket() {
interface->valid.store(false, std::memory_order_release);
std::string tagstr;
if (!interface->tag.empty()) {
tagstr = " \"" + interface->tag + "\"";
Expand Down
22 changes: 21 additions & 1 deletion daemon/settings.cc
Expand Up @@ -686,6 +686,11 @@ static void handle_opentracing(Settings& s, const nlohmann::json& obj) {
s.setOpenTracingConfig(std::make_shared<OpenTracingConfig>(obj));
}

static void handle_whitelist_localhost_interface(Settings& s,
const nlohmann::json& obj) {
s.setWhitelistLocalhostInterface(obj.get<bool>());
}

void Settings::reconfigure(const nlohmann::json& json) {
// Nuke the default interface added to the system in settings_init and
// use the ones in the configuration file.. (this is a bit messy)
Expand Down Expand Up @@ -757,7 +762,9 @@ void Settings::reconfigure(const nlohmann::json& json) {
handle_max_concurrent_commands_per_connection},
{"opentracing", handle_opentracing},
{"portnumber_file", handle_portnumber_file},
{"parent_identifier", handle_parent_identifier}};
{"parent_identifier", handle_parent_identifier},
{"whitelist_localhost_interface",
handle_whitelist_localhost_interface}};

for (const auto& obj : json.items()) {
bool found = false;
Expand Down Expand Up @@ -1243,6 +1250,19 @@ void Settings::updateSettings(const Settings& other, bool apply) {
}
}

if (other.has.whitelist_localhost_interface) {
if (other.whitelist_localhost_interface !=
whitelist_localhost_interface) {
LOG_INFO(
R"(Change whitelist of localhost interface from "{}" to "{}")",
isLocalhostInterfaceWhitelisted() ? "enabled" : "disabled",
other.isLocalhostInterfaceWhitelisted() ? "enabled"
: "disabled");
setWhitelistLocalhostInterface(
other.isLocalhostInterfaceWhitelisted());
}
}

if (other.has.max_concurrent_commands_per_connection) {
if (other.getMaxConcurrentCommandsPerConnection() !=
getMaxConcurrentCommandsPerConnection()) {
Expand Down
16 changes: 16 additions & 0 deletions daemon/settings.h
Expand Up @@ -843,6 +843,16 @@ class Settings {
notify_changed("num_writer_threads");
}

bool isLocalhostInterfaceWhitelisted() const {
return whitelist_localhost_interface.load(std::memory_order_acquire);
}

void setWhitelistLocalhostInterface(bool val) {
whitelist_localhost_interface.store(val, std::memory_order_release);
has.whitelist_localhost_interface = true;
notify_changed("whitelist_localhost_interface");
}

protected:
/// Should the server always collect trace information for commands
std::atomic_bool always_collect_trace_info{false};
Expand Down Expand Up @@ -1027,6 +1037,11 @@ class Settings {
std::atomic<int> num_reader_threads{0};
std::atomic<int> num_writer_threads{0};

/// If "localhost" is whitelisted from deleting connections as part
/// of server cleanup. This setting should only be used for unit
/// tests
std::atomic_bool whitelist_localhost_interface{true};

public:
/**
* Flags for each of the above config options, indicating if they were
Expand Down Expand Up @@ -1081,6 +1096,7 @@ class Settings {
bool num_writer_threads = false;
bool portnumber_file = false;
bool parent_identifier = false;
bool whitelist_localhost_interface = false;
} has;

protected:
Expand Down

0 comments on commit c7ce4bf

Please sign in to comment.