Skip to content

Latest commit

 

History

History
1062 lines (805 loc) · 35 KB

File metadata and controls

1062 lines (805 loc) · 35 KB

六、创建客户端——服务器应用

在前一章中,我们深入研究了Boost.Asio库,这对开发网络应用非常重要。现在,我们将深入讨论一个客户端-服务器应用,它可以通过计算机网络在两台或多台计算机之间相互通信。其中一个叫客户端,另一个叫服务器

我们将讨论服务器的开发,它能够发送和接收来自客户端的数据流量,并创建一个客户端程序来接收数据流量。在本章中,我们将讨论以下主题:

  • 在客户端和服务器之间建立连接
  • 在客户端和服务器之间发送和接收数据
  • 包装最常用的代码,通过避免代码重用来简化编程过程

建立连接

我们在第 2 章了解网络概念中讨论了两种类型的互联网协议。它们是传输控制协议和用户数据报协议。TCP 是面向连接的,这意味着数据可以在连接建立后立即发送。相比之下,UDP 是无连接的互联网协议,这意味着该协议只是将数据直接发送到目的设备。这一章我们只讲 TCP 因此,我们必须首先建立连接。只有在客户端和服务器这两方接受连接的情况下,才能建立连接。在这里,我们将尝试同步和异步建立连接。

同步客户端

我们从建立与远程主机的同步连接开始。它作为一个客户端,将打开与帕克特出版网站(www.packtpub.com)的连接。我们将使用 TCP 协议,正如我们之前在第 2 章中讨论的,理解网络概念。下面是代码:

/* connectsync.cpp */
#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <boost/lexical_cast.hpp>

boost::mutex global_stream_lock;

void WorkerThread(boost::shared_ptr<boost::asio::io_service> iosvc, int counter) {
  global_stream_lock.lock();
  std::cout << "Thread " << counter << " Start.\n";
  global_stream_lock.unlock();

  while(true) {
    try {
      boost::system::error_code ec;
      iosvc->run(ec);
      if(ec) {
        global_stream_lock.lock();
        std::cout << "Message: " << ec << ".\n";
        global_stream_lock.unlock();
      }
      break;
    }
    catch(std::exception &ex) {
      global_stream_lock.lock();
      std::cout << "Message: " << ex.what() << ".\n";
      global_stream_lock.unlock();
    }
  }

  global_stream_lock.lock();
  std::cout << "Thread " << counter << " End.\n";
  global_stream_lock.unlock();
}

int main(void) {
  boost::shared_ptr<boost::asio::io_service> io_svc(
    new boost::asio::io_service
  );

  boost::shared_ptr<boost::asio::io_service::work> worker(
    new boost::asio::io_service::work(*io_svc)
  );
  boost::shared_ptr<boost::asio::io_service::strand> strand(
    new boost::asio::io_service::strand(*io_svc)
  );

  global_stream_lock.lock();
  std::cout << "Press ENTER to exit!\n";
  global_stream_lock.unlock();

  boost::thread_group threads;
  for(int i=1; i<=2; i++)
    threads.create_thread(boost::bind(&WorkerThread, io_svc, i));

  boost::asio::ip::tcp::socket sckt(*io_svc);

  try {
    boost::asio::ip::tcp::resolver resolver(*io_svc);
    boost::asio::ip::tcp::resolver::query query("www.packtpub.com", 
      boost::lexical_cast<std::string>(80)
    );
    boost::asio::ip::tcp::resolver::iterator iterator = resolver.resolve(query);
    boost::asio::ip::tcp::endpoint endpoint = *iterator;

    global_stream_lock.lock();
    std::cout << "Connecting to: " << endpoint << std::endl;
    global_stream_lock.unlock();

    sckt.connect(endpoint); 
    std::cout << "Connected!\n";
  }
  catch(std::exception &ex) {
    global_stream_lock.lock();
    std::cout << "Message: " << ex.what() << ".\n";
    global_stream_lock.unlock();
  }

  std::cin.get();

  boost::system::error_code ec;
  sckt.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
  sckt.close(ec);

  io_svc->stop();

  threads.join_all();

  return 0;
}

将前面的代码保存为connectsync.cpp,运行以下命令编译代码:

g++ -Wall -ansi -I ../boost_1_58_0 connectsync.cpp -o connectsync -L ../boost_1_58_0/stage/lib -l boost_system-mgw49-mt-1_58 -l ws2_32 -l libboost_thread-mgw49-mt-1_58

在控制台输入connectsync运行程序,我们应该会得到如下输出:

A synchronous client

我们一按进入键,程序就会退出。

现在,让我们分析一下代码。正如我们在前面的代码中所看到的,我们使用前面的示例代码并插入一行代码,以便能够建立连接。让我们注意我们插入的那一行:

boost::asio::ip::tcp::socket sckt(*io_svc);

我们现在有了一个全局变量socket。该变量将用于提供套接字功能。它来自命名空间boost::asio::ip::tcp,因为我们使用 TCP 作为我们的协议:

boost::asio::ip::tcp::resolver resolver(*io_svc);
boost::asio::ip::tcp::resolver::query query("www.packtpub.com",
 boost::lexical_cast<std::string>(80)
);
boost::asio::ip::tcp::resolver::iterator iterator =
resolver.resolve(query);

我们也使用命名空间boost::asio::ip::tcp::resolver。它用于获取我们要连接的远程主机的地址。使用query()类,我们传递互联网地址和端口作为参数。但是因为我们使用整数类型作为端口号,所以我们必须使用lexical_cast将其转换为字符串。查询类用于描述可以传递给解析器的查询。然后,通过使用iterator类,我们将根据解析器返回的结果定义迭代器:

boost::asio::ip::tcp::endpoint endpoint = *iterator;

迭代器创建成功后,我们将其赋予endpoint类型变量。端点将存储由resolver生成的ip地址列表:

sckt.connect(endpoint);

然后,connect()成员函数将套接字连接到端点,这是我们之前指定的。如果一切正常运行,并且没有抛出错误或异常,那么现在就建立了连接:

boost::system::error_code ec;
sckt.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
sckt.close(ec);

要释放连接,我们必须先使用shutdown()成员函数禁用套接字上的数据发送和接收过程;然后,我们调用close()成员函数关闭套接字。

当我们运行程序并获得像前面的图像一样的输出时,它会通知我们连接已经建立。例如,我们可以在query()类中将端口号更改为110,这是远程远程登录服务协议,如下所示:

boost::asio::ip::tcp::resolver::query query("www.packtpub.com",
 boost::lexical_cast<std::string>(110)
);

然后,程序会抛出一个异常,输出如下:

A synchronous client

从的输出中,我们可以得出结论,连接已经被目标机器拒绝,因为我们计划连接到的端口已经关闭。这意味着通过使用端口80,也就是超文本传输协议 ( HTTP ),我们可以与 Packt Publishing 网站建立连接。

异步客户端

我们已经能够同步建立连接。但是,如果我们需要异步连接到目标,以便程序在尝试连接时不会冻结,那该怎么办?让我们看看下面的代码来找到答案:

/* connectasync.cpp */
#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <boost/lexical_cast.hpp>
#include <iostream>
#include <string>

boost::mutex global_stream_lock;

void WorkerThread(boost::shared_ptr<boost::asio::io_service> iosvc, int counter) {
  global_stream_lock.lock();
  std::cout << "Thread " << counter << " Start.\n";
  global_stream_lock.unlock();

  while(true) {
    try {
      boost::system::error_code ec;
      iosvc->run(ec);
      if(ec) {
        global_stream_lock.lock();
        std::cout << "Message: " << ec << ".\n";
        global_stream_lock.unlock();
      }
      break;
    }
    catch(std::exception &ex) {
      global_stream_lock.lock();
      std::cout << "Message: " << ex.what() << ".\n";
      global_stream_lock.unlock();
    }
  }

  global_stream_lock.lock();
  std::cout << "Thread " << counter << " End.\n";
  global_stream_lock.unlock();
}

void OnConnect(const boost::system::error_code &ec) {
  if(ec) {
    global_stream_lock.lock();
    std::cout << "OnConnect Error: " << ec << ".\n";
    global_stream_lock.unlock();
  }
  else {
    global_stream_lock.lock();
    std::cout << "Connected!.\n";
    global_stream_lock.unlock();
  }
}

int main(void) {
  boost::shared_ptr<boost::asio::io_service> io_svc(
    new boost::asio::io_service
  );

  boost::shared_ptr<boost::asio::io_service::work> worker(
    new boost::asio::io_service::work(*io_svc)
  );

  boost::shared_ptr<boost::asio::io_service::strand> strand(
    new boost::asio::io_service::strand(*io_svc)
  );

  global_stream_lock.lock();
  std::cout << "Press ENTER to exit!\n";
  global_stream_lock.unlock();

  boost::thread_group threads;
  for(int i=1; i<=2; i++)
    threads.create_thread(boost::bind(&WorkerThread, io_svc, i));

  boost::shared_ptr<boost::asio::ip::tcp::socket> sckt(
    new boost::asio::ip::tcp::socket(*io_svc)
  );

  try {
    boost::asio::ip::tcp::resolver resolver(*io_svc);
    boost::asio::ip::tcp::resolver::query query("www.packtpub.com",
      boost::lexical_cast<std::string>(80)
    );
    boost::asio::ip::tcp::resolver::iterator iterator = resolver.resolve( query );
    boost::asio::ip::tcp::endpoint endpoint = *iterator;

    global_stream_lock.lock();
    std::cout << "Connecting to: " << endpoint << std::endl;
    global_stream_lock.unlock();

    sckt->async_connect(endpoint, boost::bind(OnConnect, _1));
  }
  catch(std::exception &ex) {
    global_stream_lock.lock();
    std::cout << "Message: " << ex.what() << ".\n";
    global_stream_lock.unlock();
  }

  std::cin.get();

  boost::system::error_code ec;
  sckt->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
  sckt->close(ec);

  io_svc->stop();

  threads.join_all();

  return 0;
}

然后,将前面的代码保存为connectasync.cpp并运行以下命令编译代码:

g++ -Wall -ansi -I ../boost_1_58_0 connectasync.cpp -o connectasync -L ../boost_1_58_0/stage/lib -l boost_system-mgw49-mt-1_58 -l ws2_32 -l libboost_thread-mgw49-mt-1_58

尝试运行程序,应该会得到如下输出:

An asynchronous client

正如我们在前面的代码中看到的,我们添加了OnConnect()函数。因为socket对象是不可复制的,并且我们需要确保它在处理程序等待调用时仍然有效,所以我们必须使用boost::shared_ptr命名空间。我们还使用boost::bind名称空间来调用处理程序,即OnConnect()函数。

异步服务器

我们已经知道如何同步和异步连接到远程主机。现在,我们将创建服务器程序,与之前创建的客户端程序进行对话。因为我们将处理boost::asio命名空间中的异步程序,所以我们将只讨论异步服务器中的客户端程序。让我们看看下面的代码:

/* serverasync.cpp */
#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <boost/lexical_cast.hpp>
#include <iostream>
#include <string>

boost::mutex global_stream_lock;

void WorkerThread(boost::shared_ptr<boost::asio::io_service> iosvc, int counter) {
  global_stream_lock.lock();
  std::cout << "Thread " << counter << " Start.\n";
  global_stream_lock.unlock();

  while(true) {
    try {
      boost::system::error_code ec;
      iosvc->run(ec);
      if(ec) {
        global_stream_lock.lock();
        std::cout << "Message: " << ec << ".\n";
        global_stream_lock.unlock();
      }
      break;
    }
    catch(std::exception &ex) {
      global_stream_lock.lock();
      std::cout << "Message: " << ex.what() << ".\n";
      global_stream_lock.unlock();
    }
  }

  global_stream_lock.lock();
  std::cout << "Thread " << counter << " End.\n";
  global_stream_lock.unlock();
}

void OnAccept(const boost::system::error_code &ec) {
  if(ec) {
    global_stream_lock.lock();
    std::cout << "OnAccept Error: " << ec << ".\n";
    global_stream_lock.unlock();
  }
  else {
    global_stream_lock.lock();
    std::cout << "Accepted!" << ".\n";
    global_stream_lock.unlock();
  }
}

int main(void) {
  boost::shared_ptr<boost::asio::io_service> io_svc(
    new boost::asio::io_service
  );

  boost::shared_ptr<boost::asio::io_service::work> worker(
    new boost::asio::io_service::work(*io_svc)
  );

  boost::shared_ptr<boost::asio::io_service::strand> strand(
    new boost::asio::io_service::strand(*io_svc)
  );

  global_stream_lock.lock();
  std::cout << "Press ENTER to exit!\n";
  global_stream_lock.unlock();

  boost::thread_group threads;
  for(int i=1; i<=2; i++)
    threads.create_thread(boost::bind(&WorkerThread, io_svc, i));

  boost::shared_ptr< boost::asio::ip::tcp::acceptor > acceptor(
    new boost::asio::ip::tcp::acceptor(*io_svc)
  );

  boost::shared_ptr<boost::asio::ip::tcp::socket> sckt(
    new boost::asio::ip::tcp::socket(*io_svc)
  );

  try {
    boost::asio::ip::tcp::resolver resolver(*io_svc);
    boost::asio::ip::tcp::resolver::query query(
      "127.0.0.1", 
      boost::lexical_cast<std::string>(4444)
    );
    boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);
    acceptor->open(endpoint.protocol());
    acceptor->set_option(
      boost::asio::ip::tcp::acceptor::reuse_address(false));
    acceptor->bind(endpoint);
    acceptor->listen(boost::asio::socket_base::max_connections);
    acceptor->async_accept(*sckt, boost::bind(OnAccept, _1));

    global_stream_lock.lock();
    std::cout << "Listening on: " << endpoint << std::endl;
    global_stream_lock.unlock();
  }
  catch(std::exception &ex) {
    global_stream_lock.lock();
    std::cout << "Message: " << ex.what() << ".\n";
    global_stream_lock.unlock();
  }

  std::cin.get();

  boost::system::error_code ec;
  acceptor->close(ec);

  sckt->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
  sckt->close(ec);

  io_svc->stop();

  threads.join_all();

  return 0;
}

将前面的代码保存为serverasync.cpp,运行以下命令编译代码:

g++ -Wall -ansi -I ../boost_1_58_0 serverasync.cpp -o serverasync -L ../boost_1_58_0/stage/lib -l boost_system-mgw49-mt-1_58 -l ws2_32 -l libboost_thread-mgw49-mt-1_58 –l mswsock

在运行程序之前,让我们区分代码。我们现在有了一个新的对象,那就是tcp::acceptor。此对象用于接受新的套接字连接。由于使用了accept()功能,我们需要在编译过程中添加mswsock库:

acptor->open(endpoint.protocol());
acptor->set_option
(boost::asio::ip::tcp::acceptor::reuse_address(false));
acptor->bind(endpoint);
acptor->listen(boost::asio::socket_base::max_connections);
acptor->async_accept(*sckt, boost::bind(OnAccept, _1));

从前面的代码片段中,我们可以看到程序调用open()函数,通过使用从endpoint变量中检索的协议来打开接受者。然后,通过使用set_option功能,我们在接受者上设置了一个不重用地址的选项。受体也使用bind()功能绑定到端点。之后,我们调用listen()函数,使接受者进入监听新连接的状态。最后,接受者将使用async_accept()功能接受新的连接,这将启动异步接受。

现在,是运行程序的时候了。我们需要在这里打开两个控制台。第一个控制台用于程序本身,第二个控制台用于调用telnet命令与服务器建立连接。我们只需要在运行完serverasync程序后运行telnet 127.0.0.1 4444命令(可以参考第二章了解联网概念,在命令提示符下调用telnet命令)。输出应该如下所示:

An asynchronous server

从上图可以看出,程序启动时正在监听端口4444,当我们调用telnet命令开始连接到端口4444后,程序接受连接。但是,因为我们只有一个 socket 对象,只调用async_accept()函数一次,程序只接受一个连接。

读写插座

我们正式能够建立客户端-服务器连接。现在,我们将读写插座,使连接更加有用。我们将修改我们的之前的代码,serverasync.cpp,并添加basic_stream_socket对象,它提供了面向流的套接字功能。

欲了解更多关于basic_stream_socket对象的详细信息,可登陆www . boost . org/doc/libs/1 _ 58 _ 0/doc/html/boost _ asio/reference/basic _ stream _ socket . html

现在,以为例看一下下面包含读取和写入套接字过程的代码:

/* readwritesocket.cpp */
#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/cstdint.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <iostream>
#include <string>

boost::mutex global_stream_lock;

void WorkerThread(boost::shared_ptr<boost::asio::io_service> iosvc, int counter) {
  global_stream_lock.lock();
  std::cout << "Thread " << counter << " Start.\n";
  global_stream_lock.unlock();

  while(true) {
    try {
      boost::system::error_code ec;
      iosvc->run(ec);
      if(ec) {
        global_stream_lock.lock();
        std::cout << "Message: " << ec << ".\n";
        global_stream_lock.unlock();
      }
      break;
    }
    catch(std::exception &ex) {
      global_stream_lock.lock();
      std::cout << "Message: " << ex.what() << ".\n";
      global_stream_lock.unlock();
    }
  }

  global_stream_lock.lock();
  std::cout << "Thread " << counter << " End.\n";
  global_stream_lock.unlock();
}

struct ClientContext : public boost::enable_shared_from_this<ClientContext> {
  boost::asio::ip::tcp::socket m_socket;

  std::vector<boost::uint8_t> m_recv_buffer;
  size_t m_recv_buffer_index;

  std::list<std::vector<boost::uint8_t> > m_send_buffer;

  ClientContext(boost::asio::io_service & io_service)
  : m_socket(io_service), m_recv_buffer_index(0) {
    m_recv_buffer.resize(4096);
  }

  ~ClientContext() {
  }

  void Close() {
    boost::system::error_code ec;
    m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
    m_socket.close(ec);
  }

  void OnSend(const boost::system::error_code &ec, std::list<std::vector<boost::uint8_t> >::iterator itr) {
    if(ec) {
      global_stream_lock.lock();
      std::cout << "OnSend Error: " << ec << ".\n";
      global_stream_lock.unlock();

      Close();
    }
    else {
      global_stream_lock.lock();
      std::cout << "Sent " << (*itr).size() << " bytes." << std::endl;
      global_stream_lock.unlock();
    }
    m_send_buffer.erase(itr);

    // Start the next pending send
    if(!m_send_buffer.empty()) {
      boost::asio::async_write(
        m_socket,
        boost::asio::buffer(m_send_buffer.front()),
        boost::bind(
          &ClientContext::OnSend,
          shared_from_this(),
          boost::asio::placeholders::error,
          m_send_buffer.begin()
        )
      );
    }
  }

  void Send(const void * buffer, size_t length) {
    bool can_send_now = false;

    std::vector<boost::uint8_t> output;
    std::copy((const boost::uint8_t *)buffer, (const boost::uint8_t *)buffer + length, std::back_inserter(output));

    // Store if this is the only current send or not
    can_send_now = m_send_buffer.empty();

    // Save the buffer to be sent
    m_send_buffer.push_back(output);

    // Only send if there are no more pending buffers waiting!
    if(can_send_now) {
      // Start the next pending send
      boost::asio::async_write(
        m_socket,
        boost::asio::buffer(m_send_buffer.front()),
        boost::bind(
          &ClientContext::OnSend,
          shared_from_this(),
          boost::asio::placeholders::error,
          m_send_buffer.begin()
        )
      );
    }
  }

  void OnRecv(const boost::system::error_code &ec, size_t bytes_transferred) {
    if(ec) {
      global_stream_lock.lock();
      std::cout << "OnRecv Error: " << ec << ".\n";
      global_stream_lock.unlock();

      Close();
    }
    else 	{
      // Increase how many bytes we have saved up
      m_recv_buffer_index += bytes_transferred;

      // Debug information
      global_stream_lock.lock();
      std::cout << "Recv " << bytes_transferred << " bytes." << std::endl;
      global_stream_lock.unlock();

      // Dump all the data
      global_stream_lock.lock();
      for(size_t x = 0; x < m_recv_buffer_index; ++ x) {

        std::cout << (char)m_recv_buffer[x] << " ";
        if((x + 1) % 16 == 0) {
          std::cout << std::endl;
        }
      }
      std::cout << std::endl << std::dec;
      global_stream_lock.unlock();

      // Clear all the data
      m_recv_buffer_index = 0;

      // Start the next receive cycle
      Recv();
    }
  }

  void Recv() {
    m_socket.async_read_some(
      boost::asio::buffer(
        &m_recv_buffer[m_recv_buffer_index],
        m_recv_buffer.size() - m_recv_buffer_index),
      boost::bind(&ClientContext::OnRecv, shared_from_this(), _1, _2)
    );
  }
};

void OnAccept(const boost::system::error_code &ec, boost::shared_ptr<ClientContext> clnt) {
  if(ec) {
    global_stream_lock.lock();
    std::cout << "OnAccept Error: " << ec << ".\n";
    global_stream_lock.unlock();
  }
  else {
    global_stream_lock.lock();
    std::cout << "Accepted!" << ".\n";
    global_stream_lock.unlock();

    // 2 bytes message size, followed by the message
    clnt->Send("Hi there!", 9);
    clnt->Recv();
  }
}

int main(void) {
  boost::shared_ptr<boost::asio::io_service> io_svc(
    new boost::asio::io_service
  );

  boost::shared_ptr<boost::asio::io_service::work> worker(
    new boost::asio::io_service::work(*io_svc)
  );

  boost::shared_ptr<boost::asio::io_service::strand> strand(
    new boost::asio::io_service::strand(*io_svc)
  );

  global_stream_lock.lock();
  std::cout << "Press ENTER to exit!\n";
  global_stream_lock.unlock();

  // We just use one worker thread 
  // in order that no thread safety issues
  boost::thread_group threads;
  threads.create_thread(boost::bind(&WorkerThread, io_svc, 1));

  boost::shared_ptr< boost::asio::ip::tcp::acceptor > acceptor(
    new boost::asio::ip::tcp::acceptor(*io_svc)
  );

  boost::shared_ptr<ClientContext> client(
    new ClientContext(*io_svc)
  );

  try {
    boost::asio::ip::tcp::resolver resolver(*io_svc);
    boost::asio::ip::tcp::resolver::query query(
      "127.0.0.1",
      boost::lexical_cast<std::string>(4444)
    );
    boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);
    acceptor->open(endpoint.protocol());
    acceptor->set_option(boost::asio::ip::tcp::acceptor::reuse_address(false));
    acceptor->bind(endpoint);
    acceptor->listen(boost::asio::socket_base::max_connections);
    acceptor->async_accept(client->m_socket, boost::bind(OnAccept, _1, client));

    global_stream_lock.lock();
    std::cout << "Listening on: " << endpoint << std::endl;
    global_stream_lock.unlock();
  }
  catch(std::exception &ex) {
    global_stream_lock.lock();
    std::cout << "Message: " << ex.what() << ".\n";
    global_stream_lock.unlock();
  }

  std::cin.get();

  boost::system::error_code ec;
  acceptor->close(ec);

  io_svc->stop();

  threads.join_all();

  return 0;
}

将前面的代码保存为readwritesocket.cpp,使用以下命令编译代码:

g++ -Wall -ansi -I ../boost_1_58_0 readwritesocket.cpp -o readwritesocket -L ../boost_1_58_0/stage/lib -l boost_system-mgw49-mt-1_58 -l ws2_32 -l libboost_thread-mgw49-mt-1_58 -l mswsock

如果我们将readwritesocket.cpp文件的代码与serverasync.cpp文件进行比较,我们会发现我们添加了一个名为ClientContext的新类。它包含五个成员功能:Send()OnSend()Recv()OnRecv()Close()

发送()和结束()功能

Send()功能中,我们输入一组字符和它们的长度。在函数发送字符数组之前,它必须检查m_send_buffer参数是否为空。只有当缓冲区不为空时,发送过程才会发生。

boost::asio::async_write命名空间写入套接字并调用OnSend()函数处理程序。然后,它擦除缓冲区,并发送下一个挂起的数据(如果有)。现在,每次我们按下telnet窗口中的任何键,它都会显示我们键入的内容,因为readwritesocket项目会将我们键入的内容发送回telnet窗口。

Recv()和 OnRecv()函数

与到Send()函数相反,Recv()函数将调用async_read_some()函数来接收数据集,OnRecv()函数处理程序将接收到的数据格式化为十六进制格式。

包装网络代码

为了方便起见,让我们为网络应用创建一个包装器。在使用这个包装器时,我们不需要一次又一次地重用我们的代码;从而使我们的编程过程更简单、更高效。现在,只需创建两个名为wrapper.hwrapper.cpp的文件,我们将在下一个代码中将其包含在编译过程中。因为源代码很长,不方便在这本书里打印,所以我把它们做成可下载的文件,你可以在这本书的知识库www . packtpub . com/networking-and-servers/boost asio-c-network-programming-second edition查阅。转到代码文件部分。

开发客户端和服务器程序

我们已经有了网络包装代码,通过使用“T0”库来简化开发网络应用的编程过程。现在,让我们使用包装代码创建一个客户端和服务器程序。

创建一个简单的回送服务器

我们将去创建一个服务器程序,它将回显从客户端检索的所有流量。在这种情况下,我们将使用telnet作为客户端,就像我们之前所做的那样。文件必须保存为echoserver.cpp,内容如下:

/* echoserver.cpp */
#include "wrapper.h"
#include <conio.h>
#include <boost/thread/mutex.hpp>

boost::mutex global_stream_lock;

class MyConnection : public Connection {
private:
  void OnAccept(const std::string &host, uint16_t port) {
    global_stream_lock.lock();
    std::cout << "[OnAccept] " << host << ":" << port << "\n";
    global_stream_lock.unlock();

    Recv();
  }

  void OnConnect(const std::string & host, uint16_t port) {
    global_stream_lock.lock();
    std::cout << "[OnConnect] " << host << ":" << port << "\n";
    global_stream_lock.unlock();

    Recv();
  }

  void OnSend(const std::vector<uint8_t> & buffer) {
    global_stream_lock.lock();
    std::cout << "[OnSend] " << buffer.size() << " bytes\n";
    for(size_t x=0; x<buffer.size(); x++) {

      std::cout << (char)buffer[x];
      if((x + 1) % 16 == 0)
        std::cout << std::endl;
    }
    std::cout << std::endl;
    global_stream_lock.unlock();
  }

  void OnRecv(std::vector<uint8_t> &buffer) {
    global_stream_lock.lock();
    std::cout << "[OnRecv] " << buffer.size() << " bytes\n";
    for(size_t x=0; x<buffer.size(); x++) {

      std::cout << (char)buffer[x];
      if((x + 1) % 16 == 0)
        std::cout << std::endl;
    }
    std::cout << std::endl;
    global_stream_lock.unlock();

    // Start the next receive
    Recv();

    // Echo the data back
    Send(buffer);
  }

  void OnTimer(const boost::posix_time::time_duration &delta) {
    global_stream_lock.lock();
    std::cout << "[OnTimer] " << delta << "\n";
    global_stream_lock.unlock();
  }

  void OnError(const boost::system::error_code &error) {
    global_stream_lock.lock();
    std::cout << "[OnError] " << error << "\n";
    global_stream_lock.unlock();
  }

public:
  MyConnection(boost::shared_ptr<Hive> hive)
    : Connection(hive) {
  }

  ~MyConnection() {
  }
};

class MyAcceptor : public Acceptor {
private:
  bool OnAccept(boost::shared_ptr<Connection> connection, const std::string &host, uint16_t port) {
    global_stream_lock.lock();
    std::cout << "[OnAccept] " << host << ":" << port << "\n";
    global_stream_lock.unlock();

    return true;
  }

  void OnTimer(const boost::posix_time::time_duration &delta) {
    global_stream_lock.lock();
    std::cout << "[OnTimer] " << delta << "\n";
    global_stream_lock.unlock();
  }

  void OnError(const boost::system::error_code &error) {
    global_stream_lock.lock();
    std::cout << "[OnError] " << error << "\n";
    global_stream_lock.unlock();
  }

public:
  MyAcceptor(boost::shared_ptr<Hive> hive)
    : Acceptor(hive) {
  }

  ~MyAcceptor() {
  }
};

int main(void) {
  boost::shared_ptr<Hive> hive(new Hive());

  boost::shared_ptr<MyAcceptor> acceptor(new MyAcceptor(hive));
  acceptor->Listen("127.0.0.1", 4444);

  boost::shared_ptr<MyConnection> connection(new MyConnection(hive));
  acceptor->Accept(connection);

  while(!_kbhit()) {
    hive->Poll();
    Sleep(1);
  }

  hive->Stop();

  return 0;
}

然后,使用以下命令编译前面的代码。在这里,我们可以看到我们在编译过程中包含wrapper.cpp来利用我们的包装代码:

g++ -Wall -ansi -I ../boost_1_58_0 wrapper.cpp echoserver.cpp -o echoserver -L ../boost_1_58_0/stage/lib -l boost_system-mgw49-mt-1_58 -l ws2_32 -l libboost_thread-mgw49-mt-1_58 -l mswsock

我们可以通过在控制台窗口中键入echoserver来尝试前面的程序;之后,我们应该得到如下输出:

Creating a simple echo server

我们第一次运行程序,它会在localhost中监听端口4444。我们可以在main块中看到,如果没有键盘敲击,程序会调用Hive类中的poll()函数。这意味着如果按下任何键,程序将关闭,因为它将调用Hive类中的Stop()功能,这将停止io_service对象。每隔 1000 毫秒,计时器就会滴答作响,因为Acceptor类的构造函数会启动计时器间隔 1000 毫秒。

现在,打开另一个控制台窗口,输入命令telnet 127.0.0.1 4444使telnet成为我们的客户端。echoserver接受连接后,每次按下键盘上的字母数字选项,echoserver都会将字符发送回telnet。下图描述了echoservertelnet服务器之间的验收连接:

Creating a simple echo server

当服务器接受来自客户端的连接时,OnAccept()函数处理程序将立即被调用。我在telnet窗口中分别按下了 ABC 键,然后echoserver收到了字符并发回客户端。telnet窗口还显示ABC

创建一个简单的客户端程序

我们已经成功地创建了一个服务器端程序。现在,我们将继续开发客户端程序。它将通过HTTP GET命令接收 Packt Publishing 网站的内容,代码如下:

/* clienthttpget.cpp */
#include "wrapper.h"
#include <conio.h>
#include <boost/thread/mutex.hpp>

boost::mutex global_stream_lock;

class MyConnection : public Connection {
private:
  void OnAccept(const std::string &host, uint16_t port) {
    global_stream_lock.lock();
    std::cout << "[OnAccept] " << host << ":" << port << "\n";
    global_stream_lock.unlock();

    // Start the next receive
    Recv();
  }

  void OnConnect(const std::string &host, uint16_t port) {
    global_stream_lock.lock();
    std::cout << "[OnConnect] " << host << ":" << port << "\n";
    global_stream_lock.unlock();

    // Start the next receive
    Recv();

    std::string str = "GET / HTTP/1.0\r\n\r\n";

    std::vector<uint8_t> request;
    std::copy(str.begin(), str.end(), std::back_inserter(request));
    Send(request);
  }

  void OnSend(const std::vector<uint8_t> &buffer) {
    global_stream_lock.lock();
    std::cout << "[OnSend] " << buffer.size() << " bytes\n";
    for(size_t x=0; x<buffer.size(); x++) {

      std::cout << (char)buffer[x];
      if((x + 1) % 16 == 0)
        std::cout << "\n";
    }
    std::cout << "\n";
    global_stream_lock.unlock();
  }

  void OnRecv(std::vector<uint8_t> &buffer) {
    global_stream_lock.lock();
    std::cout << "[OnRecv] " << buffer.size() << " bytes\n";
    for(size_t x=0; x<buffer.size(); x++) {

      std::cout << (char)buffer[x];
      if((x + 1) % 16 == 0)
        std::cout << "\n";
    }
    std::cout << "\n";
    global_stream_lock.unlock();

    // Start the next receive
    Recv();
  }

  void OnTimer(const boost::posix_time::time_duration &delta) {
    global_stream_lock.lock();
    std::cout << "[OnTimer] " << delta << std::endl;
    global_stream_lock.unlock();
  }

  void OnError(const boost::system::error_code &error) {
    global_stream_lock.lock();
    std::cout << "[OnError] " << error << "\n";
    global_stream_lock.unlock();
  }

public:
  MyConnection(boost::shared_ptr<Hive> hive)
    : Connection(hive) {
  }

  ~MyConnection() {
  }
};

int main(void) {
  boost::shared_ptr<Hive> hive(new Hive());

  boost::shared_ptr<MyConnection> connection(new MyConnection(hive));
  connection->Connect("www.packtpub.com", 80);

  while(!_kbhit()) {
    hive->Poll();
    Sleep(1);
  }

  hive->Stop();

  return 0;
}

将前面的代码保存为clienthttpget.cpp,并使用以下命令编译代码:

g++ -Wall -ansi -I ../boost_1_58_0 wrapper.cpp clienthttpget.cpp -o clienthttpget -L ../boost_1_58_0/stage/lib -l boost_system-mgw49-mt-1_58 -l ws2_32 –l libboost_thread-mgw49-mt-1_58 -l mswsock

当我们运行程序时,将显示以下输出:

Creating a simple client program

就在连接建立之后,程序使用以下代码片段向www.packtpub.com的端口80 发送HTTP GET命令:

std::string str = "GET / HTTP/1.0\r\n\r\n";
std::vector<uint8_t> request;
std::copy(str.begin(), str.end(), std::back_inserter(request));
Send(request)

然后,它使用wrapper.cpp文件代码内的Connection类中的Send()函数向套接字发送请求。Send()功能的代码片段如下:

m_io_strand.post(boost::bind(&Connection::DispatchSend, shared_from_this(), buffer));

如我们所见,我们使用strand对象来允许所有事件串行运行。此外,由于有了strand对象,我们不必在每次事件发生时都使用lock对象。

发送请求后,程序将使用以下代码片段汇集传入的数据:

m_io_service.poll();

然后,一旦数据到来,它将由OnRecv()功能处理器显示在控制台中,如我们在前面的图像中所见。

总结

开发网络应用有三个基本步骤。第一步包括在源和目标之间建立连接,这意味着客户端和服务器。我们可以配置socket对象和acceptor对象来建立连接。

其次,我们通过读写套接字来交换数据。为此,我们可以使用basic_stream_socket函数集合。在前面的例子中,我们使用boost::asio::async_write()方法发送数据,使用boost::asio::async_read()方法接收数据。最后,最后一步是释放连接。通过使用ip::tcp::socket对象中的shutdown()方法,我们可以禁用套接字上的数据发送和接收。然后,在shutdown()功能后调用close()方法将关闭插座并释放处理器。我们也已经为所有函数创建了一个包装器,通过访问Boost.Asio库,它在网络应用编程中最常用。这意味着我们可以简单有效地开发网络应用,因为我们不需要一遍又一遍地重用代码。