diff --git a/src/sofa/pbrpc/rpc_byte_stream.h b/src/sofa/pbrpc/rpc_byte_stream.h index 00e9c09..7eb3abe 100644 --- a/src/sofa/pbrpc/rpc_byte_stream.h +++ b/src/sofa/pbrpc/rpc_byte_stream.h @@ -10,6 +10,10 @@ #include // for snprintf() #include // for memset() +#include +#include +#include + #include #include @@ -31,6 +35,7 @@ namespace sofa { namespace pbrpc { using boost::asio::ip::tcp; +using namespace boost::asio; class RpcByteStream : public sofa::pbrpc::enable_shared_from_this { @@ -40,7 +45,9 @@ class RpcByteStream : public sofa::pbrpc::enable_shared_from_this , _remote_endpoint(endpoint) , _ticks(0) , _last_rw_ticks(0) + , _timer(io_service) , _socket(io_service) + , _connect_timeout(-1) , _status(STATUS_INIT) { SOFA_PBRPC_INC_RESOURCE_COUNTER(RpcByteStream); @@ -79,6 +86,19 @@ class RpcByteStream : public sofa::pbrpc::enable_shared_from_this } } + void on_connect_timeout(const boost::system::error_code& error) + { + if (_status != STATUS_CONNECTING) + { + return; + } + if (error == boost::asio::error::operation_aborted) + { + return; + } + close("connect timeout"); + } + // Connect the channel. Used by client. void async_connect() { @@ -89,6 +109,11 @@ class RpcByteStream : public sofa::pbrpc::enable_shared_from_this _status = STATUS_CONNECTING; _socket.async_connect(_remote_endpoint, boost::bind(&RpcByteStream::on_connect, shared_from_this(), _1)); + if (_connect_timeout > 0) + { + _timer.expires_from_now(boost::posix_time::milliseconds(_connect_timeout)); + _timer.async_wait(boost::bind(&RpcByteStream::on_connect_timeout, shared_from_this(), _1)); + } } // Update remote endpoint from socket. Used by server. @@ -221,6 +246,16 @@ class RpcByteStream : public sofa::pbrpc::enable_shared_from_this return _last_rw_ticks; } + void set_connect_timeout(int64 timeout) + { + _connect_timeout = timeout; + } + + int64 connect_timeout() + { + return _connect_timeout; + } + // Trigger receiving operator. // @return true if suceessfully triggered virtual bool trigger_receive() = 0; @@ -273,6 +308,12 @@ class RpcByteStream : public sofa::pbrpc::enable_shared_from_this { SOFA_PBRPC_FUNCTION_TRACE; + //Maybe already timeout + if (_status != STATUS_CONNECTING) + { + return; + } + if (error) { // TODO retry connect? @@ -338,6 +379,8 @@ class RpcByteStream : public sofa::pbrpc::enable_shared_from_this #endif _status = STATUS_CONNECTED; + _timer.cancel(); + trigger_receive(); trigger_send(); } @@ -351,7 +394,9 @@ class RpcByteStream : public sofa::pbrpc::enable_shared_from_this volatile int64 _last_rw_ticks; private: + deadline_timer _timer; tcp::socket _socket; + int64 _connect_timeout; enum { STATUS_INIT = 0, diff --git a/src/sofa/pbrpc/rpc_client.h b/src/sofa/pbrpc/rpc_client.h index 3c5942f..1eb368d 100644 --- a/src/sofa/pbrpc/rpc_client.h +++ b/src/sofa/pbrpc/rpc_client.h @@ -35,6 +35,14 @@ struct RpcClientOptions { int max_throughput_out; // max network out throughput for all connections. // in MB/s, should >= -1, -1 means no limit, default -1. + // Timeout for connect in milliseconds + // + // If it is not set or set no more than 0, it will not consider the timeout + // ot the connect. + // + // default is -1 + int connect_timeout; + RpcClientOptions() : work_thread_num(4) , callback_thread_num(4) @@ -42,6 +50,7 @@ struct RpcClientOptions { , max_pending_buffer_size(100) , max_throughput_in(-1) , max_throughput_out(-1) + , connect_timeout(-1) {} }; diff --git a/src/sofa/pbrpc/rpc_client_impl.cc b/src/sofa/pbrpc/rpc_client_impl.cc index 0800adc..a1a9b91 100644 --- a/src/sofa/pbrpc/rpc_client_impl.cc +++ b/src/sofa/pbrpc/rpc_client_impl.cc @@ -364,8 +364,7 @@ bool RpcClientImpl::ResolveAddress(const std::string& address, return sofa::pbrpc::ResolveAddress(_work_thread_group->io_service(), address, endpoint); } -RpcClientStreamPtr RpcClientImpl::FindOrCreateStream( - const RpcEndpoint& remote_endpoint) +RpcClientStreamPtr RpcClientImpl::FindOrCreateStream(const RpcEndpoint& remote_endpoint) { RpcClientStreamPtr stream; bool create = false; @@ -382,6 +381,7 @@ RpcClientStreamPtr RpcClientImpl::FindOrCreateStream( stream->set_flow_controller(_flow_controller); stream->set_max_pending_buffer_size(_max_pending_buffer_size); stream->reset_ticks((ptime_now() - _epoch_time).ticks(), true); + stream->set_connect_timeout(_options.connect_timeout); stream->set_closed_stream_callback( boost::bind(&RpcClientImpl::OnClosed, shared_from_this(), _1)); diff --git a/src/sofa/pbrpc/rpc_client_impl.h b/src/sofa/pbrpc/rpc_client_impl.h index f62de99..6c28f1c 100644 --- a/src/sofa/pbrpc/rpc_client_impl.h +++ b/src/sofa/pbrpc/rpc_client_impl.h @@ -57,7 +57,7 @@ class RpcClientImpl: public sofa::pbrpc::enable_shared_from_this RpcEndpoint* endpoint); private: - // Get stream for "remote_endpoint". Return null ptr if failed. + // Get stream for "remote_endpoint". Return null ptr if failed. RpcClientStreamPtr FindOrCreateStream(const RpcEndpoint& remote_endpoint); void OnClosed(const RpcClientStreamPtr& stream);