Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rgw: beast frontend can listen on multiple endpoints #20188

Merged
merged 3 commits into from Feb 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 16 additions & 2 deletions doc/radosgw/frontends.rst
Expand Up @@ -20,12 +20,25 @@ Options

``port``

:Description: Sets the listening port number.
:Description: Sets the listening port number. Can be specified multiple
times as in ``port=80 port=8000``.

:Type: Integer
:Default: ``80``


``endpoint``

:Description: Sets the listening address in the form ``address[:port]``,
where the address is an IPv4 address string in dotted decimal
form, or an IPv6 address in hexadecimal notation. The
optional port defaults to 80. Can be specified multiple times
as in ``endpoint=::1 endpoint=192.168.0.100:8000``.

:Type: Integer
:Default: None


Civetweb
========

Expand All @@ -43,7 +56,8 @@ Options
:Description: Sets the listening port number. For SSL-enabled ports, add an
``s`` suffix like ``443s``. To bind a specific IPv4 or IPv6
address, use the form ``address:port``. Multiple endpoints
can be separated by ``+`` as in ``127.0.0.1:8000+443s``.
can either be separated by ``+`` as in ``127.0.0.1:8000+443s``,
or by providing multiple options as in ``port=8000 port=443s``.

:Type: String
:Default: ``7480``
Expand Down
135 changes: 92 additions & 43 deletions src/rgw/rgw_asio_frontend.cc
Expand Up @@ -191,26 +191,32 @@ class Connection {
friend void intrusive_ptr_release(Connection *c) { c->put(); }
};


class AsioFrontend {
RGWProcessEnv env;
RGWFrontendConfig* conf;
boost::asio::io_service service;

tcp::acceptor acceptor;
tcp::socket peer_socket;
struct Listener {
tcp::endpoint endpoint;
tcp::acceptor acceptor;
tcp::socket socket;

Listener(boost::asio::io_service& service)
: acceptor(service), socket(service) {}
};
std::vector<Listener> listeners;

std::vector<std::thread> threads;
Pauser pauser;
std::atomic<bool> going_down{false};

CephContext* ctx() const { return env.store->ctx(); }

void accept(boost::system::error_code ec);
void accept(Listener& listener, boost::system::error_code ec);

public:
AsioFrontend(const RGWProcessEnv& env, RGWFrontendConfig* conf)
: env(env), conf(conf), acceptor(service), peer_socket(service) {}
: env(env), conf(conf) {}

int init();
int run();
Expand All @@ -220,64 +226,104 @@ class AsioFrontend {
void unpause(RGWRados* store, rgw_auth_registry_ptr_t);
};

int AsioFrontend::init()
unsigned short parse_port(const char *input, boost::system::error_code& ec)
{
char *end = nullptr;
auto port = std::strtoul(input, &end, 10);
if (port > std::numeric_limits<unsigned short>::max()) {
ec.assign(ERANGE, boost::system::system_category());
} else if (port == 0 && end == input) {
ec.assign(EINVAL, boost::system::system_category());
}
return port;
}

tcp::endpoint parse_endpoint(boost::asio::string_view input,
boost::system::error_code& ec)
{
std::string port_str;
conf->get_val("port", "80", &port_str);
tcp::endpoint endpoint;

unsigned short port;
boost::asio::ip::address addr; // default to 'any'
auto colon = input.find(':');
if (colon != input.npos) {
auto port_str = input.substr(colon + 1);
endpoint.port(parse_port(port_str.data(), ec));
} else {
endpoint.port(80);
}
if (!ec) {
auto addr = input.substr(0, colon);
endpoint.address(boost::asio::ip::make_address(addr, ec));
}
return endpoint;
}

int AsioFrontend::init()
{
boost::system::error_code ec;
auto& config = conf->get_config_map();

auto colon = port_str.find(':');
if (colon != port_str.npos) {
addr = boost::asio::ip::make_address(port_str.substr(0, colon), ec);
// parse endpoints
auto range = config.equal_range("port");
for (auto i = range.first; i != range.second; ++i) {
auto port = parse_port(i->second.c_str(), ec);
if (ec) {
lderr(ctx()) << "failed to parse address '" << port_str << "': " << ec.message() << dendl;
lderr(ctx()) << "failed to parse port=" << i->second << dendl;
return -ec.value();
}
port = std::stoul(port_str.substr(colon + 1), nullptr, 0);
} else {
port = std::stoul(port_str, nullptr, 0);
listeners.emplace_back(service);
listeners.back().endpoint.port(port);
}

tcp::endpoint ep = {addr, port};
ldout(ctx(), 4) << "frontend listening on " << ep << dendl;

acceptor.open(ep.protocol(), ec);
if (ec) {
lderr(ctx()) << "failed to open socket: " << ec.message() << dendl;
return -ec.value();
range = config.equal_range("endpoint");
for (auto i = range.first; i != range.second; ++i) {
auto endpoint = parse_endpoint(i->second, ec);
if (ec) {
lderr(ctx()) << "failed to parse endpoint=" << i->second << dendl;
return -ec.value();
}
listeners.emplace_back(service);
listeners.back().endpoint = endpoint;
}
acceptor.set_option(tcp::acceptor::reuse_address(true));
acceptor.bind(ep, ec);
if (ec) {
lderr(ctx()) << "failed to bind address " << ep <<
": " << ec.message() << dendl;
return -ec.value();

// start listeners
for (auto& l : listeners) {
l.acceptor.open(l.endpoint.protocol(), ec);
if (ec) {
lderr(ctx()) << "failed to open socket: " << ec.message() << dendl;
return -ec.value();
}
l.acceptor.set_option(tcp::acceptor::reuse_address(true));
l.acceptor.bind(l.endpoint, ec);
if (ec) {
lderr(ctx()) << "failed to bind address " << l.endpoint
<< ": " << ec.message() << dendl;
return -ec.value();
}
l.acceptor.listen(boost::asio::socket_base::max_connections);
l.acceptor.async_accept(l.socket,
[this, &l] (boost::system::error_code ec) {
accept(l, ec);
});

ldout(ctx(), 4) << "frontend listening on " << l.endpoint << dendl;
}
acceptor.listen(boost::asio::socket_base::max_connections);
acceptor.async_accept(peer_socket,
[this] (boost::system::error_code ec) {
return accept(ec);
});
return 0;
}

void AsioFrontend::accept(boost::system::error_code ec)
void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
{
if (!acceptor.is_open()) {
if (!l.acceptor.is_open()) {
return;
} else if (ec == boost::asio::error::operation_aborted) {
return;
} else if (ec) {
throw ec;
}
auto socket = std::move(peer_socket);
acceptor.async_accept(peer_socket,
[this] (boost::system::error_code ec) {
return accept(ec);
});
auto socket = std::move(l.socket);
l.acceptor.async_accept(l.socket,
[this, &l] (boost::system::error_code ec) {
accept(l, ec);
});

boost::intrusive_ptr<Connection> conn{new Connection(env, std::move(socket))};
conn->on_connect();
Expand Down Expand Up @@ -313,7 +359,10 @@ void AsioFrontend::stop()
going_down = true;

boost::system::error_code ec;
acceptor.close(ec);
// close all listeners
for (auto& listener : listeners) {
listener.acceptor.close(ec);
}

// unblock the run() threads
service.stop();
Expand Down
23 changes: 18 additions & 5 deletions src/rgw/rgw_civetweb_frontend.cc
Expand Up @@ -50,7 +50,6 @@ int RGWCivetWebFrontend::process(struct mg_connection* const conn)
int RGWCivetWebFrontend::run()
{
auto& conf_map = conf->get_config_map();
string port_str;

set_conf_default(conf_map, "num_threads",
std::to_string(g_conf->rgw_thread_pool_size));
Expand All @@ -59,15 +58,29 @@ int RGWCivetWebFrontend::run()
set_conf_default(conf_map, "validate_http_method", "no");
set_conf_default(conf_map, "canonicalize_url_path", "no");
set_conf_default(conf_map, "enable_auth_domain_check", "no");
conf->get_val("port", "80", &port_str);
std::replace(port_str.begin(), port_str.end(), '+', ',');
conf_map["listening_ports"] = port_str;

std::string listening_ports;
// support multiple port= entries
auto range = conf_map.equal_range("port");
for (auto p = range.first; p != range.second; ++p) {
std::string port_str = p->second;
// support port= entries with multiple values
std::replace(port_str.begin(), port_str.end(), '+', ',');
if (!listening_ports.empty()) {
listening_ports.append(1, ',');
}
listening_ports.append(port_str);
}
if (listening_ports.empty()) {
listening_ports = "80";
}
conf_map.emplace("listening_ports", std::move(listening_ports));

/* Set run_as_user. This will cause civetweb to invoke setuid() and setgid()
* based on pw_uid and pw_gid obtained from pw_name. */
std::string uid_string = g_ceph_context->get_set_uid_string();
if (! uid_string.empty()) {
conf_map["run_as_user"] = std::move(uid_string);
conf_map.emplace("run_as_user", std::move(uid_string));
}

/* Prepare options for CivetWeb. */
Expand Down
15 changes: 5 additions & 10 deletions src/rgw/rgw_frontend.cc
Expand Up @@ -13,14 +13,9 @@
#define dout_subsys ceph_subsys_rgw

int RGWFrontendConfig::parse_config(const string& config,
map<string, string>& config_map)
std::multimap<string, string>& config_map)
{
list<string> config_list;
get_str_list(config, " ", config_list);

list<string>::iterator iter;
for (iter = config_list.begin(); iter != config_list.end(); ++iter) {
string& entry = *iter;
for (auto& entry : get_str_vec(config, " ")) {
string key;
string val;

Expand All @@ -33,7 +28,7 @@ int RGWFrontendConfig::parse_config(const string& config,
ssize_t pos = entry.find('=');
if (pos < 0) {
dout(0) << "framework conf key: " << entry << dendl;
config_map[entry] = "";
config_map.emplace(std::move(entry), "");
continue;
}

Expand All @@ -44,7 +39,7 @@ int RGWFrontendConfig::parse_config(const string& config,
}

dout(0) << "framework conf key: " << key << ", val: " << val << dendl;
config_map[key] = val;
config_map.emplace(std::move(key), std::move(val));
}

return 0;
Expand All @@ -53,7 +48,7 @@ int RGWFrontendConfig::parse_config(const string& config,
bool RGWFrontendConfig::get_val(const string& key, const string& def_val,
string *out)
{
map<string, string>::iterator iter = config_map.find(key);
auto iter = config_map.find(key);
if (iter == config_map.end()) {
*out = def_val;
return false;
Expand Down
10 changes: 5 additions & 5 deletions src/rgw/rgw_frontend.h
Expand Up @@ -22,11 +22,11 @@

class RGWFrontendConfig {
std::string config;
std::map<std::string, std::string> config_map;
std::multimap<std::string, std::string> config_map;
std::string framework;

int parse_config(const std::string& config,
std::map<std::string, std::string>& config_map);
std::multimap<std::string, std::string>& config_map);

public:
RGWFrontendConfig(const std::string& config)
Expand Down Expand Up @@ -54,7 +54,7 @@ class RGWFrontendConfig {
return config;
}

std::map<std::string, std::string>& get_config_map() {
std::multimap<std::string, std::string>& get_config_map() {
return config_map;
}

Expand Down Expand Up @@ -97,11 +97,11 @@ class RGWCivetWebFrontend : public RGWFrontend {
struct mg_context* ctx;
RGWMongooseEnv env;

void set_conf_default(std::map<std::string, std::string>& m,
void set_conf_default(std::multimap<std::string, std::string>& m,
const std::string& key,
const std::string& def_val) {
if (m.find(key) == std::end(m)) {
m[key] = def_val;
m.emplace(key, def_val);
}
}

Expand Down