From f1e3656f551b2122dbf2023041c04acc974d5779 Mon Sep 17 00:00:00 2001 From: Zhu Jiashun Date: Wed, 9 May 2018 15:14:16 +0800 Subject: [PATCH 1/2] Make naming service from sync to async --- src/brpc/channel.cpp | 2 +- src/brpc/controller.cpp | 1 + .../details/load_balancer_with_naming.cpp | 6 +- src/brpc/details/load_balancer_with_naming.h | 11 +- .../shared_load_balancer.cpp} | 8 +- src/brpc/details/shared_load_balancer.h | 97 +++++++ ...e_thread.cpp => shared_naming_service.cpp} | 260 +++++++----------- ...rvice_thread.h => shared_naming_service.h} | 71 +++-- src/brpc/load_balancer.h | 74 +---- src/brpc/naming_service.h | 55 +--- src/brpc/partition_channel.cpp | 16 +- src/brpc/partition_channel.h | 6 +- src/brpc/periodic_naming_service.cpp | 92 +++++-- src/brpc/periodic_naming_service.h | 29 +- src/brpc/periodic_task.cpp | 65 +++++ src/brpc/periodic_task.h | 46 ++++ src/brpc/policy/baidu_naming_service.cpp | 4 - src/brpc/policy/baidu_naming_service.h | 2 - .../consistent_hashing_load_balancer.cpp | 1 + src/brpc/policy/consul_naming_service.cpp | 37 --- src/brpc/policy/consul_naming_service.h | 15 +- src/brpc/policy/domain_naming_service.cpp | 4 - src/brpc/policy/domain_naming_service.h | 2 - src/brpc/policy/file_naming_service.cpp | 101 +++++-- src/brpc/policy/file_naming_service.h | 18 +- src/brpc/policy/list_naming_service.cpp | 11 +- src/brpc/policy/list_naming_service.h | 10 +- .../policy/remote_file_naming_service.cpp | 4 - src/brpc/policy/remote_file_naming_service.h | 2 - src/brpc/selective_channel.cpp | 9 +- src/brpc/server_node.h | 57 ++++ src/brpc/socket.cpp | 153 +++++------ src/brpc/socket.h | 4 +- src/brpc/socket_map.cpp | 2 +- test/brpc_channel_unittest.cpp | 4 +- test/brpc_naming_service_filter_unittest.cpp | 2 +- test/brpc_naming_service_unittest.cpp | 29 +- 37 files changed, 730 insertions(+), 580 deletions(-) rename src/brpc/{load_balancer.cpp => details/shared_load_balancer.cpp} (94%) create mode 100644 src/brpc/details/shared_load_balancer.h rename src/brpc/details/{naming_service_thread.cpp => shared_naming_service.cpp} (65%) rename src/brpc/details/{naming_service_thread.h => shared_naming_service.h} (68%) create mode 100644 src/brpc/periodic_task.cpp create mode 100644 src/brpc/periodic_task.h create mode 100644 src/brpc/server_node.h diff --git a/src/brpc/channel.cpp b/src/brpc/channel.cpp index 27b15a811e..77633b4020 100755 --- a/src/brpc/channel.cpp +++ b/src/brpc/channel.cpp @@ -227,7 +227,7 @@ int Channel::Init(const char* ns_url, LOG(FATAL) << "Fail to new LoadBalancerWithNaming"; return -1; } - GetNamingServiceThreadOptions ns_opt; + GetSharedNamingServiceOptions ns_opt; ns_opt.succeed_without_server = _options.succeed_without_server; ns_opt.log_succeed_without_server = _options.log_succeed_without_server; if (lb->Init(ns_url, lb_name, _options.ns_filter, &ns_opt) != 0) { diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index f5cd53b1c2..9cc4b576cd 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -44,6 +44,7 @@ #include "brpc/rpc_dump.pb.h" #include "brpc/details/usercode_backup_pool.h" // RunUserCode #include "brpc/mongo_service_adaptor.h" +#include "brpc/details/shared_load_balancer.h" // Force linking the .o in UT (which analysis deps by inclusions) #include "brpc/parallel_channel.h" diff --git a/src/brpc/details/load_balancer_with_naming.cpp b/src/brpc/details/load_balancer_with_naming.cpp index eeeb8a90f1..8f9743193d 100644 --- a/src/brpc/details/load_balancer_with_naming.cpp +++ b/src/brpc/details/load_balancer_with_naming.cpp @@ -27,12 +27,12 @@ LoadBalancerWithNaming::~LoadBalancerWithNaming() { int LoadBalancerWithNaming::Init(const char* ns_url, const char* lb_name, const NamingServiceFilter* filter, - const GetNamingServiceThreadOptions* options) { + const GetSharedNamingServiceOptions* options) { if (SharedLoadBalancer::Init(lb_name) != 0) { return -1; } - if (GetNamingServiceThread(&_nsthread_ptr, ns_url, options) != 0) { - LOG(FATAL) << "Fail to get NamingServiceThread"; + if (GetSharedNamingService(&_nsthread_ptr, ns_url, options) != 0) { + LOG(ERROR) << "Fail to get SharedNamingService"; return -1; } if (_nsthread_ptr->AddWatcher(this, filter) != 0) { diff --git a/src/brpc/details/load_balancer_with_naming.h b/src/brpc/details/load_balancer_with_naming.h index 01e9a364ef..c0cbe5250e 100644 --- a/src/brpc/details/load_balancer_with_naming.h +++ b/src/brpc/details/load_balancer_with_naming.h @@ -18,8 +18,8 @@ #define BRPC_LOAD_BALANCER_WITH_NAMING_H #include "butil/intrusive_ptr.hpp" -#include "brpc/load_balancer.h" -#include "brpc/details/naming_service_thread.h" // NamingServiceWatcher +#include "brpc/details/shared_load_balancer.h" +#include "brpc/details/shared_naming_service.h" // NamingServiceWatcher namespace brpc { @@ -32,15 +32,16 @@ class LoadBalancerWithNaming : public SharedLoadBalancer, int Init(const char* ns_url, const char* lb_name, const NamingServiceFilter* filter, - const GetNamingServiceThreadOptions* options); - + const GetSharedNamingServiceOptions* options); + +protected: void OnAddedServers(const std::vector& servers); void OnRemovedServers(const std::vector& servers); void Describe(std::ostream& os, const DescribeOptions& options); private: - butil::intrusive_ptr _nsthread_ptr; + butil::intrusive_ptr _nsthread_ptr; }; } // namespace brpc diff --git a/src/brpc/load_balancer.cpp b/src/brpc/details/shared_load_balancer.cpp similarity index 94% rename from src/brpc/load_balancer.cpp rename to src/brpc/details/shared_load_balancer.cpp index 14b21ddee1..d7bdad0eda 100644 --- a/src/brpc/load_balancer.cpp +++ b/src/brpc/details/shared_load_balancer.cpp @@ -1,4 +1,4 @@ -// Copyright (c) 2014 Baidu, Inc. +// Copyright (c) 2014 brpc authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -16,8 +16,7 @@ #include #include "brpc/reloadable_flags.h" -#include "brpc/load_balancer.h" - +#include "brpc/details/shared_load_balancer.h" namespace brpc { @@ -65,7 +64,7 @@ SharedLoadBalancer::~SharedLoadBalancer() { int SharedLoadBalancer::Init(const char* lb_name) { const LoadBalancer* lb = LoadBalancerExtension()->Find(lb_name); if (lb == NULL) { - LOG(FATAL) << "Fail to find LoadBalancer by `" << lb_name << "'"; + LOG(ERROR) << "Fail to find LoadBalancer by `" << lb_name << "'"; return -1; } LoadBalancer* lb_copy = lb->New(); @@ -89,4 +88,5 @@ void SharedLoadBalancer::Describe(std::ostream& os, } } + } // namespace brpc diff --git a/src/brpc/details/shared_load_balancer.h b/src/brpc/details/shared_load_balancer.h new file mode 100644 index 0000000000..bc63532a15 --- /dev/null +++ b/src/brpc/details/shared_load_balancer.h @@ -0,0 +1,97 @@ +// Copyright (c) 2014 brpc authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Authors: Ge,Jun (gejun@baidu.com) + +#ifndef BRPC_SHARED_LOAD_BALANCER_H +#define BRPC_SHARED_LOAD_BALANCER_H + +#include "bvar/passive_status.h" +#include "brpc/load_balancer.h" +#include "brpc/shared_object.h" // SharedObject + +namespace brpc { + +DECLARE_bool(show_lb_in_vars); + +// A intrusively shareable load balancer created from name. +class SharedLoadBalancer : public SharedObject, public NonConstDescribable { +public: + SharedLoadBalancer(); + ~SharedLoadBalancer(); + + int Init(const char* lb_name); + + int SelectServer(const LoadBalancer::SelectIn& in, + LoadBalancer::SelectOut* out) { + if (FLAGS_show_lb_in_vars && !_exposed) { + ExposeLB(); + } + return _lb->SelectServer(in, out); + } + + void Feedback(const LoadBalancer::CallInfo& info) { _lb->Feedback(info); } + + bool AddServer(const ServerId& server) { + if (_lb->AddServer(server)) { + _weight_sum.fetch_add(1, butil::memory_order_relaxed); + return true; + } + return false; + } + bool RemoveServer(const ServerId& server) { + if (_lb->RemoveServer(server)) { + _weight_sum.fetch_sub(1, butil::memory_order_relaxed); + return true; + } + return false; + } + + size_t AddServersInBatch(const std::vector& servers) { + size_t n = _lb->AddServersInBatch(servers); + if (n) { + _weight_sum.fetch_add(n, butil::memory_order_relaxed); + } + return n; + } + + size_t RemoveServersInBatch(const std::vector& servers) { + size_t n = _lb->RemoveServersInBatch(servers); + if (n) { + _weight_sum.fetch_sub(n, butil::memory_order_relaxed); + } + return n; + } + + virtual void Describe(std::ostream& os, const DescribeOptions&); + + virtual int Weight() { + return _weight_sum.load(butil::memory_order_relaxed); + } + +private: + static void DescribeLB(std::ostream& os, void* arg); + void ExposeLB(); + + LoadBalancer* _lb; + butil::atomic _weight_sum; + volatile bool _exposed; + butil::Mutex _st_mutex; + bvar::PassiveStatus _st; +}; + + +} // namespace brpc + +#endif // BRPC_SHARED_LOAD_BALANCER_H diff --git a/src/brpc/details/naming_service_thread.cpp b/src/brpc/details/shared_naming_service.cpp similarity index 65% rename from src/brpc/details/naming_service_thread.cpp rename to src/brpc/details/shared_naming_service.cpp index 6f9d27426f..57a76cb0e4 100644 --- a/src/brpc/details/naming_service_thread.cpp +++ b/src/brpc/details/shared_naming_service.cpp @@ -1,4 +1,4 @@ -// Copyright (c) 2014 Baidu, Inc. +// Copyright (c) 2014 brpc authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -17,68 +17,36 @@ #include #include #include -#include "bthread/butex.h" #include "butil/scoped_lock.h" #include "butil/logging.h" #include "brpc/log.h" #include "brpc/socket_map.h" -#include "brpc/details/naming_service_thread.h" - +#include "brpc/details/shared_naming_service.h" namespace brpc { -struct NSKey { - const NamingService* ns; - std::string service_name; -}; -struct NSKeyHasher { - size_t operator()(const NSKey& nskey) const { - return butil::DefaultHasher()(nskey.service_name) - * 101 + (uintptr_t)nskey.ns; - } -}; -inline bool operator==(const NSKey& k1, const NSKey& k2) { - return (k1.ns == k2.ns && k1.service_name == k2.service_name); -} -typedef butil::FlatMap NamingServiceMap; +typedef butil::FlatMap NamingServiceMap; // Construct on demand to make the code work before main() -static NamingServiceMap* g_nsthread_map = NULL; -static pthread_mutex_t g_nsthread_map_mutex = PTHREAD_MUTEX_INITIALIZER; +static NamingServiceMap* g_ns_map = NULL; +static pthread_mutex_t g_ns_map_mutex = PTHREAD_MUTEX_INITIALIZER; -NamingServiceThread::Actions::Actions(NamingServiceThread* owner) - : _owner(owner) - , _wait_id(INVALID_BTHREAD_ID) - , _has_wait_error(false) - , _wait_error(0) { - CHECK_EQ(0, bthread_id_create(&_wait_id, NULL, NULL)); +SharedNamingService::Actions::Actions(SharedNamingService* owner) + : _owner(owner) { } -NamingServiceThread::Actions::~Actions() { +SharedNamingService::Actions::~Actions() { // Remove all sockets from SocketMap for (std::vector::const_iterator it = _last_servers.begin(); it != _last_servers.end(); ++it) { SocketMapRemove(SocketMapKey(it->addr)); } - EndWait(0); + _owner->EndWait(ECANCELED); } -void NamingServiceThread::Actions::AddServers( - const std::vector&) { - // FIXME(gejun) - abort(); -} - -void NamingServiceThread::Actions::RemoveServers( - const std::vector&) { - // FIXME(gejun) - abort(); -} - -void NamingServiceThread::Actions::ResetServers( +void SharedNamingService::Actions::ResetServers( const std::vector& servers) { _servers.assign(servers.begin(), servers.end()); - // Diff servers with _last_servers by comparing sorted vectors. // Notice that _last_servers is always sorted. std::sort(_servers.begin(), _servers.end()); @@ -173,63 +141,50 @@ void NamingServiceThread::Actions::ResetServers( } if (!_removed.empty() || !_added.empty()) { - std::ostringstream info; - info << butil::class_name_str(*_owner->_ns) << "(\"" - << _owner->_service_name << "\"):"; + LOG(INFO) << _owner->_full_ns << ":" << noflush; if (!_added.empty()) { - info << " added "<< _added.size(); + LOG(INFO) << " added "<< _added.size() << noflush; } if (!_removed.empty()) { - info << " removed " << _removed.size(); + LOG(INFO) << " removed " << _removed.size() << noflush; } - LOG(INFO) << info.str(); + LOG(INFO); } - EndWait(servers.empty() ? ENODATA : 0); + _owner->EndWait(servers.empty() ? ENODATA : 0); } -void NamingServiceThread::Actions::EndWait(int error_code) { - if (bthread_id_trylock(_wait_id, NULL) == 0) { +void SharedNamingService::EndWait(int error_code) { + if (!_has_wait_error.load(butil::memory_order_relaxed) && + bthread_id_trylock(_wait_id, NULL) == 0) { _wait_error = error_code; _has_wait_error.store(true, butil::memory_order_release); bthread_id_unlock_and_destroy(_wait_id); } } -int NamingServiceThread::Actions::WaitForFirstBatchOfServers() { - // Wait can happen before signal in which case it returns non-zero, - // so we ignore return value here and use `_wait_error' instead - if (!_has_wait_error.load(butil::memory_order_acquire)) { - bthread_id_join(_wait_id); - } - return _wait_error; -} - -NamingServiceThread::NamingServiceThread() - : _tid(0) - , _source_ns(NULL) - , _ns(NULL) - , _actions(this) { +SharedNamingService::SharedNamingService() + : _ns(NULL) + , _actions(new Actions(this)) + , _wait_id(INVALID_BTHREAD_ID) + , _has_wait_error(false) + , _wait_error(0) { + CHECK_EQ(0, bthread_id_create(&_wait_id, NULL, NULL)); } -NamingServiceThread::~NamingServiceThread() { - RPC_VLOG << "~NamingServiceThread(" << *this << ')'; - // Remove from g_nsthread_map first - if (_source_ns != NULL) { - const NSKey key = { _source_ns, _service_name }; - std::unique_lock mu(g_nsthread_map_mutex); - if (g_nsthread_map != NULL) { - NamingServiceThread** ptr = g_nsthread_map->seek(key); +SharedNamingService::~SharedNamingService() { + RPC_VLOG << "~SharedNamingService(" << *this << ')'; + // Remove from g_ns_map first + if (!_full_ns.empty()) { + std::unique_lock mu(g_ns_map_mutex); + if (g_ns_map != NULL) { + SharedNamingService** ptr = g_ns_map->seek(_full_ns); if (ptr != NULL && *ptr == this) { - g_nsthread_map->erase(key); + g_ns_map->erase(_full_ns); + LOG(INFO) << "erase " << _full_ns; } } } - if (_tid) { - bthread_stop(_tid); - bthread_join(_tid, NULL); - _tid = 0; - } { BAIDU_SCOPED_LOCK(_mutex); std::vector to_be_removed; @@ -250,39 +205,32 @@ NamingServiceThread::~NamingServiceThread() { } } -void* NamingServiceThread::RunThis(void* arg) { - static_cast(arg)->Run(); - return NULL; -} - -int NamingServiceThread::Start(const NamingService* naming_service, +int SharedNamingService::Start(const NamingService* naming_service, const std::string& service_name, - const GetNamingServiceThreadOptions* opt_in) { + const std::string& full_ns, + const GetSharedNamingServiceOptions* opt_in) { if (naming_service == NULL) { LOG(ERROR) << "Param[naming_service] is NULL"; return -1; } - _source_ns = naming_service; _ns = naming_service->New(); _service_name = service_name; + _full_ns = full_ns; if (opt_in) { _options = *opt_in; } _last_sockets.clear(); - if (_ns->RunNamingServiceReturnsQuickly()) { - RunThis(this); - } else { - int rc = bthread_start_urgent(&_tid, NULL, RunThis, this); - if (rc) { - LOG(ERROR) << "Fail to create bthread: " << berror(rc); - return -1; - } - } - return WaitForFirstBatchOfServers(); + _ns->RunNamingService(_service_name.c_str(), _actions); + return 0; } -int NamingServiceThread::WaitForFirstBatchOfServers() { - int rc = _actions.WaitForFirstBatchOfServers(); +int SharedNamingService::WaitForFirstBatchOfServers() { + // Wait can happen before signal in which case it returns non-zero, + // so we ignore return value here and use `_wait_error' instead + if (!_has_wait_error.load(butil::memory_order_acquire)) { + bthread_id_join(_wait_id); + } + int rc = _wait_error; if (rc == ENODATA && _options.succeed_without_server) { if (_options.log_succeed_without_server) { LOG(WARNING) << '`' << *this << "' is empty! RPC over the channel" @@ -290,14 +238,10 @@ int NamingServiceThread::WaitForFirstBatchOfServers() { } rc = 0; } - if (rc) { - LOG(ERROR) << "Fail to WaitForFirstBatchOfServers: " << berror(rc); - return -1; - } - return 0; + return rc; } -void NamingServiceThread::ServerNodeWithId2ServerId( +void SharedNamingService::ServerNodeWithId2ServerId( const std::vector& src, std::vector* dst, const NamingServiceFilter* filter) { dst->reserve(src.size()); @@ -313,7 +257,7 @@ void NamingServiceThread::ServerNodeWithId2ServerId( } } -int NamingServiceThread::AddWatcher(NamingServiceWatcher* watcher, +int SharedNamingService::AddWatcher(NamingServiceWatcher* watcher, const NamingServiceFilter* filter) { if (watcher == NULL) { LOG(ERROR) << "Param[watcher] is NULL"; @@ -331,7 +275,7 @@ int NamingServiceThread::AddWatcher(NamingServiceWatcher* watcher, return -1; } -int NamingServiceThread::RemoveWatcher(NamingServiceWatcher* watcher) { +int SharedNamingService::RemoveWatcher(NamingServiceWatcher* watcher) { if (watcher == NULL) { LOG(ERROR) << "Param[watcher] is NULL"; return -1; @@ -346,23 +290,6 @@ int NamingServiceThread::RemoveWatcher(NamingServiceWatcher* watcher) { return -1; } -void NamingServiceThread::Run() { - int rc = _ns->RunNamingService(_service_name.c_str(), &_actions); - if (rc != 0) { - LOG(WARNING) << "Fail to run naming service: " << berror(rc); - if (rc == ENODATA) { - LOG(ERROR) << "RunNamingService should not return ENODATA, " - "change it to ESTOP"; - rc = ESTOP; - } - _actions.EndWait(rc); - } - - // Don't remove servers here which may still be used by watchers: - // A stop-updating naming service does not mean that it's not needed - // anymore. Remove servers inside dtor. -} - static const size_t MAX_PROTOCOL_LEN = 31; static const char* ParseNamingServiceUrl(const char* url, char* protocol) { @@ -390,84 +317,90 @@ static const char* ParseNamingServiceUrl(const char* url, char* protocol) { return NULL; } -int GetNamingServiceThread( - butil::intrusive_ptr* nsthread_out, +int GetSharedNamingService( + butil::intrusive_ptr* nsp_out, const char* url, - const GetNamingServiceThreadOptions* options) { + const GetSharedNamingServiceOptions* options) { char protocol[MAX_PROTOCOL_LEN + 1]; const char* const service_name = ParseNamingServiceUrl(url, protocol); if (service_name == NULL) { - LOG(ERROR) << "Invalid naming service url=" << url; + LOG(ERROR) << "Invalid naming service=" << url; return -1; } const NamingService* ns = NamingServiceExtension()->Find(protocol); if (ns == NULL) { - LOG(ERROR) << "Unknown protocol=" << protocol; + LOG(ERROR) << "Unknown naming service=" << protocol; return -1; } - NSKey key; - key.ns = ns; - key.service_name = service_name; - bool new_thread = false; - butil::intrusive_ptr nsthread; + + // full_ns is normalized comparing to `url'. + std::string full_ns; + const size_t prot_len = strlen(protocol); + full_ns.reserve(prot_len + 3 + strlen(service_name)); + for (size_t i = 0; i < prot_len; ++i) { + full_ns.push_back(::tolower(protocol[i])); + } + full_ns.append("://"); + full_ns.append(service_name); + + bool new_ns = false; + butil::intrusive_ptr nsp; { - std::unique_lock mu(g_nsthread_map_mutex); - if (g_nsthread_map == NULL) { - g_nsthread_map = new (std::nothrow) NamingServiceMap; - if (NULL == g_nsthread_map) { + std::unique_lock mu(g_ns_map_mutex); + if (g_ns_map == NULL) { + g_ns_map = new (std::nothrow) NamingServiceMap; + if (NULL == g_ns_map) { mu.unlock(); - LOG(ERROR) << "Fail to new g_nsthread_map"; + LOG(ERROR) << "Fail to new g_ns_map"; return -1; } - if (g_nsthread_map->init(64) != 0) { + if (g_ns_map->init(64) != 0) { mu.unlock(); - LOG(ERROR) << "Fail to init g_nsthread_map"; + LOG(ERROR) << "Fail to init g_ns_map"; return -1; } } - NamingServiceThread*& ptr = (*g_nsthread_map)[key]; + SharedNamingService*& ptr = (*g_ns_map)[full_ns]; if (ptr != NULL) { if (ptr->AddRefManually() == 0) { - // The ns thread's last intrusive_ptr was just destructed and - // the removal-from-global-map-code in ptr->~NamingServideThread() - // is about to run or already running, need to create another ns - // thread. + // The NS's last intrusive_ptr was just destructed and the + // removal-from-global-map-code in ptr->~SharedNamingService() + // is about to run or already running, need to create another NS // Notice that we don't need to remove the reference because // the object is already destructing. ptr = NULL; } else { - nsthread.reset(ptr, false); + nsp.reset(ptr, false); } } if (ptr == NULL) { - NamingServiceThread* thr = new (std::nothrow) NamingServiceThread; + SharedNamingService* thr = new (std::nothrow) SharedNamingService; if (thr == NULL) { mu.unlock(); - LOG(ERROR) << "Fail to new NamingServiceThread"; + LOG(ERROR) << "Fail to new SharedNamingService"; return -1; } ptr = thr; - nsthread.reset(ptr); - new_thread = true; + nsp.reset(ptr); + new_ns = true; } } - if (new_thread) { - if (nsthread->Start(ns, key.service_name, options) != 0) { - LOG(ERROR) << "Fail to start NamingServiceThread"; - std::unique_lock mu(g_nsthread_map_mutex); - g_nsthread_map->erase(key); - return -1; - } - } else { - if (nsthread->WaitForFirstBatchOfServers() != 0) { + if (new_ns) { + if (nsp->Start(ns, service_name, full_ns, options) != 0) { + LOG(ERROR) << "Fail to start SharedNamingService"; return -1; } } - nsthread_out->swap(nsthread); + const int rc = nsp->WaitForFirstBatchOfServers(); + if (rc) { + LOG(ERROR) << "Fail to WaitForFirstBatchOfServers: " << berror(rc); + return -1; + } + nsp_out->swap(nsp); return 0; } -void NamingServiceThread::Describe(std::ostream& os, +void SharedNamingService::Describe(std::ostream& os, const DescribeOptions& options) const { if (_ns == NULL) { os << "null"; @@ -477,9 +410,10 @@ void NamingServiceThread::Describe(std::ostream& os, os << "://" << _service_name; } -std::ostream& operator<<(std::ostream& os, const NamingServiceThread& nsthr) { +std::ostream& operator<<(std::ostream& os, const SharedNamingService& nsthr) { nsthr.Describe(os, DescribeOptions()); return os; } + } // namespace brpc diff --git a/src/brpc/details/naming_service_thread.h b/src/brpc/details/shared_naming_service.h similarity index 68% rename from src/brpc/details/naming_service_thread.h rename to src/brpc/details/shared_naming_service.h index 97b835c6a2..c57502fa98 100644 --- a/src/brpc/details/naming_service_thread.h +++ b/src/brpc/details/shared_naming_service.h @@ -1,4 +1,4 @@ -// Copyright (c) 2014 Baidu, Inc. +// Copyright (c) 2014 brpc authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -14,18 +14,17 @@ // Authors: Ge,Jun (gejun@baidu.com) -#ifndef BRPC_NAMING_SERVICE_THREAD_H -#define BRPC_NAMING_SERVICE_THREAD_H +#ifndef BRPC_SHARED_NAMING_SERVICE_H +#define BRPC_SHARED_NAMING_SERVICE_H #include -#include "butil/intrusive_ptr.hpp" // butil::intrusive_ptr -#include "bthread/bthread.h" // bthread_t +#include // bthread_t +#include "butil/intrusive_ptr.hpp" // butil::intrusive_ptr #include "brpc/server_id.h" // ServerId #include "brpc/shared_object.h" // SharedObject #include "brpc/naming_service.h" // NamingService #include "brpc/naming_service_filter.h" // NamingServiceFilter - namespace brpc { // Inherit this class to observer NamingService changes. @@ -39,8 +38,8 @@ class NamingServiceWatcher { virtual void OnRemovedServers(const std::vector& servers) = 0; }; -struct GetNamingServiceThreadOptions { - GetNamingServiceThreadOptions() +struct GetSharedNamingServiceOptions { + GetSharedNamingServiceOptions() : succeed_without_server(false) , log_succeed_without_server(true) {} @@ -49,7 +48,7 @@ struct GetNamingServiceThreadOptions { }; // A dedicated thread to map a name to ServerIds -class NamingServiceThread : public SharedObject, public Describable { +class SharedNamingService : public SharedObject, public Describable { struct ServerNodeWithId { ServerNode node; SocketId id; @@ -60,19 +59,14 @@ class NamingServiceThread : public SharedObject, public Describable { }; class Actions : public NamingServiceActions { public: - Actions(NamingServiceThread* owner); + Actions(SharedNamingService* owner); ~Actions(); - void AddServers(const std::vector& servers); - void RemoveServers(const std::vector& servers); + + // @NamingServiceActions void ResetServers(const std::vector& servers); - int WaitForFirstBatchOfServers(); - void EndWait(int error_code); private: - NamingServiceThread* _owner; - bthread_id_t _wait_id; - butil::atomic _has_wait_error; - int _wait_error; + SharedNamingService* _owner; std::vector _last_servers; std::vector _servers; std::vector _added; @@ -83,11 +77,14 @@ class NamingServiceThread : public SharedObject, public Describable { }; public: - NamingServiceThread(); - ~NamingServiceThread(); + SharedNamingService(); + ~SharedNamingService(); - int Start(const NamingService* ns, const std::string& service_name, - const GetNamingServiceThreadOptions* options); + int Start(const NamingService* ns, + const std::string& service_name, + const std::string& full_ns, + const GetSharedNamingServiceOptions* options); + int WaitForFirstBatchOfServers(); int AddWatcher(NamingServiceWatcher* w, const NamingServiceFilter* f); @@ -97,39 +94,39 @@ class NamingServiceThread : public SharedObject, public Describable { void Describe(std::ostream& os, const DescribeOptions&) const; private: - void Run(); - static void* RunThis(void*); - static void ServerNodeWithId2ServerId( const std::vector& src, std::vector* dst, const NamingServiceFilter* filter); + void EndWait(int error_code); + butil::Mutex _mutex; - bthread_t _tid; - // TODO: better use a name. - const NamingService* _source_ns; NamingService* _ns; + Actions* _actions; + bthread_id_t _wait_id; + butil::atomic _has_wait_error; + int _wait_error; std::string _service_name; - GetNamingServiceThreadOptions _options; + std::string _full_ns; // normalized protocol://_service_name + GetSharedNamingServiceOptions _options; std::vector _last_sockets; - Actions _actions; std::map _watchers; }; -std::ostream& operator<<(std::ostream& os, const NamingServiceThread&); +std::ostream& operator<<(std::ostream& os, const SharedNamingService&); -// Get the decicated thread associated with `url' and put the thread into -// `ns_thread'. Calling with same `url' shares and returns the same thread. +// Get the NS associated with `url'. +// Calling with same `url' shares and returns the same instance. // If the url is not accessed before, this function blocks until the // NamingService returns the first batch of servers. If no servers are // available, unless `options->succeed_without_server' is on, this function // returns -1. // Returns 0 on success, -1 otherwise. -int GetNamingServiceThread(butil::intrusive_ptr* ns_thread, +int GetSharedNamingService(butil::intrusive_ptr* ns, const char* url, - const GetNamingServiceThreadOptions* options); + const GetSharedNamingServiceOptions* options); -} // namespace brpc +} // namespace brpc -#endif // BRPC_NAMING_SERVICE_THREAD_H +#endif // BRPC_SHARED_NAMING_SERVICE_H diff --git a/src/brpc/load_balancer.h b/src/brpc/load_balancer.h index 538c2d3848..f3d90e9de6 100644 --- a/src/brpc/load_balancer.h +++ b/src/brpc/load_balancer.h @@ -17,15 +17,12 @@ #ifndef BRPC_LOAD_BALANCER_H #define BRPC_LOAD_BALANCER_H -#include "bvar/passive_status.h" #include "brpc/describable.h" #include "brpc/destroyable.h" #include "brpc/excluded_servers.h" // ExcludedServers -#include "brpc/shared_object.h" // SharedObject #include "brpc/server_id.h" // ServerId #include "brpc/extension.h" // Extension - namespace brpc { class Controller; @@ -98,82 +95,13 @@ class LoadBalancer : public NonConstDescribable, public Destroyable { // successful and out->need_feedback was set to true. virtual void Feedback(const CallInfo& /*info*/) { } - // Create/destroy an instance. - // Caller is responsible for Destroy() the instance after usage. + // Create an instance which will be destroyed by Destroy() (inherited from Destroyable) virtual LoadBalancer* New() const = 0; protected: virtual ~LoadBalancer() { } }; -DECLARE_bool(show_lb_in_vars); - -// A intrusively shareable load balancer created from name. -class SharedLoadBalancer : public SharedObject, public NonConstDescribable { -public: - SharedLoadBalancer(); - ~SharedLoadBalancer(); - - int Init(const char* lb_name); - - int SelectServer(const LoadBalancer::SelectIn& in, - LoadBalancer::SelectOut* out) { - if (FLAGS_show_lb_in_vars && !_exposed) { - ExposeLB(); - } - return _lb->SelectServer(in, out); - } - - void Feedback(const LoadBalancer::CallInfo& info) { _lb->Feedback(info); } - - bool AddServer(const ServerId& server) { - if (_lb->AddServer(server)) { - _weight_sum.fetch_add(1, butil::memory_order_relaxed); - return true; - } - return false; - } - bool RemoveServer(const ServerId& server) { - if (_lb->RemoveServer(server)) { - _weight_sum.fetch_sub(1, butil::memory_order_relaxed); - return true; - } - return false; - } - - size_t AddServersInBatch(const std::vector& servers) { - size_t n = _lb->AddServersInBatch(servers); - if (n) { - _weight_sum.fetch_add(n, butil::memory_order_relaxed); - } - return n; - } - - size_t RemoveServersInBatch(const std::vector& servers) { - size_t n = _lb->RemoveServersInBatch(servers); - if (n) { - _weight_sum.fetch_sub(n, butil::memory_order_relaxed); - } - return n; - } - - virtual void Describe(std::ostream& os, const DescribeOptions&); - - virtual int Weight() { - return _weight_sum.load(butil::memory_order_relaxed); - } - -private: - static void DescribeLB(std::ostream& os, void* arg); - void ExposeLB(); - - LoadBalancer* _lb; - butil::atomic _weight_sum; - volatile bool _exposed; - butil::Mutex _st_mutex; - bvar::PassiveStatus _st; -}; - // For registering global instances. inline Extension* LoadBalancerExtension() { return Extension::instance(); diff --git a/src/brpc/naming_service.h b/src/brpc/naming_service.h index a9f5704ac6..fe32f7f664 100644 --- a/src/brpc/naming_service.h +++ b/src/brpc/naming_service.h @@ -25,31 +25,18 @@ #include "brpc/describable.h" #include "brpc/destroyable.h" #include "brpc/extension.h" // Extension +#include "brpc/server_node.h" namespace brpc { -// Representing a server inside a NamingService. -struct ServerNode { - ServerNode() {} - ServerNode(butil::ip_t ip, int port, const std::string& tag2) - : addr(ip, port), tag(tag2) {} - ServerNode(const butil::EndPoint& pt, const std::string& tag2) - : addr(pt), tag(tag2) {} - ServerNode(butil::ip_t ip, int port) : addr(ip, port) {} - explicit ServerNode(const butil::EndPoint& pt) : addr(pt) {} - - butil::EndPoint addr; - std::string tag; -}; - // Continuing actions to added/removed servers. // NOTE: You don't have to implement this class. class NamingServiceActions { public: virtual ~NamingServiceActions() {} - virtual void AddServers(const std::vector& servers) = 0; - virtual void RemoveServers(const std::vector& servers) = 0; + + // Call this method when servers are successfully refreshed. virtual void ResetServers(const std::vector& servers) = 0; }; @@ -57,23 +44,13 @@ class NamingServiceActions { class NamingService : public Describable, public Destroyable { public: // Implement this method to get servers associated with `service_name' - // in periodic or event-driven manner, call methods of `actions' to - // tell RPC system about server changes. This method will be run in - // a dedicated bthread without access from other threads, thus the - // implementation does NOT need to be thread-safe. - // Return 0 on success, error code otherwise. - virtual int RunNamingService(const char* service_name, - NamingServiceActions* actions) = 0; + // periodically or by event-driven, call methods of `actions' to tell + // RPC system about server changes. + // `actions' is owned and deleted by this naming service. + virtual void RunNamingService(const char* service_name, + NamingServiceActions* actions) = 0; - // If this method returns true, RunNamingService will be called without - // a dedicated bthread. As the name implies, this is suitable for static - // and simple impl, saving the cost of creating a bthread. However most - // impl of RunNamingService never quit, thread is a must to prevent the - // method from blocking the caller. - virtual bool RunNamingServiceReturnsQuickly() { return false; } - - // Create/destroy an instance. - // Caller is responsible for Destroy() the instance after usage. + // Create an instance which will be destroyed by Destroy() (inherited from Destroyable) virtual NamingService* New() const = 0; protected: @@ -84,20 +61,6 @@ inline Extension* NamingServiceExtension() { return Extension::instance(); } -inline bool operator<(const ServerNode& n1, const ServerNode& n2) -{ return n1.addr != n2.addr ? (n1.addr < n2.addr) : (n1.tag < n2.tag); } -inline bool operator==(const ServerNode& n1, const ServerNode& n2) -{ return n1.addr == n2.addr && n1.tag == n2.tag; } -inline bool operator!=(const ServerNode& n1, const ServerNode& n2) -{ return !(n1 == n2); } -inline std::ostream& operator<<(std::ostream& os, const ServerNode& n) { - os << n.addr; - if (!n.tag.empty()) { - os << "(tag=" << n.tag << ')'; - } - return os; -} - } // namespace brpc diff --git a/src/brpc/partition_channel.cpp b/src/brpc/partition_channel.cpp index 9dc7daca71..1dc208ac67 100644 --- a/src/brpc/partition_channel.cpp +++ b/src/brpc/partition_channel.cpp @@ -16,8 +16,8 @@ #include "butil/containers/flat_map.h" #include "brpc/log.h" -#include "brpc/load_balancer.h" -#include "brpc/details/naming_service_thread.h" +#include "brpc/details/shared_load_balancer.h" +#include "brpc/details/shared_naming_service.h" #include "brpc/partition_channel.h" #include "brpc/global.h" @@ -221,12 +221,12 @@ int PartitionChannel::Init(int num_partition_kinds, LOG(ERROR) << "Parameter[partition_parser] must be non-NULL"; return -1; } - GetNamingServiceThreadOptions ns_opt; + GetSharedNamingServiceOptions ns_opt; if (options_in) { ns_opt.succeed_without_server = options_in->succeed_without_server; } - if (GetNamingServiceThread(&_nsthread_ptr, ns_url, &ns_opt) != 0) { - LOG(ERROR) << "Fail to get NamingServiceThread"; + if (GetSharedNamingService(&_nsthread_ptr, ns_url, &ns_opt) != 0) { + LOG(ERROR) << "Fail to get SharedNamingService"; return -1; } _pchan = new (std::nothrow) PartitionChannelBase; @@ -449,12 +449,12 @@ int DynamicPartitionChannel::Init( LOG(ERROR) << "Parameter[partition_parser] must be non-NULL"; return -1; } - GetNamingServiceThreadOptions ns_opt; + GetSharedNamingServiceOptions ns_opt; if (options_in) { ns_opt.succeed_without_server = options_in->succeed_without_server; } - if (GetNamingServiceThread(&_nsthread_ptr, ns_url, &ns_opt) != 0) { - LOG(ERROR) << "Fail to get NamingServiceThread"; + if (GetSharedNamingService(&_nsthread_ptr, ns_url, &ns_opt) != 0) { + LOG(ERROR) << "Fail to get SharedNamingService"; return -1; } if (_schan.Init("_dynpart", options_in) != 0) { diff --git a/src/brpc/partition_channel.h b/src/brpc/partition_channel.h index 2b959c102c..4259967e61 100644 --- a/src/brpc/partition_channel.h +++ b/src/brpc/partition_channel.h @@ -26,7 +26,7 @@ namespace brpc { -class NamingServiceThread; +class SharedNamingService; class PartitionChannelBase; // Representing a partition kind. @@ -119,7 +119,7 @@ class PartitionChannel : public ChannelBase { int CheckHealth(); PartitionChannelBase* _pchan; - butil::intrusive_ptr _nsthread_ptr; + butil::intrusive_ptr _nsthread_ptr; PartitionParser* _parser; }; @@ -162,7 +162,7 @@ class DynamicPartitionChannel : public ChannelBase { SelectiveChannel _schan; Partitioner* _partitioner; - butil::intrusive_ptr _nsthread_ptr; + butil::intrusive_ptr _nsthread_ptr; PartitionParser* _parser; }; diff --git a/src/brpc/periodic_naming_service.cpp b/src/brpc/periodic_naming_service.cpp index a3dec5c53a..ff198d6807 100644 --- a/src/brpc/periodic_naming_service.cpp +++ b/src/brpc/periodic_naming_service.cpp @@ -20,6 +20,8 @@ #include "brpc/log.h" #include "brpc/reloadable_flags.h" #include "brpc/periodic_naming_service.h" +#include "brpc/periodic_task.h" +#include "brpc/shared_object.h" namespace brpc { @@ -27,35 +29,73 @@ DEFINE_int32(ns_access_interval, 5, "Wait so many seconds before next access to naming service"); BRPC_VALIDATE_GFLAG(ns_access_interval, PositiveInteger); -int PeriodicNamingService::RunNamingService( +class AccessNamingServiceTask : public PeriodicTask + , public SharedObject { +friend class PeriodicNamingService; +public: + AccessNamingServiceTask(PeriodicNamingService* owner, + const char* service_name, + NamingServiceActions* actions) + : _owner(owner) + , _service_name(service_name) + , _actions(actions) + , _scheduled_destroy(false) {} + bool DoPeriodicTask(timespec* next_abstime); + + void CleanUp(); +private: + PeriodicNamingService* _owner; + std::string _service_name; + std::unique_ptr _actions; + bool _scheduled_destroy; +}; + +void AccessNamingServiceTask::CleanUp() { + _actions.reset(NULL); +} + +bool AccessNamingServiceTask::DoPeriodicTask(timespec* next_abstime) { + if (next_abstime == NULL) { + // Remove the ref added for this task. + this->RemoveRefManually(); + return true; + } + if (_scheduled_destroy) { + return false; + } + std::vector servers; + const int rc = _owner->GetServers(_service_name.c_str(), &servers); + if (rc == 0) { + _actions->ResetServers(servers); + } + *next_abstime = butil::seconds_from_now(FLAGS_ns_access_interval); + return true; +} + +void PeriodicNamingService::RunNamingService( const char* service_name, NamingServiceActions* actions) { std::vector servers; - bool ever_reset = false; - for (;;) { - servers.clear(); - const int rc = GetServers(service_name, &servers); - if (rc == 0) { - ever_reset = true; - actions->ResetServers(servers); - } else if (!ever_reset) { - // ResetServers must be called at first time even if GetServers - // failed, to wake up callers to `WaitForFirstBatchOfServers' - ever_reset = true; - servers.clear(); - actions->ResetServers(servers); - } - - if (bthread_usleep(std::max(FLAGS_ns_access_interval, 1) * 1000000L) < 0) { - if (errno == ESTOP) { - RPC_VLOG << "Quit NamingServiceThread=" << bthread_self(); - return 0; - } - PLOG(FATAL) << "Fail to sleep"; - return -1; - } + const int rc = GetServers(service_name, &servers); + if (rc != 0) { + delete actions; + return; + } + actions->ResetServers(servers); + _task = new AccessNamingServiceTask(this, service_name, actions); + _task->AddRefManually(); // Add ref for this NS. + _task->AddRefManually(); // Add ref for the task. + PeriodicTaskManager::StartTaskAt( + _task, butil::seconds_from_now(FLAGS_ns_access_interval)); +} + +void PeriodicNamingService::Destroy() { + if (_task) { + _task->CleanUp(); + _task->_scheduled_destroy = true; + _task->RemoveRefManually(); + _task = NULL; } - CHECK(false); - return -1; + delete this; } } // namespace brpc diff --git a/src/brpc/periodic_naming_service.h b/src/brpc/periodic_naming_service.h index 0f99ee6d1e..32ef9c9102 100644 --- a/src/brpc/periodic_naming_service.h +++ b/src/brpc/periodic_naming_service.h @@ -18,17 +18,40 @@ #define BRPC_PERIODIC_NAMING_SERVICE_H #include "brpc/naming_service.h" +#include "butil/build_config.h" +// TODO: Move into build_config.h +#ifndef BASE_FINAL +# if defined(BASE_CXX11_ENABLED) +# define BASE_FINAL final +# else +# define BASE_FINAL +# endif +#endif namespace brpc { +class AccessNamingServiceTask; + +// Overwrite GetServers which will be called every -ns_access_interval seconds. class PeriodicNamingService : public NamingService { +friend class AccessNamingServiceTask; protected: + PeriodicNamingService() : _task(NULL) {} + virtual int GetServers(const char *service_name, std::vector* servers) = 0; - - int RunNamingService(const char* service_name, - NamingServiceActions* actions); +protected: + // @NamingService + void RunNamingService(const char* service_name, + NamingServiceActions* actions); + + // @Destroyable + void Destroy() BASE_FINAL; + +private: + // Not using intrusive_ptr to hide the inclusion. + AccessNamingServiceTask* _task; }; } // namespace brpc diff --git a/src/brpc/periodic_task.cpp b/src/brpc/periodic_task.cpp new file mode 100644 index 0000000000..dc5c56b0d2 --- /dev/null +++ b/src/brpc/periodic_task.cpp @@ -0,0 +1,65 @@ +// Copyright (c) 2018 brpc authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Authors: Ge,Jun (gejun@baidu.com) + +#include +#include +#include "brpc/periodic_task.h" + +namespace brpc { + + +PeriodicTask::~PeriodicTask() { +} + +static void* PeriodicTaskThread(void* arg) { + PeriodicTask* task = static_cast(arg); + timespec abstime; + if (!task->DoPeriodicTask(&abstime)) { // end + task->DoPeriodicTask(NULL); + return NULL; + } + PeriodicTaskManager::StartTaskAt(task, abstime); + return NULL; +} + +static void RunPeriodicTaskThread(void* arg) { + bthread_t th = 0; + int rc = bthread_start_background( + &th, &BTHREAD_ATTR_NORMAL, PeriodicTaskThread, arg); + if (rc != 0) { + LOG(ERROR) << "Fail to start PeriodicTaskThread"; + static_cast(arg)->DoPeriodicTask(NULL); + return; + } +} + +void PeriodicTaskManager::StartTaskAt(PeriodicTask* task, const timespec& abstime) { + if (task == NULL) { + LOG(ERROR) << "Param[task] is NULL"; + return; + } + bthread_timer_t timer_id; + const int rc = bthread_timer_add( + &timer_id, abstime, RunPeriodicTaskThread, task); + if (rc != 0) { + LOG(ERROR) << "Fail to add timer for RunPerodicTaskThread"; + task->DoPeriodicTask(NULL); + return; + } +} + + +} // namespace brpc diff --git a/src/brpc/periodic_task.h b/src/brpc/periodic_task.h new file mode 100644 index 0000000000..bb4abb454c --- /dev/null +++ b/src/brpc/periodic_task.h @@ -0,0 +1,46 @@ +// Copyright (c) 2018 brpc authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Authors: Ge,Jun (gejun@baidu.com) + +#ifndef BRPC_HEALTH_CHECK_MANAGER_H +#define BRPC_HEALTH_CHECK_MANAGER_H + +namespace brpc { + + +// Override DoPeriodicTask() with code that needs to be periodically run. If +// the task is completed, the method should return false; Otherwise the method +// should return true and set `next_abstime' to the time that the task should +// be run next time. +// Each call to DoPeriodicTask() is run in a separated bthread which can be +// suspended. To preserve states between different calls, put the states as +// fields of (subclass of) PeriodicTask. +// If any error occurs or DoPeriodicTask() returns false, the task is called +// with DoPeriodicTask(NULL) and will not be scheduled anymore. +class PeriodicTask { +public: + virtual ~PeriodicTask(); + virtual bool DoPeriodicTask(timespec* next_abstime) = 0; +}; + +class PeriodicTaskManager { +public: + static void StartTaskAt(PeriodicTask* task, const timespec& abstime); +}; + + +} // namespace brpc + +#endif // BRPC_HEALTH_CHECK_MANAGER_H diff --git a/src/brpc/policy/baidu_naming_service.cpp b/src/brpc/policy/baidu_naming_service.cpp index 831f6fdcb0..35744b62eb 100644 --- a/src/brpc/policy/baidu_naming_service.cpp +++ b/src/brpc/policy/baidu_naming_service.cpp @@ -68,10 +68,6 @@ NamingService* BaiduNamingService::New() const { return new BaiduNamingService; } -void BaiduNamingService::Destroy() { - delete this; -} - } // namespace policy } // namespace brpc #endif // BAIDU_INTERNAL diff --git a/src/brpc/policy/baidu_naming_service.h b/src/brpc/policy/baidu_naming_service.h index cc8f068790..74badd1270 100644 --- a/src/brpc/policy/baidu_naming_service.h +++ b/src/brpc/policy/baidu_naming_service.h @@ -34,8 +34,6 @@ class BaiduNamingService : public PeriodicNamingService { void Describe(std::ostream& os, const DescribeOptions&) const; NamingService* New() const; - - void Destroy(); }; } // namespace policy diff --git a/src/brpc/policy/consistent_hashing_load_balancer.cpp b/src/brpc/policy/consistent_hashing_load_balancer.cpp index 942f8a492a..d476d8998b 100644 --- a/src/brpc/policy/consistent_hashing_load_balancer.cpp +++ b/src/brpc/policy/consistent_hashing_load_balancer.cpp @@ -15,6 +15,7 @@ // Authors: Zhangyi Chen (chenzhangyi01@baidu.com) #include // std::set_union +#include // sqrt #include #include "butil/containers/flat_map.h" #include "butil/errno.h" diff --git a/src/brpc/policy/consul_naming_service.cpp b/src/brpc/policy/consul_naming_service.cpp index 9af01b74b5..ee41fad898 100644 --- a/src/brpc/policy/consul_naming_service.cpp +++ b/src/brpc/policy/consul_naming_service.cpp @@ -202,39 +202,6 @@ int ConsulNamingService::GetServers(const char* service_name, return 0; } -int ConsulNamingService::RunNamingService(const char* service_name, - NamingServiceActions* actions) { - std::vector servers; - bool ever_reset = false; - for (;;) { - servers.clear(); - const int rc = GetServers(service_name, &servers); - if (rc == 0) { - ever_reset = true; - actions->ResetServers(servers); - } else { - if (!ever_reset) { - // ResetServers must be called at first time even if GetServers - // failed, to wake up callers to `WaitForFirstBatchOfServers' - ever_reset = true; - servers.clear(); - actions->ResetServers(servers); - } - if (bthread_usleep(std::max(FLAGS_consul_retry_interval_ms, 1) * butil::Time::kMillisecondsPerSecond) < 0) { - if (errno == ESTOP) { - RPC_VLOG << "Quit NamingServiceThread=" << bthread_self(); - return 0; - } - PLOG(FATAL) << "Fail to sleep"; - return -1; - } - } - } - CHECK(false); - return -1; -} - - void ConsulNamingService::Describe(std::ostream& os, const DescribeOptions&) const { os << "consul"; @@ -245,9 +212,5 @@ NamingService* ConsulNamingService::New() const { return new ConsulNamingService; } -void ConsulNamingService::Destroy() { - delete this; -} - } // namespace policy } // namespace brpc diff --git a/src/brpc/policy/consul_naming_service.h b/src/brpc/policy/consul_naming_service.h index 798ac5bc4b..73dfd0da3b 100644 --- a/src/brpc/policy/consul_naming_service.h +++ b/src/brpc/policy/consul_naming_service.h @@ -17,7 +17,7 @@ #ifndef BRPC_POLICY_CONSUL_NAMING_SERVICE #define BRPC_POLICY_CONSUL_NAMING_SERVICE -#include "brpc/naming_service.h" +#include "brpc/periodic_naming_service.h" #include "brpc/channel.h" @@ -25,23 +25,18 @@ namespace brpc { class Channel; namespace policy { -class ConsulNamingService : public NamingService { +class ConsulNamingService : public PeriodicNamingService { private: - int RunNamingService(const char* service_name, - NamingServiceActions* actions); - - int GetServers(const char* service_name, - std::vector* servers); - void Describe(std::ostream& os, const DescribeOptions&) const; NamingService* New() const; + int GetServers(const char* service_name, + std::vector* servers); + int DegradeToOtherServiceIfNeeded(const char* service_name, std::vector* servers); - void Destroy(); - private: Channel _channel; std::string _consul_index; diff --git a/src/brpc/policy/domain_naming_service.cpp b/src/brpc/policy/domain_naming_service.cpp index d6f8c54325..288103de6b 100644 --- a/src/brpc/policy/domain_naming_service.cpp +++ b/src/brpc/policy/domain_naming_service.cpp @@ -143,9 +143,5 @@ NamingService* DomainNamingService::New() const { return new DomainNamingService; } -void DomainNamingService::Destroy() { - delete this; -} - } // namespace policy } // namespace brpc diff --git a/src/brpc/policy/domain_naming_service.h b/src/brpc/policy/domain_naming_service.h index 45b3e91941..0ad965ca1e 100644 --- a/src/brpc/policy/domain_naming_service.h +++ b/src/brpc/policy/domain_naming_service.h @@ -36,8 +36,6 @@ class DomainNamingService : public PeriodicNamingService { NamingService* New() const; - void Destroy(); - private: std::unique_ptr _aux_buf; size_t _aux_buf_len; diff --git a/src/brpc/policy/file_naming_service.cpp b/src/brpc/policy/file_naming_service.cpp index df69927732..f15d083fa7 100644 --- a/src/brpc/policy/file_naming_service.cpp +++ b/src/brpc/policy/file_naming_service.cpp @@ -22,6 +22,8 @@ #include "bthread/bthread.h" // bthread_usleep #include "brpc/log.h" #include "brpc/policy/file_naming_service.h" +#include "brpc/periodic_task.h" +#include "brpc/shared_object.h" namespace brpc { @@ -112,40 +114,75 @@ int FileNamingService::GetServers(const char *service_name, return 0; } -int FileNamingService::RunNamingService(const char* service_name, - NamingServiceActions* actions) { +class ReloadFileTask : public PeriodicTask, public SharedObject { +friend class FileNamingService; +public: + // FileWatcher is copyable + ReloadFileTask(FileNamingService* owner, + const butil::FileWatcher& fw, + NamingServiceActions* actions) + : _owner(owner) + , _fw(fw) + , _actions(actions) + , _scheduled_destroy(false) {} + bool DoPeriodicTask(timespec* next_abstime); + + void CleanUp(); +private: + FileNamingService* _owner; + butil::FileWatcher _fw; + std::unique_ptr _actions; + bool _scheduled_destroy; +}; + +const int64_t RELOAD_FILE_MS = 100; + +void ReloadFileTask::CleanUp() { + _actions.reset(NULL); +} + +bool ReloadFileTask::DoPeriodicTask(timespec* next_abstime) { + if (next_abstime == NULL) { + // Remove the ref added for this task. + this->RemoveRefManually(); + return true; + } + if (_scheduled_destroy) { + return false; + } + butil::FileWatcher::Change change = _fw.check_and_consume(); + if (change <= 0) { + LOG_IF(ERROR, change < 0) << "`" << _fw.filepath() << "' was deleted"; + return true; + } + std::vector servers; + const int rc = _owner->GetServers(_fw.filepath(), &servers); + if (rc == 0) { + _actions->ResetServers(servers); + } + *next_abstime = butil::seconds_from_now(RELOAD_FILE_MS); + return true; +} + +void FileNamingService::RunNamingService(const char* service_name, + NamingServiceActions* actions) { std::vector servers; butil::FileWatcher fw; if (fw.init(service_name) < 0) { - LOG(ERROR) << "Fail to init FileWatcher on `" << service_name << "'"; - return -1; + delete actions; + return; } - for (;;) { - const int rc = GetServers(service_name, &servers); - if (rc != 0) { - return rc; - } - actions->ResetServers(servers); - - for (;;) { - butil::FileWatcher::Change change = fw.check_and_consume(); - if (change > 0) { - break; - } - if (change < 0) { - LOG(ERROR) << "`" << service_name << "' was deleted"; - } - if (bthread_usleep(100000L/*100ms*/) < 0) { - if (errno == ESTOP) { - return 0; - } - PLOG(ERROR) << "Fail to sleep"; - return -1; - } - } + const int rc = GetServers(service_name, &servers); + if (rc != 0) { + delete actions; + return; } - CHECK(false); - return -1; + actions->ResetServers(servers); + _task = new ReloadFileTask(this, fw, actions); + _task->AddRefManually(); // Add ref for this NS + _task->AddRefManually(); // Add ref for the task + PeriodicTaskManager::StartTaskAt( + _task, butil::milliseconds_from_now(RELOAD_FILE_MS)); } void FileNamingService::Describe(std::ostream& os, @@ -159,6 +196,12 @@ NamingService* FileNamingService::New() const { } void FileNamingService::Destroy() { + if (_task) { + _task->CleanUp(); + _task->_scheduled_destroy = true; + _task->RemoveRefManually(); + _task = NULL; + } delete this; } diff --git a/src/brpc/policy/file_naming_service.h b/src/brpc/policy/file_naming_service.h index 20616b6b37..a196102361 100644 --- a/src/brpc/policy/file_naming_service.h +++ b/src/brpc/policy/file_naming_service.h @@ -23,20 +23,28 @@ namespace brpc { namespace policy { +class ReloadFileTask; + class FileNamingService : public NamingService { +friend class ReloadFileTask; friend class ConsulNamingService; +public: + FileNamingService() : _task(NULL) {} private: - int RunNamingService(const char* service_name, - NamingServiceActions* actions); - - int GetServers(const char *service_name, - std::vector* servers); + void RunNamingService(const char* service_name, + NamingServiceActions* actions); void Describe(std::ostream& os, const DescribeOptions&) const; NamingService* New() const; void Destroy(); +private: + int GetServers(const char *service_name, + std::vector* servers); + + // Not using intrusive_ptr to hide the inclusion. + ReloadFileTask* _task; }; } // namespace policy diff --git a/src/brpc/policy/list_naming_service.cpp b/src/brpc/policy/list_naming_service.cpp index 592e6aebbf..938e99edc4 100644 --- a/src/brpc/policy/list_naming_service.cpp +++ b/src/brpc/policy/list_naming_service.cpp @@ -71,15 +71,16 @@ int ListNamingService::GetServers(const char *service_name, return 0; } -int ListNamingService::RunNamingService(const char* service_name, - NamingServiceActions* actions) { +void ListNamingService::RunNamingService(const char* service_name, + NamingServiceActions* actions) { std::vector servers; const int rc = GetServers(service_name, &servers); if (rc != 0) { - servers.clear(); + delete actions; + } else { + _actions.reset(actions); + _actions->ResetServers(servers); } - actions->ResetServers(servers); - return 0; } void ListNamingService::Describe( diff --git a/src/brpc/policy/list_naming_service.h b/src/brpc/policy/list_naming_service.h index a03ba08069..03a20e8c6b 100644 --- a/src/brpc/policy/list_naming_service.h +++ b/src/brpc/policy/list_naming_service.h @@ -25,12 +25,9 @@ namespace policy { class ListNamingService : public NamingService { private: - int RunNamingService(const char* service_name, - NamingServiceActions* actions); + void RunNamingService(const char* service_name, + NamingServiceActions* actions); - // We don't need a dedicated bthread to run this static NS. - bool RunNamingServiceReturnsQuickly() { return true; } - int GetServers(const char *service_name, std::vector* servers); @@ -39,6 +36,9 @@ class ListNamingService : public NamingService { NamingService* New() const; void Destroy(); + +private: + std::unique_ptr _actions; }; } // namespace policy diff --git a/src/brpc/policy/remote_file_naming_service.cpp b/src/brpc/policy/remote_file_naming_service.cpp index ac81eb944a..c5e7a5714a 100644 --- a/src/brpc/policy/remote_file_naming_service.cpp +++ b/src/brpc/policy/remote_file_naming_service.cpp @@ -153,9 +153,5 @@ NamingService* RemoteFileNamingService::New() const { return new RemoteFileNamingService; } -void RemoteFileNamingService::Destroy() { - delete this; -} - } // namespace policy } // namespace brpc diff --git a/src/brpc/policy/remote_file_naming_service.h b/src/brpc/policy/remote_file_naming_service.h index 03bfdf1a5f..c090e4fb90 100644 --- a/src/brpc/policy/remote_file_naming_service.h +++ b/src/brpc/policy/remote_file_naming_service.h @@ -35,8 +35,6 @@ class RemoteFileNamingService : public PeriodicNamingService { NamingService* New() const; - void Destroy(); - private: std::unique_ptr _channel; std::string _server_addr; diff --git a/src/brpc/selective_channel.cpp b/src/brpc/selective_channel.cpp index fd1006cd28..c181308323 100644 --- a/src/brpc/selective_channel.cpp +++ b/src/brpc/selective_channel.cpp @@ -16,10 +16,11 @@ #include #include -#include "bthread/bthread.h" // bthread_id_xx -#include "brpc/socket.h" // SocketUser -#include "brpc/load_balancer.h" // LoadBalancer -#include "brpc/details/controller_private_accessor.h" // RPCSender +#include "bthread/bthread.h" // bthread_id_xx +#include "brpc/socket.h" // SocketUser +#include "brpc/load_balancer.h" // LoadBalancer +#include "brpc/details/shared_load_balancer.h" // SharedLoadBalancer +#include "brpc/details/controller_private_accessor.h" // RPCSender #include "brpc/selective_channel.h" #include "brpc/global.h" diff --git a/src/brpc/server_node.h b/src/brpc/server_node.h new file mode 100644 index 0000000000..c8adbda521 --- /dev/null +++ b/src/brpc/server_node.h @@ -0,0 +1,57 @@ +// Copyright (c) 2014 brpc authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Authors: Ge,Jun (gejun@baidu.com) + +#ifndef BRPC_SERVER_NODE_H +#define BRPC_SERVER_NODE_H + +namespace brpc { + + +// Representing a server inside a NamingService. +struct ServerNode { + ServerNode() {} + ServerNode(butil::ip_t ip, int port, const std::string& tag2) + : addr(ip, port), tag(tag2) {} + ServerNode(const butil::EndPoint& pt, const std::string& tag2) + : addr(pt), tag(tag2) {} + ServerNode(butil::ip_t ip, int port) : addr(ip, port) {} + explicit ServerNode(const butil::EndPoint& pt) : addr(pt) {} + + butil::EndPoint addr; + std::string tag; +}; + +inline bool operator<(const ServerNode& n1, const ServerNode& n2) +{ return n1.addr != n2.addr ? (n1.addr < n2.addr) : (n1.tag < n2.tag); } + +inline bool operator==(const ServerNode& n1, const ServerNode& n2) +{ return n1.addr == n2.addr && n1.tag == n2.tag; } + +inline bool operator!=(const ServerNode& n1, const ServerNode& n2) +{ return !(n1 == n2); } + +inline std::ostream& operator<<(std::ostream& os, const ServerNode& n) { + os << n.addr; + if (!n.tag.empty()) { + os << "(tag=" << n.tag << ')'; + } + return os; +} + + +} // namespace brpc + +#endif // BRPC_SERVER_NODE_H diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index d5bc55788e..a669a51219 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -41,6 +41,7 @@ #include "brpc/stream_impl.h" #include "brpc/shared_object.h" #include "brpc/policy/rtmp_protocol.h" // FIXME +#include "brpc/periodic_task.h" #if defined(OS_MACOSX) #include #endif @@ -95,13 +96,6 @@ BRPC_VALIDATE_GFLAG(connect_timeout_as_unreachable, const int WAIT_EPOLLOUT_TIMEOUT_MS = 50; -#ifdef BAIDU_INTERNAL -#define BRPC_AUXTHREAD_ATTR \ - (sizeof(com_device_t) > 32*1024 ? BTHREAD_ATTR_NORMAL : BTHREAD_ATTR_SMALL) -#else -#define BRPC_AUXTHREAD_ATTR BTHREAD_ATTR_SMALL -#endif - class BAIDU_CACHELINE_ALIGNMENT SocketPool { public: explicit SocketPool(const SocketOptions& opt); @@ -771,6 +765,16 @@ void Socket::Revive() { } } +class HealthCheckTask : public PeriodicTask { +public: + explicit HealthCheckTask(SocketId id) : _id(id) , _first_time(true) {} + bool DoPeriodicTask(timespec* next_abstime); + +private: + SocketId _id; + bool _first_time; +}; + int Socket::ReleaseAdditionalReference() { bool expect = false; // Use `relaxed' fence here since `Dereference' has `released' fence @@ -817,10 +821,9 @@ int Socket::SetFailed(int error_code, const char* error_fmt, ...) { // by Channel to revive never-connected socket when server side // comes online. if (_health_check_interval_s > 0) { - bthread_t th = 0; - int rc = bthread_start_background( - &th, &BRPC_AUXTHREAD_ATTR, HealthCheckThread, (void*)id()); - CHECK_EQ(0, rc); + PeriodicTaskManager::StartTaskAt( + new HealthCheckTask(id()), + butil::milliseconds_from_now(_health_check_interval_s * 500)); } // Wake up all threads waiting on EPOLLOUT when closing fd _epollout_butex->fetch_add(1, butil::memory_order_relaxed); @@ -918,81 +921,65 @@ int Socket::Status(SocketId id, int32_t* nref) { return -1; } -void* Socket::HealthCheckThread(void* void_arg) { - SocketId socket_id = (SocketId)void_arg; - bool first_time = true; - if (bthread_usleep(100000) < 0) { - PLOG_IF(FATAL, errno != ESTOP) << "Fail to sleep"; - return NULL; +bool HealthCheckTask::DoPeriodicTask(timespec* next_abstime) { + if (next_abstime == NULL) { + delete this; + return true; } - - for (;;) { - butil::EndPoint remote_side; - int check_interval_s = 0; - do { - SocketUniquePtr ptr; - const int rc = AddressFailedAsWell(socket_id, &ptr); - CHECK(rc != 0); - if (rc < 0) { - RPC_VLOG << "SocketId=" << socket_id - << " was abandoned before health checking"; - return NULL; - } - remote_side = ptr->remote_side(); - check_interval_s = ptr->_health_check_interval_s; - // Note: Making a Socket re-addessable is hard. An alternative is - // creating another Socket with selected internal fields to replace - // failed Socket. Although it avoids concurrent issues with in-place - // revive, it changes SocketId: many code need to watch SocketId - // and update on change, which is impractical. Another issue with - // this method is that it has to move "selected internal fields" - // which may be accessed in parallel, not trivial to be moved. - // Finally we choose a simple-enough solution: wait until the - // reference count hits `expected_nref', which basically means no - // one is addressing the Socket(except here). Because the Socket - // is not addressable, the reference count will not increase - // again. This solution is not perfect because the `expected_nref' - // is implementation specific. In our case, one reference comes - // from SocketMapInsert(socket_map.cpp), one reference is here. - // Although WaitAndReset() could hang when someone is addressing - // the failed Socket forever (also indicating bug), this is not an - // issue in current code. - if (first_time) { // Only check at first time. - first_time = false; - if (ptr->WaitAndReset(2/*note*/) != 0) { - LOG(INFO) << "Cancel checking " << *ptr; - return NULL; - } - } - - s_vars->nhealthcheck << 1; - int hc = 0; - if (ptr->_user) { - hc = ptr->_user->CheckHealth(ptr.get()); - } else { - hc = ptr->CheckHealth(); - } - if (hc == 0) { - if (ptr->CreatedByConnect()) { - s_vars->channel_conn << -1; - } - ptr->Revive(); - ptr->_hc_count = 0; - return NULL; - } else if (hc == ESTOP) { - LOG(INFO) << "Cancel checking " << *ptr; - return NULL; - } - ++ ptr->_hc_count; - } while (0); - CHECK_GT(check_interval_s, 0); - if (bthread_usleep(check_interval_s * 1000000L) < 0) { - PLOG_IF(FATAL, errno != ESTOP) << "Fail to sleep"; - LOG(INFO) << "Cancel checking SocketId=" - << socket_id << '@' << remote_side; - return NULL; + SocketUniquePtr ptr; + const int rc = Socket::AddressFailedAsWell(_id, &ptr); + CHECK(rc != 0); + if (rc < 0) { + RPC_VLOG << "SocketId=" << _id + << " was abandoned before health checking"; + return false; + } + // Note: Making a Socket re-addessable is hard. An alternative is + // creating another Socket with selected internal fields to replace + // failed Socket. Although it avoids concurrent issues with in-place + // revive, it changes SocketId: many code need to watch SocketId + // and update on change, which is impractical. Another issue with + // this method is that it has to move "selected internal fields" + // which may be accessed in parallel, not trivial to be moved. + // Finally we choose a simple-enough solution: wait until the + // reference count hits `expected_nref', which basically means no + // one is addressing the Socket(except here). Because the Socket + // is not addressable, the reference count will not increase + // again. This solution is not perfect because the `expected_nref' + // is implementation specific. In our case, one reference comes + // from SocketMapInsert(socket_map.cpp), one reference is here. + // Although WaitAndReset() could hang when someone is addressing + // the failed Socket forever (also indicating bug), this is not an + // issue in current code. + if (_first_time) { // Only check at first time. + _first_time = false; + if (ptr->WaitAndReset(2/*note*/) != 0) { + LOG(INFO) << "Cancel checking " << *ptr; + return false; + } + } + + s_vars->nhealthcheck << 1; + int hc = 0; + if (ptr->_user) { + hc = ptr->_user->CheckHealth(ptr.get()); + } else { + hc = ptr->CheckHealth(); + } + if (hc == 0) { + if (ptr->CreatedByConnect()) { + s_vars->channel_conn << -1; } + ptr->Revive(); + ptr->_hc_count = 0; + return false; + } else if (hc == ESTOP) { + LOG(INFO) << "Cancel checking " << *ptr; + return false; } + ++ ptr->_hc_count; + *next_abstime = butil::seconds_from_now(ptr->_health_check_interval_s); + return true; } void Socket::OnRecycle() { diff --git a/src/brpc/socket.h b/src/brpc/socket.h index 23265b12c7..1ad06d7c0d 100644 --- a/src/brpc/socket.h +++ b/src/brpc/socket.h @@ -178,6 +178,7 @@ friend class Controller; friend class policy::ConsistentHashingLoadBalancer; friend class policy::RtmpContext; friend class schan::ChannelBalancer; +friend class HealthCheckTask; class SharedPart; struct Forbidden {}; struct WriteRequest; @@ -515,7 +516,6 @@ friend void DereferenceSocket(Socket*); int ConnectIfNot(const timespec* abstime, WriteRequest* req); int ResetFileDescriptor(int fd); - static void* HealthCheckThread(void*); // Returns 0 on success, 1 on failed socket, -1 on recycled. static int AddressFailedAsWell(SocketId id, SocketUniquePtr* ptr); @@ -660,7 +660,7 @@ friend void DereferenceSocket(Socket*); int _preferred_index; // Number of HC since the last SetFailed() was called. Set to 0 when the - // socket is revived. Only set in HealthCheckThread + // socket is revived. Only set in HealthCheckTask::DoPeriodicTask() int _hc_count; // Size of current incomplete message, set to 0 on complete. diff --git a/src/brpc/socket_map.cpp b/src/brpc/socket_map.cpp index 68835ea9aa..69c9f95bbe 100644 --- a/src/brpc/socket_map.cpp +++ b/src/brpc/socket_map.cpp @@ -158,7 +158,7 @@ void SocketMapRemove(const SocketMapKey& key) { SocketMap* m = get_client_side_socket_map(); if (m) { // TODO: We don't have expected_id to pass right now since the callsite - // at NamingServiceThread is hard to be fixed right now. As long as + // at SharedNamingService is hard to be fixed right now. As long as // FLAGS_health_check_interval is limited to positive, SocketMapInsert // never replaces the sockets, skipping comparison is still right. m->Remove(key, (SocketId)-1); diff --git a/test/brpc_channel_unittest.cpp b/test/brpc_channel_unittest.cpp index 9566f143cf..233247fc77 100644 --- a/test/brpc_channel_unittest.cpp +++ b/test/brpc_channel_unittest.cpp @@ -149,7 +149,7 @@ pthread_once_t register_mock_protocol = PTHREAD_ONCE_INIT; class ChannelTest : public ::testing::Test{ protected: ChannelTest() - : _ep(butil::IP_ANY, 8787) + : _ep(butil::IP_ANY, 9787) , _close_fd_once(false) { pthread_once(®ister_mock_protocol, register_protocol); const brpc::InputMessageHandler pairs[] = { @@ -1866,7 +1866,7 @@ TEST_F(ChannelTest, init_using_naming_service) { brpc::LoadBalancerWithNaming* lb = dynamic_cast(channel->_lb.get()); ASSERT_TRUE(lb != NULL); - brpc::NamingServiceThread* ns = lb->_nsthread_ptr.get(); + brpc::SharedNamingService* ns = lb->_nsthread_ptr.get(); { const int NUM = 10; diff --git a/test/brpc_naming_service_filter_unittest.cpp b/test/brpc_naming_service_filter_unittest.cpp index b25470dfbd..56082d59cd 100644 --- a/test/brpc_naming_service_filter_unittest.cpp +++ b/test/brpc_naming_service_filter_unittest.cpp @@ -9,7 +9,7 @@ #include "butil/files/temp_file.h" #include "brpc/socket.h" #include "brpc/channel.h" -#include "brpc/load_balancer.h" +#include "brpc/details/shared_load_balancer.h" #include "brpc/policy/file_naming_service.h" class NamingServiceFilterTest : public testing::Test { diff --git a/test/brpc_naming_service_unittest.cpp b/test/brpc_naming_service_unittest.cpp index 1fe30b6d6d..b0652975e2 100644 --- a/test/brpc_naming_service_unittest.cpp +++ b/test/brpc_naming_service_unittest.cpp @@ -15,6 +15,8 @@ #include "brpc/policy/file_naming_service.h" #include "brpc/policy/list_naming_service.h" #include "brpc/policy/remote_file_naming_service.h" +#include "brpc/global.h" +#include "brpc/details/shared_naming_service.h" #include "echo.pb.h" #include "brpc/server.h" @@ -30,7 +32,15 @@ DECLARE_string(consul_service_discovery_url); } // brpc namespace { -TEST(NamingServiceTest, sanity) { + +class NamingServiceTest : public testing::Test { +protected: + void SetUp() { + brpc::GlobalInitializeOrDie(); + } + void TearDown() {} +}; +TEST_F(NamingServiceTest, sanity) { std::vector servers; #ifdef BAIDU_INTERNAL @@ -97,7 +107,14 @@ TEST(NamingServiceTest, sanity) { } } -TEST(NamingServiceTest, invalid_port) { +TEST_F(NamingServiceTest, fail_to_get_shared_naming_service) { + butil::intrusive_ptr ns; + ASSERT_EQ(-1, brpc::GetSharedNamingService(&ns, "foobar", NULL)); + ASSERT_EQ(-1, brpc::GetSharedNamingService(&ns, "foobar://", NULL)); + ASSERT_EQ(-1, brpc::GetSharedNamingService(&ns, "file://never_exist", NULL)); +} + +TEST_F(NamingServiceTest, invalid_port) { std::vector servers; #ifdef BAIDU_INTERNAL @@ -111,7 +128,7 @@ TEST(NamingServiceTest, invalid_port) { ASSERT_EQ(-1, dns.GetServers("brpc.baidu.com:99999", &servers)); } -TEST(NamingServiceTest, wrong_name) { +TEST_F(NamingServiceTest, wrong_name) { std::vector servers; #ifdef BAIDU_INTERNAL @@ -178,7 +195,7 @@ class UserNamingServiceImpl : public test::UserNamingService { butil::atomic touch_count; }; -TEST(NamingServiceTest, remotefile) { +TEST_F(NamingServiceTest, remotefile) { brpc::Server server1; UserNamingServiceImpl svc1; ASSERT_EQ(0, server1.AddService(&svc1, brpc::SERVER_DOESNT_OWN_SERVICE)); @@ -355,7 +372,7 @@ class ConsulNamingServiceImpl : public test::UserNamingService { butil::atomic touch_count; }; -TEST(NamingServiceTest, consul_with_backup_file) { +TEST_F(NamingServiceTest, consul_with_backup_file) { brpc::policy::FLAGS_consul_enable_degrade_to_file_naming_service = true; const char *address_list[] = { "10.127.0.1:1234", @@ -394,7 +411,7 @@ TEST(NamingServiceTest, consul_with_backup_file) { restful_map.c_str())); ASSERT_EQ(0, server.Start("localhost:8500", NULL)); - bthread_usleep(1000000); + bthread_usleep(5000000); butil::EndPoint n1; ASSERT_EQ(0, butil::str2endpoint("10.121.36.189:8003", &n1)); From bf10ab1fe2f2d25ee1b534c9b34f59da9cb51389 Mon Sep 17 00:00:00 2001 From: zyearn Date: Thu, 23 Aug 2018 16:02:48 +0800 Subject: [PATCH 2/2] Fix _actions destructed too early --- src/brpc/details/shared_naming_service.cpp | 18 ++++++++++++++++-- src/brpc/details/shared_naming_service.h | 7 ++++++- src/brpc/global.cpp | 4 ++-- src/brpc/naming_service.h | 12 ++++++++++++ src/brpc/periodic_naming_service.cpp | 20 ++++++++++++++------ src/brpc/policy/file_naming_service.cpp | 19 +++++++++++-------- test/brpc_channel_unittest.cpp | 6 +++--- 7 files changed, 64 insertions(+), 22 deletions(-) diff --git a/src/brpc/details/shared_naming_service.cpp b/src/brpc/details/shared_naming_service.cpp index 57a76cb0e4..64cf607f5c 100644 --- a/src/brpc/details/shared_naming_service.cpp +++ b/src/brpc/details/shared_naming_service.cpp @@ -36,16 +36,30 @@ SharedNamingService::Actions::Actions(SharedNamingService* owner) } SharedNamingService::Actions::~Actions() { + if (!IsCleanedUp()) { + // If Action is not cleaned up, _owner is still a valid pointer. + // Otherwise *_owner has already been destructed and _owner is a + // dangling pointer. + _owner->EndWait(ECANCELED); + } +} + +void SharedNamingService::Actions::CleanUpImp() { + BAIDU_SCOPED_LOCK(_mutex); // Remove all sockets from SocketMap for (std::vector::const_iterator it = _last_servers.begin(); it != _last_servers.end(); ++it) { SocketMapRemove(SocketMapKey(it->addr)); } - _owner->EndWait(ECANCELED); + NamingServiceActions::CleanUpImp(); } void SharedNamingService::Actions::ResetServers( const std::vector& servers) { + BAIDU_SCOPED_LOCK(_mutex); + if (IsCleanedUp()) { + return; + } _servers.assign(servers.begin(), servers.end()); // Diff servers with _last_servers by comparing sorted vectors. // Notice that _last_servers is always sorted. @@ -365,7 +379,7 @@ int GetSharedNamingService( if (ptr->AddRefManually() == 0) { // The NS's last intrusive_ptr was just destructed and the // removal-from-global-map-code in ptr->~SharedNamingService() - // is about to run or already running, need to create another NS + // is about to run or already running, need to create another NS. // Notice that we don't need to remove the reference because // the object is already destructing. ptr = NULL; diff --git a/src/brpc/details/shared_naming_service.h b/src/brpc/details/shared_naming_service.h index c57502fa98..a21ad21329 100644 --- a/src/brpc/details/shared_naming_service.h +++ b/src/brpc/details/shared_naming_service.h @@ -27,7 +27,7 @@ namespace brpc { -// Inherit this class to observer NamingService changes. +// Inherit this class to observe NamingService changes. // NOTE: Same SocketId with different tags are treated as different entries. // When you change tag of a server, the server with the old tag will appear // in OnRemovedServers first, then in OnAddedServers with the new tag. @@ -65,8 +65,13 @@ class SharedNamingService : public SharedObject, public Describable { // @NamingServiceActions void ResetServers(const std::vector& servers); + protected: + // @NamingServiceActions + void CleanUpImp(); + private: SharedNamingService* _owner; + butil::Mutex _mutex; std::vector _last_servers; std::vector _servers; std::vector _added; diff --git a/src/brpc/global.cpp b/src/brpc/global.cpp index 4fbeade79b..c126510dc5 100755 --- a/src/brpc/global.cpp +++ b/src/brpc/global.cpp @@ -333,8 +333,8 @@ static void GlobalInitializeOrDieImpl() { #endif NamingServiceExtension()->RegisterOrDie("file", &g_ext->fns); NamingServiceExtension()->RegisterOrDie("list", &g_ext->lns); - NamingServiceExtension()->RegisterOrDie("http", &g_ext->dns); - NamingServiceExtension()->RegisterOrDie("redis", &g_ext->dns); + NamingServiceExtension()->RegisterOrDie("http", &g_ext->dns); + NamingServiceExtension()->RegisterOrDie("redis", &g_ext->dns); NamingServiceExtension()->RegisterOrDie("remotefile", &g_ext->rfns); NamingServiceExtension()->RegisterOrDie("consul", &g_ext->cns); diff --git a/src/brpc/naming_service.h b/src/brpc/naming_service.h index fe32f7f664..5056dc83a2 100644 --- a/src/brpc/naming_service.h +++ b/src/brpc/naming_service.h @@ -34,10 +34,22 @@ namespace brpc { // NOTE: You don't have to implement this class. class NamingServiceActions { public: + NamingServiceActions() + :_cleaned_up(false) {} virtual ~NamingServiceActions() {} // Call this method when servers are successfully refreshed. virtual void ResetServers(const std::vector& servers) = 0; + + // Clean up resources + void CleanUp() { CleanUpImp(); } + bool IsCleanedUp() { return _cleaned_up; } + +protected: + virtual void CleanUpImp() { _cleaned_up = true; } + +private: + bool _cleaned_up; }; // Mapping a name to ServerNodes. diff --git a/src/brpc/periodic_naming_service.cpp b/src/brpc/periodic_naming_service.cpp index f4efb8a9a4..d03b58e7bc 100644 --- a/src/brpc/periodic_naming_service.cpp +++ b/src/brpc/periodic_naming_service.cpp @@ -38,23 +38,26 @@ friend class PeriodicNamingService; NamingServiceActions* actions) : _owner(owner) , _service_name(service_name) - , _actions(actions) - , _scheduled_destroy(false) {} + , _actions(actions) {} bool DoPeriodicTask(timespec* next_abstime); + void CleanUp(); private: PeriodicNamingService* _owner; std::string _service_name; std::unique_ptr _actions; - bool _scheduled_destroy; }; +void AccessNamingServiceTask::CleanUp() { + _actions->CleanUp(); +} + bool AccessNamingServiceTask::DoPeriodicTask(timespec* next_abstime) { if (next_abstime == NULL) { // Remove the ref added for this task. this->RemoveRefManually(); return true; } - if (_scheduled_destroy) { + if (_actions->IsCleanedUp()){ return false; } std::vector servers; @@ -84,11 +87,16 @@ void PeriodicNamingService::RunNamingService( void PeriodicNamingService::Destroy() { if (_task) { - _task->_scheduled_destroy = true; + _task->CleanUp(); _task->RemoveRefManually(); _task = NULL; + } else { + // Ownership hasn't been transfered to _task, which should + // belong to _task in normal case since PeriodicNamingService:: + // GetServers may be called in AccessNamingServiceTask and + // PeriodicNamingService should still be in a valid state. + delete this; } - delete this; } } // namespace brpc diff --git a/src/brpc/policy/file_naming_service.cpp b/src/brpc/policy/file_naming_service.cpp index f15d083fa7..5085f2ab6d 100644 --- a/src/brpc/policy/file_naming_service.cpp +++ b/src/brpc/policy/file_naming_service.cpp @@ -123,22 +123,21 @@ friend class FileNamingService; NamingServiceActions* actions) : _owner(owner) , _fw(fw) - , _actions(actions) - , _scheduled_destroy(false) {} + , _actions(actions) {} bool DoPeriodicTask(timespec* next_abstime); void CleanUp(); private: - FileNamingService* _owner; + std::unique_ptr _owner; + butil::Mutex _mutex; butil::FileWatcher _fw; std::unique_ptr _actions; - bool _scheduled_destroy; }; const int64_t RELOAD_FILE_MS = 100; void ReloadFileTask::CleanUp() { - _actions.reset(NULL); + _actions->CleanUp(); } bool ReloadFileTask::DoPeriodicTask(timespec* next_abstime) { @@ -147,7 +146,7 @@ bool ReloadFileTask::DoPeriodicTask(timespec* next_abstime) { this->RemoveRefManually(); return true; } - if (_scheduled_destroy) { + if (_actions->IsCleanedUp()) { return false; } butil::FileWatcher::Change change = _fw.check_and_consume(); @@ -198,11 +197,15 @@ NamingService* FileNamingService::New() const { void FileNamingService::Destroy() { if (_task) { _task->CleanUp(); - _task->_scheduled_destroy = true; _task->RemoveRefManually(); _task = NULL; + } else { + // Ownership hasn't been transfered to _task, which should + // belong to _task in normal case since FileNamingService:: + // GetServers may be called in DoPeriodicTask and FileNamingService + // should still be in a valid state. + delete this; } - delete this; } } // namespace policy diff --git a/test/brpc_channel_unittest.cpp b/test/brpc_channel_unittest.cpp index 5f181a86d3..c2ce94530a 100644 --- a/test/brpc_channel_unittest.cpp +++ b/test/brpc_channel_unittest.cpp @@ -36,7 +36,7 @@ namespace policy { void SendRpcResponse(int64_t correlation_id, Controller* cntl, const google::protobuf::Message* req, const google::protobuf::Message* res, - const Server* server_raw, MethodStatus *, long); + const Server* server_raw, MethodStatus *, int64_t); } // policy } // brpc @@ -230,7 +230,7 @@ class ChannelTest : public ::testing::Test{ const google::protobuf::Message*, const google::protobuf::Message*, const brpc::Server*, - brpc::MethodStatus*, long>( + brpc::MethodStatus*, int64_t>( &brpc::policy::SendRpcResponse, meta.correlation_id(), cntl, NULL, res, &ts->_dummy, NULL, -1); @@ -702,7 +702,7 @@ class ChannelTest : public ::testing::Test{ CallMethod(&subchans[0], &cntl, &req, &res, false); ASSERT_TRUE(cntl.Failed()); ASSERT_EQ(brpc::EINTERNAL, cntl.ErrorCode()) << cntl.ErrorText(); - ASSERT_EQ("[E2001][127.0.1.1:0]Method ComboEcho() not implemented.", cntl.ErrorText()); + ASSERT_TRUE(butil::StringPiece(cntl.ErrorText()).ends_with("Method ComboEcho() not implemented.")); // do the rpc call. cntl.Reset();