Skip to content

Latest commit

 

History

History
1698 lines (1244 loc) · 59.3 KB

File metadata and controls

1698 lines (1244 loc) · 59.3 KB

六、操作任务

在本章中,我们将介绍:

  • 为任意数据类型处理注册任务
  • 将计时器和处理计时器事件作为任务
  • 作为任务的网络通信
  • 接受传入连接
  • 并行执行不同的任务
  • 流水线任务处理
  • 制造非阻塞屏障
  • 存储异常并从中创建任务
  • 获取和处理系统信号作为任务

介绍

这一章是关于任务的。我们将功能对象称为任务,因为它更短,更好地反映了它将做什么。这一章的主要思想是,我们可以将所有的处理、计算和交互分解为函子(任务),并且几乎独立地处理这些任务中的每一个。此外,我们可能不会阻止一些缓慢的操作,例如从套接字接收数据或等待超时,而是提供一个回调任务并继续处理其他任务。一旦操作系统完成这个缓慢的操作,我们的回调就被执行了。

The best way to understand the example is to play with it by modifying, running, and extending it. The site, http://apolukhin.github.io/Boost-Cookbook/, has all the examples from this chapter, and you can even play with some of them online.

开始之前

本章要求至少具备第一、第二和第五章的基本知识。需要 C++ 11 右值引用和 lambdas 的基础知识。

为任意数据类型处理注册任务

首先,让我们关注保存所有任务并为其执行提供方法的类。我们已经在第 5 章 *【多线程,创建工作队列类】*食谱中做了类似的事情,但是以下一些问题没有得到解决:

  • 一个work_queue类只是存储和返回任务,但是我们还需要执行现有的任务。
  • 任务可能会引发异常。如果异常离开了任务边界,我们需要捕捉并处理它们。
  • 任务可能不会注意到线程中断。同一线程上的下一个任务可能会被中断。
  • 我们需要一种方法来停止任务的处理。

准备好

该配方需要与boost_systemboost_thread库链接。还需要具备Boost.Thread的基本知识。

怎么做...

在这个食谱中,我们用boost::asio::io_service代替了上一章的work_queue。这样做是有原因的,我们会在下面的食谱中看到。

  1. 让我们从围绕用户任务的结构开始:
#include <boost/thread/thread.hpp>
#include <iostream>

namespace detail {

template <class T>
struct task_wrapped {
private:
    T task_unwrapped_;

public:
    explicit task_wrapped(const T& f)
        : task_unwrapped_(f)
    {}

    void operator()() const {
        // Resetting interruption.
        try {
            boost::this_thread::interruption_point();
        } catch(const boost::thread_interrupted&){}

        try {
            // Executing task.
            task_unwrapped_();
        } catch (const std::exception& e) {
            std::cerr<< "Exception: " << e.what() << '\n';
        } catch (const boost::thread_interrupted&) {
            std::cerr<< "Thread interrupted\n";
        } catch (...) {
            std::cerr<< "Unknown exception\n";
        }
    }
};

} // namespace detail
  1. 为了便于使用,我们将创建一个从用户的函子中产生task_wrapped的函数:
namespace detail {

template <class T>
task_wrapped<T> make_task_wrapped(const T& task_unwrapped) {
    return task_wrapped<T>(task_unwrapped);
}

} // namespace detail
  1. 现在,我们准备写tasks_processor课:
#include <boost/asio/io_service.hpp> 

class tasks_processor: private boost::noncopyable {
protected:
    static boost::asio::io_service& get_ios() {
        static boost::asio::io_service ios;
        static boost::asio::io_service::work work(ios);

        return ios;
    }
  1. 让我们添加push_task方法:
public:
    template <class T>
    static void push_task(const T& task_unwrapped) {
        get_ios().post(detail::make_task_wrapped(task_unwrapped));
    }
  1. 让我们通过添加用于启动和停止任务执行循环的成员函数来结束这个类:
    static void start() {
        get_ios().run();
    }

    static void stop() {
        get_ios().stop();
    }
}; // tasks_processor

搞定了。现在,是时候测试我们的班级了:

int func_test() {
    static int counter = 0;
    ++ counter;
    boost::this_thread::interruption_point();

    switch (counter) {
    case 3:
        throw std::logic_error("Just checking");

    case 10:
        // Emulation of thread interruption.
        // Caught inside task_wrapped and does not stop execution.
        throw boost::thread_interrupted();

    case 90:
        // Stopping the tasks_processor.
        tasks_processor::stop();
    }

    return counter;
}

main功能可能如下所示:

int main () {
    for (std::size_t i = 0; i < 100; ++ i) {
        tasks_processor::push_task(&func_test);
    }

    // Processing was not started.
    assert(func_test() == 1);

    // We can also use lambda as a task.
    // Counting 2 + 2 asynchronously.
    int sum = 0;
    tasks_processor::push_task(
        [&sum]() { sum = 2 + 2; }
    );

    // Processing was not started.
    assert(sum == 0);

    // Does not throw, but blocks till
    // one of the tasks it is owning
    // calls tasks_processor::stop().
    tasks_processor::start();
    assert(func_test() == 91);
}

它是如何工作的...

boost::asio::io_service变量可以存储和执行发布给它的任务。但是我们可能不会直接向它发布用户的任务,因为他们可能会收到针对其他任务的中断或抛出异常。这就是为什么我们用detail::task_wrapped结构包装用户的任务。它通过调用以下命令重置所有先前的中断:

try { 
    boost::this_thread::interruption_point(); 
} catch(const boost::thread_interrupted&){}

detail::task_wrapped执行try{ } catch()块中的任务,确保没有异常离开operator()边界。

看一下start()功能。boost::asio::io_service::run()开始处理发布到io_service变量的任务。如果不调用boost::asio::io_service::run(),则不执行发布的任务(这可以在main()功能中看到)。可以通过调用boost::asio::io_service::stop()来停止任务处理。

如果没有剩余任务,则boost::asio::io_service类从run()函数返回,因此我们使用boost::asio::io_service::work的实例强制它继续执行:

static boost::asio::io_service& get_ios() {
    static boost::asio::io_service ios;
    static boost::asio::io_service::work work(ios);

    return ios;
}

The iostream classes and variables, such as std::cerr and std::cout are not thread safe on pre C++ 11 compilers and may produce interleaved characters on C++ 11 compatible compilers. In real projects, additional synchronization must be used to get readable output. For the simplicity of an example, we do not do that.

还有更多...

C++ 17 标准库没有io_service。然而,Boost.Asio库的很大一部分被提议作为网络技术规范 ( TS )作为 C++ 的补充。

请参见

  • 本章以下食谱将向您展示为什么我们选择boost::asio::io_service而不是使用我们来自第 5 章 【多线程】 的手写代码
  • 您可以参考Boost.Asio的文档,在http://boost.org/libs/asio获取一些示例、教程和类参考
  • 您也可以阅读 Boost。Asio C++ 网络编程一书,对Boost.Asio的介绍比较流畅,涵盖了一些本书没有涉及的细节

将计时器和处理计时器事件作为任务

以特定的时间间隔检查某物是一项常见的任务。例如,我们需要每 5 秒钟检查一次活动的某些会话。对于这样的问题,有一些流行的解决方案:

  • 坏的解决方案会创建一个线程来执行检查,然后休眠 5 秒钟。这是一个蹩脚的解决方案,消耗了大量的系统资源,并且扩展性很差。
  • 正确的解决方案使用系统特定的 API 来异步操作计时器。这是一个更好的解决方案,需要一些工作,不便携,除非你使用Boost.Asio

准备好

你必须知道如何使用 C++ 11 右值引用和unique_ptr

该配方基于上一个配方的代码。参见本章第一个食谱,获取boost::asio::io_servicetask_queue课程的信息。

将该配方与boost_systemboost_thread库链接。定义BOOST_ASIO_DISABLE_HANDLER_TYPE_REQUIREMENTS绕过限制性的库检查。

怎么做...

我们只是通过添加新的方法来修改tasks_processor类,以便在某个指定的时间运行任务。

  1. 让我们为我们的tasks_processor类添加一个延迟运行任务的方法:
class tasks_processor {
    // ...
public:
    template <class Time, class Func>
    static void run_delayed(Time duration_or_time, const Func& f) {
        std::unique_ptr<boost::asio::deadline_timer> timer(
            new boost::asio::deadline_timer(
                get_ios(), duration_or_time
            )
        );

        timer_ref.async_wait(
            detail::timer_task<Func>(
                std::move(timer),
                f
            )
        );
    }
};
  1. 作为最后一步,我们创建一个timer_task结构:
#include <boost/asio/io_service.hpp>
#include <boost/asio/deadline_timer.hpp>
#include <boost/system/error_code.hpp>
#include <memory>  // std::unique_ptr
#include <iostream>

namespace detail {

    template <class Functor>
    struct timer_task {
    private:
        std::unique_ptr<boost::asio::deadline_timer> timer_;
        task_wrapped<Functor> task_;

    public:
        explicit timer_task(
                std::unique_ptr<boost::asio::deadline_timer> timer,
                const Functor& task_unwrapped)
            : timer_(std::move(timer))
            , task_(task_unwrapped)
        {}

        void operator()(const boost::system::error_code& error) const {
            if (!error) {
                task_();
            } else {
                std::cerr << error << '\n';
            }
        }
    };

} // namespace detail

这就是我们使用新功能的方式:

int main () {
    const int seconds_to_wait = 3;
    int i = 0;

    tasks_processor::run_delayed(
        boost::posix_time::seconds(seconds_to_wait),
        test_functor(i)
    );

    tasks_processor::run_delayed(
        boost::posix_time::from_time_t(time(NULL) + 1),
        &test_func1
    );

    assert(i == 0);

    // Blocks till one of the tasks
    // calls tasks_processor::stop().
    tasks_processor::start();
}

其中test_functor是定义了operator()的结构,test_func1是函数:

struct test_functor {
    int& i_;

    explicit test_functor(int& i);

    void operator()() const {
        i_ = 1;
        tasks_processor::stop();
    }
};

void test_func1();

它是如何工作的...

简而言之,当经过指定的时间后,boost::asio::deadline_timer将任务推送到boost::asio::io_service类的实例中执行。

所有讨厌的东西都在run_delayed功能中:

    template <class Time, class Functor>
    static void run_delayed(Time duration_or_time, const Functor& f) {
        std::unique_ptr<boost::asio::deadline_timer> 
        timer( /* ... */ );

        boost::asio::deadline_timer& timer_ref = *timer;

        timer_ref.async_wait(
            detail::timer_task<Functor>(
                std::move(timer),
                f
            )
        );
    }

tasks_processor::run_delayed函数接受超时和超时后要调用的函子。其中,创建了一个指向boost::asio::deadline_timer的唯一指针。boost::asio::deadline_timer保存特定于平台的东西,用于异步执行任务。

Boost.Asio does not manage memory out of the box. The library user has to take care of managing resources usually by keeping them in the task. So if we need a timer and want some function to execute after the specified timeout, we have to move the timer's unique pointer into the task, get a reference to the timer, and pass a task to the timer.

我们得到了对这一行中deadline_timer的引用:

boost::asio::deadline_timer& timer_ref = *timer;

现在,我们创建一个detail::timer_task对象,它存储一个函子并获得unique_ptr<boost::asio::deadline_timer>的所有权:

            detail::timer_task<Functor>(
                std::move(timer),
                f
            )

boost::asio::deadline_timer在被触发之前不能被破坏,将其移入timer_task函子保证了这一点。

最后,当请求的时间过去后,我们指示boost::asio::deadline_timertimer_task函子发布到io_service:

timer_ref.async_wait( /* timer_task */ )

io_service变量的引用保存在boost::asio::deadline_timer变量中。这就是为什么它的构造器需要一个对io_service的引用来存储它,并且一旦超时就将任务发布给它。

detail::timer_task::operator()方法接受boost::system::error_code,如果等待时发生了不好的事情,则包含错误描述。如果没有错误发生,我们调用用户的 functor,它被包装以捕捉异常(我们重复使用第一个配方中的detail::task_wrapped结构)。

boost::asio::deadline_timer::async_wait等待超时时不消耗 CPU 资源或执行线程。您可以简单地将一些进一步推入io_service中,它们将在操作系统保持超时的同时开始执行:

As a rule of thumb: all the resources that are used during the async_* calls must be stored in the task.

还有更多...

一些奇特/古老的平台没有 API 来很好地实现定时器,因此Boost.Asio库使用每个io_service的额外执行线程来模拟异步定时器的行为。没别的办法了。

C++ 17 里面没有Boost.Asio类;然而,网络终端服务有async_waittimer两个等级。

请参见

  • 阅读本章的第一个食谱将教会你boost::asio::io_service的基础知识。以下食谱将为您提供更多io_service用法的例子,并向您展示如何使用Boost.Asio处理网络通信、信号和其他功能。
  • 您可以参考Boost.Asio的文档,在http://boost.org/libs/asio网站获取一些示例、教程和课程参考。

作为任务的网络通信

通过网络接收或发送数据是一项缓慢的操作。当机器接收数据包时,当操作系统验证数据包并将数据复制到用户指定的缓冲区时,可能需要几秒钟。

我们可以做很多工作而不是等待!让我们修改我们的tasks_processor类,使它能够以异步方式发送和接收数据。在非技术术语中,我们要求它从远程主机接收至少 N 字节,完成后,调用我们的函子。顺便说一句,不要阻止这个电话。那些了解libev****libevent或者 Node.js 的读者可能会在这个食谱中找到很多熟悉的东西。

准备好

这个食谱是基于前面两个食谱。参见本章第一个食谱,获取boost::asio::io_servicetask_queue课程的信息。参见第二个配方,回顾异步处理的基础。

将该配方与boost_systemboost_thread库链接。定义BOOST_ASIO_DISABLE_HANDLER_TYPE_REQUIREMENTS绕过限制性的图书馆检查。

怎么做...

让我们通过添加创建连接的方法来扩展上一个方法的代码。

  1. 一个连接将由一个connection_with_data类来表示。这个类保存到远程主机的套接字和一个用于接收和发送数据的std::string:
#include <boost/asio/ip/tcp.hpp>
#include <boost/core/noncopyable.hpp>

struct connection_with_data: boost::noncopyable {
    boost::asio::ip::tcp::socket socket;
    std::string data;

    explicit connection_with_data(boost::asio::io_service& ios)
        : socket(ios) 
    {}

    void shutdown() {
        if (!socket.is_open()) {
            return;
        }

        boost::system::error_code ignore;
        socket.shutdown(
            boost::asio::ip::tcp::socket::shutdown_both,
            ignore
        );
        socket.close(ignore);
    }

    ~connection_with_data() {
        shutdown();
    }
};
  1. 就像在前面的方法中一样,类主要由指向它的唯一指针使用。为了简单起见,我们添加一个typedef:
#include <memory> // std::unique_ptr

typedef std::unique_ptr<connection_with_data> connection_ptr;
  1. 上一个配方中的tasks_processor类拥有boost::asio::io_service对象。让它成为构建连接的工厂似乎是合理的:
class tasks_processor {
    // ...
public:
    static connection_ptr create_connection(
        const char* addr,
        unsigned short port_num)
    {
        connection_ptr c( new connection_with_data(get_ios()) );

        c->socket.connect(boost::asio::ip::tcp::endpoint(
            boost::asio::ip::address_v4::from_string(addr),
            port_num
        ));

        return c;
    }
};
  1. 以下是将数据异步写入远程主机的方法:
#include <boost/asio/write.hpp>

template <class T>
struct task_wrapped_with_connection;

template <class Functor>
void async_write_data(connection_ptr&& c, const Functor& f) {
    boost::asio::ip::tcp::socket& s = c->socket;
    std::string& d = c->data;

    boost::asio::async_write(
        s,
        boost::asio::buffer(d),
        task_wrapped_with_connection<Functor>(std::move(c), f)
    );
}
  1. 以下是从远程主机异步读取数据的方法:
#include <boost/asio/read.hpp>

template <class Functor>
void async_read_data(
    connection_ptr&& c,
    const Functor& f,
    std::size_t at_least_bytes)
{
    c->data.resize(at_least_bytes);
    c->data.resize(at_least_bytes);

    boost::asio::ip::tcp::socket& s = c->socket;
    std::string& d = c->data;
    char* p = (d.empty() ? 0 : &d[0]);

    boost::asio::async_read(
        s,
        boost::asio::buffer(p, d.size()),
        task_wrapped_with_connection<Functor>(std::move(c), f)
    );
}

template <class Functor>
void async_read_data_at_least(
    connection_ptr&& c,
    const Functor& f,
    std::size_t at_least_bytes,
    std::size_t at_most)
{
    std::string& d = c->data;
    d.resize(at_most);
    char* p = (at_most == 0 ? 0 : &d[0]);

    boost::asio::ip::tcp::socket& s = c->socket;

    boost::asio::async_read(
        s,
        boost::asio::buffer(p, at_most),
        boost::asio::transfer_at_least(at_least_bytes),
        task_wrapped_with_connection<Functor>(std::move(c), f)
    );
}
  1. 最后一部分是task_wrapped_with_connection类定义:
template <class T>
struct task_wrapped_with_connection {
private:
    connection_ptr c_;
    T task_unwrapped_;

public:
    explicit task_wrapped_with_connection
    (connection_ptr&& c, const T& f)
        : c_(std::move(c))
        , task_unwrapped_(f)
    {}

    void operator()(
        const boost::system::error_code& error,
        std::size_t bytes_count)
    {
        c_->data.resize(bytes_count);
        task_unwrapped_(std::move(c_), error);
    }
};

搞定了。现在,库用户可以像这样使用前面的类发送数据:

void send_auth() {
    connection_ptr soc = tasks_processor::create_connection(
        "127.0.0.1", g_port_num
    );
    soc->data = "auth_name";

    async_write_data(
        std::move(soc),
        &on_send
    );
}

用户也可以这样使用它来接收数据:

void receive_auth_response(
    connection_ptr&& soc,
    const boost::system::error_code& err)
{
    if (err) {
        std::cerr << "Error on sending data: " 
        << err.message() << '\n';
        assert(false);
    }

    async_read_data(
        std::move(soc),
        &process_server_response,
        2
    );
}

库用户可以这样处理接收到的数据:

void process_server_response(
        connection_ptr&& soc,
        const boost::system::error_code& err)
{
    if (err && err != boost::asio::error::eof) {
        std::cerr << "Client error on receive: "
        << err.message() << '\n';
        assert(false);
    }

    if (soc->data.size() != 2) {
        std::cerr << "Wrong bytes count\n";
        assert(false);
    }

    if (soc->data != "OK") {
        std::cerr << "Wrong response: " << soc->data << '\n';
        assert(false);
    }

    soc->shutdown();
    tasks_processor::stop();
}

它是如何工作的...

Boost.Asio库不管理开箱即用的资源和缓冲区。因此,如果我们想要一些简单的接口来读写数据,最简单的解决方案是将发送/接收数据的套接字和缓冲区绑定在一起。这就是connection_with_data班的工作。它包含一个boost::asio::ip::tcp::socket,这是一个围绕本地套接字的Boost.Asio包装器和一个我们用作缓冲区的std::string变量。

一个boost::asio::ip::tcp::socket类的构造函数接受boost::asio::io_service作为Boost.Asio的几乎所有类。创建套接字后,它必须连接到某个远程端点:

        c->socket.connect(boost::asio::ip::tcp::endpoint(
            boost::asio::ip::address_v4::from_string(addr),
            port_num
        ));

看一下书写功能。它接受指向connection_with_data类和函子f的唯一指针:

#include <boost/asio/write.hpp>

template <class Functor>
void async_write_data(connection_ptr&& c, const Functor& f) {

在其中,我们获得了对套接字和缓冲区的引用:

boost::asio::ip::tcp::socket& s = c->socket;
std::string& d = c->data;

然后,我们要求异步写入:

    boost::asio::async_write(
        s,
        boost::asio::buffer(d),
        task_wrapped_with_connection<Functor>(std::move(c), f)
    );
}

所有有趣的事情都发生在boost::asio::async_write函数中。就像定时器一样,异步调用立即返回,而不执行函数。它只告诉在一些操作完成后将回调任务发布到boost::asio::io_service(在我们的例子中,它正在向套接字写入数据)。boost::asio::io_service在一个调用io_service::run()方法的线程中执行我们的函数。下图说明了这一点:

现在,来看看task_wrapped_with_connection::operator()。它接受const boost::system::error_code& errorstd::size_t bytes_count,因为boost::asio::async_writeboost::asio::async_read函数都在异步操作完成时传递这些参数。对c_->data.resize(bytes_count);的调用会调整缓冲区的大小,使其仅包含接收/写入的数据。最后,我们调用最初传递给async函数并存储为task_unwrapped_的回调。

那是怎么回事?这就是发送数据的简单方法!现在,我们有了一个async_write_data函数,它将数据从缓冲区异步写入套接字,并在操作完成时执行回调:

void on_send(connection_ptr&& soc, const boost::system::
error_code& err);

void connect_and_send() {
    connection_ptr s = tasks_processor::create_connection
    ("127.0.0.1", 80);

    s->data = "data_to_send";
    async_write_data(
        std::move(s),
        &on_send
    );
}

async_read_dataasync_write_data很近。它调整缓冲区的大小,创建一个task_wrapped_with_connection函数,并在异步操作完成时将其推入is_service

注意async_read_data_at_least功能。在它的身体里,对boost::asio::async_read有一个稍微不同的称呼:

boost::asio::async_read(
    s,
    boost::asio::buffer(p, at_most),
    boost::asio::transfer_at_least(at_least_bytes),
    task_wrapped_with_connection<Functor>(std::move(c), f)
);

里面有一个boost::asio::transfer_at_least(al_least_bytes)Boost.Asio有很多自定义读写的函子。这一个函子说,在调用回调之前至少传输 at_least_bytes 字节。更多字节是可以的,直到它们适合缓冲区

最后,让我们看一下其中一个回调:

void process_server_response(
        connection_ptr&& soc,
        const boost::system::error_code& err);

在这个例子中,回调必须接受connection_ptr和一个boost::system::error_code变量。一个boost::system::error_code变量保存关于错误的信息。它有一个到bool运算符的显式转换,所以检查错误的简单方法就是编写if (err) { ... }。如果遥控器结束传输并关闭插座,err可能包含boost::asio::error::eof错误代码。这并不总是坏事。在我们的示例中,我们将其视为非错误行为:

    if (err && err != boost::asio::error::eof) {
        std::cerr << "Client error on receive: " 
        << err.message() << '\n';
        assert(false);
    }

因为我们已经将套接字和缓冲区绑定在一起,所以您可以从soc->data获得接收到的数据:

if (soc->data.size() != 2) {
    std::cerr << "Wrong bytes count\n";
    assert(false);
}

if (soc->data != "OK") {
    std::cerr << "Wrong response: " << soc->data << '\n';
    assert(false);
}

The soc->shutdown() call is optional, because when soc goes out of scope, the destructor for it is called. Destructor of unique_ptr<connection_with_data> calls ~connection_with_data that has a shutdown() in its body.

还有更多...

我们的task_wrapped_with_connection::operator()不够好!用户提供了task_unwrapped_回调我的抛出异常,并且可能被不属于该特定任务的Boost.Thread中断所中断。修复方法是将回调包装到第一个配方的类中:

void operator()(
    const boost::system::error_code& error,
    std::size_t bytes_count)
{
    const auto lambda = [this, &error, bytes_count]() {
        this->c_->data.resize(bytes_count);
        this->task_unwrapped_(std::move(this->c_), error);
    };

    const auto task = detail::make_task_wrapped(lambda);
    task();
}

task_wrapped_with_connection::operator()中,我们创建了一个名为lambda的λ函数。执行时,lambdaconnection_with_data类中的数据调整为bytes_count并调用最初传递的回调。最后,我们将第一个配方中的lambda包装到我们的安全执行任务中,然后执行它。

你可能会在网上看到很多Boost.Asio的例子。其中许多人使用shared_ptr而不是unique_ptr来保存数据。用shared_ptr的方法更容易实现;然而,它有两大缺点:

  • 效率:shared_ptr内部有一个原子计数器,从不同的线程修改它可能会显著降低性能。在接下来的一个食谱中,您将看到如何在多个线程中处理任务,这就是在高负载的情况下差异可能很明显的地方。
  • 显而易见:使用unique_ptr,您总是可以看到连接的所有权被转移到了某个地方(您可以在代码中看到std::move)。使用shared_ptr,你不能从界面上理解这个函数是获取了所有权还是仅仅使用了对一个对象的引用。

但是,如果根据应用的逻辑,所有权必须同时在多个任务之间共享,您可能会被迫使用shared_ptr

Boost.Asio不是 C++ 17 的一部分,但它很快将作为 Networking TS 发货,并被纳入即将推出的 C++ 标准之一。

请参见

  • 更多示例、教程、http://boost.org/libs/asio 的完整参考以及如何使用 UDP 或 ICMP 协议的示例,请参见Boost.Asio的官方文档。
  • 您也可以阅读 Boost。Asio C++ 网络编程一书,更详细地描述了Boost.Asio

接受传入连接

使用网络的服务器端通常看起来像一个序列,我们首先获得新的连接,读取数据,然后处理它,然后发送结果。想象一下,我们正在创建某种授权服务器,它必须每秒处理大量请求。在这种情况下,我们需要在多个线程中接受、接收、异步发送和处理任务。

在这个食谱中,我们将看到如何扩展我们的tasks_processor类来接受和处理传入的连接,在下一个食谱中,我们将看到如何使它多线程化。

准备好

这个食谱需要对本章第一个食谱中描述的boost::asio::io_service基础知识有很好的了解。一些关于网络通信的知识会对你有所帮助。还需要了解boost::function和至少两个以前食谱的信息。将此食谱与boost_systemboost_thread库链接。定义BOOST_ASIO_DISABLE_HANDLER_TYPE_REQUIREMENTS绕过限制性的图书馆检查。

怎么做...

就像之前的食谱一样,我们给tasks_processor类增加了新的方法。

  1. 我们从在tasks_processor中添加一些typedefs开始:
class tasks_processor {
    typedef boost::asio::ip::tcp::acceptor acceptor_t;

    typedef boost::function<
        void(connection_ptr, const boost::system::error_code&)
    > on_accpet_func_t;
  1. 让我们添加一个类,它将新的传入连接的套接字、要监听的套接字和用户提供的用于处理新连接的回调绑定在一起:
private:
    struct tcp_listener {
        acceptor_t              acceptor_;
        const on_accpet_func_t  func_;
        connection_ptr          new_c_;

        template <class Functor>
        tcp_listener(
                boost::asio::io_service& io_service,
                unsigned short port,
                const Functor& task_unwrapped)
            : acceptor_(io_service, boost::asio::ip::tcp::endpoint(
                boost::asio::ip::tcp::v4(), port
            ))
            , func_(task_unwrapped)
        {}
    };

    typedef std::unique_ptr<tcp_listener> listener_ptr;
  1. 我们需要添加一个在指定端口开始监听的函数:
public:  
   template <class Functor>
    static void add_listener(unsigned short port_num, const Functor& f) {
        std::unique_ptr<tcp_listener> listener(
            new tcp_listener(get_ios(), port_num, f)
        );

        start_accepting_connection(std::move(listener));
    }
  1. 开始接受传入连接的函数:
private:
   static void start_accepting_connection(listener_ptr&& listener) {
        if (!listener->acceptor_.is_open()) {
            return;
        }

        listener->new_c_.reset(new connection_with_data(
            listener->acceptor_.get_io_service()
        ));

        boost::asio::ip::tcp::socket& s = listener->new_c_->socket;
        acceptor_t& a = listener->acceptor_;
        a.async_accept(
            s,
            tasks_processor::handle_accept(std::move(listener))
        );
    }
  1. 我们还需要一个处理新连接的函子:
private:
    struct handle_accept {
        listener_ptr listener;

        explicit handle_accept(listener_ptr&& l)
            : listener(std::move(l))
        {}

        void operator()(const boost::system::error_code& error) {
            task_wrapped_with_connection<on_accpet_func_t> task(
                std::move(listener->new_c_), listener->func_
            );

            start_accepting_connection(std::move(listener));
            task(error, 0);
        }
    };

搞定了。现在,我们可以通过以下方式接受连接:

class authorizer {
public:
    static void on_connection_accpet(
        connection_ptr&& connection,
        const boost::system::error_code& error)
    {
        assert(!error);
        // ...
    }
};

int main() {
    tasks_processor::add_listener(80, &authorizer::on_connection_accpet);
    tasks_processor::start();
}

它是如何工作的...

函数add_listener构建了新的tcp_listener,它保存了接受连接所需的所有东西。就像任何异步操作一样,我们需要在操作执行时保持资源活跃。一个独特的指向tcp_listener的指针可以完成这项工作。

当我们构造指定端点的boost::asio::ip::tcp::acceptor(参见步骤 3 )时,它在指定的地址打开一个套接字,并准备接受连接。

步骤 4 中,我们创建一个新的套接字,并为该新套接字调用async_accept。当一个新的连接到来时,listener->acceptor_将这个连接绑定到一个套接字,并将tasks_processor::handle_accept回调推入boost::asio::io_service。从之前的食谱中我们了解到,所有的async_*电话都会立即返回,async_accept并不是特例。

让我们仔细看看我们的handle_accept::operator()。在其中,我们从之前的配方中创建一个task_wrapped_with_connection函子,并在其中移动一个新的连接。现在,我们的listener_ptrnew_c_中没有插座,因为它属于函子。我们调用函数start_accepting_connection(std::move(listener)),它在listener->new_c_中创建新的套接字,并启动异步接受。异步接受操作不会阻塞,因此程序继续执行,从start_accepting_connection(std::move(listener))函数返回,并通过连接task(error, 0)执行函子。

You've made everything as shown in the example, but the performance of the server is not good enough. That's because the example is simplified and many optimizations left behind at the scene. The most significant one is to keep a separate small buffer in connection_with_data and use it for all the internal Boost.Asio's callback related allocations. See Custom memory allocation example in the official documentation of the Boost.Asio library for more information on this optimization topic.

当调用boost::asio::io_service的析构函数时,调用所有回调的析构函数。这使得tcp_connection_ptr的析构函数被调用,释放了资源。

还有更多...

我们没有使用boost::asio::ip::tcp::acceptor类的所有特性。如果我们提供特定的boost::asio::ip::tcp::endpoint,它可以绑定到特定的 IPv6 或 IPv4 地址。您也可以通过native_handle()方法获得一个本地套接字,并使用一些特定于操作系统的调用来调整行为。您可以通过呼叫set_optionacceptor_设置一些选项。例如,您可以这样强制acceptor_重用地址:

boost::asio::socket_base::reuse_address option(true); 
acceptor_.set_option(option); 

Reusing the address provides an ability to restart the server quickly after it was terminated without correct shutdown. After the server was terminated, a socket may be opened for some time, and you won't be able to start the server on the same address without the reuse_address option.

C++ 17 没有从Boost.Asio开始的类,但是带有大部分功能的联网 TS 即将到来。

请参见

  • 从头开始这一章是一个好主意,可以获得更多关于Boost.Asio的信息
  • 参见Boost.Asio的官方文档,了解更多示例、教程和在http://boost.org/libs/asio的完整参考

并行执行不同的任务

现在,是时候让我们的tasks_processor在多个线程中处理任务了。这能有多难?

入门指南

你需要阅读本章的第一个食谱。还需要一些多线程的知识,尤其是阅读操纵一组线程的食谱。

将该配方与boost_systemboost_thread库链接。定义BOOST_ASIO_DISABLE_HANDLER_TYPE_REQUIREMENTS绕过限制性的库检查。

怎么做...

我们所需要做的就是将start_multiple方法添加到我们的tasks_processor类中:

#include <boost/thread/thread.hpp> 

class tasks_processor {
public:
    // Default value will attempt to guess optimal count of threads.
    static void start_multiple(std::size_t threads_count = 0) {
        if (!threads_count) {
            threads_count = (std::max)(static_cast<int>(
                boost::thread::hardware_concurrency()), 1
            );
        }

        // First thread is the current thread.
        -- threads_count;

        boost::asio::io_service& ios = get_ios();
        boost::thread_group tg;
        for (std::size_t i = 0; i < threads_count; ++ i) {
            tg.create_thread([&ios]() { ios.run(); });
        }

        ios.run();
        tg.join_all();
    }
};

现在,我们可以做更多的工作,如下图所示:

它是如何工作的...

boost::asio::io_service::run方法是线程安全的。我们只需要从不同的线程运行boost::asio::io_service::run方法。

If you are executing tasks that modify a common resource, you need to add mutexes around that resources, or organize your application in a way, that the common resource is not used simultaneously by different tasks. It is safe to use resource from different tasks without concurrent access to the resource because boost::asio::io_service takes care of additional synchronization between tasks and forces the modification results of one task to be seen by another task.

参见对boost::thread::hardware_concurrency()的调用。它返回可以在当前硬件上并发运行的线程数。但是,这只是一个提示,有时它可能会返回一个0值,这就是为什么我们要为它调用std::max函数。std::max确保threads_count至少存储值1

We wrapped std::max in parentheses because some popular compilers define the min() and max() macros, so we need additional tricks to work around this.

还有更多...

boost::thread::hardware_concurrency()函数是 C++ 11 的一部分;您可以在std::名称空间的<thread>标题中找到它。

所有的boost::asio类都不是 C++ 17 的一部分,但是它们将很快作为联网 TS 提供。

请参见

流水线任务处理

有时,需要在指定的时间间隔内处理任务。与以前的食谱相比,我们试图按照任务在队列中出现的顺序来处理任务,这是一个很大的不同。

考虑一个例子,我们正在编写一个连接两个子系统的程序,其中一个子系统产生数据包,另一个子系统将修改后的数据写入磁盘(类似这样的情况可以在摄像机、录音机和其他设备中看到)。我们需要按照指定的顺序一个接一个地处理数据包,平滑且抖动很小,并且在多个线程中进行。

天真的方法在这里不起作用:

#include <boost/thread/thread.hpp>

subsystem1 subs1;
subsystem2 subs2;

void process_data() {
    while (!subs1.is_stopped()) {
        data_packet data = subs1.get_data();
        decoded_data d_decoded = decode_data(data);
        compressed_data c_data = compress_data(d_decoded);
        subs2.send_data(c_data);
    }
}

void run_in_multiple_threads() {
    boost::thread t(&process_data);
    process_data();

    t.join();
}

在多线程环境中,我们可以在第一个线程中获得包#1 ,然后在第二个执行线程中获得包#2 。由于处理时间不同,操作系统上下文切换和调度数据包#2 可能在数据包#1 之前处理。数据包和处理顺序没有保证。让我们解决这个问题!

准备好

理解这个例子需要第五章多线程中的制作工作队列食谱。代码必须链接到boost_threadboost_system库。

需要 C++ 11 的基础知识,尤其是 lambda 函数。

怎么做...

本食谱基于第 5 章多线程制作工作队列食谱中work_queue类的代码。我们将进行一些修改,并将使用该类的一些实例。

  1. 让我们从为数据解码、数据压缩和数据发送创建单独的队列开始:
work_queue decoding_queue, compressing_queue, sending_queue;
  1. 现在,是时候重构process_data并将其拆分为多个功能了:
void start_data_accepting();
void do_decode(const data_packet& packet);
void do_compress(const decoded_data& packet);

void start_data_accepting() {
    while (!subs1.is_stopped()) {
        data_packet packet = subs1.get_data();

        decoding_queue.push_task(
            [packet]() {
                do_decode(packet);
            }
        );
    }
}

void do_decode(const data_packet& packet) {
    decoded_data d_decoded = decode_data(packet);

    compressing_queue.push_task(
        [d_decoded]() {
            do_compress(d_decoded);
        }
    );
}

void do_compress(const decoded_data& packet) {
    compressed_data c_data = compress_data(packet);

    sending_queue.push_task(
        [c_data]() {
            subs2.send_data(c_data);
        }
    );
}
  1. 我们的work_queue类来自第 5 章多线程,获得了一些停止和运行任务的界面变化:
#include <deque>
#include <boost/function.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/locks.hpp>
#include <boost/thread/condition_variable.hpp>

class work_queue {
public:
    typedef boost::function<void()> task_type;

private:
    std::deque<task_type>       tasks_;
    boost::mutex                mutex_;
    boost::condition_variable   cond_;
    bool                        is_stopped_;

public:
    work_queue()
        : is_stopped_(false)
    {}

    void run();
    void stop();

    // Same as in Chapter 5, but with
    // rvalue references support.
    void push_task(task_type&& task);
};
  1. work_queue``stop()``run()功能的实现必须是这样的:
void work_queue::stop() {
    boost::lock_guard<boost::mutex> lock(mutex_);
    is_stopped_ = true;
    cond_.notify_all();
}

void work_queue::run() {
    while (1) {
        boost::unique_lock<boost::mutex> lock(mutex_);
        while (tasks_.empty()) {
            if (is_stopped_) {
                return;
            }
            cond_.wait(lock);
        }

        task_type t = std::move(tasks_.front());
        tasks_.pop_front();
        lock.unlock();

        t();
    }
}
  1. 仅此而已!现在,我们只需要启动管道:
#include <boost/thread/thread.hpp> 
int main() {
    boost::thread t_data_decoding(
        []() { decoding_queue.run(); }
    );
    boost::thread t_data_compressing(
        []() { compressing_queue.run(); }
    );
    boost::thread t_data_sending(
        []() { sending_queue.run(); }
    );

    start_data_accepting();
  1. 管道可以这样停止:
    decoding_queue.stop();
    t_data_decoding.join();

    compressing_queue.stop();
    t_data_compressing.join();

    sending_queue.stop();
    t_data_sending.join();

它是如何工作的...

诀窍是将单个数据包的处理分成一些同样小的子任务,并在不同的work_queues中逐一处理。在本例中,我们可以将数据处理分为数据解码、数据压缩和数据发送。

理想情况下,六个数据包的处理如下所示:

时间 接收 解码 压缩 发送
勾选 1: 数据包#1
勾选 2: 数据包#2 数据包#1
勾选 3: 数据包#3 数据包#2 数据包#1
打勾 4: 数据包#4 数据包#3 数据包#2 数据包#1
勾选 5: 数据包#5 数据包#4 数据包#3 数据包#2
勾选 6: 数据包#6 数据包#5 数据包#4 数据包#3
打勾 7: - 数据包#6 数据包#5 数据包#4
打勾 8: - - 数据包#6 数据包#5
打勾 9: - - - 数据包#6

然而,我们的世界并不理想,所以有些任务可能比其他任务完成得更快。例如,接收可能比解码更快,在这种情况下,解码队列将保存一组要完成的任务。为了避免队列溢出,请努力使每个后续任务比前一个任务稍快。

在我们的例子中,我们没有使用boost::asio::io_service,因为它不能保证发布的任务按照它们的邮资顺序执行。

还有更多...

在这个例子中,所有用来创建管道的工具都可以在 C++ 11 中使用,所以没有什么能阻止你在 C++ 11 兼容的编译器上创建同样的东西。但是,Boost 使您的代码在 C++ 11 之前的编译器上更加可移植和可用。

请参见

  • 这种技术为处理器开发人员所熟知和使用。见http://en.wikipedia.org/wiki/Instruction_pipeline。在这里,您可能会发现对管道所有特征的简要描述。
  • *来自第五章多线程的【制作工作队列】*食谱将为您提供更多关于该食谱中使用的方法的信息。

制造非阻塞屏障

在多线程编程中,有一个抽象叫做障碍。它会停止到达它的执行线程,直到请求的线程数没有被阻塞。之后,所有线程都被释放,并继续执行。考虑以下可以使用它的示例。

我们希望在不同的线程中处理数据的不同部分,然后发送数据:

#include <boost/array.hpp>
#include <boost/thread/barrier.hpp>
#include <boost/thread/thread.hpp>

typedef boost::array<std::size_t, 10000> vector_type;
typedef boost::array<vector_type, 4> data_t;

void fill_data(vector_type& data);
void compute_send_data(data_t& data);

void runner(std::size_t thread_index, boost::barrier& barrier, data_t& data) {
    for (std::size_t i = 0; i < 1000; ++ i) {
        fill_data(data.at(thread_index));
        barrier.wait();

        if (!thread_index) {
            compute_send_data(data);
        }
        barrier.wait();
    }
}

int main() {
    // Initing barrier.
    boost::barrier barrier(data_t::static_size);

    // Initing data.
    data_t data;

    // Run on 4 threads.
    boost::thread_group tg;
    for (std::size_t i = 0; i < data_t::static_size; ++ i) {
        tg.create_thread([i, &barrier, &data] () {
            runner(i, barrier, data);
        });
    }

    tg.join_all();
}

data_barrier.wait()方法阻塞,直到所有线程填满数据。之后,所有线程都被释放。索引为0的线程使用compute_send_data(data)计算要发送的数据,而其他线程再次在栅栏处等待,如下图所示:

看起来很蹩脚,不是吗?

准备好

这个食谱需要本章第一个食谱的知识。还需要Boost.Thread的知识。这个食谱的代码需要链接到boost_threadboost_system库。

怎么做...

我们根本不需要封锁!让我们仔细看看这个例子。我们需要做的就是发布四个fill_data任务,最后完成的任务叫compute_send_data(data)

  1. 我们需要第一个食谱中的tasks_processor类;无需对其进行任何更改。
  2. 我们将使用原子变量来代替屏障:
#include <boost/atomic.hpp> 
typedef boost::atomic<unsigned int> atomic_count_t; 
  1. 我们新的 runner 函数将如下所示:
void clever_runner(
        std::size_t thread_index,
        std::size_t iteration,
        atomic_count_t& counter,
        data_t& data)
{
    fill_data(data.at(thread_index));

    if (++ counter != data_t::static_size) {
        return;
    }

    compute_send_data(data);

    if (++ iteration == 1000) {
        // Exiting, because 1000 iterations are done.
        tasks_processor::stop();
        return;
    }

    counter = 0;
    for (std::size_t i = 0; i < data_t::static_size; ++ i) {
        tasks_processor::push_task([i, iteration, &counter, &data]() {
            clever_runner( 
                i, 
                iteration,
                counter,
                data
            );
        });
    }
}
  1. main功能需要一个微小的改变:
    // Initing counter.
    atomic_count_t counter(0);

    // Initing data.
    data_t data;

    // Run 4 tasks.
    for (std::size_t i = 0; i < data_t::static_size; ++ i) {
        tasks_processor::push_task([i, &counter, &data]() {
            clever_runner( 
                i, 
                0, // first iteration
                counter,
                data
            );
        });
    }

    tasks_processor::start();

它是如何工作的...

我们一点也不阻拦。我们没有阻塞,而是计算完成填充数据的任务。这是通过counter原子变量完成的。最后剩余的任务将有一个等于data_t::static_sizecounter变量。只有该任务必须计算和发送数据。

之后,我们检查退出条件(完成了 1000 次迭代),并通过将任务推入队列来发布新数据。

还有更多...

这是不是更好的解决方案?首先,它的伸缩性更好:

这种方法对于程序做很多不同工作的情况也更有效。因为没有线程在屏障中等待,当其中一个线程计算和发送数据时,空闲线程可能会执行一些其他任务。

这个方法可以在没有 Boost 库的 C++ 11 中实现。你只需要从第五章、多线程work_queue替换tasks_processor里面的io_service。但是像往常一样,Boost 提供了更好的可移植性,并且可以让这个例子在使用 Boost 库的 C++ 11 之前的编译器上运行。你只需要用boost::bindboost::ref替换 lambda 函数。

请参见

存储异常并从中创建任务

处理异常并不总是微不足道的,可能会消耗大量时间。考虑异常必须由网络序列化和发送的情况。这可能需要几毫秒和几千行代码。异常被捕获后,并不总是处理它的最佳时间和地点。

我们可以存储异常并延迟它们的处理吗?

准备好

这个食谱需要熟悉boost::asio::io_service,这在本章的第一个食谱中有描述。

该配方需要与boost_systemboost_thread库链接。

怎么做...

我们所需要的是能够存储异常,并像普通变量一样在线程之间传递它们。

  1. 让我们从存储和处理异常的函数开始:
#include <boost/exception_ptr.hpp>

struct process_exception {
    boost::exception_ptr exc_;

    explicit process_exception(const boost::exception_ptr& exc)
        : exc_(exc)
    {}

    void operator()() const;
};
  1. 该函子的operator()只是将异常输出到控制台:
#include <boost/lexical_cast.hpp>
void func_test2(); // Forward declaration.

void process_exception::operator()() const  {
    try {
        boost::rethrow_exception(exc_);
    } catch (const boost::bad_lexical_cast& /*e*/) {
        std::cout << "Lexical cast exception detected\n" << std::endl;

        // Pushing another task to execute.
        tasks_processor::push_task(&func_test2);
    } catch (...) {
        std::cout << "Can not handle such exceptions:\n" 
            << boost::current_exception_diagnostic_information() 
            << std::endl;

        // Stopping.
        tasks_processor::stop();
    }
}
  1. 让我们编写一些函数来演示异常是如何工作的:
#include <stdexcept>
void func_test1() {
    try {
        boost::lexical_cast<int>("oops!");
    } catch (...) {
        tasks_processor::push_task(
            process_exception(boost::current_exception())
        );
    }
}

void func_test2() {
    try {
        // ...
        BOOST_THROW_EXCEPTION(std::logic_error("Some fatal logic error"));
        // ...
    } catch (...) {
        tasks_processor::push_task(
            process_exception(boost::current_exception())
        );
    }
}

现在,如果我们这样运行这个例子:

  tasks_processor::get().push_task(&func_test1); 
  tasks_processor::get().start(); 

我们将获得以下输出:

Lexical cast exception detected

Can not handle such exceptions:
main.cpp(48): Throw in function void func_test2()
Dynamic exception type: boost::exception_detail::clone_impl<boost::exception_detail::error_info_injector<std::logic_error> >
std::exception::what: Some fatal logic error  

它是如何工作的...

Boost.Exception库提供了存储和重新抛出异常的能力。boost::current_exception()方法只能从catch()块内部调用,它返回一个类型为boost::exception_ptr的对象。

在前面的func_test1()例子中,boost::bad_lexical_cast异常被抛出。boost::current_exception()归还;从该异常创建一个process_exception任务。

boost::exception_ptr恢复异常类型的唯一方法是使用boost::rethrow_exception(exc)功能重新抛出。这就是process_exception功能的作用。

Throwing and catching exceptions is a heavy operation. Throwing may dynamically allocate memory, touch cold memory, lock mutex, compute a bunch of addresses, and do other stuff. Do not throw exception in performance critical paths without very good reasons to do so!

func_test2中,我们使用BOOST_THROW_EXCEPTION宏抛出了一个std::logic_error异常。这个宏做了很多有用的工作;它检查我们的异常是否来自std::exception,向我们的异常添加关于源文件名、函数名和引发异常的代码行号的信息。当我们的std::logic_error异常被重新扔进process_exception::operator()时,它被catch(...)抓住。boost::current_exception_diagnostic_information()输出尽可能多的关于抛出异常的信息。

还有更多...

通常,exception_ptr用于在线程间传递异常。例如:

void run_throw(boost::exception_ptr& ptr) {
    try {
        // A lot of code goes here.
    } catch (...) {
        ptr = boost::current_exception();
    }
}

int main () {
    boost::exception_ptr ptr;

    // Do some work in parallel.
    boost::thread t(
        &run_throw,
        boost::ref(ptr)
    );

    // Some code goes here.
    // ...

    t.join();

    // Checking for exception.
    if (ptr) {
        // Exception occurred in thread.
        boost::rethrow_exception(ptr);
    }
}

boost::exception_ptr类可以多次通过堆分配内存,使用原子,并通过重新抛出和捕获异常来实现一些操作。没有实际需要,尽量不要使用。

C++ 11 采用了boost::current_exceptionboost::rethrow_exceptionboost::exception_ptr。你可以在std::命名空间的<exception>中找到它们。BOOST_THROW_EXCEPTIONboost::current_exception_diagnostic_information()功能不在 C++ 17 中。

请参见

  • http://boost.org/libs/exceptionBoost.Exception官方文档包含了很多关于实施和限制的有用信息。您可能还会发现一些本食谱中没有涉及的信息(例如,如何向已经抛出的异常添加附加信息)。
  • 本章的第一个食谱为你提供了tasks_processor课程的信息。将字符串转换为数字 r 来自第 3 章转换和转换的 ecipe 描述了本食谱中使用的Boost.LexicalCast库。

获取和处理系统信号作为任务

当编写一些服务器应用(尤其是 Linux 操作系统)时,需要捕获和处理信号。通常,所有的信号处理程序都是在服务器启动时设置的,在应用执行期间不会改变。

这个食谱的目标是让我们的tasks_processor类能够处理信号。

准备好

我们需要本章第一个食谱的代码。还需要对Boost.Function有很好的了解。

该配方需要与boost_systemboost_thread库链接。

怎么做...

这个食谱类似于本章 24 的食谱:我们有async信号等待功能,一些async信号处理程序,还有一些支持代码。

  1. 让我们从包含以下标题开始:
#include <boost/asio/signal_set.hpp> 
#include <boost/function.hpp> 
  1. 现在,我们向tasks_processor类添加一个用于信号处理的成员:
protected:
    static boost::asio::signal_set& signals() {
        static boost::asio::signal_set signals_(get_ios());
        return signals_;
    }

    static boost::function<void(int)>& signal_handler() {
        static boost::function<void(int)> users_signal_handler_;
        return users_signal_handler_;
    }
  1. 信号捕获时将调用的功能如下:
    static void handle_signals(
            const boost::system::error_code& error,
            int signal_number)
    {
        signals().async_wait(&tasks_processor::handle_signals);

        if (error) {
            std::cerr << "Error in signal handling: " << error << '\n';
        } else {
            boost::function<void(int)> h = signal_handler();
            h(signal_number);
        }

    }
  1. 现在我们需要一个注册信号处理器的函数:
public:

    // This function is not thread safe!
    // Must be called before all the `start()` calls.
    // Function can be called only once.
    template <class Func>
    static void register_signals_handler(
            const Func& f,
            std::initializer_list<int> signals_to_wait)
    {
        // Making sure that this is the first call.
        assert(!signal_handler()); 

        signal_handler() = f;
        boost::asio::signal_set& sigs = signals();

        std::for_each(
            signals_to_wait.begin(),
            signals_to_wait.end(),
            [&sigs](int signal) { sigs.add(signal); }
        );

        sigs.async_wait(&tasks_processor::handle_signals);
    }

仅此而已。现在,我们准备处理信号。以下是测试程序:

void accept_3_signals_and_stop(int signal) {
    static int signals_count = 0;
    assert(signal == SIGINT);

    ++ signals_count;
    std::cout << "Captured " << signals_count << " SIGINT\n"; 
    if (signals_count == 3) {
        tasks_processor::stop();
    }
}

int main () {
    tasks_processor::register_signals_handler(
        &accept_3_signals_and_stop,
        { SIGINT, SIGSEGV }
    );

    tasks_processor::start();
}

这将产生以下输出:

Captured 1 SIGINT
Captured 2 SIGINT
Captured 3 SIGINT
Press any key to continue . . .

它是如何工作的...

这里没有什么是困难的(与本章之前的一些食谱相比)。register_signals_handler功能将待处理的信号编号相加。这是通过调用signals_to_wait的每个元素的boost::asio::signal_set::add函数来完成的。

接下来,sigs.async_wait启动async等待信号,并调用信号捕捉上的tasks_processor::handle_signals功能。tasks_processor::handle_signals函数立即开始异步等待下一个信号,检查错误,如果没有错误,则调用提供信号号码的回调。

还有更多...

我们可以做得更好!我们可以从第一个方法将用户提供的回调包装到我们的类中,以正确处理异常,并从第一个方法中做其他好事:

boost::function<void(int)> h = signal_handler();

detail::make_task_wrapped([h, signal_number]() {
    h(signal_number);
})(); // make and run task_wrapped

当需要线程安全的动态添加和移除信号时,我们可以修改这个例子,使其看起来像中的detail::timer_task一样,将定时器和处理定时器事件作为本章的任务配方。当多个boost::asio::signal_set对象注册等待相同的信号时,每个signal_set的一个处理程序在一个信号上被调用。

长期以来,C++ 一直能够使用<csignal>头中的signal函数来处理信号。网络终端服务可能不具备signal_set功能。

请参见

  • 在变量配方中存储任何功能对象的来自第 2 章管理资源,提供了关于boost::function的信息* ** 参见Boost.Asio的官方文档,了解更多关于boost::asio::signal_set的信息和例子,以及这个位于http://boost.org/libs/asio的伟大图书馆的其他特征