Skip to content

Commit

Permalink
WIP: Asynchronous Server Implementation
Browse files Browse the repository at this point in the history
This is an incremental installment into the asynchronous server
implementation revamp. The goal here is to isolate the parts that are
actually necessary to build client code and move more of the
implementation into externally-linked libraries. This allows us to
tweak, add features, and generally make the API/ABI more stable for
future extension and updates.
  • Loading branch information
deanberris committed Mar 18, 2012
1 parent 8f26ea8 commit 1b765f0
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 26 deletions.
3 changes: 2 additions & 1 deletion boost/network/include/http/server.hpp
@@ -1,7 +1,8 @@
#ifndef BOOST_NETWORK_INCLUDE_HTTP_SERVER_HPP_
#define BOOST_NETWORK_INCLUDE_HTTP_SERVER_HPP_

// Copyright 2010 Dean Michael Berris
// Copyright 2010-2012 Dean Michael Berris <dberris@google.com>.
// Copyright 2012 Google, Inc.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
Expand Down
17 changes: 15 additions & 2 deletions boost/network/protocol/http/server/async_impl.hpp
Expand Up @@ -9,6 +9,9 @@

#include <boost/function.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/network/protocol/http/server/options.hpp>

namespace boost { namespace network { namespace utils {

Expand All @@ -22,6 +25,8 @@ struct thread_pool;

namespace boost { namespace network { namespace http {

struct request;

class async_server_connection;

class async_server_impl {
Expand All @@ -30,6 +35,7 @@ class async_server_impl {
async_server_impl(server_options const &options,
function<void(request const &, connection_ptr)> handler,
utils::thread_pool &thread_pool);
~async_server_impl();
void run();
void stop();
void listen();
Expand All @@ -40,10 +46,17 @@ class async_server_impl {
asio::io_service *service_;
asio::ip::tcp::acceptor *acceptor_;
shared_ptr<async_server_connection> new_connection_;
mutex listening_mutex_;
bool listening_, owned_service_;
mutex listening_mutex_, stopping_mutex_;
function<void(request const &, connection_ptr)> handler_;
utils::thread_pool &pool_;
bool listening_, owned_service_, stopping_;

void handle_stop();
void start_listening();
void handle_accept(system::error_code const &ec);

void set_socket_options(asio::ip::tcp::socket &socket);
void set_acceptor_options(asio::ip::tcp::acceptor &acceptor);
};

} // namespace http
Expand Down
189 changes: 189 additions & 0 deletions boost/network/protocol/http/server/async_impl.ipp
@@ -0,0 +1,189 @@
#ifndef BOOST_NETWORK_PROTOCOL_HTTP_SERVER_ASYNC_IMPL_IPP_20120318
#define BOOST_NETWORK_PROTOCOL_HTTP_SERVER_ASYNC_IMPL_IPP_20120318

// Copyright 2012 Dean Michael Berris <dberris@google.com>.
// Copyright 2012 Google, Inc.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)

#include <boost/network/protocol/http/server/async_impl.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/placeholders.hpp>
#include <boost/bind.hpp>
#include <boost/network/detail/debug.hpp>

namespace boost { namespace network { namespace http {

async_server_impl::async_server_impl(server_options const &options,
function<void(request const &, connection_ptr)> handler,
utils::thread_pool &thread_pool)
: options_(options)
, address_(options.address())
, port_(options.port())
, service_(options.io_service())
, acceptor_(0)
, new_connection_()
, listening_mutex_()
, stopping_mutex_()
, handler_(handler)
, pool_(thread_pool)
, listening_(false)
, owned_service_(false)
, stopping_(false) {
if (service_ == 0) {
service_ = new (std::nothrow) asio::io_service;
owned_service_ = true;
}
BOOST_ASSERT(service_ != 0);
acceptor_ = new (std::nothrow) asio::ip::tcp::acceptor(*service_);
BOOST_ASSERT(acceptor_ != 0);
}

async_server_impl::~async_server_impl() {
if (owned_service_) delete service_;
delete acceptor_;
}

void async_server_impl::run() {
listen();
service_->run();
}

void async_server_impl::stop() {
lock_guard<mutex> listening_lock(listening_mutex_);
if (listening_) {
lock_guard<mutex> stopping_lock(stopping_mutex_);
stopping_ = true;
system::error_code ignored;
acceptor_->close(ignored);
listening_ = false;
service_->post(
boost::bind(&async_server_impl::handle_stop, this));
}
}

void async_server_impl::listen() {
lock_guard<mutex> listening_lock(listening_mutex_);
BOOST_NETWORK_MESSAGE("listening on " << address_ << ':' << port_);
if (!listening_) start_listening();
if (!listening_) {
BOOST_NETWORK_MESSAGE("error listening on " << address_ << ':' << port_);
BOOST_THROW_EXCEPTION(std::runtime_error("Error listening on provided address:port."));
}
}

void async_server_impl::handle_stop() {
lock_guard<mutex> stopping_lock(stopping_mutex_);
// A user may have stopped listening again before the stop command is
// reached.
if (stopping_) service_->stop();
}

void async_server_impl::handle_accept(boost::system::error_code const & ec) {
{
lock_guard<mutex> stopping_lock(stopping_mutex_);
// We dont want to add another handler instance, and we dont want to know
// about errors for a socket we dont need anymore.
if (stopping_) return;
}
if (!ec) {
set_socket_options(new_connection_->socket());
new_connection_->start();
new_connection_.reset(
new async_server_connection(*service_, handler_, pool_));
acceptor_->async_accept(
new_connection_->socket(),
boost::bind(
&async_server_impl::handle_accept,
this,
asio::placeholders::error));
} else {
BOOST_NETWORK_MESSAGE("Error accepting connection, reason: " << ec);
}
}

void async_server_impl::start_listening() {
using asio::ip::tcp;
system::error_code error;
service_->reset(); // allows repeated cycles of run->stop->run
tcp::resolver resolver(*service_);
tcp::resolver::query query(address_, port_);
tcp::resolver::iterator endpoint_iterator = resolver.resolve(query, error);
if (error) {
BOOST_NETWORK_MESSAGE("error resolving '" << address_ << ':' << port_);
BOOST_THROW_EXCEPTION(std::runtime_error("Error resolving address:port combination."));
}
tcp::endpoint endpoint = *endpoint_iterator;
acceptor_->open(endpoint.protocol(), error);
if (error) {
BOOST_NETWORK_MESSAGE("error opening socket: " << address_ << ":" << port_);
BOOST_THROW_EXCEPTION(std::runtime_error("Error opening socket."));
}
set_acceptor_options(*acceptor_);
acceptor_->bind(endpoint, error);
if (error) {
BOOST_NETWORK_MESSAGE("error binding socket: " << address_ << ":" << port_);
BOOST_THROW_EXCEPTION(std::runtime_error("Error binding socket."));
}
acceptor_->listen(asio::socket_base::max_connections, error);
if (error) {
BOOST_NETWORK_MESSAGE("error listening on socket: '" << error << "' on " << address_ << ":" << port_);
BOOST_THROW_EXCEPTION(std::runtime_error("Error listening on socket."));
}
new_connection_.reset(new async_server_connection(*service_, handler_, pool_));
acceptor_->async_accept(
new_connection_->socket(),
boost::bind(
&async_server_impl::handle_accept,
this,
asio::placeholders::error));
listening_ = true;
lock_guard<mutex> stopping_lock(stopping_mutex_);
stopping_ = false; // if we were in the process of stopping, we revoke that command and continue listening
BOOST_NETWORK_MESSAGE("now listening on '" << address_ << ":" << port_ << "'");
}

void async_server_impl::set_socket_options(asio::ip::tcp::socket &socket) {
system::error_code ignored;
socket.non_blocking(options_.non_blocking_io(), ignored);
if (options_.linger()) {
asio::ip::tcp::socket::linger linger(true, options_.linger_timeout());
socket.set_option(linger, ignored);
}
if (int buf_size = options_.receive_buffer_size() >= 0) {
asio::ip::tcp::socket::receive_buffer_size receive_buffer_size(buf_size);
socket.set_option(receive_buffer_size, ignored);
}
if (int buf_size = options_.send_buffer_size() >= 0) {
asio::ip::tcp::socket::send_buffer_size send_buffer_size(buf_size);
socket.set_option(send_buffer_size, ignored);
}
if (int buf_size = options_.receive_low_watermark() >= 0) {
asio::ip::tcp::socket::receive_low_watermark receive_low_watermark(buf_size);
socket.set_option(receive_low_watermark, ignored);
}
if (int buf_size = options_.send_low_watermark() >= 0) {
asio::ip::tcp::socket::send_low_watermark send_low_watermark(buf_size);
socket.set_option(send_low_watermark, ignored);
}
}

void async_server_impl::set_acceptor_options(asio::ip::tcp::acceptor &acceptor) {
system::error_code ignored;
acceptor.set_option(
asio::ip::tcp::acceptor::reuse_address(options_.reuse_address()),
ignored);
acceptor.set_option(
asio::ip::tcp::acceptor::enable_connection_aborted(options_.report_aborted()),
ignored);
}

} // namespace http

} // namespace network

} // namespace boost

#endif // BOOST_NETWORK_PROTOCOL_HTTP_SERVER_ASYNC_IMPL_IPP_20120318
40 changes: 20 additions & 20 deletions boost/network/protocol/http/server/options.hpp
Expand Up @@ -28,46 +28,46 @@ class server_options {
void swap(server_options &other);
server_options& operator=(server_options rhs);

server_options& address(std::string const &address="0.0.0.0");
server_options& address(std::string const &address);
std::string const address() const;

server_options& port(std::string const &port="80");
server_options& port(std::string const &port);
std::string const port() const;

server_options& io_service(asio::io_service *service = 0);
server_options& io_service(asio::io_service *service);
asio::io_service *io_service() const;

server_options& reuse_address(bool setting=true);
bool reuse_address();
server_options& reuse_address(bool setting);
bool reuse_address() const;

server_options& report_aborted(bool setting=false);
bool report_aborted();
server_options& report_aborted(bool setting);
bool report_aborted() const;

// Set the receive buffer size for a socket. -1 means just use the default.
server_options& receive_buffer_size(int buffer_size=-1);
int receive_buffer_size();
server_options& receive_buffer_size(int buffer_size);
int receive_buffer_size() const;

// Set the send buffer size for a socket. -1 means just use the default.
server_options& send_buffer_size(int buffer_size=-1);
int send_buffer_size();
server_options& send_buffer_size(int buffer_size);
int send_buffer_size() const;

// Set the receive low watermark for a socket. -1 means just use the default.
server_options& receive_low_watermark(int low_watermark=-1);
int receive_low_watermark();
server_options& receive_low_watermark(int low_watermark);
int receive_low_watermark() const;

// Set the send low watermark for a socket. -1 means just use the default.
server_options& send_low_watermark(int low_watermark=-1);
int send_low_watermark();
server_options& send_low_watermark(int low_watermark);
int send_low_watermark() const;

server_options& non_blocking_io(bool setting=true);
bool non_blocking_io();
server_options& non_blocking_io(bool setting);
bool non_blocking_io() const;

server_options& linger(bool setting=false);
bool linger();
server_options& linger(bool setting);
bool linger() const;

// Set the socket linger timeout. This is only relevant if linger is true
// (see linger above). -1 means just use the default.
server_options& linger_timeout(int setting=-1);
server_options& linger_timeout(int setting);
int linger_timeout();

private:
Expand Down
14 changes: 12 additions & 2 deletions libs/network/src/CMakeLists.txt
Expand Up @@ -17,8 +17,18 @@ if (${CMAKE_CXX_COMPILER_ID} MATCHES GNU)
endif()
endforeach(src_file)

set(CPP-NETLIB_HTTP_SERVER_SRCS server_request_parsers_impl.cpp)
add_library(cppnetlib-server-parsers ${CPP-NETLIB_HTTP_SERVER_SRCS})
set(CPP-NETLIB_HTTP_SERVER_PARSERS_SRCS server_request_parsers_impl.cpp)
add_library(cppnetlib-http-server-parsers ${CPP-NETLIB_HTTP_SERVER_PARSERS_SRCS})
foreach (src_file ${CPP-NETLIB_HTTP_SERVER_PARSERS_SRCS})
if (${CMAKE_CXX_COMPILER_ID} MATCHES GNU)
set_source_files_properties(${src_file}
PROPERTIES COMPILE_FLAGS "-Wall -Werror")
endif()
endforeach(src_file)

set(CPP-NETLIB_HTTP_SERVER_SRCS
http/server_async_impl.cpp)
add_library(cppnetlib-http-servers ${CPP-NETLIB_HTTP_SERVER_SRCS})
foreach (src_file ${CPP-NETLIB_HTTP_SERVER_SRCS})
if (${CMAKE_CXX_COMPILER_ID} MATCHES GNU)
set_source_files_properties(${src_file}
Expand Down
7 changes: 7 additions & 0 deletions libs/network/src/http/server_async_impl.cpp
@@ -0,0 +1,7 @@
// Copyright 2012 Dean Michael Berris <dberris@google.com>.
// Copyright 2012 Google, Inc.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)

#include <boost/network/protocol/http/server/async_impl.ipp>
6 changes: 5 additions & 1 deletion libs/network/test/http/CMakeLists.txt
Expand Up @@ -84,7 +84,11 @@ if (Boost_FOUND)
PROPERTIES COMPILE_FLAGS "-Wall")
endif()
add_executable(cpp-netlib-http-${test} ${test}.cpp)
target_link_libraries(cpp-netlib-http-${test} ${Boost_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} cppnetlib-server-parsers)
target_link_libraries(cpp-netlib-http-${test}
${Boost_LIBRARIES}
${CMAKE_THREAD_LIBS_INIT}
cppnetlib-http-servers
cppnetlib-http-server-parsers)
set_target_properties(cpp-netlib-http-${test}
PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CPP-NETLIB_BINARY_DIR}/tests)
add_test(cpp-netlib-http-${test}
Expand Down

0 comments on commit 1b765f0

Please sign in to comment.