Skip to content

Commit

Permalink
Split recipe03 in chapter06 into two
Browse files Browse the repository at this point in the history
  • Loading branch information
apolukhin committed May 23, 2017
1 parent 96f30ac commit 8896513
Show file tree
Hide file tree
Showing 11 changed files with 211 additions and 175 deletions.
Expand Up @@ -2,11 +2,16 @@ if (!include(../../config.txt)) {
error("Failed to open config.txt")
}

# We are NOT building an executable
TEMPLATE -= app

# ... we are building a library.
TEMPLATE = lib

HEADERS += \
../01_tasks_processor_base/tasks_processor_base.hpp \
../02_tasks_processor_timers/tasks_processor_timers.hpp \
tasks_processor_network.hpp
tasks_processor_network_client.hpp

SOURCES += main.cpp
SOURCES += client.cpp
QMAKE_CXXFLAGS += $$CPP11FLAG
!msvc:LIBS += -lboost_thread -lboost_system
54 changes: 54 additions & 0 deletions Chapter06/03_tasks_processor_network_client/client.cpp
@@ -0,0 +1,54 @@
// Big part of code for this recipe
// in in this header:
#include "tasks_processor_network_client.hpp"
#include "client.hpp"
using namespace tp_network_client;

bool g_authed = false;

void process_server_response(
connection_ptr&& soc,
const boost::system::error_code& err)
{
if (err && err != boost::asio::error::eof) {
std::cerr << "process_server_response: Client error on receive: " << err.message() << '\n';
assert(false);
}

if (soc->data.size() != 2) {
std::cerr << "process_server_response: wrong bytes count\n";
assert(false);
}

if (soc->data != "OK") {
std::cerr << "process_server_response: wrong response: " << soc->data << '\n';
assert(false);
}

g_authed = true;
soc->shutdown();
tasks_processor::stop();
}

void receive_auth_response(connection_ptr&& soc, const boost::system::error_code& err) {
if (err) {
std::cerr << "receive_auth_response: error on sending data: " << err.message() << '\n';
assert(false);
}

async_read_data(
std::move(soc),
&process_server_response,
2
);
}

void send_auth() {
connection_ptr soc = tasks_processor::create_connection("127.0.0.1", g_port_num);
soc->data = "auth_name";

async_write_data(
std::move(soc),
&receive_auth_response
);
}
5 changes: 5 additions & 0 deletions Chapter06/03_tasks_processor_network_client/client.hpp
@@ -0,0 +1,5 @@
// Helper heade file
extern bool g_authed;
const unsigned short g_port_num = 65001;

void send_auth();
@@ -1,13 +1,12 @@
#ifndef BOOK_CHAPTER6_TASK_PROCESSOR_NETWORK_HPP
#define BOOK_CHAPTER6_TASK_PROCESSOR_NETWORK_HPP
#ifndef BOOK_CHAPTER6_TASK_PROCESSOR_NETWORK_CLIENT_HPP
#define BOOK_CHAPTER6_TASK_PROCESSOR_NETWORK_CLIENT_HPP

#include "../02_tasks_processor_timers/tasks_processor_timers.hpp"

#include <boost/asio/ip/tcp.hpp>
#include <boost/core/noncopyable.hpp>

#include <memory> // std::unique_ptr

struct connection_with_data {
struct connection_with_data: boost::noncopyable {
boost::asio::ip::tcp::socket socket;
std::string data;

Expand All @@ -30,6 +29,9 @@ struct connection_with_data {
}
};


#include <memory> // std::unique_ptr

typedef std::unique_ptr<connection_with_data> connection_ptr;


Expand All @@ -46,16 +48,22 @@ struct task_wrapped_with_connection {
{}

void operator()(const boost::system::error_code& error, std::size_t bytes_count) {
const auto task = detail::make_task_wrapped([this, &error, bytes_count]() {
const auto lambda = [this, &error, bytes_count]() {
this->c_->data.resize(bytes_count);
this->task_unwrapped_(std::move(this->c_), error);
});
};

const auto task = detail::make_task_wrapped(lambda);

task();
}
};

#include <boost/asio/write.hpp>

template <class T>
struct task_wrapped_with_connection;

template <class Functor>
void async_write_data(connection_ptr&& c, const Functor& f) {
boost::asio::ip::tcp::socket& s = c->socket;
Expand All @@ -69,6 +77,7 @@ void async_write_data(connection_ptr&& c, const Functor& f) {
}

#include <boost/asio/read.hpp>

template <class Functor>
void async_read_data(connection_ptr&& c, const Functor& f, std::size_t at_least_bytes) {
c->data.resize(at_least_bytes);
Expand Down Expand Up @@ -101,78 +110,15 @@ void async_read_dataat_least(connection_ptr&& c, const Functor& f, std::size_t a
);
}

#include <boost/function.hpp>
namespace tp_network {
namespace tp_network_client {

class tasks_processor: public tp_timers::tasks_processor {
typedef boost::asio::ip::tcp::acceptor acceptor_t;
typedef boost::function<void(connection_ptr, const boost::system::error_code&)> on_accpet_func_t;

struct tcp_listener {
acceptor_t acceptor_;
const on_accpet_func_t func_;
connection_ptr new_c_;

template <class Functor>
tcp_listener(
boost::asio::io_service& io_service,
unsigned short port,
const Functor& task_unwrapped)
: acceptor_(io_service, boost::asio::ip::tcp::endpoint(
boost::asio::ip::tcp::v4(), port
))
, func_(task_unwrapped)
{}
};
typedef std::unique_ptr<tcp_listener> listener_ptr;

struct handle_accept {
listener_ptr listener;

explicit handle_accept(listener_ptr&& l)
: listener(std::move(l))
{}

void operator()(const boost::system::error_code& error) {
task_wrapped_with_connection<on_accpet_func_t> task(std::move(listener->new_c_), listener->func_);
if (error) {
std::cerr << error << '\n';
}

start_accepting_connection(std::move(listener));
task(error, 0);
}
};

static void start_accepting_connection(listener_ptr&& listener) {
if (!listener->acceptor_.is_open()) {
return;
}

listener->new_c_.reset(
new connection_with_data(listener->acceptor_.get_io_service())
);

boost::asio::ip::tcp::socket& s = listener->new_c_->socket;
acceptor_t& a = listener->acceptor_;
a.async_accept(
s,
tasks_processor::handle_accept(std::move(listener))
);
}
// ...

public:
template <class Functor>
static void add_listener(unsigned short port_num, const Functor& f) {
std::unique_ptr<tcp_listener> listener(
new tcp_listener(get_ios(), port_num, f)
);

start_accepting_connection(std::move(listener));
}

static connection_ptr create_connection(const char* addr, unsigned short port_num) {
connection_ptr c( new connection_with_data(get_ios()) );

c->socket.connect(boost::asio::ip::tcp::endpoint(
boost::asio::ip::address_v4::from_string(addr),
port_num
Expand All @@ -182,6 +128,6 @@ class tasks_processor: public tp_timers::tasks_processor {
}
};

} // namespace tp_network
} // namespace tp_network_cleint

#endif // BOOK_CHAPTER6_TASK_PROCESSOR_NETWORK_HPP
@@ -0,0 +1,17 @@
if (!include(../../config.txt)) {
error("Failed to open config.txt")
}

HEADERS += \
../01_tasks_processor_base/tasks_processor_base.hpp \
../02_tasks_processor_timers/tasks_processor_timers.hpp \
../03_tasks_processor_network_client/tasks_processor_network_client.hpp \
../03_tasks_processor_network_client/client.hpp \
04_tasks_processor_network_accept.hpp

SOURCES += \
../03_tasks_processor_network_client/client.cpp \
main.cpp

QMAKE_CXXFLAGS += $$CPP11FLAG
!msvc:LIBS += -lboost_thread -lboost_system
@@ -1,6 +1,7 @@
// Big part of code for this recipe
// in in this header:
#include "tasks_processor_network.hpp"
#include "tasks_processor_network_accept.hpp"
#include "../03_tasks_processor_network_client/client.hpp"
using namespace tp_network;

class authorizer {
Expand Down Expand Up @@ -43,59 +44,9 @@ class authorizer {
}
};

bool g_authed = false;

void finsh_socket_auth_task(
connection_ptr&& soc,
const boost::system::error_code& err)
{
if (err && err != boost::asio::error::eof) {
std::cerr << "finsh_socket_auth_task: Client error on recieve: " << err.message() << '\n';
assert(false);
}

if (soc->data.size() != 2) {
std::cerr << "finsh_socket_auth_task: wrong bytes count\n";
assert(false);
}

if (soc->data != "OK") {
std::cerr << "finsh_socket_auth_task: wrong response: " << soc->data << '\n';
assert(false);
}

g_authed = true;
soc->shutdown();
tasks_processor::stop();
}

void recieve_auth_task(connection_ptr&& soc, const boost::system::error_code& err) {
if (err) {
std::cerr << "recieve_auth_task: Client error on recieve: " << err.message() << '\n';
assert(false);
}

async_read_data(
std::move(soc),
&finsh_socket_auth_task,
2
);
}

const unsigned short g_port_num = 65001;

void send_auth_task() {
connection_ptr soc = tasks_processor::create_connection("127.0.0.1", g_port_num);
soc->data = "auth_name";

async_write_data(
std::move(soc),
&recieve_auth_task
);
}

int main() {
tasks_processor::run_delayed(boost::posix_time::seconds(1), &send_auth_task);
tasks_processor::run_delayed(boost::posix_time::seconds(1), &send_auth);
tasks_processor::add_listener(g_port_num, &authorizer::on_connection_accpet);
assert(!g_authed);

Expand Down

0 comments on commit 8896513

Please sign in to comment.