diff --git a/sample/timeout_sample/client.cc b/sample/timeout_sample/client.cc index f555e76..5bf6069 100644 --- a/sample/timeout_sample/client.cc +++ b/sample/timeout_sample/client.cc @@ -30,8 +30,9 @@ int main(int /*argc*/, char** /*argv*/) // Call 1 SLOG(NOTICE, "----------- Call 1 ---------------------------------------"); - SLOG(NOTICE, "Sync call SleepWithServiceTimeout(), timeout is 2 seconds."); + SLOG(NOTICE, "Sync call SleepWithServiceTimeout(), default timeout is 2 seconds."); SLOG(NOTICE, "Sleep for 1 seconds."); + SLOG(NOTICE, "Expect result is success."); cntl->Reset(); request->set_sleep_time(1); stub->SleepWithServiceTimeout(cntl, request, response, NULL); @@ -42,11 +43,15 @@ int main(int /*argc*/, char** /*argv*/) else { SLOG(NOTICE, "Succeed: %s", response->message().c_str()); } + if (cntl->Failed()) { + SLOG(ERROR, "!!!! unexpected result"); + } // Call 2 SLOG(NOTICE, "----------- Call 2 ---------------------------------------"); - SLOG(NOTICE, "Sync call SleepWithServiceTimeout(), timeout is 2 seconds."); + SLOG(NOTICE, "Sync call SleepWithServiceTimeout(), default timeout is 2 seconds."); SLOG(NOTICE, "Sleep for 3 seconds."); + SLOG(NOTICE, "Expect result is failure."); cntl->Reset(); request->set_sleep_time(3); stub->SleepWithServiceTimeout(cntl, request, response, NULL); @@ -57,11 +62,15 @@ int main(int /*argc*/, char** /*argv*/) else { SLOG(NOTICE, "Succeed: %s", response->message().c_str()); } + if (!cntl->Failed()) { + SLOG(ERROR, "!!!! unexpected result"); + } // Call 3 SLOG(NOTICE, "----------- Call 3 ---------------------------------------"); - SLOG(NOTICE, "Sync call SleepWithMethodTimeout(), timeout is 4 seconds."); + SLOG(NOTICE, "Sync call SleepWithMethodTimeout(), default timeout is 4 seconds."); SLOG(NOTICE, "Sleep for 3 seconds."); + SLOG(NOTICE, "Expect result is success."); cntl->Reset(); request->set_sleep_time(3); stub->SleepWithMethodTimeout(cntl, request, response, NULL); @@ -72,11 +81,15 @@ int main(int /*argc*/, char** /*argv*/) else { SLOG(NOTICE, "Succeed: %s", response->message().c_str()); } + if (cntl->Failed()) { + SLOG(ERROR, "!!!! unexpected result"); + } // Call 4 SLOG(NOTICE, "----------- Call 4 ---------------------------------------"); - SLOG(NOTICE, "Sync call SleepWithMethodTimeout(), timeout is 4 seconds."); + SLOG(NOTICE, "Sync call SleepWithMethodTimeout(), default timeout is 4 seconds."); SLOG(NOTICE, "Sleep for 5 seconds."); + SLOG(NOTICE, "Expect result is failure."); cntl->Reset(); request->set_sleep_time(5); stub->SleepWithMethodTimeout(cntl, request, response, NULL); @@ -87,12 +100,16 @@ int main(int /*argc*/, char** /*argv*/) else { SLOG(NOTICE, "Succeed: %s", response->message().c_str()); } + if (!cntl->Failed()) { + SLOG(ERROR, "!!!! unexpected result"); + } // Call 5 SLOG(NOTICE, "----------- Call 5 ---------------------------------------"); - SLOG(NOTICE, "Sync call SleepWithMethodTimeout(), timeout is 4 seconds."); + SLOG(NOTICE, "Sync call SleepWithMethodTimeout(), default timeout is 4 seconds."); SLOG(NOTICE, "Set timeout of RpcController to 1 seconds."); SLOG(NOTICE, "Sleep for 3 seconds."); + SLOG(NOTICE, "Expect result is failure."); cntl->Reset(); cntl->SetTimeout(1000); request->set_sleep_time(3); @@ -104,6 +121,9 @@ int main(int /*argc*/, char** /*argv*/) else { SLOG(NOTICE, "Succeed: %s", response->message().c_str()); } + if (!cntl->Failed()) { + SLOG(ERROR, "!!!! unexpected result"); + } delete cntl; delete request; diff --git a/src/sofa/pbrpc/binary_rpc_request.cc b/src/sofa/pbrpc/binary_rpc_request.cc index aa07962..83a6ea9 100644 --- a/src/sofa/pbrpc/binary_rpc_request.cc +++ b/src/sofa/pbrpc/binary_rpc_request.cc @@ -115,6 +115,10 @@ void BinaryRpcRequest::ProcessRequest( cntl->SetLocalEndpoint(_local_endpoint); cntl->SetRemoteEndpoint(_remote_endpoint); cntl->SetRpcServerStream(stream); + if (_req_meta.has_server_timeout() && _req_meta.server_timeout() > 0) + { + cntl->SetServerTimeout(_req_meta.server_timeout()); + } cntl->SetRequestReceivedTime(_received_time); cntl->SetResponseCompressType(_req_meta.has_expected_response_compress_type() ? _req_meta.expected_response_compress_type() : CompressTypeNone); diff --git a/src/sofa/pbrpc/rpc_client_impl.cc b/src/sofa/pbrpc/rpc_client_impl.cc index cc1a695..a1a9b91 100644 --- a/src/sofa/pbrpc/rpc_client_impl.cc +++ b/src/sofa/pbrpc/rpc_client_impl.cc @@ -260,6 +260,11 @@ void RpcClientImpl::CallMethod(const google::protobuf::Message* request, meta.set_type(RpcMeta::REQUEST); meta.set_sequence_id(cntl->SequenceId()); meta.set_method(cntl->MethodId()); + int64 timeout = cntl->Timeout(); + if (timeout > 0) + { + meta.set_server_timeout(timeout); + } meta.set_compress_type(cntl->RequestCompressType()); meta.set_expected_response_compress_type(cntl->ResponseCompressType()); @@ -328,7 +333,6 @@ void RpcClientImpl::CallMethod(const google::protobuf::Message* request, cntl->PushDoneCallback(boost::bind(&RpcClientImpl::DoneCallback, shared_from_this(), response, _1)); // add to timeout manager if need - int64 timeout = cntl->Timeout(); if (timeout > 0) { if (!_timeout_manager->add(cntl)) diff --git a/src/sofa/pbrpc/rpc_client_stream.h b/src/sofa/pbrpc/rpc_client_stream.h index 7327983..541fb39 100644 --- a/src/sofa/pbrpc/rpc_client_stream.h +++ b/src/sofa/pbrpc/rpc_client_stream.h @@ -83,6 +83,13 @@ class RpcClientStream : public RpcMessageStream async_send_message(cntl->RequestBuffer(), cntl); } + // erase request from controller map by sequence id. + void erase_request(uint64 sequence_id) + { + ScopedLocker _(_controller_map_lock); + _controller_map.erase(sequence_id); + } + private: virtual void on_closed() { @@ -113,6 +120,8 @@ class RpcClientStream : public RpcMessageStream { SOFA_PBRPC_FUNCTION_TRACE; + // TODO: set rpc_meta.server_timeout here to make it more accurate. + // if already done (may be done by timeout manager), // should cancel sending. return !cntl->IsDone() && !cntl->IsStartCancel(); @@ -202,11 +211,25 @@ class RpcClientStream : public RpcMessageStream // // just ignore it #if defined( LOG ) - LOG(ERROR) << "on_received(): " << RpcEndpointToString(_remote_endpoint) - << " {" << meta.sequence_id() << "}" - << ": sequence id not found, ignore"; + LOG(WARNING) << "on_received(): " << RpcEndpointToString(_remote_endpoint) + << " {" << meta.sequence_id() << "}" + << ": sequence id not found (maybe already timeout), ignore"; +#else + SLOG(WARNING, "on_received(): %s {%lu}: sequence id not found (maybe already timeout), ignore", + RpcEndpointToString(_remote_endpoint).c_str(), meta.sequence_id()); +#endif + return; + } + + if (cntl->IsDone()) + { + // just ignore it +#if defined( LOG ) + LOG(WARNING) << "on_received(): " << RpcEndpointToString(_remote_endpoint) + << " {" << meta.sequence_id() << "}" + << ": request already done (maybe already timeout), ignore"; #else - SLOG(ERROR, "on_received(): %s {%lu}: sequence id not found, ignore", + SLOG(WARNING, "on_received(): %s {%lu}: request already done (maybe already timeout), ignore", RpcEndpointToString(_remote_endpoint).c_str(), meta.sequence_id()); #endif return; diff --git a/src/sofa/pbrpc/rpc_controller_impl.h b/src/sofa/pbrpc/rpc_controller_impl.h index 4ee1679..79dabb2 100644 --- a/src/sofa/pbrpc/rpc_controller_impl.h +++ b/src/sofa/pbrpc/rpc_controller_impl.h @@ -40,6 +40,7 @@ class RpcControllerImpl : public sofa::pbrpc::enable_shared_from_thisimpl(); cntl->SetFinishProcessTime(ptime_now()); + + if (cntl->ServerTimeout() > 0) + { + int64 server_time_ms = + (cntl->FinishProcessTime() - cntl->RequestReceivedTime()).total_milliseconds(); + if (server_time_ms > cntl->ServerTimeout()) + { +#if defined( LOG ) + LOG(WARNING) << "OnCallMethodDone(): " << RpcEndpointToString(_remote_endpoint) + << " {" << cntl->SequenceId() << "}" + << ": call method \"" << cntl->MethodId() << "\" timeout" + << ": server_timeout_ms=" << cntl->ServerTimeout() + << ", server_used_time_ms=" << server_time_ms; +#else + SLOG(WARNING, "OnCallMethodDone(): %s {%lu}: call method \"%s\" timeout: " + "server_timeout_ms=%lld, server_used_time_ms=%lld", + RpcEndpointToString(_remote_endpoint).c_str(), cntl->SequenceId(), + cntl->MethodId().c_str(), cntl->ServerTimeout(), server_time_ms); +#endif + delete request; + delete response; + delete controller; + return; + } + } + int64 process_time_us = (cntl->FinishProcessTime() - cntl->StartProcessTime()).total_microseconds(); if (cntl->Failed()) @@ -48,8 +74,8 @@ void RpcRequest::OnCallMethodDone( #if defined( LOG ) LOG(ERROR) << "OnCallMethodDone(): " << RpcEndpointToString(_remote_endpoint) << " {" << cntl->SequenceId() << "}" - << ": call method \"" << cntl->MethodId() << "\" failed" - ": " << RpcErrorCodeToString(cntl->ErrorCode()) << ": " << cntl->Reason(); + << ": call method \"" << cntl->MethodId() << "\" failed: " + << RpcErrorCodeToString(cntl->ErrorCode()) << ": " << cntl->Reason(); #else SLOG(ERROR, "OnCallMethodDone(): %s {%lu}: call method \"%s\" failed: %s: %s", RpcEndpointToString(_remote_endpoint).c_str(), cntl->SequenceId(), diff --git a/src/sofa/pbrpc/rpc_timeout_manager.h b/src/sofa/pbrpc/rpc_timeout_manager.h index f80b877..180a407 100644 --- a/src/sofa/pbrpc/rpc_timeout_manager.h +++ b/src/sofa/pbrpc/rpc_timeout_manager.h @@ -17,6 +17,7 @@ #include #include +#include #include namespace sofa { @@ -202,9 +203,20 @@ class RpcTimeoutManager : public sofa::pbrpc::enable_shared_from_thisSetTimeoutId(0u); cntl->Done(RPC_ERROR_REQUEST_TIMEOUT, "timeout"); + + // erase from RpcClientStream + RpcClientStreamPtr stream = cntl->RpcClientStream().lock(); + if (stream) + { + stream->erase_request(cntl->SequenceId()); + } } }