-
Notifications
You must be signed in to change notification settings - Fork 90
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
site2site port negotiation #114
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems as though your line lengths aren't respecting the linter length of 200, so there are a lot of changes that are superfluous. Please correct and I'll review again, but otherwise seems good.
|
||
std::unique_ptr < Site2SiteClientProtocol> protocol; | ||
protocol = std::unique_ptr < Site2SiteClientProtocol | ||
> (new Site2SiteClientProtocol(nullptr)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the need to create a new object each refresh?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refresh is done once, i would like to avoid hanging a protocol around.
@@ -38,7 +39,8 @@ namespace minifi { | |||
|
|||
bool Site2SiteClientProtocol::establish() { | |||
if (_peerState != IDLE) { | |||
logger_->log_error("Site2Site peer state is not idle while try to establish"); | |||
logger_->log_error( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the change in log statements? Our line length is 200.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, i will change the format, i want to reduce the line length to make it easy to read
logger_->log_error( | ||
"Site2Site HandShake Failed because destination port is either invalid or full"); | ||
ret = -1; | ||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the commented code needed here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will remove that.
return false; | ||
} | ||
|
||
for (int i = 0; i < number; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to have a limit on number so we don't have a virtual DoS?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is config by the NiFI cluster server to let client evenly distribute the node cross peers.
MiNiFI java is doing the similar thing.
@@ -344,7 +427,8 @@ int Site2SiteClientProtocol::readRequestType(RequestType &type) { | |||
return -1; | |||
} | |||
|
|||
int Site2SiteClientProtocol::readRespond(RespondCode &code, std::string &message) { | |||
int Site2SiteClientProtocol::readRespond(RespondCode &code, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the change in the function prototype?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i will revert that.
auto hostStr = node["host"].as<std::string>(); | ||
checkRequiredField(&node, "port", CONFIG_YAML_PROVENANCE_REPORT_KEY); | ||
auto portStr = node["port"].as<std::string>(); | ||
checkRequiredField(&node, "url", CONFIG_YAML_PROVENANCE_REPORT_KEY); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should maintain prior functionality. Though we're not GA, we should avoid jarring consumers who may be using a prior config. We should able to support both conditionally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i want to avoid confusion so that we have a consistent way to config the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we tagged after this were merged, would people who have tried MiNiFi in cpp 0.2.0 be able to use the same config files in cpp 0.3.0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree it would be good to support both. We might be able to support both in the same config loader by preferring the new way and falling back to the old way, or if not we could version the flow file YAML schema similar to the Java implementation of this YAML config loader: https://github.com/apache/nifi-minifi/tree/master/minifi-commons/minifi-commons-schema/src/main/java/org/apache/nifi/minifi/commons/schema
} | ||
return std::move(nextProtocol); | ||
} | ||
|
||
void RemoteProcessorGroupPort::returnProtocol(std::unique_ptr<Site2SiteClientProtocol> return_protocol) { | ||
if (available_protocols_.size_approx() >= max_concurrent_tasks_) { | ||
if (available_protocols_.size_approx() >= (max_concurrent_tasks_ * site2site_peer_status_list_.size())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why should this change?
With 10 concurrent tasks ( threads ) and 10 peers, you need 100 queued elements? Shouldn't you only need ten
( +/- a few ) since you only have ten threads to pull objects?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i need to round robin the peer even for a single thread to distribute the load cross all peers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the logic works for a single thread, but what happens in the case where you have ten threads, returning at different times? Typical algorithms use multiple queues for fairness.
std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str = std::unique_ptr<org::apache::nifi::minifi::io::DataStream>(stream_factory_->createSocket(host_, port_)); | ||
std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr<Site2SitePeer>(new Site2SitePeer(std::move(str), host_, port_)); | ||
nextProtocol->setPeer(std::move(peer_)); | ||
for (auto peer : site2site_peer_status_list_) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you not create one on demand and simply return it versus enqueueing it and dequeuing it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i want to prepopulate to reduce the time to establish the socket/negotiation. Also we need to round robin them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the time to establish sockets?
properties.insert(portUUID); | ||
setSupportedProperties(properties); | ||
// Set the supported relationships | ||
std::set<core::Relationship> relationships; | ||
relationships.insert(relation); | ||
setSupportedRelationships(relationships); | ||
curl_global_init(CURL_GLOBAL_DEFAULT); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the function is not thread safe. initialization isn't guaranteed to be thread safe either since other components could initialize concurrently. It's probably fine for now but seems like this should be done in a thread safe way.
https://curl.haxx.se/libcurl/c/curl_global_init.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK.
3acdb49
to
5e5693c
Compare
@phrocker please review. |
5e5693c
to
b4eb060
Compare
@phrocker please review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @benqiu2016. It will be nice having peer load balancing capabilities in the SiteToSiteProtocol when this is done. I left a couple of comments / questions.
Also, this may be out of scope for this PR, but are we considering adding discovering remote input port UUIDs by name? That would be a nice feature to have if the name is stable but the input port uuid changes over time for whatever reason. Again, that might be out of scope or could wait for a future PR.
// setURL | ||
void setURL(std::string val) { | ||
url_ = val; | ||
utils::parse_url(url_, host_, port_, protocol_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need a null check here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the val is empty, the url will be empty and other will empty also.
@@ -88,6 +90,40 @@ struct HTTPRequestResponse { | |||
|
|||
}; | |||
|
|||
static void parse_url(std::string &url, std::string &host, int &port, std::string &protocol) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This URL parsing method is a subset of valid URLs. Are we ok with that? For example, it will fail if passed a Literal IPv6 URL as defined by RFC 3986 or RFC 2732, e.g.:
http://[FEDC:BA98:7654:3210:FEDC:BA98:7654:3210]:80/nifi
If we are ok with this parser being designed to work with URLs of a certain format, we should document those assumptions of inputs.
If we would like this part of the code to be a bit more robust for cases we haven't thought of (both present and future), we could consider adding a dependency to a RFC-compliant URI parser, or, if we want to avoid adding a dependency, just expanding the logic to cover more valid URLs.
Lastly, should the url input arg be const and checked for null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now it was designed for support ipv4 now. If we need a RFC compliant URL parser, we can certainly use a open source compliant URL parser.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IPv6 was just one example. Another case would be username:password@example.com (Not that that is a typical NiFi deployment case, but for the sake of argument as we can't anticipate how these tools will be deployed in every scenario.) I am not saying we necessarily need full RFC URI/L compliance, but I do think whatever is / is not supported in terms of remote s2s urls should be documented somewhere that the users / admins can find it, or logged if parsing fails so that users can more easily discover at runtime why they cannot connect to their IPv6 URL :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for now, we can document the same in the README.md to specify the URL format if it is OK with you.
As for the access control, i looked at the doc, it looks like we need to use token, etc for a secure cluster. I will need to ask around.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I think documentation of the YAML format is a good thing to have, and probably also a method comment on the interface for HTTPUtils for future developers to know what format the method expects to be passed.
std::string host_; | ||
int port_; | ||
bool isSecure_; | ||
} Site2SitePeerStatus; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about flowFileCount, the number of flow files the peer holds? That is discoverable using the /site-to-site/peers
endpoint and could be useful if the client wants to use it for load balancing decisions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, there is a flowFileCount, i want to reduce the complexity of looking at flowFileCount and doing round robin. If we need to use flowFileCount, it means that we need refresh the peer status periodically and even that, we can only catch a snapshot of the flowFileCount based on the refreshing interval, it is not accurate.
if (site2site_port_ == -1) | ||
return; | ||
|
||
this->site2site_peer_status_list_.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the logic below this fail under certain conditions? If so, is it work waiting to clear the current list until we are sure we have an updated list?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refreshRemoteSite2SiteInfo can fail.
if it is failed. it will set the this->site2site_port_ = -1;
if that's the case, we do not need to clear the list.
if (this->host_.empty() || this->port_ == -1 || this->protocol_.empty()) | ||
return; | ||
|
||
std::string fullUrl = this->protocol_ + this->host_ + ":" + std::to_string(this->port_) + "/nifi-api/controller/"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If port is not available this should default to 80 for http and 443 for https.
Also, I'm new to the NiFi REST API... what does /nifi-api/controller return? the site2site impl's I see pull from /site-to-site/peers for this info. curious about the difference (will try it out later today)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the /nifi-api/controller return
it return the site2site port and we can use that to do the site2site negotiation with master to find the peer info
{
"revision": {
"clientId": "d40fb824-070b-4547-9b1c-f50f5ba0a677"
},
"controller": {
"id": "fe4a3a42-53b6-4af1-a80d-6fdfe60de97f",
"name": "NiFi Flow",
"comments": "",
"runningCount": 3,
"stoppedCount": 2,
"invalidCount": 0,
"disabledCount": 0,
"inputPortCount": 1,
"outputPortCount": 1,
"remoteSiteListeningPort": 10001,
"siteToSiteSecure": false,
"instanceId": "9d841c51-ab00-422e-811c-53c6dc2f8e59",
"inputPorts": [
{
"id": "471deef6-2a6e-4a7d-912a-81cc17e3a204",
"name": " From Node A",
"comments": "",
"state": "RUNNING"
}
],
"outputPorts": [
{
"id": "75f88005-0a87-4fef-8320-6219cdbcf18b",
"name": "To A",
"comments": "",
"state": "STOPPED"
}
]
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for posting that @benqiu2016. I learned something new about the NiFI REST API :).
Are you planning to add support for default protocol ports?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked into it further and found out that /controller
is a NiFi 0.x endpoint. The /site-to-site
endpoint was introduced in NiFi 1.x, and NiFi 1.x maps /controller
to /site-to-site
so that NiFi 0.x instances can communicate with NiFI 1.x instances.
I wasn't sure what minifi-cpp should do. /controller
currently works for both major versions of NiFi, but there is no guarantee it will forever, and /controller
is not documented in the 1.x REST API, making the intended behavior of this client code a little less obvious/discoverable for future developers (it was not obvious to me just from the API documentation). The /site-to-site
endpoint is documented, but would not be available on 0.x NiFi instances.
I decided to look at the minifi-java implementation. It uses the site-to-site client module from NiFi, which has this behavior: It tries /site-to-site
first, and if it gets a 404 (resource not found) error, then it falls back to /controller
. From SiteToSiteRestAPiClient:
private ControllerDTO getController() throws IOException {
try {
HttpGet get = this.createGetControllerRequest();
return ((ControllerEntity)this.execute(get, ControllerEntity.class)).getController();
} catch (SiteToSiteRestApiClient.HttpGetFailedException var3) {
if(404 == var3.getResponseCode()) {
logger.debug("getController received NOT_FOUND, trying to access the old NiFi version resource url...");
HttpGet get = this.createGet("/controller");
return ((ControllerEntity)this.execute(get, ControllerEntity.class)).getController();
} else {
throw var3;
}
}
}
private HttpGet createGetControllerRequest() {
HttpGet get = this.createGet("/site-to-site");
get.setHeader("x-nifi-site-to-site-protocol-version", String.valueOf(this.transportProtocolVersionNegotiator.getVersion()));
return get;
}
That seems like a reasonable approach for minifi-cpp as well. I'm interested in hearing thoughts from others. @apiri - should we try to keep compatibility with 0.x NiFi's in this implementation? If not, we could just go with /site-to-site
in this implementation.
@@ -20,6 +20,9 @@ | |||
|
|||
#include "../include/RemoteProcessorGroupPort.h" | |||
|
|||
#include <curl/curl.h> | |||
#include <curl/curlbuild.h> | |||
#include <curl/easy.h> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Encapsulating the curl / http communication in this implementation has some pros and cons. On the upside, it abstracts the details of the network communication from the users of the interface, making calling it simple. A downside is that it's harder to unit test the logic in this class as the networking component is not mockable / stub-able without setting up an HTTP server. Also if we ever want to change HTTP client libraries, it is more work as it is tightly coupled here. I don't know if it's worth changing now as it would be a significant refactoring, and it's certainly not just in this case but something that goes for a lot of our code base. It would be nice if there was more decoupled modularity in these components.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for now, i think we are tied couple curl with our code like others InvokeHttp, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InvokeHTTP's sole responsibility is HTTP communications. I don't think the same translates to RemoteProcessGroupPort.
@@ -84,19 +107,41 @@ void RemoteProcessorGroupPort::initialize() { | |||
std::set<core::Relationship> relationships; | |||
relationships.insert(relation); | |||
setSupportedRelationships(relationships); | |||
curl_global_init(CURL_GLOBAL_DEFAULT); | |||
site2site_peer_mutex_.lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why did you choose to use lock over a lock guard or some other RAII implemented technique.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree this seems to be a good use case for lock_guard as the lock is obtained at the start of initialize() and released just before returning.
Looks good, but am curious about the use of lock vs a stack based lock guard. What was the reason for that? |
b4eb060
to
3902abc
Compare
…into port_negotiation
@phrocker if we use a stack based lock guard, in order to control the period of lock, i need to add scope for the lock guard inside {}. it make code less readable. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @benqiu2016 for the additional work.
I think the following improvements could be made:
-
Access token generation for username:password authentication is inlined in the RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() function. This could be extracted and made reusable as it is the method of generating a token for any rest API usage. Also, the tokens are usually long-lived, so extracting the logic to generate a token for a given username:password combination into a library function creates a better foundation that a caller of that method could choose to cache that token if they wish to.
-
I would like the http code use default protocol ports if not specified in the URL. 80 for http and 443 for https. Currently it looks like there is conditional logic that will bail if the port is not provided.
-
I added an inline comment about the use of
/nifi-api/site-to-site
(NiFi 1.x) vs./nifi-api/controller
(NiFi 0.x). I'm not sure what the recommended action is yet, but I think we should resolve that discussion as part of this PR.
@benqiu2016 thanks for making that round of changes! They look good. I'll be happy to test the branch when a get a chance to setup a NiFi environment that I can use to test some of the more advanced configurations that are supported (clustered peers, secure client). I should have a chance to do that this week, hopefully in the next couple days. As a general comment, it looks like there are quite a few cases that are not covered by corresponding automated tests so a lot of this will rely manual verification. You may want to open a JIRA ticket for adding some additional unit tests and integration tests as a way to verify this feature over time as changes are made to the codebase. Thanks! |
Thanks for the review. Can we merge the PR? |
@benqiu2016 Can you rebase? 4042aa0 appears to be a merge commit since there are two parents for this commit. Can you rebase and we'll get it merged, thanks. |
Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
For all changes:
Is there a JIRA ticket associated with this PR? Is it referenced
in the commit message?
Does your PR title start with MINIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
Has your PR been rebased against the latest commit within the target branch (typically master)?
Is your initial contribution a single, squashed commit?
For code changes:
For documentation related changes:
Note:
Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.