Skip to content

Commit

Permalink
Merge pull request #10767 from rzarzynski/wip-rgw-frontend-rework
Browse files Browse the repository at this point in the history
rgw: frontend subsystem rework

all requested changes have been made by the author
tested by-hand w/sigv2 and sigv4, and already passed teuthology rgw suite
  • Loading branch information
mattbenjamin committed Nov 1, 2016
2 parents 2847e17 + 573c564 commit 2c8d25b
Show file tree
Hide file tree
Showing 56 changed files with 2,632 additions and 952 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Expand Up @@ -38,3 +38,6 @@
url = https://github.com/ceph/lua.git
branch = lua-5.3-ceph
ignore = dirty
[submodule "src/Beast"]
path = src/Beast
url = https://github.com/ceph/Beast.git
2 changes: 2 additions & 0 deletions CMakeLists.txt
Expand Up @@ -323,6 +323,8 @@ if(WITH_RADOSGW)
find_package(fcgi REQUIRED)
endif(WITH_RADOSGW)

option(WITH_RADOSGW_ASIO_FRONTEND "Rados Gateway's ASIO frontend is enabled" OFF)

#option for CephFS
option(WITH_CEPHFS "CephFS is enabled" ON)

Expand Down
1 change: 1 addition & 0 deletions src/Beast
Submodule Beast added at 999e2f
3 changes: 3 additions & 0 deletions src/include/config-h.in.cmake
Expand Up @@ -126,6 +126,9 @@
/* define if radosgw enabled */
#cmakedefine WITH_RADOSGW

/* define if radosgw's asio frontend enabled */
#cmakedefine WITH_RADOSGW_ASIO_FRONTEND

/* define if HAVE_THREAD_SAFE_RES_QUERY */
#cmakedefine HAVE_THREAD_SAFE_RES_QUERY

Expand Down
13 changes: 12 additions & 1 deletion src/rgw/CMakeLists.txt
Expand Up @@ -94,7 +94,11 @@ set(rgw_a_srcs
rgw_torrent.cc)

add_library(rgw_a STATIC ${rgw_a_srcs})
target_include_directories(rgw_a PUBLIC ${FCGI_INCLUDE_DIR})

target_include_directories(rgw_a PUBLIC
"../Beast/include"
${FCGI_INCLUDE_DIR})

target_link_libraries(rgw_a librados cls_lock_client cls_rgw_client cls_refcount_client
cls_log_client cls_statelog_client cls_timeindex_client cls_version_client
cls_replica_log_client cls_user_client common common_utf8 global
Expand All @@ -109,6 +113,13 @@ set(radosgw_srcs
rgw_civetweb_frontend.cc
rgw_civetweb_log.cc
rgw_main.cc)

if (WITH_RADOSGW_ASIO_FRONTEND)
list(APPEND radosgw_srcs
rgw_asio_client.cc
rgw_asio_frontend.cc)
endif (WITH_RADOSGW_ASIO_FRONTEND)

add_executable(radosgw ${radosgw_srcs} $<TARGET_OBJECTS:civetweb_common_objs>)
target_link_libraries(radosgw rgw_a librados
cls_rgw_client cls_lock_client cls_refcount_client
Expand Down
8 changes: 5 additions & 3 deletions src/rgw/librgw.cc
Expand Up @@ -290,9 +290,11 @@ namespace rgw {
op->complete();

done:
int r = io->complete_request();
if (r < 0) {
dout(0) << "ERROR: io->complete_request() returned " << r << dendl;
try {
io->complete_request();
} catch (rgw::io::Exception& e) {
dout(0) << "ERROR: io->complete_request() returned "
<< e.what() << dendl;
}
if (should_log) {
rgw_log_op(store, s, (op ? op->name() : "unknown"), olog);
Expand Down
199 changes: 199 additions & 0 deletions src/rgw/rgw_asio_client.cc
@@ -0,0 +1,199 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#include <boost/algorithm/string/predicate.hpp>
#include <boost/asio/write.hpp>

#include "rgw_asio_client.h"

#define dout_subsys ceph_subsys_rgw

#undef dout_prefix
#define dout_prefix (*_dout << "asio: ")


RGWAsioClientIO::RGWAsioClientIO(tcp::socket&& socket,
request_type&& request)
: socket(std::move(socket)),
request(std::move(request)),
txbuf(*this) {
}

RGWAsioClientIO::~RGWAsioClientIO() = default;

void RGWAsioClientIO::init_env(CephContext *cct)
{
env.init(cct);
body_iter = request.body.begin();

const auto& headers = request.headers;
for (auto header = headers.begin(); header != headers.end(); ++header) {
const auto& name = header->name();
const auto& value = header->value();

if (boost::algorithm::iequals(name, "content-length")) {
env.set("CONTENT_LENGTH", value);
continue;
}
if (boost::algorithm::iequals(name, "content-type")) {
env.set("CONTENT_TYPE", value);
continue;
}
if (boost::algorithm::iequals(name, "connection")) {
conn_keepalive = boost::algorithm::iequals(value, "keep-alive");
conn_close = boost::algorithm::iequals(value, "close");
}

static const boost::string_ref HTTP_{"HTTP_"};

char buf[name.size() + HTTP_.size() + 1];
auto dest = std::copy(std::begin(HTTP_), std::end(HTTP_), buf);
for (auto src = name.begin(); src != name.end(); ++src, ++dest) {
if (*src == '-') {
*dest = '_';
} else {
*dest = std::toupper(*src);
}
}
*dest = '\0';

env.set(buf, value);
}

env.set("REQUEST_METHOD", request.method);

// split uri from query
auto url = boost::string_ref{request.url};
auto pos = url.find('?');
auto query = url.substr(pos + 1);
url = url.substr(0, pos);

env.set("REQUEST_URI", url);
env.set("QUERY_STRING", query);
env.set("SCRIPT_URI", url); /* FIXME */

char port_buf[16];
snprintf(port_buf, sizeof(port_buf), "%d", socket.local_endpoint().port());
env.set("SERVER_PORT", port_buf);
// TODO: set SERVER_PORT_SECURE if using ssl
// TODO: set REMOTE_USER if authenticated
}

size_t RGWAsioClientIO::write_data(const char* const buf,
const size_t len)
{
boost::system::error_code ec;
auto bytes = boost::asio::write(socket, boost::asio::buffer(buf, len), ec);
if (ec) {
derr << "write_data failed: " << ec.message() << dendl;
throw rgw::io::Exception(ec.value(), std::system_category());
} else {
/* According to the documentation of boost::asio::write if there is
* no error (signalised by ec), then bytes == len. We don't need to
* take care of partial writes in such situation. */
return bytes;
}
}

size_t RGWAsioClientIO::read_data(char* const buf, const size_t max)
{
// read data from the body's bufferlist
auto bytes = std::min<unsigned>(max, body_iter.get_remaining());
body_iter.copy(bytes, buf);
return bytes;
}

size_t RGWAsioClientIO::complete_request()
{
return 0;
}

void RGWAsioClientIO::flush()
{
txbuf.pubsync();
}

size_t RGWAsioClientIO::send_status(const int status,
const char* const status_name)
{
static constexpr size_t STATUS_BUF_SIZE = 128;

char statusbuf[STATUS_BUF_SIZE];
const auto statuslen = snprintf(statusbuf, sizeof(statusbuf),
"HTTP/1.1 %d %s\r\n", status, status_name);

return txbuf.sputn(statusbuf, statuslen);
}

size_t RGWAsioClientIO::send_100_continue()
{
const char HTTTP_100_CONTINUE[] = "HTTP/1.1 100 CONTINUE\r\n\r\n";
const size_t sent = txbuf.sputn(HTTTP_100_CONTINUE,
sizeof(HTTTP_100_CONTINUE) - 1);
flush();
return sent;
}

static constexpr size_t TIME_BUF_SIZE = 128;
static size_t dump_date_header(char (&timestr)[TIME_BUF_SIZE])
{
const time_t gtime = time(nullptr);
struct tm result;
struct tm const * const tmp = gmtime_r(&gtime, &result);
if (tmp == nullptr) {
return 0;
}
return strftime(timestr, sizeof(timestr),
"Date: %a, %d %b %Y %H:%M:%S %Z\r\n", tmp);
}

size_t RGWAsioClientIO::complete_header()
{
size_t sent = 0;

char timestr[TIME_BUF_SIZE];
if (dump_date_header(timestr)) {
sent += txbuf.sputn(timestr, strlen(timestr));
}

if (conn_keepalive) {
constexpr char CONN_KEEP_ALIVE[] = "Connection: Keep-Alive\r\n";
sent += txbuf.sputn(CONN_KEEP_ALIVE, sizeof(CONN_KEEP_ALIVE) - 1);
} else if (conn_close) {
constexpr char CONN_KEEP_CLOSE[] = "Connection: close\r\n";
sent += txbuf.sputn(CONN_KEEP_CLOSE, sizeof(CONN_KEEP_CLOSE) - 1);
}

constexpr char HEADER_END[] = "\r\n";
sent += txbuf.sputn(HEADER_END, sizeof(HEADER_END) - 1);

flush();
return sent;
}

size_t RGWAsioClientIO::send_header(const boost::string_ref& name,
const boost::string_ref& value)
{
static constexpr char HEADER_SEP[] = ": ";
static constexpr char HEADER_END[] = "\r\n";

size_t sent = 0;

sent += txbuf.sputn(name.data(), name.length());
sent += txbuf.sputn(HEADER_SEP, sizeof(HEADER_SEP) - 1);
sent += txbuf.sputn(value.data(), value.length());
sent += txbuf.sputn(HEADER_END, sizeof(HEADER_END) - 1);

return sent;
}

size_t RGWAsioClientIO::send_content_length(const uint64_t len)
{
static constexpr size_t CONLEN_BUF_SIZE = 128;

char sizebuf[CONLEN_BUF_SIZE];
const auto sizelen = snprintf(sizebuf, sizeof(sizebuf),
"Content-Length: %" PRIu64 "\r\n", len);

return txbuf.sputn(sizebuf, sizelen);
}
115 changes: 115 additions & 0 deletions src/rgw/rgw_asio_client.h
@@ -0,0 +1,115 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#ifndef RGW_ASIO_CLIENT_H
#define RGW_ASIO_CLIENT_H

#include <boost/asio/ip/tcp.hpp>
#include <beast/http/body_type.hpp>
#include <beast/http/concepts.hpp>
#include <beast/http/message_v1.hpp>
#include "include/assert.h"

#include "rgw_client_io.h"

// bufferlist to represent the message body
class RGWBufferlistBody {
public:
using value_type = ceph::bufferlist;

class reader;
class writer;

template <bool isRequest, typename Headers>
using message_type = beast::http::message<isRequest, RGWBufferlistBody,
Headers>;
};

class RGWAsioClientIO : public rgw::io::RestfulClient,
public rgw::io::BuffererSink {
using tcp = boost::asio::ip::tcp;
tcp::socket socket;

using body_type = RGWBufferlistBody;
using request_type = beast::http::request_v1<body_type>;
request_type request;

bufferlist::const_iterator body_iter;

bool conn_keepalive{false};
bool conn_close{false};
RGWEnv env;

rgw::io::StaticOutputBufferer<> txbuf;

size_t write_data(const char *buf, size_t len) override;
size_t read_data(char *buf, size_t max);

public:
RGWAsioClientIO(tcp::socket&& socket, request_type&& request);
~RGWAsioClientIO();

void init_env(CephContext *cct) override;
size_t complete_request() override;
void flush() override;
size_t send_status(int status, const char *status_name) override;
size_t send_100_continue() override;
size_t send_header(const boost::string_ref& name,
const boost::string_ref& value) override;
size_t send_content_length(uint64_t len) override;
size_t complete_header() override;

size_t recv_body(char* buf, size_t max) override {
return read_data(buf, max);
}

size_t send_body(const char* buf, size_t len) override {
return write_data(buf, len);
}

RGWEnv& get_env() noexcept override {
return env;
}
};

// used by beast::http::read() to read the body into a bufferlist
class RGWBufferlistBody::reader {
value_type& bl;
public:
template<bool isRequest, typename Headers>
explicit reader(message_type<isRequest, Headers>& m) : bl(m.body) {}

void write(const char* data, size_t size, boost::system::error_code&) {
bl.append(data, size);
}
};

// used by beast::http::write() to write the buffered body
class RGWBufferlistBody::writer {
const value_type& bl;
public:
template<bool isRequest, typename Headers>
explicit writer(const message_type<isRequest, Headers>& msg)
: bl(msg.body) {}

void init(boost::system::error_code& ec) {}
uint64_t content_length() const { return bl.length(); }

template<typename Write>
boost::tribool operator()(beast::http::resume_context&&,
boost::system::error_code&, Write&& write) {
// translate from bufferlist to a ConstBufferSequence for beast
std::vector<boost::asio::const_buffer> buffers;
buffers.reserve(bl.get_num_buffers());
for (auto& ptr : bl.buffers()) {
buffers.emplace_back(ptr.c_str(), ptr.length());
}
write(buffers);
return true;
}
};
static_assert(beast::http::is_ReadableBody<RGWBufferlistBody>{},
"RGWBufferlistBody does not satisfy ReadableBody");
static_assert(beast::http::is_WritableBody<RGWBufferlistBody>{},
"RGWBufferlistBody does not satisfy WritableBody");

#endif // RGW_ASIO_CLIENT_H

0 comments on commit 2c8d25b

Please sign in to comment.