Skip to content

Latest commit

 

History

History
1084 lines (793 loc) · 33.6 KB

File metadata and controls

1084 lines (793 loc) · 33.6 KB

五、深入研究 Boost.Asio 库

现在我们已经能够运行io_service对象并给它一些工作要做,是时候在Boost.Asio库中找到更多关于其他对象的信息,以便开发网络应用了。我们之前使用的io_service对象的所有工作都是异步运行的,但不是以序列化的顺序运行的,这意味着我们无法确定io_service对象将要运行的工作的顺序。此外,我们必须考虑如果我们的应用在运行时遇到任何错误,我们将做什么,并考虑运行任何io_service对象工作的时间间隔。因此,在本章中,我们将讨论以下主题:

  • 连续执行io_service对象的工作
  • 捕捉异常并正确处理它们
  • 在期望的时间内完成工作

序列化输入输出服务工作

假设我们想把要做的工作排好队,但是顺序很重要。如果我们只是应用异步方法,我们将不知道我们将得到的工作顺序。我们需要确保工作的顺序是我们想要的,并且已经设计好了。例如,如果我们以这个顺序发布工作 A、工作 B 和工作 C,我们希望在运行时保持这个顺序。

使用链函数

Strandio_service对象中的一个类,提供处理程序执行序列化。它可以用来确保我们的工作将连续执行。让我们检查下面的代码,通过使用strand函数来理解序列化。但是首先,我们将在没有的情况下开始使用strand()lock()功能:

/* nonstrand.cpp */
#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <iostream>

boost::mutex global_stream_lock;

void WorkerThread(boost::shared_ptr<boost::asio::io_service> iosvc, int counter) {
  global_stream_lock.lock();
  std::cout << "Thread " << counter << " Start.\n";
  global_stream_lock.unlock();

  iosvc->run();

  global_stream_lock.lock();
  std::cout << "Thread " << counter << " End.\n";
global_stream_lock.unlock();
}

void Print(int number) {
  std::cout << "Number: " << number << std::endl;
}

int main(void) {
  boost::shared_ptr<boost::asio::io_service> io_svc(
    new boost::asio::io_service
  );

  boost::shared_ptr<boost::asio::io_service::work> worker(
    new boost::asio::io_service::work(*io_svc)
  );

  global_stream_lock.lock();
  std::cout << "The program will exit once all work has finished.\n";
  global_stream_lock.unlock();

  boost::thread_group threads;
  for(int i=1; i<=5; i++)
    threads.create_thread(boost::bind(&WorkerThread, io_svc, i));

  boost::this_thread::sleep(boost::posix_time::milliseconds(500));

  io_svc->post(boost::bind(&Print, 1));
  io_svc->post(boost::bind(&Print, 2));
  io_svc->post(boost::bind(&Print, 3));
  io_svc->post(boost::bind(&Print, 4));
  io_svc->post(boost::bind(&Print, 5));

  worker.reset();

  threads.join_all();

  return 0;
}

将前面的代码保存为nonstrand.cpp,并使用以下命令进行编译:

g++ -Wall -ansi -I ../boost_1_58_0 nonstrand.cpp -o nonstrand -L ../boost_1_58_0/stage/lib -l boost_system-mgw49-mt-1_58 -l ws2_32 -l libboost_thread-mgw49-mt-1_58

然后,在控制台窗口中键入nonstrand运行。我们将得到类似如下的输出:

Using the strand function

您可能会得到不同的输出,并且多次运行程序确实会产生不同顺序的结果。这是因为,正如我们在上一章中讨论的,如果没有lock对象,输出将是不同步的,如下所示。我们可以注意到结果看起来很混乱:

Number: Number: 1
Number: 5
Number: 3
2
Number: 4

正如我们在下面的片段中看到的,我们没有使用lock对象来同步输出。这就是为什么我们会得到前面截图所示的输出。

void Print(int number) {
 std::cout << "Number: " << number << std::endl;
}

现在,让我们应用strand功能来同步程序的流程。输入以下代码并保存为strand.cpp:

/* strand.cpp */
#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <iostream>

boost::mutex global_stream_lock;

void WorkerThread(boost::shared_ptr<boost::asio::io_service> iosvc, int counter) {
  global_stream_lock.lock();
  std::cout << "Thread " << counter << " Start.\n";
  global_stream_lock.unlock();

  iosvc->run();

  global_stream_lock.lock();
  std::cout << "Thread " << counter << " End.\n";
  global_stream_lock.unlock();
}

void Print(int number) {
  std::cout << "Number: " << number << std::endl;
}

int main(void) {
  boost::shared_ptr<boost::asio::io_service> io_svc(
    new boost::asio::io_service
  );

  boost::shared_ptr<boost::asio::io_service::work> worker(
    new boost::asio::io_service::work(*io_svc)
  );

  boost::asio::io_service::strand strand(*io_svc);

  global_stream_lock.lock();
  std::cout << "The program will exit once all work has finished.\n";
  global_stream_lock.unlock();

  boost::thread_group threads;
  for(int i=1; i<=5; i++)
    threads.create_thread(boost::bind(&WorkerThread, io_svc, i));

  boost::this_thread::sleep(boost::posix_time::milliseconds(500));

  strand.post(boost::bind(&Print, 1));
  strand.post(boost::bind(&Print, 2));
  strand.post(boost::bind(&Print, 3));
  strand.post(boost::bind(&Print, 4));
  strand.post(boost::bind(&Print, 5));

  worker.reset();

  threads.join_all();

  return 0;
}

使用以下命令编译前面的代码:

g++ -Wall -ansi -I ../boost_1_58_0 strand.cpp -o strand -L ../boost_1_58_0/stage/lib -l boost_system-mgw49-mt-1_58 -l ws2_32 -l libboost_thread-mgw49-mt-1_58

nonstrand.cppstrand.cpp我们只是做了一点修改,但是影响还是比较大的。在运行程序之前,让我们区分一下nonstrand.cppstrand.cpp的代码:

io_svc->post(boost::bind(&Print, 1));
io_svc->post(boost::bind(&Print, 2));
io_svc->post(boost::bind(&Print, 3));
io_svc->post(boost::bind(&Print, 4));
io_svc->post(boost::bind(&Print, 5));

我们使用io_service对象中的post()函数使其工作。但是通过使用这种方法,程序的流程是不可预测的,因为它不是同步的:

strand.post(boost::bind(&Print, 1));
strand.post(boost::bind(&Print, 2));
strand.post(boost::bind(&Print, 3));
strand.post(boost::bind(&Print, 4));
strand.post(boost::bind(&Print, 5));

然后,我们使用strand对象将功赋予io_service对象。通过使用这种方法,我们将确保工作的顺序与我们在代码中声明的完全相同。为了证明这一点,让我们看看以下输出:

Using the strand function

工作的顺序与我们代码中的工作顺序相同。我们以数字顺序显示了工作的输出,即:

Number: 1
Number: 2
Number: 3
Number: 4
Number: 5

并且,如果你记得,我们继续从Print()功能中省略lock()功能,由于strand对象的使用,它仍然正常运行。现在,不管我们重新运行程序多少次,结果总是按升序排列。

将处理程序缠绕在线束对象上

boost::asio::strand中有一个函数叫做wrap()方法。基于官方的 Boost 文档,它创建了一个新的处理函数对象,当调用该对象时,它会自动将包装好的处理函数传递给strand对象的调度函数。让我们看下面的代码来解释它:

/* strandwrap.cpp */
#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <iostream>

boost::mutex global_stream_lock;

void WorkerThread(boost::shared_ptr<boost::asio::io_service> iosvc, int counter) {
  global_stream_lock.lock();
  std::cout << "Thread " << counter << " Start.\n";
  global_stream_lock.unlock();

  iosvc->run();

  global_stream_lock.lock();
  std::cout << "Thread " << counter << " End.\n";
  global_stream_lock.unlock();
}

void Print(int number) {
  std::cout << "Number: " << number << std::endl;
}

int main(void) {
  boost::shared_ptr<boost::asio::io_service> io_svc(
    new boost::asio::io_service
  );

  boost::shared_ptr<boost::asio::io_service::work> worker(
    new boost::asio::io_service::work(*io_svc)
  );

  boost::asio::io_service::strand strand(*io_svc);

  global_stream_lock.lock();
  std::cout << "The program will exit once all work has finished." <<  std::endl;
  global_stream_lock.unlock();

  boost::thread_group threads;
  for(int i=1; i<=5; i++)
    threads.create_thread(boost::bind(&WorkerThread, io_svc, i));

  boost::this_thread::sleep(boost::posix_time::milliseconds(100));
  io_svc->post(strand.wrap(boost::bind(&Print, 1)));
  io_svc->post(strand.wrap(boost::bind(&Print, 2)));

  boost::this_thread::sleep(boost::posix_time::milliseconds(100));
  io_svc->post(strand.wrap(boost::bind(&Print, 3)));
  io_svc->post(strand.wrap(boost::bind(&Print, 4)));

  boost::this_thread::sleep(boost::posix_time::milliseconds(100));
  io_svc->post(strand.wrap(boost::bind(&Print, 5)));
  io_svc->post(strand.wrap(boost::bind(&Print, 6)));

  worker.reset();

  threads.join_all();

  return 0;
}

给前面的代码命名为strandwrap.cpp,,然后使用以下命令编译它:

g++ -Wall -ansi -I ../boost_1_58_0 strandwrap.cpp -o strandwrap -L ../boost_1_58_0/stage/lib -l boost_system-mgw49-mt-1_58 -l ws2_32 -l libboost_thread-mgw49-mt-1_58

现在,运行该程序,我们将获得以下输出:

Wrapping a handler through the strand object

然而,如果我们多次运行该程序,它可能会产生如下的随机输出:

Number: 2
Number: 1
Number: 3
Number: 4
Number: 6
Number: 5

虽然工作被保证是串行执行的,但是不能保证哪个工作的顺序实际上是由于内置的处理程序包装器而发生的。而且如果顺序真的很重要,我们在使用strand对象的时候就要看内置的处理程序包装器本身。

处理异常和错误

有时,我们的代码会在运行时抛出异常或错误。正如你可能还记得我们在第 3 章中对lexical.cpp的讨论,在介绍 Boost C++ 库时,我们有时必须在代码中使用异常处理,现在我们将挖掘它来深入研究异常和错误处理。

处理异常

异常是一种方式,通过将控制权转移给处理程序来应对代码出现异常情况的情况。为了处理异常,我们需要在代码中使用try-catch块;然后,如果出现异常情况,将向异常处理程序抛出异常。

现在,看看下面的代码,看看异常处理是如何使用的:

/* exception.cpp */
#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <iostream>

boost::mutex global_stream_lock;

void WorkerThread(boost::shared_ptr<boost::asio::io_service> iosvc, int counter) {
  global_stream_lock.lock();
  std::cout << "Thread " << counter << " Start.\n";
  global_stream_lock.unlock();

  try {
    iosvc->run();

    global_stream_lock.lock();
    std::cout << "Thread " << counter << " End.\n";
    global_stream_lock.unlock();
  }
  catch(std::exception & ex) {
    global_stream_lock.lock();
    std::cout << "Message: " << ex.what() << ".\n";
    global_stream_lock.unlock();
  }
}

void ThrowAnException(boost::shared_ptr<boost::asio::io_service> iosvc, int counter) {
  global_stream_lock.lock();
  std::cout << "Throw Exception " << counter << "\n" ;
  global_stream_lock.unlock();

  throw(std::runtime_error("The Exception !!!"));
}

int main(void) {
  boost::shared_ptr<boost::asio::io_service> io_svc(
    new boost::asio::io_service
  );

  boost::shared_ptr<boost::asio::io_service::work> worker(
    new boost::asio::io_service::work(*io_svc)
  );

  global_stream_lock.lock();
  std::cout << "The program will exit once all work has finished.\n";
  global_stream_lock.unlock();

  boost::thread_group threads;
  for(int i=1; i<=2; i++)
    threads.create_thread(boost::bind(&WorkerThread, io_svc, i));

  io_svc->post(boost::bind(&ThrowAnException, io_svc, 1));
  io_svc->post(boost::bind(&ThrowAnException, io_svc, 2));
  io_svc->post(boost::bind(&ThrowAnException, io_svc, 3));
  io_svc->post(boost::bind(&ThrowAnException, io_svc, 4));
  io_svc->post(boost::bind(&ThrowAnException, io_svc, 5));

  threads.join_all();

  return 0;
}

将前面的代码保存为exception.cpp并运行以下命令进行编译:

g++ -Wall -ansi -I ../boost_1_58_0 exception.cpp -o exception -L ../boost_1_58_0/stage/lib -l boost_system-mgw49-mt-1_58 -l ws2_32 -l libboost_thread-mgw49-mt-1_58

然后,运行该程序,您应该会得到以下输出:

Handling an exception

正如我们可以看到的,由于异常,我们没有显示从std::cout << "Thread " << counter << " End.\n";开始的线。当运行io_service对象的工作时,它总是使用throw关键字抛出异常,这样该异常将被WorkerThread功能内的catch块捕获,因为iosvc->run()功能在try块内。

我们还可以看到,虽然我们为io_service对象发布了五次工作,但异常处理只处理了两个异常,因为一旦线程完成,线程中的join_all()函数将完成线程并退出程序。换句话说,我们可以说,一旦异常被处理,线程就会退出并加入调用。可能引发异常的附加代码将永远不会被调用。

如果我们递归地放入io_service对象的工作调用呢?会不会导致一个无限运行的程序?让我们尝试无限地抛出异常。代码如下所示:

/* exception2.cpp */
#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <iostream>

boost::mutex global_stream_lock;

void WorkerThread(boost::shared_ptr<boost::asio::io_service> iosvc, int counter) {
  global_stream_lock.lock();
  std::cout << "Thread " << counter << " Start.\n";
  global_stream_lock.unlock();

  try {
    iosvc->run();

    global_stream_lock.lock();
    std::cout << "Thread " << counter << " End.\n";
    global_stream_lock.unlock();
  }
  catch(std::exception &ex) {
    global_stream_lock.lock();
    std::cout << "Message: " << ex.what() << ".\n";
    global_stream_lock.unlock();
  }
}

void ThrowAnException(boost::shared_ptr<boost::asio::io_service> iosvc) {
  global_stream_lock.lock();
  std::cout << "Throw Exception\n" ;
  global_stream_lock.unlock();

  iosvc->post(boost::bind(&ThrowAnException, iosvc));

  throw(std::runtime_error("The Exception !!!"));
}

int main(void) {
  boost::shared_ptr<boost::asio::io_service> io_svc(
    new boost::asio::io_service
  );

  boost::shared_ptr<boost::asio::io_service::work> worker(
    new boost::asio::io_service::work(*io_svc)
  );

  global_stream_lock.lock();
  std::cout << "The program will exit once all work has finished.\n";
  global_stream_lock.unlock();

  boost::thread_group threads;
  for(int i=1; i<=5; i++)
    threads.create_thread(boost::bind(&WorkerThread, io_svc, i));

  io_svc->post(boost::bind(&ThrowAnException, io_svc));

  threads.join_all();

  return 0;
}

将前面的代码保存为exception2.cpp,并使用以下命令进行编译:

g++ -Wall -ansi -I ../boost_1_58_0 exception2.cpp -o exception2 -L ../boost_1_58_0/stage/lib -l boost_system-mgw49-mt-1_58 -l ws2_32 -l libboost_thread-mgw49-mt-1_58

现在,让我们检查一下代码:

iosvc->post(boost::bind(&ThrowAnException, iosvc));

我们在ThrowAnException函数中添加了前面的代码片段。每次调用ThrowAnException函数,都会调用自己。那么,它应该是一个无限程序,因为有一个递归函数。让我们通过在控制台窗口中键入exception2命令来运行程序来证明这一点。输出如下所示:

Handling an exception

幸运的是,程序能够成功完成。发生这种情况是因为异常通过run()函数传播,并且工作线程退出。之后,所有线程结束,调用join_all()函数。这就是为什么程序退出,即使在io_service对象中还有工作。

处理错误

在我们之前的例子中,我们使用了没有任何参数的run()函数,但实际上,该函数有两个重载方法,std::size_t run()std::size_t run(boost::system::error_code & ec)。后一种方法有一个错误代码参数,如果发生错误,将设置该参数。

现在,让我们尝试在run()函数中使用一个错误代码作为输入参数。看看下面的代码:

/* errorcode.cpp */
#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <iostream>

boost::mutex global_stream_lock;

void WorkerThread(boost::shared_ptr<boost::asio::io_service> iosvc, int counter) {
  global_stream_lock.lock();
  std::cout << "Thread " << counter << " Start.\n";
  global_stream_lock.unlock();

  boost::system::error_code ec;
  iosvc->run(ec);

  if(ec) {
    global_stream_lock.lock();
    std::cout << "Message: " << ec << ".\n";
    global_stream_lock.unlock();
  }

  global_stream_lock.lock();
  std::cout << "Thread " << counter << " End.\n";
  global_stream_lock.unlock();
}

void ThrowAnException(boost::shared_ptr<boost::asio::io_service> iosvc) {
  global_stream_lock.lock();
  std::cout << "Throw Exception\n" ;
  global_stream_lock.unlock();

  iosvc->post(boost::bind(&ThrowAnException, iosvc));

  throw(std::runtime_error("The Exception !!!"));
}

int main(void) {
  boost::shared_ptr<boost::asio::io_service> io_svc(
    new boost::asio::io_service
  );

  boost::shared_ptr<boost::asio::io_service::work> worker(
    new boost::asio::io_service::work(*io_svc)
  );

  global_stream_lock.lock();
  std::cout << "The program will exit once all work has finished.\n";
  global_stream_lock.unlock();

  boost::thread_group threads;
  for(int i=1; i<=5; i++)
    threads.create_thread(boost::bind(&WorkerThread, io_svc, i));

  io_svc->post(boost::bind(&ThrowAnException, io_svc));

  threads.join_all();

  return 0;
}

将前面的代码保存为errorcode.cpp,使用以下命令编译代码:

g++ -Wall -ansi -I ../boost_1_58_0 errorcode.cpp -o errorcode -L ../boost_1_58_0/stage/lib -l boost_system-mgw49-mt-1_58 -l ws2_32 -l libboost_thread-mgw49-mt-1_58

现在,通过在控制台中键入errorcode命令来运行程序。这样做的结果是,程序将崩溃。以下屏幕截图显示了输出:

Handling an error

我们打算通过使用以下代码来检索错误代码:

iosvc->run(ec);

我们可以通过使用if块来捕捉错误,如下所示:

if(ec)

然而,在错误变量方法中,用户异常转化为boost::asio异常;因此,错误变量ec不会将用户异常解释为错误,因此异常不会被处理程序捕获。如果Boost.Asio库需要抛出一个错误,如果没有错误变量就会变成异常,或者转换成错误变量。如果我们继续使用try-catch块来捕捉任何异常或错误会更好。

此外,我们必须检查异常的类型,它要么是系统故障,要么是上下文故障。如果是系统故障,那么我们必须调用io_service类中的stop()函数,以确保工作对象已经被销毁,以便程序能够退出。相反,如果异常是上下文失败,我们需要工作线程再次调用run()函数,以防止线程死亡。现在,看看下面的代码来理解这个概念:

/* errorcode2.cpp */
#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <iostream>

boost::mutex global_stream_lock;

void WorkerThread(boost::shared_ptr<boost::asio::io_service> iosvc, int counter) {
  global_stream_lock.lock();
  std::cout << "Thread " << counter << " Start.\n";
  global_stream_lock.unlock();

  while(true) {
    try {
      boost::system::error_code ec;
      iosvc->run(ec);
      if(ec) {
        global_stream_lock.lock();
        std::cout << "Error Message: " << ec << ".\n";
        global_stream_lock.unlock();
      }
      break;
    }
    catch(std::exception &ex) {
      global_stream_lock.lock();
      std::cout << "Exception Message: " << ex.what() << ".\n";
      global_stream_lock.unlock();
    }
  }

  global_stream_lock.lock();
  std::cout << "Thread " << counter << " End.\n";
  global_stream_lock.unlock();
}

void ThrowAnException(boost::shared_ptr<boost::asio::io_service> iosvc) {
  global_stream_lock.lock();
  std::cout << "Throw Exception\n" ;
  global_stream_lock.unlock();

  iosvc->post(boost::bind(&ThrowAnException, iosvc));

  throw(std::runtime_error("The Exception !!!"));
}

int main(void) {
  boost::shared_ptr<boost::asio::io_service> io_svc(
    new boost::asio::io_service
  );

  boost::shared_ptr<boost::asio::io_service::work> worker(
    new boost::asio::io_service::work(*io_svc)
  );

  global_stream_lock.lock();
  std::cout << "The program will exit once all work has finished.\n";
  global_stream_lock.unlock();

  boost::thread_group threads;
  for(int i=1; i<=5; i++)
    threads.create_thread(boost::bind(&WorkerThread, io_svc, i));

  io_svc->post(boost::bind(&ThrowAnException, io_svc));

  threads.join_all();

  return 0;
}

将前面的代码保存为errorcode2.cpp,然后通过执行以下命令进行编译:

g++ -Wall -ansi -I ../boost_1_58_0 errorcode2.cpp -o errorcode2 -L ../boost_1_58_0/stage/lib -l boost_system-mgw49-mt-1_58 -l ws2_32 -l libboost_thread-mgw49-mt-1_58

如果运行程序,会看到它不会退出,需要按 Ctrl + C 才能停止程序:

Handling an error

如果我们看到下面的代码片段:

while(true) {
 try {
 . . .
 iosvc->run(ec);
 if(ec)
 . . .
 }
 catch(std::exception &ex) {
 . . .
 }
}

工作线程正在循环。当输出结果中出现异常时(由Throw ExceptionException Message: The Exception!!!输出指示),也是这种情况。再次调用run()函数,这样它会将新事件发布到队列中。当然,我们不希望在我们的应用中出现这种情况。

使用定时器类为工作执行计时

Boost C++ 库中有一个类,它提供了对定时器进行阻塞或异步等待直到定时器到期的能力,被称为截止定时器。截止时间计时器指示两种状态之一:过期或未过期。

即将到期的计时器

在这里,我们将创建一个将在 10 秒后到期的计时器。让我们看看下面的代码:

/* timer.cpp */
#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <iostream>

boost::mutex global_stream_lock;

void WorkerThread(boost::shared_ptr<boost::asio::io_service> iosvc, int counter) {
  global_stream_lock.lock();
  std::cout << "Thread " << counter << " Start.\n";
  global_stream_lock.unlock();

  while(true) {
    try {
      boost::system::error_code ec;
      iosvc->run(ec);
      if(ec) {
        global_stream_lock.lock();
        std::cout << "Message: " << ec << ".\n";
        global_stream_lock.unlock();
      }
      break;
    }
    catch(std::exception &ex) {
      global_stream_lock.lock();
      std::cout << "Message: " << ex.what() << ".\n";
      global_stream_lock.unlock();
    }
  }

  global_stream_lock.lock();
  std::cout << "Thread " << counter << " End.\n";
  global_stream_lock.unlock();
}

void TimerHandler(const boost::system::error_code & ec) {
  if(ec) {
    global_stream_lock.lock();
    std::cout << "Error Message: " << ec << ".\n";
    global_stream_lock.unlock();
  }
  else {
    global_stream_lock.lock();
    std::cout << "You see this line because you have waited for 10 seconds.\n";
    std::cout << "Now press ENTER to exit.\n";
    global_stream_lock.unlock();
  }
}

int main(void) {
  boost::shared_ptr<boost::asio::io_service> io_svc(
    new boost::asio::io_service
  );

  boost::shared_ptr<boost::asio::io_service::work> worker(
    new boost::asio::io_service::work(*io_svc)
  );

  global_stream_lock.lock();
  std::cout << "Wait for ten seconds to see what happen, ";
  std::cout << "otherwise press ENTER to exit!\n";
  global_stream_lock.unlock();

  boost::thread_group threads;
  for(int i=1; i<=5; i++)
    threads.create_thread(boost::bind(&WorkerThread, io_svc, i));

  boost::asio::deadline_timer timer(*io_svc);
  timer.expires_from_now(boost::posix_time::seconds(10));
  timer.async_wait(TimerHandler);

  std::cin.get();

  io_svc->stop();

  threads.join_all();

  return 0;
}

将前面的代码保存为timer.cpp并运行以下命令进行编译:

g++ -Wall -ansi -I ../boost_1_58_0 timer.cpp -o timer -L ../boost_1_58_0/stage/lib -l boost_system-mgw49-mt-1_58 -l ws2_32 -l libboost_thread-mgw49-mt-1_58

现在,让我们在运行代码之前对其进行区分:

boost::asio::deadline_timer timer(*io_svc);
timer.expires_from_now(boost::posix_time::seconds(10));
timer.async_wait(TimerHandler);

在程序调用TimerHandler函数之前,它必须等待 10 秒,因为我们使用的是来自timer对象的expires_from_now函数。async_wait()功能将一直等到定时器到期:

void TimerHandler(const boost::system::error_code & ec) {
 if(ec)
 . . .
}
else {
 global_stream_lock.lock();
 std::cout << "You see this line because you have waited for 10 seconds.\n";
 std::cout << "Now press ENTER to exit.\n";
 global_stream_lock.unlock();
}

在定时器到期后,TimerHandler功能将被调用,由于没有错误,程序将执行else块内的代码。让我们运行程序来查看完整的输出:

An expiring timer

并且,由于我们使用了async_wait()功能,我们可以在看到线路之前点击回车键退出程序,现在按回车键退出

使用定时器和 boost::bind 功能

让我们尝试创建一个循环计时器。我们必须初始化全局计时器对象,以便该对象成为共享对象。为了实现这一点,我们需要来自shared_ptr指针和boost::bind方法的帮助来确保线程安全,因为我们将使用一个共享对象:

/* timer2.cpp */
#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <iostream>

boost::mutex global_stream_lock;

void WorkerThread(boost::shared_ptr<boost::asio::io_service> iosvc, int counter) {
  global_stream_lock.lock();
  std::cout << "Thread " << counter << " Start.\n";
  global_stream_lock.unlock();

  while( true ) {
    try {
      boost::system::error_code ec;
      iosvc->run(ec);
      if(ec) {
        global_stream_lock.lock();
        std::cout << "Message: " << ec << ".\n";
        global_stream_lock.unlock();
      }
      break;
    }
    catch(std::exception &ex) {
      global_stream_lock.lock();
      std::cout << "Message: " << ex.what() << ".\n";
      global_stream_lock.unlock();
    }
  }

  global_stream_lock.lock();
  std::cout << "Thread " << counter << " End.\n";
  global_stream_lock.unlock();
}

void TimerHandler(
  const boost::system::error_code &ec,
  boost::shared_ptr<boost::asio::deadline_timer> tmr
)
{
  if(ec) {
    global_stream_lock.lock();
    std::cout << "Error Message: " << ec << ".\n";
    global_stream_lock.unlock();
  }
  else {
    global_stream_lock.lock();
    std::cout << "You see this every three seconds.\n";
    global_stream_lock.unlock();

    tmr->expires_from_now( boost::posix_time::seconds(3));
    tmr->async_wait(boost::bind(&TimerHandler, _1, tmr));
  }
}

int main(void) {
  boost::shared_ptr<boost::asio::io_service> io_svc(
    new boost::asio::io_service
  );

  boost::shared_ptr<boost::asio::io_service::work> worker(
    new boost::asio::io_service::work(*io_svc)
  );

  global_stream_lock.lock();
  std::cout << "Press ENTER to exit!\n";
  global_stream_lock.unlock();

  boost::thread_group threads;
  for(int i=1; i<=5; i++)
    threads.create_thread(boost::bind(&WorkerThread, io_svc, i));

  boost::shared_ptr<boost::asio::deadline_timer> timer(
    new boost::asio::deadline_timer(*io_svc)
  );
  timer->expires_from_now( boost::posix_time::seconds(3));
  timer->async_wait(boost::bind(&TimerHandler, _1, timer));

  std::cin.get();

  io_svc->stop();

  threads.join_all();

  return 0;
}

将前面的代码保存为timer2.cpp,运行以下命令进行编译:

g++ -Wall -ansi -I ../boost_1_58_0 timer2.cpp -o timer2 -L ../boost_1_58_0/stage/lib -l boost_system-mgw49-mt-1_58 -l ws2_32 -l libboost_thread-mgw49-mt-1_58

现在,运行程序。我们会得到一个循环输出,点击进入键可以停止,如下所示:

Using the timer along with the boost::bind function

从输出中我们看到计时器每三秒勾选一次,用户按下进入键后,工作将停止。现在,让我们看看下面的代码片段:

timer->async_wait(boost::bind(&TimerHandler, _1, timer));

boost::bind功能帮助我们使用全局定时器对象。如果我们看得更深,我们可以使用boost::bind函数的_1参数。如果我们阅读boost::bind函数的文档,我们会发现_1参数是一个占位符参数,将被第一个输入参数替换。

有关使用占位符绑定的更多信息,请查看位于 www.boost.org/doc/libs/1_58_0/libs/bind/doc/html/bind.html 的官方 Boost 文档。

有关占位符参数的更多信息,请参见en.cppreference.com/w/cpp/utility/functional/placeholders

使用定时器和增强::链功能

由于定时器是异步执行的,因此定时器的执行可能不是在一个序列化的进程中。计时器可能在一个线程中执行,而另一个事件同时执行。如前所述,我们可以利用strand函数来序列化执行顺序。让我们看看下面的代码片段:

/* timer3.cpp */
#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <iostream>

boost::mutex global_stream_lock;

void WorkerThread(boost::shared_ptr<boost::asio::io_service> iosvc, int counter) {
  global_stream_lock.lock();
  std::cout << "Thread " << counter << " Start.\n";
  global_stream_lock.unlock();

  while( true ) {
    try {
      boost::system::error_code ec;
      iosvc->run(ec);
      if(ec) {
        global_stream_lock.lock();
        std::cout << "Message: " << ec << ".\n";
        global_stream_lock.unlock();
      }
      break;
    }
    catch(std::exception &ex) {
      global_stream_lock.lock();
      std::cout << "Message: " << ex.what() << ".\n";
      global_stream_lock.unlock();
    }
  }

  global_stream_lock.lock();
  std::cout << "Thread " << counter << " End.\n";
  global_stream_lock.unlock();
}

void TimerHandler(
  const boost::system::error_code &ec,
  boost::shared_ptr<boost::asio::deadline_timer> tmr,
  boost::shared_ptr<boost::asio::io_service::strand> strand
)
{
  if(ec) {
    global_stream_lock.lock();
    std::cout << "Error Message: " << ec << ".\n";
    global_stream_lock.unlock();
  }
  else {
    global_stream_lock.lock();
    std::cout << "You see this every three seconds.\n";
    global_stream_lock.unlock();

    tmr->expires_from_now( boost::posix_time::seconds(1));
    tmr->async_wait(
      strand->wrap(boost::bind(&TimerHandler, _1, tmr, strand))
    );
  }
}

void Print(int number) {
  std::cout << "Number: " << number << std::endl;
  boost::this_thread::sleep( boost::posix_time::milliseconds(500));
}

int main(void) {
  boost::shared_ptr<boost::asio::io_service> io_svc(
    new boost::asio::io_service
  );

  boost::shared_ptr<boost::asio::io_service::work> worker(
    new boost::asio::io_service::work(*io_svc)
  );
  boost::shared_ptr<boost::asio::io_service::strand> strand(
    new boost::asio::io_service::strand(*io_svc)
  );

  global_stream_lock.lock();
  std::cout << "Press ENTER to exit!\n";
  global_stream_lock.unlock();

  boost::thread_group threads;
  for(int i=1; i<=5; i++)
    threads.create_thread(boost::bind(&WorkerThread, io_svc, i));

  boost::this_thread::sleep(boost::posix_time::seconds(1));

  strand->post(boost::bind(&Print, 1));
  strand->post(boost::bind(&Print, 2));
  strand->post(boost::bind(&Print, 3));
  strand->post(boost::bind(&Print, 4));
  strand->post(boost::bind(&Print, 5));

  boost::shared_ptr<boost::asio::deadline_timer> timer(
    new boost::asio::deadline_timer(*io_svc)
  );

  timer->expires_from_now( boost::posix_time::seconds(1));
  timer->async_wait( 
    strand->wrap(boost::bind(&TimerHandler, _1, timer, strand))
  );

  std::cin.get();

  io_svc->stop();

  threads.join_all();

  return 0;
}

将前面的代码保存为timer3.cpp,并通过运行以下命令进行编译:

g++ -Wall -ansi -I ../boost_1_58_0 timer3.cpp -o timer3 -L ../boost_1_58_0/stage/lib -l boost_system-mgw49-mt-1_58 -l ws2_32 -l libboost_thread-mgw49-mt-1_58

现在,通过在控制台中键入timer3命令来运行程序,我们将获得以下输出:

Using the timer along with the boost::strand function

从的输出中,我们可以看到前五个work对象首先执行,因为它们必须串行执行,然后执行TimerHandler()功能。在执行定时器线程之前,必须首先完成work对象。如果我们去掉strand包装,程序的流程会很混乱,因为我们没有把std::cout功能锁定在Print()功能里面。

总结

我们已经使用strand对象成功序列化了io_service对象的工作,所以我们可以确保我们设计的工作顺序。我们还可以通过使用错误和异常处理来确保我们的程序平稳运行,没有任何崩溃。最后,在本章中,我们讨论了等待时间,因为这在创建网络应用时非常重要。

现在,让我们进入下一章,讨论如何创建一个服务器-客户端应用,使服务器和客户端双方之间的通信成为可能。