Skip to content

Commit

Permalink
Merge pull request #84 from qinzuoyan/master
Browse files Browse the repository at this point in the history
support server timeout to avoid unnecessary response
  • Loading branch information
qinzuoyan committed Apr 1, 2016
2 parents dc55b40 + e729f75 commit f6ba9de
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 14 deletions.
30 changes: 25 additions & 5 deletions sample/timeout_sample/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ int main(int /*argc*/, char** /*argv*/)

// Call 1
SLOG(NOTICE, "----------- Call 1 ---------------------------------------");
SLOG(NOTICE, "Sync call SleepWithServiceTimeout(), timeout is 2 seconds.");
SLOG(NOTICE, "Sync call SleepWithServiceTimeout(), default timeout is 2 seconds.");
SLOG(NOTICE, "Sleep for 1 seconds.");
SLOG(NOTICE, "Expect result is success.");
cntl->Reset();
request->set_sleep_time(1);
stub->SleepWithServiceTimeout(cntl, request, response, NULL);
Expand All @@ -42,11 +43,15 @@ int main(int /*argc*/, char** /*argv*/)
else {
SLOG(NOTICE, "Succeed: %s", response->message().c_str());
}
if (cntl->Failed()) {
SLOG(ERROR, "!!!! unexpected result");
}

// Call 2
SLOG(NOTICE, "----------- Call 2 ---------------------------------------");
SLOG(NOTICE, "Sync call SleepWithServiceTimeout(), timeout is 2 seconds.");
SLOG(NOTICE, "Sync call SleepWithServiceTimeout(), default timeout is 2 seconds.");
SLOG(NOTICE, "Sleep for 3 seconds.");
SLOG(NOTICE, "Expect result is failure.");
cntl->Reset();
request->set_sleep_time(3);
stub->SleepWithServiceTimeout(cntl, request, response, NULL);
Expand All @@ -57,11 +62,15 @@ int main(int /*argc*/, char** /*argv*/)
else {
SLOG(NOTICE, "Succeed: %s", response->message().c_str());
}
if (!cntl->Failed()) {
SLOG(ERROR, "!!!! unexpected result");
}

// Call 3
SLOG(NOTICE, "----------- Call 3 ---------------------------------------");
SLOG(NOTICE, "Sync call SleepWithMethodTimeout(), timeout is 4 seconds.");
SLOG(NOTICE, "Sync call SleepWithMethodTimeout(), default timeout is 4 seconds.");
SLOG(NOTICE, "Sleep for 3 seconds.");
SLOG(NOTICE, "Expect result is success.");
cntl->Reset();
request->set_sleep_time(3);
stub->SleepWithMethodTimeout(cntl, request, response, NULL);
Expand All @@ -72,11 +81,15 @@ int main(int /*argc*/, char** /*argv*/)
else {
SLOG(NOTICE, "Succeed: %s", response->message().c_str());
}
if (cntl->Failed()) {
SLOG(ERROR, "!!!! unexpected result");
}

// Call 4
SLOG(NOTICE, "----------- Call 4 ---------------------------------------");
SLOG(NOTICE, "Sync call SleepWithMethodTimeout(), timeout is 4 seconds.");
SLOG(NOTICE, "Sync call SleepWithMethodTimeout(), default timeout is 4 seconds.");
SLOG(NOTICE, "Sleep for 5 seconds.");
SLOG(NOTICE, "Expect result is failure.");
cntl->Reset();
request->set_sleep_time(5);
stub->SleepWithMethodTimeout(cntl, request, response, NULL);
Expand All @@ -87,12 +100,16 @@ int main(int /*argc*/, char** /*argv*/)
else {
SLOG(NOTICE, "Succeed: %s", response->message().c_str());
}
if (!cntl->Failed()) {
SLOG(ERROR, "!!!! unexpected result");
}

// Call 5
SLOG(NOTICE, "----------- Call 5 ---------------------------------------");
SLOG(NOTICE, "Sync call SleepWithMethodTimeout(), timeout is 4 seconds.");
SLOG(NOTICE, "Sync call SleepWithMethodTimeout(), default timeout is 4 seconds.");
SLOG(NOTICE, "Set timeout of RpcController to 1 seconds.");
SLOG(NOTICE, "Sleep for 3 seconds.");
SLOG(NOTICE, "Expect result is failure.");
cntl->Reset();
cntl->SetTimeout(1000);
request->set_sleep_time(3);
Expand All @@ -104,6 +121,9 @@ int main(int /*argc*/, char** /*argv*/)
else {
SLOG(NOTICE, "Succeed: %s", response->message().c_str());
}
if (!cntl->Failed()) {
SLOG(ERROR, "!!!! unexpected result");
}

delete cntl;
delete request;
Expand Down
4 changes: 4 additions & 0 deletions src/sofa/pbrpc/binary_rpc_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ void BinaryRpcRequest::ProcessRequest(
cntl->SetLocalEndpoint(_local_endpoint);
cntl->SetRemoteEndpoint(_remote_endpoint);
cntl->SetRpcServerStream(stream);
if (_req_meta.has_server_timeout() && _req_meta.server_timeout() > 0)
{
cntl->SetServerTimeout(_req_meta.server_timeout());
}
cntl->SetRequestReceivedTime(_received_time);
cntl->SetResponseCompressType(_req_meta.has_expected_response_compress_type() ?
_req_meta.expected_response_compress_type() : CompressTypeNone);
Expand Down
6 changes: 5 additions & 1 deletion src/sofa/pbrpc/rpc_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,11 @@ void RpcClientImpl::CallMethod(const google::protobuf::Message* request,
meta.set_type(RpcMeta::REQUEST);
meta.set_sequence_id(cntl->SequenceId());
meta.set_method(cntl->MethodId());
int64 timeout = cntl->Timeout();
if (timeout > 0)
{
meta.set_server_timeout(timeout);
}
meta.set_compress_type(cntl->RequestCompressType());
meta.set_expected_response_compress_type(cntl->ResponseCompressType());

Expand Down Expand Up @@ -328,7 +333,6 @@ void RpcClientImpl::CallMethod(const google::protobuf::Message* request,
cntl->PushDoneCallback(boost::bind(&RpcClientImpl::DoneCallback, shared_from_this(), response, _1));

// add to timeout manager if need
int64 timeout = cntl->Timeout();
if (timeout > 0)
{
if (!_timeout_manager->add(cntl))
Expand Down
31 changes: 27 additions & 4 deletions src/sofa/pbrpc/rpc_client_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ class RpcClientStream : public RpcMessageStream<RpcControllerImplPtr>
async_send_message(cntl->RequestBuffer(), cntl);
}

// erase request from controller map by sequence id.
void erase_request(uint64 sequence_id)
{
ScopedLocker<FastLock> _(_controller_map_lock);
_controller_map.erase(sequence_id);
}

private:
virtual void on_closed()
{
Expand Down Expand Up @@ -113,6 +120,8 @@ class RpcClientStream : public RpcMessageStream<RpcControllerImplPtr>
{
SOFA_PBRPC_FUNCTION_TRACE;

// TODO: set rpc_meta.server_timeout here to make it more accurate.

// if already done (may be done by timeout manager),
// should cancel sending.
return !cntl->IsDone() && !cntl->IsStartCancel();
Expand Down Expand Up @@ -202,11 +211,25 @@ class RpcClientStream : public RpcMessageStream<RpcControllerImplPtr>
//
// just ignore it
#if defined( LOG )
LOG(ERROR) << "on_received(): " << RpcEndpointToString(_remote_endpoint)
<< " {" << meta.sequence_id() << "}"
<< ": sequence id not found, ignore";
LOG(WARNING) << "on_received(): " << RpcEndpointToString(_remote_endpoint)
<< " {" << meta.sequence_id() << "}"
<< ": sequence id not found (maybe already timeout), ignore";
#else
SLOG(WARNING, "on_received(): %s {%lu}: sequence id not found (maybe already timeout), ignore",
RpcEndpointToString(_remote_endpoint).c_str(), meta.sequence_id());
#endif
return;
}

if (cntl->IsDone())
{
// just ignore it
#if defined( LOG )
LOG(WARNING) << "on_received(): " << RpcEndpointToString(_remote_endpoint)
<< " {" << meta.sequence_id() << "}"
<< ": request already done (maybe already timeout), ignore";
#else
SLOG(ERROR, "on_received(): %s {%lu}: sequence id not found, ignore",
SLOG(WARNING, "on_received(): %s {%lu}: request already done (maybe already timeout), ignore",
RpcEndpointToString(_remote_endpoint).c_str(), meta.sequence_id());
#endif
return;
Expand Down
12 changes: 12 additions & 0 deletions src/sofa/pbrpc/rpc_controller_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class RpcControllerImpl : public sofa::pbrpc::enable_shared_from_this<RpcControl
, _is_start_cancel(false)
, _is_sync(false)
, _timeout_id(0)
, _server_timeout(0)
, _is_http(false)
, _http_path(NULL)
, _http_query_params(NULL)
Expand Down Expand Up @@ -351,6 +352,16 @@ class RpcControllerImpl : public sofa::pbrpc::enable_shared_from_this<RpcControl
return _server_stream;
}

void SetServerTimeout(int64 timeout)
{
_server_timeout = timeout;
}

int64 ServerTimeout() const
{
return _server_timeout;
}

void SetRequestReceivedTime(const PTime& time)
{
_request_received_time = time;
Expand Down Expand Up @@ -463,6 +474,7 @@ class RpcControllerImpl : public sofa::pbrpc::enable_shared_from_this<RpcControl

// used only in server side
RpcServerStreamWPtr _server_stream;
int64 _server_timeout;
PTime _request_received_time;
PTime _start_process_time;
PTime _finish_process_time;
Expand Down
5 changes: 4 additions & 1 deletion src/sofa/pbrpc/rpc_meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@ message RpcMeta {
required uint64 sequence_id = 2;

/////////////////////////////////////////////////////
// The following fields are used only for response.
// The following fields are used only for request.

// Method full name.
// For example: "test.HelloService.GreetMethod"
optional string method = 100;

// Server timeout in milli-seconds.
optional int64 server_timeout = 101;

/////////////////////////////////////////////////////
// The following fields are used only for response.

Expand Down
30 changes: 28 additions & 2 deletions src/sofa/pbrpc/rpc_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,41 @@ void RpcRequest::OnCallMethodDone(
{
const RpcControllerImplPtr& cntl = controller->impl();
cntl->SetFinishProcessTime(ptime_now());

if (cntl->ServerTimeout() > 0)
{
int64 server_time_ms =
(cntl->FinishProcessTime() - cntl->RequestReceivedTime()).total_milliseconds();
if (server_time_ms > cntl->ServerTimeout())
{
#if defined( LOG )
LOG(WARNING) << "OnCallMethodDone(): " << RpcEndpointToString(_remote_endpoint)
<< " {" << cntl->SequenceId() << "}"
<< ": call method \"" << cntl->MethodId() << "\" timeout"
<< ": server_timeout_ms=" << cntl->ServerTimeout()
<< ", server_used_time_ms=" << server_time_ms;
#else
SLOG(WARNING, "OnCallMethodDone(): %s {%lu}: call method \"%s\" timeout: "
"server_timeout_ms=%lld, server_used_time_ms=%lld",
RpcEndpointToString(_remote_endpoint).c_str(), cntl->SequenceId(),
cntl->MethodId().c_str(), cntl->ServerTimeout(), server_time_ms);
#endif
delete request;
delete response;
delete controller;
return;
}
}

int64 process_time_us =
(cntl->FinishProcessTime() - cntl->StartProcessTime()).total_microseconds();
if (cntl->Failed())
{
#if defined( LOG )
LOG(ERROR) << "OnCallMethodDone(): " << RpcEndpointToString(_remote_endpoint)
<< " {" << cntl->SequenceId() << "}"
<< ": call method \"" << cntl->MethodId() << "\" failed"
": " << RpcErrorCodeToString(cntl->ErrorCode()) << ": " << cntl->Reason();
<< ": call method \"" << cntl->MethodId() << "\" failed: "
<< RpcErrorCodeToString(cntl->ErrorCode()) << ": " << cntl->Reason();
#else
SLOG(ERROR, "OnCallMethodDone(): %s {%lu}: call method \"%s\" failed: %s: %s",
RpcEndpointToString(_remote_endpoint).c_str(), cntl->SequenceId(),
Expand Down
14 changes: 13 additions & 1 deletion src/sofa/pbrpc/rpc_timeout_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <sofa/pbrpc/common_internal.h>
#include <sofa/pbrpc/rpc_controller_impl.h>
#include <sofa/pbrpc/rpc_client_stream.h>
#include <sofa/pbrpc/timer_worker.h>

namespace sofa {
Expand Down Expand Up @@ -202,9 +203,20 @@ class RpcTimeoutManager : public sofa::pbrpc::enable_shared_from_this<RpcTimeout
void notify_timeout(const RpcControllerImplWPtr& weak_cntl)
{
RpcControllerImplPtr cntl = weak_cntl.lock();
if (cntl) {
if (cntl)
{
// ATTENTION: here we reset timeout id to 0 to avoid unnecessarily erasing
// from timeout manager in RpcClientImpl::DoneCallback(), because when
// timeout id is 0, then RpcTimeoutManager::erase() will do nothing.
cntl->SetTimeoutId(0u);
cntl->Done(RPC_ERROR_REQUEST_TIMEOUT, "timeout");

// erase from RpcClientStream
RpcClientStreamPtr stream = cntl->RpcClientStream().lock();
if (stream)
{
stream->erase_request(cntl->SequenceId());
}
}
}

Expand Down

0 comments on commit f6ba9de

Please sign in to comment.