Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINIFICPP-2291 Fix site-to-site transfer of large files #1720

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions docker/test/integration/features/s2s.feature
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,21 @@ Feature: Sending data from MiNiFi-C++ to NiFi using S2S protocol

When both instances start up
Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds
And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds

Scenario: A MiNiFi instance produces and transfers a large data file to a NiFi instance via s2s
Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
And a file with the content "this is a very long file we want to send by site-to-site" is present in "/tmp/input"
And a RemoteProcessGroup node opened on "http://nifi-${feature_id}:8080/nifi"
And the "success" relationship of the GetFile processor is connected to the input port on the RemoteProcessGroup

And a NiFi flow receiving data from a RemoteProcessGroup "from-minifi" on port 8080
And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow
And the "success" relationship of the from-minifi is connected to the PutFile

When both instances start up
Then a flowfile with the content "this is a very long file we want to send by site-to-site" is placed in the monitored directory in less than 90 seconds
And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds

Scenario: Zero length files are transfered between via s2s if the "drop empty" connection property is false
Given a MiNiFi CPP server with yaml config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ def serialize_node(self, connectable, nifi_version, root, visited):
"labelIndex": 1,
"zIndex": 0,
"selectedRelationships": [conn_name] if not isinstance(connectable, InputPort) else [""],
"backPressureObjectThreshold": 10000,
"backPressureDataSizeThreshold": "1 GB",
"backPressureObjectThreshold": 10,
"backPressureDataSizeThreshold": "50 B",
"flowFileExpiration": "0 sec",
"prioritizers": [],
"bends": [],
Expand Down
2 changes: 1 addition & 1 deletion extensions/standard-processors/processors/GetFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ void GetFile::onSchedule(core::ProcessContext& context, core::ProcessSessionFact

if (auto directory_str = context.getProperty(Directory)) {
if (!utils::file::is_directory(*directory_str)) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Input Directory \"" + value + "\" is not a directory");
throw Exception(PROCESS_SCHEDULE_EXCEPTION, utils::string::join_pack("Input Directory \"", *directory_str, "\" is not a directory"));
}
request_.inputDirectory = *directory_str;
} else {
Expand Down
2 changes: 1 addition & 1 deletion libminifi/include/sitetosite/SiteToSiteClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ class SiteToSiteClient : public core::Connectable {
// Cancel the transaction
virtual void cancel(const utils::Identifier &transactionID);
// Complete the transaction
virtual bool complete(const utils::Identifier &transactionID);
virtual bool complete(core::ProcessContext& context, const utils::Identifier &transactionID);
// Error the transaction
virtual void error(const utils::Identifier &transactionID);

Expand Down
2 changes: 1 addition & 1 deletion libminifi/src/core/FlowConfiguration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ bool FlowConfiguration::persist(const std::string& serialized_flow) {
}

const bool status = filesystem_->write(*config_path_, serialized_flow);
logger_->log_info("Result of updating the config file {}: {}", config_path_, status ? "success" : "failure");
logger_->log_info("Result of updating the config file {}: {}", *config_path_, status ? "success" : "failure");
checksum_calculator_.invalidateChecksum();
return status;
}
Expand Down
2 changes: 1 addition & 1 deletion libminifi/src/sitetosite/RawSocketProtocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ bool RawSiteToSiteClient::transmitPayload(core::ProcessContext& context, core::P
if (!confirm(transactionID)) {
throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed in transaction " + transactionID.to_string());
}
if (!complete(transactionID)) {
if (!complete(context, transactionID)) {
throw Exception(SITE2SITE_EXCEPTION, "Complete Failed in transaction " + transactionID.to_string());
}
logger_->log_info("Site2Site transaction {} successfully send flow record {} content bytes {}", transactionID.to_string(), transaction->current_transfers_, transaction->_bytes);
Expand Down
15 changes: 10 additions & 5 deletions libminifi/src/sitetosite/SiteToSiteClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ bool SiteToSiteClient::transferFlowFiles(core::ProcessContext& context, core::Pr
if (!confirm(transactionID)) {
throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed for " + transactionID.to_string());
}
if (!complete(transactionID)) {
if (!complete(context, transactionID)) {
throw Exception(SITE2SITE_EXCEPTION, "Complete Failed for " + transactionID.to_string());
}
logger_->log_debug("Site2Site transaction {} successfully sent flow record {}, content bytes {}", transactionID.to_string(), transaction->total_transfers_, transaction->_bytes);
Expand Down Expand Up @@ -336,7 +336,7 @@ void SiteToSiteClient::error(const utils::Identifier& transactionID) {
}

// Complete the transaction
bool SiteToSiteClient::complete(const utils::Identifier& transactionID) {
bool SiteToSiteClient::complete(core::ProcessContext& context, const utils::Identifier& transactionID) {
int ret = 0;
std::shared_ptr<Transaction> transaction = nullptr;

Expand Down Expand Up @@ -382,12 +382,17 @@ bool SiteToSiteClient::complete(const utils::Identifier& transactionID) {
if (ret <= 0)
return false;

if (code == TRANSACTION_FINISHED) {
if (code == TRANSACTION_FINISHED || code == TRANSACTION_FINISHED_BUT_DESTINATION_FULL) {
logger_->log_info("Site2Site transaction {} peer finished transaction", transactionID.to_string());
transaction->_state = TRANSACTION_COMPLETED;

if (code == TRANSACTION_FINISHED_BUT_DESTINATION_FULL) {
logger_->log_info("Site2Site transaction {} reported destination full, yielding", transactionID.to_string());
context.yield();
}
return true;
} else {
logger_->log_warn("Site2Site transaction {} peer unknown respond code {}", transactionID.to_string(), magic_enum::enum_underlying(code));
logger_->log_warn("Site2Site transaction {} peer unexpected respond code {}: {}", transactionID.to_string(), magic_enum::enum_underlying(code), magic_enum::enum_name(code));
return false;
}
}
Expand Down Expand Up @@ -718,7 +723,7 @@ bool SiteToSiteClient::receiveFlowFiles(core::ProcessContext& context, core::Pro
if (transfers > 0 && !confirm(transactionID)) {
throw Exception(SITE2SITE_EXCEPTION, "Confirm Transaction Failed");
}
if (!complete(transactionID)) {
if (!complete(context, transactionID)) {
std::stringstream transaction_str;
transaction_str << "Complete Transaction " << transactionID.to_string() << " Failed";
throw Exception(SITE2SITE_EXCEPTION, transaction_str.str());
Expand Down
Loading