Skip to content

Commit

Permalink
MINIFICPP-756: Remove usage of setrelationship, deprecate it, and sup…
Browse files Browse the repository at this point in the history
…port multiple relationships in connection class

This closes civetweb#501.

Signed-off-by: Aldrin Piri <aldrin@apache.org>
  • Loading branch information
phrocker authored and apiri committed Mar 7, 2019
1 parent 0360a74 commit 2194662
Show file tree
Hide file tree
Showing 12 changed files with 97 additions and 81 deletions.
16 changes: 8 additions & 8 deletions extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,17 @@ TEST_CASE("HTTPTestsWithNoResourceClaimPOST", "[httptest1]") {
REQUIRE(true == invokehttp->getUUID(invokehttp_uuid));

std::shared_ptr<minifi::Connection> gcConnection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection");
gcConnection->setRelationship(core::Relationship("success", "description"));
gcConnection->addRelationship(core::Relationship("success", "description"));

std::shared_ptr<minifi::Connection> laConnection = std::make_shared<minifi::Connection>(repo, content_repo, "logattribute");
laConnection->setRelationship(core::Relationship("success", "description"));
laConnection->addRelationship(core::Relationship("success", "description"));

std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection");
connection->setRelationship(core::Relationship("success", "description"));
connection->addRelationship(core::Relationship("success", "description"));

std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, content_repo, "listenhttp");

connection2->setRelationship(core::Relationship("No Retry", "description"));
connection2->addRelationship(core::Relationship("No Retry", "description"));

// link the connections so that we can test results at the end for this
connection->setSource(listenhttp);
Expand Down Expand Up @@ -184,17 +184,17 @@ TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") {
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();

std::shared_ptr<minifi::Connection> gcConnection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection");
gcConnection->setRelationship(core::Relationship("success", "description"));
gcConnection->addRelationship(core::Relationship("success", "description"));

std::shared_ptr<minifi::Connection> laConnection = std::make_shared<minifi::Connection>(repo, content_repo, "logattribute");
laConnection->setRelationship(core::Relationship("success", "description"));
laConnection->addRelationship(core::Relationship("success", "description"));

std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection");
connection->setRelationship(core::Relationship("success", "description"));
connection->addRelationship(core::Relationship("success", "description"));

std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, content_repo, "listenhttp");

connection2->setRelationship(core::Relationship("No Retry", "description"));
connection2->addRelationship(core::Relationship("No Retry", "description"));

// link the connections so that we can test results at the end for this
connection->setSource(listenhttp);
Expand Down
21 changes: 15 additions & 6 deletions libminifi/include/Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class Connection : public core::Connectable, public std::enable_shared_from_this
explicit Connection(const std::shared_ptr<core::Repository> &flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::string name);
explicit Connection(const std::shared_ptr<core::Repository> &flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::string name, utils::Identifier & uuid);
explicit Connection(const std::shared_ptr<core::Repository> &flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::string name, utils::Identifier & uuid,
utils::Identifier & srcUUID);
utils::Identifier & srcUUID);
explicit Connection(const std::shared_ptr<core::Repository> &flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::string name, utils::Identifier & uuid,
utils::Identifier & srcUUID, utils::Identifier & destUUID);
// Destructor
Expand Down Expand Up @@ -90,13 +90,22 @@ class Connection : public core::Connectable, public std::enable_shared_from_this
std::shared_ptr<core::Connectable> getDestination() {
return dest_connectable_;
}
// Set Connection relationship

/**
* Deprecated function
* Please use addRelationship.
*/
void setRelationship(core::Relationship relationship) {
relationship_ = relationship;
relationships_.insert(relationship);
}

// Set Connection relationship
void addRelationship(core::Relationship relationship) {
relationships_.insert(relationship);
}
// ! Get Connection relationship
core::Relationship getRelationship() {
return relationship_;
const std::set<core::Relationship> &getRelationships() const {
return relationships_;
}
// Set Max Queue Size
void setMaxQueueSize(uint64_t size) {
Expand Down Expand Up @@ -166,7 +175,7 @@ class Connection : public core::Connectable, public std::enable_shared_from_this
// Destination Processor UUID
utils::Identifier dest_uuid_;
// Relationship for this connection
core::Relationship relationship_;
std::set<core::Relationship> relationships_;
// Source Processor (ProcessNode/Port)
std::shared_ptr<core::Connectable> source_connectable_;
// Destination Processor (ProcessNode/Port)
Expand Down
61 changes: 32 additions & 29 deletions libminifi/src/core/Processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,31 +126,33 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
}
std::string source_uuid = srcUUID.to_string();
if (my_uuid == source_uuid) {
std::string relationship = connection->getRelationship().getName();
// Connection is source from the current processor
auto &&it = out_going_connections_.find(relationship);
if (it != out_going_connections_.end()) {
// We already has connection for this relationship
std::set<std::shared_ptr<Connectable>> existedConnection = it->second;
if (existedConnection.find(connection) == existedConnection.end()) {
// We do not have the same connection for this relationship yet
existedConnection.insert(connection);
const auto &rels = connection->getRelationships();
for (auto i = rels.begin(); i != rels.end(); i++) {
const auto relationship = (*i).getName();
// Connection is source from the current processor
auto &&it = out_going_connections_.find(relationship);
if (it != out_going_connections_.end()) {
// We already has connection for this relationship
std::set<std::shared_ptr<Connectable>> existedConnection = it->second;
if (existedConnection.find(connection) == existedConnection.end()) {
// We do not have the same connection for this relationship yet
existedConnection.insert(connection);
connection->setSource(shared_from_this());
out_going_connections_[relationship] = existedConnection;
logger_->log_debug("Add connection %s into Processor %s outgoing connection for relationship %s", connection->getName(), name_, relationship);
ret = true;
}
} else {
// We do not have any outgoing connection for this relationship yet
std::set<std::shared_ptr<Connectable>> newConnection;
newConnection.insert(connection);
connection->setSource(shared_from_this());
out_going_connections_[relationship] = existedConnection;
out_going_connections_[relationship] = newConnection;
logger_->log_debug("Add connection %s into Processor %s outgoing connection for relationship %s", connection->getName(), name_, relationship);
ret = true;
}
} else {
// We do not have any outgoing connection for this relationship yet
std::set<std::shared_ptr<Connectable>> newConnection;
newConnection.insert(connection);
connection->setSource(shared_from_this());
out_going_connections_[relationship] = newConnection;
logger_->log_debug("Add connection %s into Processor %s outgoing connection for relationship %s", connection->getName(), name_, relationship);
ret = true;
}
}

return ret;
}

Expand Down Expand Up @@ -181,16 +183,17 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) {
}

if (uuid_ == srcUUID) {
std::string relationship = connection->getRelationship().getName();
// Connection is source from the current processor
auto &&it = out_going_connections_.find(relationship);
if (it == out_going_connections_.end()) {
return;
} else {
if (out_going_connections_[relationship].find(connection) != out_going_connections_[relationship].end()) {
out_going_connections_[relationship].erase(connection);
connection->setSource(NULL);
logger_->log_debug("Remove connection %s into Processor %s outgoing connection for relationship %s", connection->getName(), name_, relationship);
const auto &rels = connection->getRelationships();
for (auto i = rels.begin(); i != rels.end(); i++) {
const auto relationship = (*i).getName();
// Connection is source from the current processor
auto &&it = out_going_connections_.find(relationship);
if (it != out_going_connections_.end()) {
if (out_going_connections_[relationship].find(connection) != out_going_connections_[relationship].end()) {
out_going_connections_[relationship].erase(connection);
connection->setSource(NULL);
logger_->log_debug("Remove connection %s into Processor %s outgoing connection for relationship %s", connection->getName(), name_, relationship);
}
}
}
}
Expand Down
24 changes: 14 additions & 10 deletions libminifi/src/core/yaml/YamlConfiguration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -534,20 +534,24 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::P
core::Relationship relationship(rawRelationship, "");
logger_->log_debug("parseConnection: relationship => [%s]", rawRelationship);
if (connection) {
connection->setRelationship(relationship);
connection->addRelationship(relationship);
}
} else if (connectionNode.as<YAML::Node>()["source relationship names"]) {
auto relList = connectionNode["source relationship names"];

if (relList.size() != 1) {
throw std::invalid_argument("Only one element is supported for 'source relationship names'");
}

auto rawRelationship = relList[0].as<std::string>();
core::Relationship relationship(rawRelationship, "");
logger_->log_debug("parseConnection: relationship => [%s]", rawRelationship);
if (connection) {
connection->setRelationship(relationship);
if (relList.IsSequence()) {
for (const auto &rel : relList) {
auto rawRelationship = rel.as<std::string>();
core::Relationship relationship(rawRelationship, "");
logger_->log_debug("parseConnection: relationship => [%s]", rawRelationship);
connection->addRelationship(relationship);
}
} else {
auto rawRelationship = relList.as<std::string>();
core::Relationship relationship(rawRelationship, "");
logger_->log_debug("parseConnection: relationship => [%s]", rawRelationship);
connection->addRelationship(relationship);
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions libminifi/test/TestBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::shared_ptr<co
connection_name << last->getUUIDStr() << "-to-" << processor->getUUIDStr();
logger_->log_info("Creating %s connection for proc %d", connection_name.str(), processor_queue_.size() + 1);
std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(flow_repo_, content_repo_, connection_name.str());
connection->setRelationship(relationship);
connection->addRelationship(relationship);

// link the connections so that we can test results at the end for this
connection->setSource(last);
Expand Down Expand Up @@ -192,7 +192,7 @@ std::shared_ptr<minifi::Connection> TestPlan::buildFinalConnection(std::shared_p
std::shared_ptr<core::Processor> last = processor;
connection_name << last->getUUIDStr() << "-to-" << processor->getUUIDStr();
std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(flow_repo_, content_repo_, connection_name.str());
connection->setRelationship(termination_);
connection->addRelationship(termination_);

// link the connections so that we can test results at the end for this
connection->setSource(last);
Expand Down
16 changes: 8 additions & 8 deletions libminifi/test/archive-tests/CompressContentTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ TEST_CASE("CompressFileGZip", "[compressfiletest1]") {
content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
// connection from compress processor to log attribute
std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
connection->setRelationship(core::Relationship("success", "compress successful output"));
connection->addRelationship(core::Relationship("success", "compress successful output"));
connection->setSource(processor);
connection->setDestination(logAttributeProcessor);
connection->setSourceUUID(processoruuid);
Expand Down Expand Up @@ -226,7 +226,7 @@ TEST_CASE("DecompressFileGZip", "[compressfiletest2]") {
content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
// connection from compress processor to log attribute
std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
connection->setRelationship(core::Relationship("success", "compress successful output"));
connection->addRelationship(core::Relationship("success", "compress successful output"));
connection->setSource(processor);
connection->setDestination(logAttributeProcessor);
connection->setSourceUUID(processoruuid);
Expand Down Expand Up @@ -329,7 +329,7 @@ TEST_CASE("CompressFileBZip", "[compressfiletest3]") {
content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
// connection from compress processor to log attribute
std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
connection->setRelationship(core::Relationship("success", "compress successful output"));
connection->addRelationship(core::Relationship("success", "compress successful output"));
connection->setSource(processor);
connection->setDestination(logAttributeProcessor);
connection->setSourceUUID(processoruuid);
Expand Down Expand Up @@ -430,7 +430,7 @@ TEST_CASE("DecompressFileBZip", "[compressfiletest4]") {
content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
// connection from compress processor to log attribute
std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
connection->setRelationship(core::Relationship("success", "compress successful output"));
connection->addRelationship(core::Relationship("success", "compress successful output"));
connection->setSource(processor);
connection->setDestination(logAttributeProcessor);
connection->setSourceUUID(processoruuid);
Expand Down Expand Up @@ -533,7 +533,7 @@ TEST_CASE("CompressFileLZMA", "[compressfiletest5]") {
content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
// connection from compress processor to log attribute
std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
connection->setRelationship(core::Relationship("success", "compress successful output"));
connection->addRelationship(core::Relationship("success", "compress successful output"));
connection->setSource(processor);
connection->setDestination(logAttributeProcessor);
connection->setSourceUUID(processoruuid);
Expand Down Expand Up @@ -640,7 +640,7 @@ TEST_CASE("DecompressFileLZMA", "[compressfiletest6]") {
content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
// connection from compress processor to log attribute
std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
connection->setRelationship(core::Relationship("success", "compress successful output"));
connection->addRelationship(core::Relationship("success", "compress successful output"));
connection->setSource(processor);
connection->setDestination(logAttributeProcessor);
connection->setSourceUUID(processoruuid);
Expand Down Expand Up @@ -750,7 +750,7 @@ TEST_CASE("CompressFileXYLZMA", "[compressfiletest7]") {
content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
// connection from compress processor to log attribute
std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
connection->setRelationship(core::Relationship("success", "compress successful output"));
connection->addRelationship(core::Relationship("success", "compress successful output"));
connection->setSource(processor);
connection->setDestination(logAttributeProcessor);
connection->setSourceUUID(processoruuid);
Expand Down Expand Up @@ -857,7 +857,7 @@ TEST_CASE("DecompressFileXYLZMA", "[compressfiletest8]") {
content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
// connection from compress processor to log attribute
std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
connection->setRelationship(core::Relationship("success", "compress successful output"));
connection->addRelationship(core::Relationship("success", "compress successful output"));
connection->setSource(processor);
connection->setDestination(logAttributeProcessor);
connection->setSourceUUID(processoruuid);
Expand Down
Loading

0 comments on commit 2194662

Please sign in to comment.