Skip to content

Circuit breaker#413

Merged
jamesge merged 27 commits intoapache:masterfrom
TousakaRin:circuit_breaker
Sep 19, 2018
Merged

Circuit breaker#413
jamesge merged 27 commits intoapache:masterfrom
TousakaRin:circuit_breaker

Conversation

@TousakaRin
Copy link
Contributor

增加了CircuitBreaker:

  • 每个Socket中都有一个CircuitBreaker对象
  • 在ChannelOptions中新增enable_circuit_breaker开关
  • Socket发现CircuitBreaker::OnCallEnd返回false之后调用SetFailed,之后由healthy check thread调用 Reset 来复位。

还存在的问题:

  • 只有main socket才需要circuitbreaker,每个socket都有一个circuitbreaker是否有些浪费。
  • PR中的默认参数还需要进一步调整。

@jamesge jamesge mentioned this pull request Jul 30, 2018
// Maximum: INT_MAX
int max_retry;

// When the error rate of an endpoint is too high, deactivate the node.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deactivate -> isolate, an endpoint -> a server node

int max_retry;

// When the error rate of an endpoint is too high, deactivate the node.
// Note that this deactive is GLOBAL, and the endpoint will become
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deactive -> isolation, endpoint -> node.
The node will become unavailable for all channels running in this process during the isolation.

__attribute__ ((__format__ (__printf__, 3, 4)));
static int SetFailed(SocketId id);

void FeedbackCircuitBreaker(int error_code, int64_t latency_us) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个接口没必要封装,在外部实现看得很清楚。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

假如把CircuitBreaker放在SharedPart里的话,由于SharedPart的定义在cpp文件里,在Controller::Call::OnComplete里就拿不到CircuitBreaker了,会提示编译错误说SharedPart是incompleted class。
这样的话反而只能封装接口了

butil::Mutex _stream_mutex;
std::set<StreamId> *_stream_set;

CircuitBreaker _circuit_breaker;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

放的位置不对,应该在SharedPart里面。这是为了让连接池模式下(Controller::Call::peer_id代表的)主链接和(Controller::Call::sending_sock代表的)实际连接方便地共享状态。注意:其他链接方式当然也会用到SharedPart,只是主链接就是sending_sock

return true;
}

butil::DoublyBufferedData<ErrRecorderList> _recorders;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个类有点over-design了,DBD和ErrRecorderList都是不必要的。用户就是实现一个接口OnCallEnd,这个对象会new出来设置在Socket.SharedPart中,健康检查前(Socket.WaitAndReset)中被delete掉。长短两个窗口就是inline地写在代码中(当然一些函数可以共用),而不需要用vector管理。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

”这个对象会new出来设置在Socket.SharedPart中” 是指CircuitBreaker对象需要new出来放在SharedPart中吗,我觉得一直放在SharedPart里,然后在WaiAndReset里调一下CircuitBreaker::Reset就可以了吧

#ifndef BRPC_CIRCUIT_BREAKER_H
#define BRPC_CIRCUIT_BREAKER_H

#include "butil/containers/doubly_buffered_data.h"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个头文件是多余的吧

// Some of them are copied from `Channel' which might be destroyed
// after CallMethod.
int _max_retry;
bool _enable_circuit_breaker;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建议加到Controller::FLAGS_xxx里去。

void Socket::FeedbackCircuitBreaker(int error_code, int64_t latency_us) {
if (!GetOrNewSharedPart()->circuit_breaker.OnCallEnd(error_code, latency_us)) {
LOG(ERROR)
<< "Socket[" << *this << "] deactivted by circuit breaker";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个换行很奇怪,行宽不超过100都ok。

}
if (enable_circuit_breaker) {
SocketUniquePtr sock;
if (Socket::Address(peer_id, &sock) == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

上面就有sending_sock,不用address了。现在放sharedpart里,sending_sock是和peer_id对应的(mainsock)共享的,所以就能访问到。注意sending_sock会被置空,这块代码位置可以调整一下。


int64_t CircuitBreaker::EmaErrorRecorder::UpdateLatency(int64_t latency) {
while (true) {
int64_t ema_latency = _ema_latency.load(butil::memory_order_relaxed);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

load放循环外。失败的case会更新ema_latency的。另外这里写成do while更好。

//Ordinary response
while (true) {
int64_t ema_error_cost =
_ema_error_cost.load(butil::memory_order_relaxed);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

load放循环外,失败cas会更新参数。

int sample_count = _sample_count.fetch_add(1, butil::memory_order_relaxed);
bool init_completed = _init_completed.load(butil::memory_order_acquire);
if (!init_completed && sample_count >= _window_size) {
_init_completed.store(true, butil::memory_order_release);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

没看出这个init_completed意义在哪,每次判下_sample_count和_window_size关系不就行了?

DEFINE_int32(circuit_breaker_min_error_cost_us, 100,
"The minimum error_cost, when the ema of error cost is less than this "
"value, it will be set to zero.");
DEFINE_int32(circuit_breaker_max_failed_latency_mutliple, 2,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mutliple -> multiple。这主要是针对超时的把,2倍够么?

@TousakaRin TousakaRin force-pushed the circuit_breaker branch 6 times, most recently from 655fbb0 to 6e5a863 Compare August 31, 2018 06:44
@TousakaRin TousakaRin force-pushed the circuit_breaker branch 2 times, most recently from 373297e to 2ae2559 Compare September 9, 2018 09:54

void Socket::FeedbackCircuitBreaker(int error_code, int64_t latency_us) {
if (!GetOrNewSharedPart()->circuit_breaker.OnCallEnd(error_code, latency_us)) {
LOG(ERROR) << "Socket[" << *this << "] deactivted by circuit breaker";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deactivated 改为 isolated 以保持用词统一

SocketId peer_id; // main server id
int64_t begin_time_us; // sent real time.
SocketId peer_id; // main server id
int64_t begin_time_us; // sent real time.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

//最好和上面对齐

"the average latency of the success requests.");
DEFINE_int32(circuit_breaker_min_isolation_duration_ms, 100,
"Minimum isolation duration in milliseconds");
DEFINE_int32(circuit_breaker_max_isolation_duration_ms, 300000,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

太长了,可能会产生badcase,先取30000吧

if (now_time_ms - _last_reset_time_ms < max_isolation_duration_ms) {
isolation_duration_ms *= 2;
isolation_duration_ms =
std::min(isolation_duration_ms, max_isolation_duration_ms);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

两行改成min第一个参数*2就好了。

DEFINE_int32(circuit_breaker_min_error_cost_us, 500,
"The minimum error_cost, when the ema of error cost is less than this "
"value, it will be set to zero.");
DEFINE_int32(circuit_breaker_max_failed_latency_mutilple, 2,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mutilple -> mutiple,全部搜一下吧

#include "brpc/options.pb.h" // ConnectionType
#include "brpc/socket_id.h" // SocketId
#include "brpc/socket_message.h" // SocketMessagePtr
#include "brpc/circuit_breaker.h" // CircuitBreaker
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

此处不需要include?

void Socket::FeedbackCircuitBreaker(int error_code, int64_t latency_us) {
if (!GetOrNewSharedPart()->circuit_breaker.OnCallEnd(error_code, latency_us)) {
LOG(ERROR) << "Socket[" << *this << "] deactivted by circuit breaker";
SetFailed();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SetFailed(peer_id)

@jamesge jamesge merged commit ce50327 into apache:master Sep 19, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants