Skip to content

Commit

Permalink
Merge pull request #592 from cdjingit/lb_configurable_and_consistency…
Browse files Browse the repository at this point in the history
…_lb_refactor_caidaojin

lb configurable && consistency hash lb refactor
  • Loading branch information
jamesge committed Apr 11, 2019
2 parents 1b9e006 + 4a7c3c5 commit 2ef40e6
Show file tree
Hide file tree
Showing 19 changed files with 256 additions and 85 deletions.
7 changes: 5 additions & 2 deletions src/brpc/global.cpp
Expand Up @@ -113,8 +113,9 @@ const char* const DUMMY_SERVER_PORT_FILE = "dummy_server.port";

struct GlobalExtensions {
GlobalExtensions()
: ch_mh_lb(MurmurHash32)
, ch_md5_lb(MD5Hash32)
: ch_mh_lb(CONS_HASH_LB_MURMUR3)
, ch_md5_lb(CONS_HASH_LB_MD5)
, ch_ketama_lb(CONS_HASH_LB_KETAMA)
, constant_cl(0) {
}

Expand All @@ -134,6 +135,7 @@ struct GlobalExtensions {
LocalityAwareLoadBalancer la_lb;
ConsistentHashingLoadBalancer ch_mh_lb;
ConsistentHashingLoadBalancer ch_md5_lb;
ConsistentHashingLoadBalancer ch_ketama_lb;
DynPartLoadBalancer dynpart_lb;

AutoConcurrencyLimiter auto_cl;
Expand Down Expand Up @@ -355,6 +357,7 @@ static void GlobalInitializeOrDieImpl() {
LoadBalancerExtension()->RegisterOrDie("la", &g_ext->la_lb);
LoadBalancerExtension()->RegisterOrDie("c_murmurhash", &g_ext->ch_mh_lb);
LoadBalancerExtension()->RegisterOrDie("c_md5", &g_ext->ch_md5_lb);
LoadBalancerExtension()->RegisterOrDie("c_ketama", &g_ext->ch_ketama_lb);
LoadBalancerExtension()->RegisterOrDie("_dynpart", &g_ext->dynpart_lb);

// Compress Handlers
Expand Down
37 changes: 32 additions & 5 deletions src/brpc/load_balancer.cpp
Expand Up @@ -62,18 +62,23 @@ SharedLoadBalancer::~SharedLoadBalancer() {
}
}

int SharedLoadBalancer::Init(const char* lb_name) {
const LoadBalancer* lb = LoadBalancerExtension()->Find(lb_name);
int SharedLoadBalancer::Init(const char* lb_protocol) {
std::string lb_name;
butil::StringPiece lb_params;
if (!ParseParameters(lb_protocol, &lb_name, &lb_params)) {
LOG(FATAL) << "Fail to parse this load balancer protocol '" << lb_protocol << '\'';
return -1;
}
const LoadBalancer* lb = LoadBalancerExtension()->Find(lb_name.c_str());
if (lb == NULL) {
LOG(FATAL) << "Fail to find LoadBalancer by `" << lb_name << "'";
return -1;
}
LoadBalancer* lb_copy = lb->New();
if (lb_copy == NULL) {
_lb = lb->New(lb_params);
if (_lb == NULL) {
LOG(FATAL) << "Fail to new LoadBalancer";
return -1;
}
_lb = lb_copy;
if (FLAGS_show_lb_in_vars && !_exposed) {
ExposeLB();
}
Expand All @@ -89,4 +94,26 @@ void SharedLoadBalancer::Describe(std::ostream& os,
}
}

bool SharedLoadBalancer::ParseParameters(const butil::StringPiece& lb_protocol,
std::string* lb_name,
butil::StringPiece* lb_params) {
lb_name->clear();
lb_params->clear();
if (lb_protocol.empty()) {
return false;
}
const char separator = ':';
size_t pos = lb_protocol.find(separator);
if (pos == std::string::npos) {
lb_name->append(lb_protocol.data(), lb_protocol.size());
} else {
lb_name->append(lb_protocol.data(), pos);
if (pos < lb_protocol.size() - sizeof(separator)) {
*lb_params = lb_protocol.substr(pos + sizeof(separator));
}
}

return true;
}

} // namespace brpc
7 changes: 6 additions & 1 deletion src/brpc/load_balancer.h
Expand Up @@ -24,6 +24,8 @@
#include "brpc/shared_object.h" // SharedObject
#include "brpc/server_id.h" // ServerId
#include "brpc/extension.h" // Extension<T>
#include "butil/strings/string_piece.h"
#include "butil/strings/string_split.h"


namespace brpc {
Expand Down Expand Up @@ -100,7 +102,7 @@ class LoadBalancer : public NonConstDescribable, public Destroyable {

// Create/destroy an instance.
// Caller is responsible for Destroy() the instance after usage.
virtual LoadBalancer* New() const = 0;
virtual LoadBalancer* New(const butil::StringPiece& params) const = 0;

protected:
virtual ~LoadBalancer() { }
Expand Down Expand Up @@ -164,6 +166,9 @@ class SharedLoadBalancer : public SharedObject, public NonConstDescribable {
}

private:
static bool ParseParameters(const butil::StringPiece& lb_protocol,
std::string* lb_name,
butil::StringPiece* lb_params);
static void DescribeLB(std::ostream& os, void* arg);
void ExposeLB();

Expand Down
186 changes: 146 additions & 40 deletions src/brpc/policy/consistent_hashing_load_balancer.cpp
Expand Up @@ -18,8 +18,10 @@
#include <gflags/gflags.h>
#include "butil/containers/flat_map.h"
#include "butil/errno.h"
#include "butil/strings/string_number_conversions.h"
#include "brpc/socket.h"
#include "brpc/policy/consistent_hashing_load_balancer.h"
#include "brpc/policy/hasher.h"


namespace brpc {
Expand All @@ -29,16 +31,119 @@ namespace policy {
DEFINE_int32(chash_num_replicas, 100,
"default number of replicas per server in chash");

ConsistentHashingLoadBalancer::ConsistentHashingLoadBalancer(HashFunc hash)
: _hash(hash)
, _num_replicas(FLAGS_chash_num_replicas) {
// Defined in hasher.cpp.
const char* GetHashName(HashFunc hasher);

class ReplicaPolicy {
public:
virtual ~ReplicaPolicy() = default;

virtual bool Build(ServerId server,
size_t num_replicas,
std::vector<ConsistentHashingLoadBalancer::Node>* replicas) const = 0;
virtual const char* name() const = 0;
};

class DefaultReplicaPolicy : public ReplicaPolicy {
public:
DefaultReplicaPolicy(HashFunc hash) : _hash_func(hash) {}

virtual bool Build(ServerId server,
size_t num_replicas,
std::vector<ConsistentHashingLoadBalancer::Node>* replicas) const;

virtual const char* name() const { return GetHashName(_hash_func); }

private:
HashFunc _hash_func;
};

bool DefaultReplicaPolicy::Build(ServerId server,
size_t num_replicas,
std::vector<ConsistentHashingLoadBalancer::Node>* replicas) const {
SocketUniquePtr ptr;
if (Socket::AddressFailedAsWell(server.id, &ptr) == -1) {
return false;
}
replicas->clear();
for (size_t i = 0; i < num_replicas; ++i) {
char host[32];
int len = snprintf(host, sizeof(host), "%s-%lu",
endpoint2str(ptr->remote_side()).c_str(), i);
ConsistentHashingLoadBalancer::Node node;
node.hash = _hash_func(host, len);
node.server_sock = server;
node.server_addr = ptr->remote_side();
replicas->push_back(node);
}
return true;
}

class KetamaReplicaPolicy : public ReplicaPolicy {
public:
virtual bool Build(ServerId server,
size_t num_replicas,
std::vector<ConsistentHashingLoadBalancer::Node>* replicas) const;

virtual const char* name() const { return "ketama"; }
};

bool KetamaReplicaPolicy::Build(ServerId server,
size_t num_replicas,
std::vector<ConsistentHashingLoadBalancer::Node>* replicas) const {
SocketUniquePtr ptr;
if (Socket::AddressFailedAsWell(server.id, &ptr) == -1) {
return false;
}
replicas->clear();
const size_t points_per_hash = 4;
CHECK(num_replicas % points_per_hash == 0)
<< "Ketam hash replicas number(" << num_replicas << ") should be n*4";
for (size_t i = 0; i < num_replicas / points_per_hash; ++i) {
char host[32];
int len = snprintf(host, sizeof(host), "%s-%lu",
endpoint2str(ptr->remote_side()).c_str(), i);
unsigned char digest[16];
MD5HashSignature(host, len, digest);
for (size_t j = 0; j < points_per_hash; ++j) {
ConsistentHashingLoadBalancer::Node node;
node.server_sock = server;
node.server_addr = ptr->remote_side();
node.hash = ((uint32_t) (digest[3 + j * 4] & 0xFF) << 24)
| ((uint32_t) (digest[2 + j * 4] & 0xFF) << 16)
| ((uint32_t) (digest[1 + j * 4] & 0xFF) << 8)
| (digest[0 + j * 4] & 0xFF);
replicas->push_back(node);
}
}
return true;
}

namespace {

pthread_once_t s_replica_policy_once = PTHREAD_ONCE_INIT;
const std::array<const ReplicaPolicy*, CONS_HASH_LB_LAST>* g_replica_policy = nullptr;

void InitReplicaPolicy() {
g_replica_policy = new std::array<const ReplicaPolicy*, CONS_HASH_LB_LAST>({
new DefaultReplicaPolicy(MurmurHash32),
new DefaultReplicaPolicy(MD5Hash32),
new KetamaReplicaPolicy
});
}

inline const ReplicaPolicy* GetReplicaPolicy(ConsistentHashingLoadBalancerType type) {
pthread_once(&s_replica_policy_once, InitReplicaPolicy);
return g_replica_policy->at(type);
}

} // namespace

ConsistentHashingLoadBalancer::ConsistentHashingLoadBalancer(
HashFunc hash,
size_t num_replicas)
: _hash(hash)
, _num_replicas(num_replicas) {
ConsistentHashingLoadBalancerType type)
: _num_replicas(FLAGS_chash_num_replicas), _type(type) {
CHECK(GetReplicaPolicy(_type))
<< "Fail to find replica policy for consistency lb type: '" << _type << '\'';
}

size_t ConsistentHashingLoadBalancer::AddBatch(
Expand Down Expand Up @@ -112,20 +217,9 @@ size_t ConsistentHashingLoadBalancer::Remove(
bool ConsistentHashingLoadBalancer::AddServer(const ServerId& server) {
std::vector<Node> add_nodes;
add_nodes.reserve(_num_replicas);
SocketUniquePtr ptr;
if (Socket::AddressFailedAsWell(server.id, &ptr) == -1) {
if (!GetReplicaPolicy(_type)->Build(server, _num_replicas, &add_nodes)) {
return false;
}
for (size_t i = 0; i < _num_replicas; ++i) {
char host[32];
int len = snprintf(host, sizeof(host), "%s-%lu",
endpoint2str(ptr->remote_side()).c_str(), i);
Node node;
node.hash = _hash(host, len);
node.server_sock = server;
node.server_addr = ptr->remote_side();
add_nodes.push_back(node);
}
std::sort(add_nodes.begin(), add_nodes.end());
bool executed = false;
const size_t ret = _db_hash_ring.ModifyWithForeground(
Expand All @@ -138,23 +232,12 @@ size_t ConsistentHashingLoadBalancer::AddServersInBatch(
const std::vector<ServerId> &servers) {
std::vector<Node> add_nodes;
add_nodes.reserve(servers.size() * _num_replicas);
std::vector<Node> replicas;
replicas.reserve(_num_replicas);
for (size_t i = 0; i < servers.size(); ++i) {
SocketUniquePtr ptr;
if (Socket::AddressFailedAsWell(servers[i].id, &ptr) == -1) {
continue;
}
for (size_t rep = 0; rep < _num_replicas; ++rep) {
char host[32];
// To be compatible with libmemcached, we formulate the key of
// a virtual node as `|address|-|replica_index|', see
// http://fe.baidu.com/-1bszwnf at line 297.
int len = snprintf(host, sizeof(host), "%s-%lu",
endpoint2str(ptr->remote_side()).c_str(), rep);
Node node;
node.hash = _hash(host, len);
node.server_sock = servers[i];
node.server_addr = ptr->remote_side();
add_nodes.push_back(node);
replicas.clear();
if (GetReplicaPolicy(_type)->Build(servers[i], _num_replicas, &replicas)) {
add_nodes.insert(add_nodes.end(), replicas.begin(), replicas.end());
}
}
std::sort(add_nodes.begin(), add_nodes.end());
Expand Down Expand Up @@ -187,8 +270,14 @@ size_t ConsistentHashingLoadBalancer::RemoveServersInBatch(
return n;
}

LoadBalancer *ConsistentHashingLoadBalancer::New() const {
return new (std::nothrow) ConsistentHashingLoadBalancer(_hash);
LoadBalancer *ConsistentHashingLoadBalancer::New(const butil::StringPiece& params) const {
ConsistentHashingLoadBalancer* lb =
new (std::nothrow) ConsistentHashingLoadBalancer(_type);
if (lb != nullptr && !lb->SetParameters(params)) {
delete lb;
lb = nullptr;
}
return lb;
}

void ConsistentHashingLoadBalancer::Destroy() {
Expand Down Expand Up @@ -232,16 +321,14 @@ int ConsistentHashingLoadBalancer::SelectServer(
return EHOSTDOWN;
}

extern const char *GetHashName(uint32_t (*hasher)(const void* key, size_t len));

void ConsistentHashingLoadBalancer::Describe(
std::ostream &os, const DescribeOptions& options) {
if (!options.verbose) {
os << "c_hash";
return;
}
os << "ConsistentHashingLoadBalancer {\n"
<< " hash function: " << GetHashName(_hash) << '\n'
<< " hash function: " << GetReplicaPolicy(_type)->name() << '\n'
<< " replica per host: " << _num_replicas << '\n';
std::map<butil::EndPoint, double> load_map;
GetLoads(&load_map);
Expand Down Expand Up @@ -289,5 +376,24 @@ void ConsistentHashingLoadBalancer::GetLoads(
}
}

bool ConsistentHashingLoadBalancer::SetParameters(const butil::StringPiece& params) {
for (butil::StringSplitter sp(params.begin(), params.end(), ' '); sp != nullptr; ++sp) {
butil::StringPiece key_value(sp.field(), sp.length());
size_t p = key_value.find('=');
if (p == key_value.npos || p == key_value.size() - 1) {
// No value configed.
return false;
}
if (key_value.substr(0, p) == "replicas") {
if (!butil::StringToSizeT(key_value.substr(p + 1), &_num_replicas)) {
return false;
}
continue;
}
LOG(ERROR) << "Failed to set this unknown parameters " << key_value;
}
return true;
}

} // namespace policy
} // namespace brpc

0 comments on commit 2ef40e6

Please sign in to comment.