New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
rocketmq_proxy: implement rocketmq proxy #9503
Conversation
f55ff0f
to
e7b43b6
Compare
@aaron-ai thanks for your interest in contributing this awesome feature! I'm going to be honest in saying that we cannot accept an 8K line PR with no prior notice and no plan for reviewing. Do you have any maintainers in mind who would be willing to sponsor reviewing this PR and seeing that it get merged? Thank you. |
This pull request has been automatically marked as stale because it has not had activity in the last 7 days. It will be closed in 7 days if no further activity occurs. Please feel free to give a status update now, ping for review, or re-open when it's ready. Thank you for your contributions! |
@mattklein123 Thanks for your feedback first! This pull request does look too big at first sight, but more than half code of lines are test cases to meet 100% test coverages goal required here. The main source parts are self-contained and generally feature-implement-purpose. Once understanding core concepts of Apache RocketMQ, reviewing this PR will not be as challenging as initial thought. |
CODEOWNERS
Outdated
@@ -16,6 +16,8 @@ | |||
extensions/filters/common/original_src @snowp @klarose | |||
# dubbo_proxy extension | |||
/*/extensions/filters/network/dubbo_proxy @zyfjeff @lizan | |||
# rocketmq_proxy extension | |||
/*/extensions/filters/network/rocketmq_proxy @aaron-ai @lizhanhui |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aaron-ai There must be a maintainer in code owner https://github.com/envoyproxy/envoy/blob/master/EXTENSION_POLICY.md
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @zyfjeff, we are still looking for a maintainer who has the convenience of sponsoring.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aaron-ai @lizhanhui I don't have the bandwidth right now to do a full review for this and I doubt any of the @envoyproxy/maintainers do either, but they can speak for themselves. However, if @zyfjeff is willing to take point and do a thorough review I'm happy to do a skim afterwords and sponsor. @zyfjeff does that sound OK to you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mattklein123
I would be happy to review this, I used to write rocketmq c++ sdk, so I think I can do it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would be happy to review this, I used to write rocketmq c++ sdk, so I think I can do it
Awesome. Thank you so much! :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can help review, but unfortunately I am not a maintainer
message RocketmqProxy { | ||
string stat_prefix = 1 [(validate.rules).string = {min_bytes: 1}]; | ||
|
||
bool develop_mode = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add some comments to explain this field
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I'll fix it.
include/envoy/config/grpc_mux.h
Outdated
* The only difference between addToWatch() and addOrUpdateWatch() is that the 'resources' here | ||
* means the *extra* resources we interested in. | ||
*/ | ||
virtual Watch* addToWatch(const std::string& type_url, Watch* watch, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to split a separate PR for review?
I do not believe that this code is part of the rocketmq_proxy filter, and this part of the code does not pass review in #8984 .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to split a separate PR for review?
I do not believe that this code is part of the rocketmq_proxy filter, and this part of the code does not pass review in #8984 .
I would start a new PR about this part. Thank you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"message_queue.h", | ||
"topic_route.h", | ||
], | ||
external_deps = ["rapidjson"], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not depend on rapidjson. See: #4705
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also had the same ideas, 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
#include "extensions/filters/network/rocketmq_proxy/conn_manager.h" | ||
#include "extensions/filters/network/rocketmq_proxy/topic_route.h" | ||
|
||
#include "rapidjson/stringbuffer.h" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Envoy is now actively trying to get rid of its depends on rapidjson #4705 we should convert to JSON using protobuf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now replaced with Protobuf.
connection_manager_.stats().request_active_.inc(); | ||
auto code = static_cast<RequestCode>(request_->code()); | ||
switch (code) { | ||
case RequestCode::POP_MESSAGE: { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Redundant code that can be put into a common function for stats statistics.
b9ed6ae
to
d719945
Compare
|
||
class RocketmqConstants { | ||
public: | ||
static const std::string CLUSTER_TYPE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please replace it with the idiom used by the Envoy,
struct RocketmqValues {
const std::string CLUSTER_TYPE{xxxx};
};
using RocketmqConstants = ConstSingleton<RocketmqValues>;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It has been fixed.
|
||
TopicRouteData() = default; | ||
|
||
TopicRouteData(std::vector<QueueData>& queueData, std::vector<BrokerData>& brokerData) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please replace std::vector<QueueData>& queueData
to std::vector<QueueData>&& queueData
It is not easy to read
std::vector<QueueData> test
TopicRouteData tt(test);
// The following code is not aware that test has been moved and will continue to use test
But it's much easier to read if you go like this
std::vector<QueueData> test
TopicRouteData tt(std::move(test));
// I knew from looking at the code that test had been moved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, let's use the move way.
public: | ||
BrokerData(const std::string& cluster, const std::string& brokerName, | ||
std::unordered_map<int64_t, std::string>& brokerAddrs) | ||
: cluster_(cluster), broker_name_(brokerName), broker_addrs_(brokerAddrs) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
broker_addrs_(brokerAddrs)
Do we need a copy here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All right. Will use move to avoid copy
Upstream::ClusterInfoConstSharedPtr cluster_info_; | ||
UpstreamRequestPtr upstream_request_; | ||
|
||
absl::string_view target_host_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you ensure that the string referenced by target_host_ is always valid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
target_host_ referenced can only be either empty or a valid target host. In case it is empty, host selection is completely subject to the LB. If it is a valid target host address, the request will be routed to it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refer to this comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean whether the memory referenced by target_host_ is valid, not the content.
|
||
std::string target_host = app_load_balancer_ptr_->lb(request); | ||
if (!target_host.empty()) { | ||
createActiveMessage(request).sendRequestToUpstream(target_host); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sendRequestToUpstream(target_host) -> createFilterChain(target_host); -> connection_manager_.config().createRouter(target_host);
RouterPtr createRouter(absl::string_view target_host) override {
return std::make_unique<RouterImpl>(context_.clusterManager(), target_host);
}
class RouterImpl {
private:
absl::string_view target_host_;
}
Finally, target_host_ in RouterImpl refers to the target_host here, but the target_host here is released when it leaves the scope, resulting an undefined behavior in routerimpl.target_host_ refers to invalid memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I get your point. This is a bug. target_host_ should be of std::string, not string_view.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
string_view
has been fixed already.
@zyfjeff All issues above have been fixed. |
RouterImpl::UpstreamRequest::UpstreamRequest(RouterImpl& router, | ||
Tcp::ConnectionPool::Instance& connectionPool) | ||
: router_(router), connectionPool_(connectionPool) { | ||
(void)connectionPool_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove, Didn't see this variable used
return false; | ||
} | ||
|
||
uint32_t RouterImpl::hostSelectionRetryCount() const { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I think you should implement shouldSelectAnotherHost
, not hostSelectionRetryCount
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
active_message_->onReset(); | ||
} | ||
|
||
bool RouterImpl::shouldSelectAnotherHost(const Upstream::Host& host) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that subset should be used to select the endpoint
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I will use subset instead.
@zyfjeff All issues above have been fixed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aaron-ai good work, I have finished review, I have no issues.
Can you merge master once? I can sponsor meanwhile until someone from Alibaba becomes maintainer, but I don't have enough bandwidth to review this week. |
OK |
Please merge master to pick up #10672. We no longer accept changes to v2 (without explicit exception), so any API modifications should happen in v3. If this PR is adding a new proto, please follow the updated instructions in https://github.com/envoyproxy/envoy/blob/master/api/STYLE.md#adding-an-extension-configuration-to-the-api. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you merge master again to pick 1.14.0 release and then add a release note for 1.15.0?
The rocketmq proxy filter decodes the RPC protocol between rocketmq clients | ||
and nameserver/broker. | ||
|
||
* :ref:`v2 API reference <envoy_api_msg_config.filter.network.rocketmq_proxy.v2alpha1.RocketMQProxy>` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
v3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll update the docs.
generated_api_shadow/BUILD
Outdated
@@ -77,6 +77,7 @@ proto_library( | |||
"//envoy/config/filter/network/rate_limit/v2:pkg", | |||
"//envoy/config/filter/network/rbac/v2:pkg", | |||
"//envoy/config/filter/network/redis_proxy/v2:pkg", | |||
"//envoy/config/filter/network/rocketmq_proxy/v2alpha1:pkg", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
namespace NetworkFilters { | ||
namespace RocketmqProxy { | ||
|
||
const uint32_t RemotingCommand::SHIFT_RPC = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static constexpr
case RequestCode::PopMessage: { | ||
ASSERT(fieldValuePair.contains("extFields")); | ||
const auto& extFields = fieldValuePair.at("extFields"); | ||
std::unique_ptr<PopMessageRequestHeader> popMessageRequestHeader = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pop_message_request_header, please fix all local variable cases in this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK.
Hi, @lizan, Release note is added to current.rst and constexpr-issues are fixed. One more TODO is also inserted, indicating future work that is planned. |
More docs have been added. A small part of code are not covered by UT, and I will implement them tomorrow. |
65de232
to
0408438
Compare
@lizan Unit tests have been supplemented and all issues raised are properly resolved. |
Signed-off-by: aaron-ai <yangkun.ayk@alibaba-inc.com>
All tests in CI are OK. |
Implement rocketmq proxy
Description: implement rocketmq proxy
Risk Level: Low
Testing: Unit Tests
Docs Changes: N/A
Release Notes: N/A
[Optional Fixes #Issue] #9431