Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/brpc/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ int Channel::Init(const char* ns_url,
LOG(FATAL) << "Fail to new LoadBalancerWithNaming";
return -1;
}
GetNamingServiceThreadOptions ns_opt;
GetSharedNamingServiceOptions ns_opt;
ns_opt.succeed_without_server = _options.succeed_without_server;
ns_opt.log_succeed_without_server = _options.log_succeed_without_server;
if (lb->Init(ns_url, lb_name, _options.ns_filter, &ns_opt) != 0) {
Expand Down
1 change: 1 addition & 0 deletions src/brpc/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "brpc/rpc_dump.pb.h"
#include "brpc/details/usercode_backup_pool.h" // RunUserCode
#include "brpc/mongo_service_adaptor.h"
#include "brpc/details/shared_load_balancer.h"

// Force linking the .o in UT (which analysis deps by inclusions)
#include "brpc/parallel_channel.h"
Expand Down
6 changes: 3 additions & 3 deletions src/brpc/details/load_balancer_with_naming.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ LoadBalancerWithNaming::~LoadBalancerWithNaming() {

int LoadBalancerWithNaming::Init(const char* ns_url, const char* lb_name,
const NamingServiceFilter* filter,
const GetNamingServiceThreadOptions* options) {
const GetSharedNamingServiceOptions* options) {
if (SharedLoadBalancer::Init(lb_name) != 0) {
return -1;
}
if (GetNamingServiceThread(&_nsthread_ptr, ns_url, options) != 0) {
LOG(FATAL) << "Fail to get NamingServiceThread";
if (GetSharedNamingService(&_nsthread_ptr, ns_url, options) != 0) {
LOG(ERROR) << "Fail to get SharedNamingService";
return -1;
}
if (_nsthread_ptr->AddWatcher(this, filter) != 0) {
Expand Down
11 changes: 6 additions & 5 deletions src/brpc/details/load_balancer_with_naming.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
#define BRPC_LOAD_BALANCER_WITH_NAMING_H

#include "butil/intrusive_ptr.hpp"
#include "brpc/load_balancer.h"
#include "brpc/details/naming_service_thread.h" // NamingServiceWatcher
#include "brpc/details/shared_load_balancer.h"
#include "brpc/details/shared_naming_service.h" // NamingServiceWatcher


namespace brpc {
Expand All @@ -32,15 +32,16 @@ class LoadBalancerWithNaming : public SharedLoadBalancer,

int Init(const char* ns_url, const char* lb_name,
const NamingServiceFilter* filter,
const GetNamingServiceThreadOptions* options);

const GetSharedNamingServiceOptions* options);

protected:
void OnAddedServers(const std::vector<ServerId>& servers);
void OnRemovedServers(const std::vector<ServerId>& servers);

void Describe(std::ostream& os, const DescribeOptions& options);

private:
butil::intrusive_ptr<NamingServiceThread> _nsthread_ptr;
butil::intrusive_ptr<SharedNamingService> _nsthread_ptr;
};

} // namespace brpc
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2014 Baidu, Inc.
// Copyright (c) 2014 brpc authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -16,8 +16,7 @@

#include <gflags/gflags.h>
#include "brpc/reloadable_flags.h"
#include "brpc/load_balancer.h"

#include "brpc/details/shared_load_balancer.h"

namespace brpc {

Expand Down Expand Up @@ -65,7 +64,7 @@ SharedLoadBalancer::~SharedLoadBalancer() {
int SharedLoadBalancer::Init(const char* lb_name) {
const LoadBalancer* lb = LoadBalancerExtension()->Find(lb_name);
if (lb == NULL) {
LOG(FATAL) << "Fail to find LoadBalancer by `" << lb_name << "'";
LOG(ERROR) << "Fail to find LoadBalancer by `" << lb_name << "'";
return -1;
}
LoadBalancer* lb_copy = lb->New();
Expand All @@ -89,4 +88,5 @@ void SharedLoadBalancer::Describe(std::ostream& os,
}
}


} // namespace brpc
97 changes: 97 additions & 0 deletions src/brpc/details/shared_load_balancer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright (c) 2014 brpc authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Authors: Ge,Jun (gejun@baidu.com)

#ifndef BRPC_SHARED_LOAD_BALANCER_H
#define BRPC_SHARED_LOAD_BALANCER_H

#include "bvar/passive_status.h"
#include "brpc/load_balancer.h"
#include "brpc/shared_object.h" // SharedObject

namespace brpc {

DECLARE_bool(show_lb_in_vars);

// A intrusively shareable load balancer created from name.
class SharedLoadBalancer : public SharedObject, public NonConstDescribable {
public:
SharedLoadBalancer();
~SharedLoadBalancer();

int Init(const char* lb_name);

int SelectServer(const LoadBalancer::SelectIn& in,
LoadBalancer::SelectOut* out) {
if (FLAGS_show_lb_in_vars && !_exposed) {
ExposeLB();
}
return _lb->SelectServer(in, out);
}

void Feedback(const LoadBalancer::CallInfo& info) { _lb->Feedback(info); }

bool AddServer(const ServerId& server) {
if (_lb->AddServer(server)) {
_weight_sum.fetch_add(1, butil::memory_order_relaxed);
return true;
}
return false;
}
bool RemoveServer(const ServerId& server) {
if (_lb->RemoveServer(server)) {
_weight_sum.fetch_sub(1, butil::memory_order_relaxed);
return true;
}
return false;
}

size_t AddServersInBatch(const std::vector<ServerId>& servers) {
size_t n = _lb->AddServersInBatch(servers);
if (n) {
_weight_sum.fetch_add(n, butil::memory_order_relaxed);
}
return n;
}

size_t RemoveServersInBatch(const std::vector<ServerId>& servers) {
size_t n = _lb->RemoveServersInBatch(servers);
if (n) {
_weight_sum.fetch_sub(n, butil::memory_order_relaxed);
}
return n;
}

virtual void Describe(std::ostream& os, const DescribeOptions&);

virtual int Weight() {
return _weight_sum.load(butil::memory_order_relaxed);
}

private:
static void DescribeLB(std::ostream& os, void* arg);
void ExposeLB();

LoadBalancer* _lb;
butil::atomic<int> _weight_sum;
volatile bool _exposed;
butil::Mutex _st_mutex;
bvar::PassiveStatus<std::string> _st;
};


} // namespace brpc

#endif // BRPC_SHARED_LOAD_BALANCER_H
Loading