Skip to content

Commit

Permalink
Merge branch 'master' of github.com:baidu/sofa-pbrpc
Browse files Browse the repository at this point in the history
  • Loading branch information
qinzuoyan committed Apr 1, 2016
2 parents 246544b + dc55b40 commit e729f75
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 3 deletions.
45 changes: 45 additions & 0 deletions src/sofa/pbrpc/rpc_byte_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
#include <cstdio> // for snprintf()
#include <cstring> // for memset()

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp>

#include <sofa/pbrpc/common_internal.h>
#include <sofa/pbrpc/rpc_endpoint.h>

Expand All @@ -31,6 +35,7 @@ namespace sofa {
namespace pbrpc {

using boost::asio::ip::tcp;
using namespace boost::asio;

class RpcByteStream : public sofa::pbrpc::enable_shared_from_this<RpcByteStream>
{
Expand All @@ -40,7 +45,9 @@ class RpcByteStream : public sofa::pbrpc::enable_shared_from_this<RpcByteStream>
, _remote_endpoint(endpoint)
, _ticks(0)
, _last_rw_ticks(0)
, _timer(io_service)
, _socket(io_service)
, _connect_timeout(-1)
, _status(STATUS_INIT)
{
SOFA_PBRPC_INC_RESOURCE_COUNTER(RpcByteStream);
Expand Down Expand Up @@ -79,6 +86,19 @@ class RpcByteStream : public sofa::pbrpc::enable_shared_from_this<RpcByteStream>
}
}

void on_connect_timeout(const boost::system::error_code& error)
{
if (_status != STATUS_CONNECTING)
{
return;
}
if (error == boost::asio::error::operation_aborted)
{
return;
}
close("connect timeout");
}

// Connect the channel. Used by client.
void async_connect()
{
Expand All @@ -89,6 +109,11 @@ class RpcByteStream : public sofa::pbrpc::enable_shared_from_this<RpcByteStream>
_status = STATUS_CONNECTING;
_socket.async_connect(_remote_endpoint,
boost::bind(&RpcByteStream::on_connect, shared_from_this(), _1));
if (_connect_timeout > 0)
{
_timer.expires_from_now(boost::posix_time::milliseconds(_connect_timeout));
_timer.async_wait(boost::bind(&RpcByteStream::on_connect_timeout, shared_from_this(), _1));
}
}

// Update remote endpoint from socket. Used by server.
Expand Down Expand Up @@ -221,6 +246,16 @@ class RpcByteStream : public sofa::pbrpc::enable_shared_from_this<RpcByteStream>
return _last_rw_ticks;
}

void set_connect_timeout(int64 timeout)
{
_connect_timeout = timeout;
}

int64 connect_timeout()
{
return _connect_timeout;
}

// Trigger receiving operator.
// @return true if suceessfully triggered
virtual bool trigger_receive() = 0;
Expand Down Expand Up @@ -273,6 +308,12 @@ class RpcByteStream : public sofa::pbrpc::enable_shared_from_this<RpcByteStream>
{
SOFA_PBRPC_FUNCTION_TRACE;

//Maybe already timeout
if (_status != STATUS_CONNECTING)
{
return;
}

if (error)
{
// TODO retry connect?
Expand Down Expand Up @@ -338,6 +379,8 @@ class RpcByteStream : public sofa::pbrpc::enable_shared_from_this<RpcByteStream>
#endif

_status = STATUS_CONNECTED;
_timer.cancel();

trigger_receive();
trigger_send();
}
Expand All @@ -351,7 +394,9 @@ class RpcByteStream : public sofa::pbrpc::enable_shared_from_this<RpcByteStream>
volatile int64 _last_rw_ticks;

private:
deadline_timer _timer;
tcp::socket _socket;
int64 _connect_timeout;

enum {
STATUS_INIT = 0,
Expand Down
9 changes: 9 additions & 0 deletions src/sofa/pbrpc/rpc_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,22 @@ struct RpcClientOptions {
int max_throughput_out; // max network out throughput for all connections.
// in MB/s, should >= -1, -1 means no limit, default -1.

// Timeout for connect in milliseconds
//
// If it is not set or set no more than 0, it will not consider the timeout
// ot the connect.
//
// default is -1
int connect_timeout;

RpcClientOptions()
: work_thread_num(4)
, callback_thread_num(4)
, keep_alive_time(-1)
, max_pending_buffer_size(100)
, max_throughput_in(-1)
, max_throughput_out(-1)
, connect_timeout(-1)
{}
};

Expand Down
4 changes: 2 additions & 2 deletions src/sofa/pbrpc/rpc_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,7 @@ bool RpcClientImpl::ResolveAddress(const std::string& address,
return sofa::pbrpc::ResolveAddress(_work_thread_group->io_service(), address, endpoint);
}

RpcClientStreamPtr RpcClientImpl::FindOrCreateStream(
const RpcEndpoint& remote_endpoint)
RpcClientStreamPtr RpcClientImpl::FindOrCreateStream(const RpcEndpoint& remote_endpoint)
{
RpcClientStreamPtr stream;
bool create = false;
Expand All @@ -382,6 +381,7 @@ RpcClientStreamPtr RpcClientImpl::FindOrCreateStream(
stream->set_flow_controller(_flow_controller);
stream->set_max_pending_buffer_size(_max_pending_buffer_size);
stream->reset_ticks((ptime_now() - _epoch_time).ticks(), true);
stream->set_connect_timeout(_options.connect_timeout);
stream->set_closed_stream_callback(
boost::bind(&RpcClientImpl::OnClosed, shared_from_this(), _1));

Expand Down
2 changes: 1 addition & 1 deletion src/sofa/pbrpc/rpc_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class RpcClientImpl: public sofa::pbrpc::enable_shared_from_this<RpcClientImpl>
RpcEndpoint* endpoint);

private:
// Get stream for "remote_endpoint". Return null ptr if failed.
// Get stream for "remote_endpoint". Return null ptr if failed.
RpcClientStreamPtr FindOrCreateStream(const RpcEndpoint& remote_endpoint);

void OnClosed(const RpcClientStreamPtr& stream);
Expand Down

0 comments on commit e729f75

Please sign in to comment.