-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Fleet Executor]Add sink interceptor and test #41497
Conversation
你的PR提交成功,感谢你对开源项目的贡献! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
void SinkInterceptor::StopCarrierIfComplete() { | ||
bool flag = true; | ||
for (const auto& up : upstream_step_) { | ||
flag = flag & (up.second == max_run_times_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
感觉用 && 比较好一些,毕竟bool还是用字节存储的,理论上 >0 的都是true。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
哦对,有道理
std::string carrier_id = "0"; | ||
Carrier* carrier = | ||
GlobalMap<std::string, Carrier>::Create(carrier_id, carrier_id); | ||
carrier->Init(0, {{-1, 0}, {0, 0}, {-2, 0}}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
后续把-1和-2这两个魔数搞成宏定义或者数值常量?好看点
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
是的,下一个pr改
PR types
New features
PR changes
Others
Describe
在一个进程中,只会有一个SinkInterceptor,它负责根据当前运行步数,判断是否能停止carrier。SinkInterceptor接收上游发送的DataIsReady的消息,自增计步器,直到达到最大运行上限时停止carrier。