From ba075e0e7594ac2855321646dde5ec04d60077b5 Mon Sep 17 00:00:00 2001 From: Marc Parisi Date: Tue, 15 May 2018 13:45:02 -0400 Subject: [PATCH] MINIFICPP-403: Update connectables so that they contain a reference to the flow identifier. With this approach the flow identifier will be updated with C2 and automatically apply to any processors applied as a result of that update --- extensions/http-curl/tests/C2UpdateTest.cpp | 2 +- .../tests/HttpGetIntegrationTest.cpp | 1 + libminifi/CMakeLists.txt | 2 +- libminifi/include/FlowController.h | 3 +- libminifi/include/FlowFileRecord.h | 6 +- libminifi/include/core/Connectable.h | 19 +++- libminifi/include/core/FlowConfiguration.h | 14 ++- libminifi/include/core/ProcessorNode.h | 14 ++- libminifi/include/core/state/FlowIdentifier.h | 91 +++++++++++++++++++ .../core/state/nodes/FlowInformation.h | 67 +++++++------- libminifi/src/FlowController.cpp | 35 +------ libminifi/src/FlowFileRecord.cpp | 18 +++- libminifi/src/core/Connectable.cpp | 2 + libminifi/src/core/FlowConfiguration.cpp | 29 ++++++ libminifi/src/core/ProcessSession.cpp | 55 +++++++---- libminifi/src/core/yaml/YamlConfiguration.cpp | 2 + libminifi/test/TestBase.cpp | 6 +- libminifi/test/TestBase.h | 20 ++-- libminifi/test/unit/GetFileTests.cpp | 1 + 19 files changed, 285 insertions(+), 102 deletions(-) create mode 100644 libminifi/include/core/state/FlowIdentifier.h diff --git a/extensions/http-curl/tests/C2UpdateTest.cpp b/extensions/http-curl/tests/C2UpdateTest.cpp index 0799ae5dc5..52e60f8e21 100644 --- a/extensions/http-curl/tests/C2UpdateTest.cpp +++ b/extensions/http-curl/tests/C2UpdateTest.cpp @@ -174,7 +174,7 @@ int main(int argc, char **argv) { auto milliseconds = std::chrono::duration_cast(then - start).count(); std::string logs = LogTestController::getInstance().log_output.str(); - assert(logs.find("Starting to reload Flow Controller with flow control name MiNiFi Flow, version 0") != std::string::npos); + assert(logs.find("Starting to reload Flow Controller with flow control name MiNiFi Flow, version") != std::string::npos); LogTestController::getInstance().reset(); rmdir("./content_repository"); assert(h_ex.calls_ <= (milliseconds / 1000) + 1); diff --git a/extensions/http-curl/tests/HttpGetIntegrationTest.cpp b/extensions/http-curl/tests/HttpGetIntegrationTest.cpp index df40497318..9e6e99f6aa 100644 --- a/extensions/http-curl/tests/HttpGetIntegrationTest.cpp +++ b/extensions/http-curl/tests/HttpGetIntegrationTest.cpp @@ -154,6 +154,7 @@ int main(int argc, char **argv) { assert(logs.find("key:filename value:") != std::string::npos); assert(logs.find("key:invokehttp.request.url value:" + url) != std::string::npos); assert(logs.find("key:invokehttp.status.code value:200") != std::string::npos); + assert(logs.find("key:flow.id") != std::string::npos); LogTestController::getInstance().reset(); rmdir("./content_repository"); diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt index 3e6390f4ff..302b7f826e 100644 --- a/libminifi/CMakeLists.txt +++ b/libminifi/CMakeLists.txt @@ -59,7 +59,7 @@ include_directories(../thirdparty/rapidjson-1.1.0/include) include_directories(../thirdparty/concurrentqueue/) include_directories(include) -file(GLOB SOURCES "src/sitetosite/*.cpp" "src/core/logging/*.cpp" "src/core/state/*.cpp" "src/c2/protocols/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" "src/io/tls/*.cpp" "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp" "src/provenance/*.cpp" "src/utils/*.cpp" "src/*.cpp") +file(GLOB SOURCES "src/sitetosite/*.cpp" "src/core/logging/*.cpp" "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" "src/io/tls/*.cpp" "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp" "src/provenance/*.cpp" "src/utils/*.cpp" "src/*.cpp") file(GLOB PROCESSOR_SOURCES "src/processors/*.cpp" ) diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h index 71d22cbb0e..0466546afb 100644 --- a/libminifi/include/FlowController.h +++ b/libminifi/include/FlowController.h @@ -172,7 +172,7 @@ class FlowController : public core::controller::ControllerServiceProvider, publi // first it will validate the payload with the current root node config for flowController // like FlowController id/name is the same and new version is greater than the current version // after that, it will apply the configuration - bool applyConfiguration(const std::string &configurePayload); + bool applyConfiguration(const std::string &source, const std::string &configurePayload); // get name std::string getName() const{ @@ -408,7 +408,6 @@ class FlowController : public core::controller::ControllerServiceProvider, publi std::chrono::steady_clock::time_point last_metrics_capture_; private: - std::shared_ptr flow_version_; std::shared_ptr logger_; std::string serial_number_; static std::shared_ptr id_generator_; diff --git a/libminifi/include/FlowFileRecord.h b/libminifi/include/FlowFileRecord.h index 93ea74bac3..795003fd01 100644 --- a/libminifi/include/FlowFileRecord.h +++ b/libminifi/include/FlowFileRecord.h @@ -64,11 +64,13 @@ enum FlowAttribute { DISCARD_REASON, // Indicates an identifier other than the FlowFile's UUID that is known to refer to this FlowFile. ALTERNATE_IDENTIFIER, + // Flow identifier + FLOW_ID, MAX_FLOW_ATTRIBUTES }; // FlowFile Attribute Key -static const char *FlowAttributeKeyArray[MAX_FLOW_ATTRIBUTES] = { "path", "absolute.path", "filename", "uuid", "priority", "mime.type", "discard.reason", "alternate.identifier" }; +static const char *FlowAttributeKeyArray[MAX_FLOW_ATTRIBUTES] = { "path", "absolute.path", "filename", "uuid", "priority", "mime.type", "discard.reason", "alternate.identifier", "flow.id" }; // FlowFile Attribute Enum to Key inline const char *FlowAttributeKey(FlowAttribute attribute) { @@ -122,7 +124,7 @@ class FlowFileRecord : public core::FlowFile, public io::Serializable { // Destructor virtual ~FlowFileRecord(); // addAttribute key is enum - bool addKeyedAttribute(FlowAttribute key, std::string value); + bool addKeyedAttribute(FlowAttribute key, const std::string &value); // removeAttribute key is enum bool removeKeyedAttribute(FlowAttribute key); // updateAttribute key is enum diff --git a/libminifi/include/core/Connectable.h b/libminifi/include/core/Connectable.h index 588b67aa3f..ec2fd3a928 100644 --- a/libminifi/include/core/Connectable.h +++ b/libminifi/include/core/Connectable.h @@ -25,7 +25,7 @@ #include "core/logging/Logger.h" #include "Relationship.h" #include "Scheduling.h" - +#include "core/state/FlowIdentifier.h" namespace org { namespace apache { namespace nifi { @@ -133,6 +133,21 @@ class __attribute__((visibility("default"))) Connectable : public CoreComponent return false; } + /** + * Sets the flow version for this connectable. + */ + void setFlowIdentifier(const std::shared_ptr &version){ + connectable_version_ = version; + } + + /** + * Returns theflow version + * @returns flow version. can be null if a flow version is not tracked. + */ + virtual std::shared_ptr getFlowIdentifier(){ + return connectable_version_; + } + protected: // Penalization Period in MilliSecond @@ -165,6 +180,8 @@ class __attribute__((visibility("default"))) Connectable : public CoreComponent std::atomic strategy_; // Concurrent condition variable for whether there is incoming work to do std::condition_variable work_condition_; + // version under which this connectable was created. + std::shared_ptr connectable_version_; private: std::shared_ptr logger_; diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h index eab716900d..cf4c7f6be8 100644 --- a/libminifi/include/core/FlowConfiguration.h +++ b/libminifi/include/core/FlowConfiguration.h @@ -33,6 +33,7 @@ #include "core/ProcessSession.h" #include "core/ProcessGroup.h" #include "io/StreamFactory.h" +#include "core/state/nodes/FlowInformation.h" namespace org { namespace apache { @@ -61,6 +62,7 @@ class FlowConfiguration : public CoreComponent { logger_(logging::LoggerFactory::getLogger()) { controller_services_ = std::make_shared(); service_provider_ = std::make_shared(controller_services_, nullptr, configuration); + flow_version_ = std::make_shared("", "default", ""); // it is okay if this has already been called initialize_static_functions(); } @@ -82,6 +84,14 @@ class FlowConfiguration : public CoreComponent { // Create Provenance Report Task std::shared_ptr createProvenanceReportTask(void); + std::shared_ptr getFlowVersion() const{ + return flow_version_; + } + + std::shared_ptr getConfiguration() { // cannot be const as getters mutate the underlying map + return configuration_; + } + /** * Returns the configuration path string * @return config_path_ @@ -94,6 +104,8 @@ class FlowConfiguration : public CoreComponent { return getRoot(config_path_); } + std::unique_ptr updateFromPayload(const std::string &source, const std::string &yamlConfigPayload); + virtual std::unique_ptr getRootFromPayload(const std::string &yamlConfigPayload) { return nullptr; } @@ -147,7 +159,7 @@ class FlowConfiguration : public CoreComponent { // stream factory std::shared_ptr stream_factory_; std::shared_ptr configuration_; - + std::shared_ptr flow_version_; private: std::shared_ptr logger_; static std::mutex atomic_initialization_; diff --git a/libminifi/include/core/ProcessorNode.h b/libminifi/include/core/ProcessorNode.h index ed44d6a784..c64dbb0a18 100644 --- a/libminifi/include/core/ProcessorNode.h +++ b/libminifi/include/core/ProcessorNode.h @@ -30,8 +30,6 @@ namespace core { /** * Processor node functions as a pass through to the implementing Connectables - * ProcessorNode can be used by itself or with a pass through object, in which case - * we need to function as a passthrough or not. */ class ProcessorNode : public ConfigurableComponent, public Connectable { public: @@ -100,6 +98,18 @@ class ProcessorNode : public ConfigurableComponent, public Connectable { } } + /** + * Returns theflow version + * @returns flow version. can be null if a flow version is not tracked. + */ + virtual std::shared_ptr getFlowIdentifier() { + if (processor_ != nullptr) { + return processor_->getFlowIdentifier(); + } else { + return connectable_version_; + } + } + /** * Sets the dynamic property using the provided name * @param property name diff --git a/libminifi/include/core/state/FlowIdentifier.h b/libminifi/include/core/state/FlowIdentifier.h new file mode 100644 index 0000000000..72dc13f461 --- /dev/null +++ b/libminifi/include/core/state/FlowIdentifier.h @@ -0,0 +1,91 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_STATE_FLOWIDENTIFIER_H_ +#define LIBMINIFI_INCLUDE_CORE_STATE_FLOWIDENTIFIER_H_ + + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace state { + +/** + * Purpose: Represents a flow identifier for a given flow update or instance. + * + * Design: Immutable collection of strings for the component parts. + */ +class FlowIdentifier { + public: + + FlowIdentifier() = delete; + + /** + * Constructor accepts the url, bucket id, and flow id. + */ + explicit FlowIdentifier(const std::string &url, const std::string &bucket_id, const std::string &flow_id) { + registry_url_ = url; + bucket_id_ = bucket_id; + flow_id_ = flow_id; + } + + /** + * In most cases the lock guard isn't necessary for these getters; however, + * we don't want to cause issues if the FlowVersion object is ever used in a way + * that breaks the current paradigm. + */ + std::string getRegistryUrl() const { + return registry_url_; + } + + std::string getBucketId() const { + return bucket_id_; + } + + std::string getFlowId() const { + return flow_id_; + } + protected: + + explicit FlowIdentifier(const FlowIdentifier &other) { + registry_url_ = other.registry_url_; + bucket_id_ = other.bucket_id_; + flow_id_ = other.flow_id_; + } + FlowIdentifier &operator=(const FlowIdentifier &other) { + registry_url_ = other.registry_url_; + bucket_id_ = other.bucket_id_; + flow_id_ = other.flow_id_; + return *this; + } + + private: + std::string registry_url_; + std::string bucket_id_; + std::string flow_id_; + friend class FlowVersion; +}; + + +} /* namespace state */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_STATE_FLOWIDENTIFIER_H_ */ diff --git a/libminifi/include/core/state/nodes/FlowInformation.h b/libminifi/include/core/state/nodes/FlowInformation.h index 9c9874ae84..022b9ce5d4 100644 --- a/libminifi/include/core/state/nodes/FlowInformation.h +++ b/libminifi/include/core/state/nodes/FlowInformation.h @@ -45,6 +45,7 @@ #include "Connection.h" #include "io/ClientSocket.h" #include "../nodes/StateMonitor.h" +#include "../FlowIdentifier.h" namespace org { namespace apache { @@ -58,61 +59,66 @@ class FlowVersion : public DeviceInformation { explicit FlowVersion() : DeviceInformation("FlowVersion", nullptr) { + setFlowVersion("", "", getUUIDStr()); } explicit FlowVersion(const std::string ®istry_url, const std::string &bucket_id, const std::string &flow_id) - : DeviceInformation("FlowVersion", nullptr), - registry_url_(registry_url), - bucket_id_(bucket_id), - flow_id_(flow_id) { - setFlowVersion(registry_url_, bucket_id_, flow_id_); + : DeviceInformation("FlowVersion", nullptr) { + setFlowVersion(registry_url, bucket_id, flow_id.empty() ? getUUIDStr() : flow_id); } explicit FlowVersion(FlowVersion &&fv) : DeviceInformation("FlowVersion", nullptr), - registry_url_(std::move(fv.registry_url_)), - bucket_id_(std::move(fv.bucket_id_)), - flow_id_(std::move(fv.flow_id_)) { - setFlowVersion(registry_url_, bucket_id_, flow_id_); + identifier(std::move(fv.identifier)) { } std::string getName() const { return "FlowVersion"; } - std::string getRegistryUrl() const { - return registry_url_; + virtual std::shared_ptr getFlowIdentifier() { + std::lock_guard lock(guard); + return identifier; + } + /** + * In most cases the lock guard isn't necessary for these getters; however, + * we don't want to cause issues if the FlowVersion object is ever used in a way + * that breaks the current paradigm. + */ + std::string getRegistryUrl() { + std::lock_guard lock(guard); + return identifier->getRegistryUrl(); } - std::string getBucketId() const { - return bucket_id_; + std::string getBucketId() { + std::lock_guard lock(guard); + return identifier->getBucketId(); } - std::string getFlowId() const { - return flow_id_.empty() ? getUUIDStr() : flow_id_; + std::string getFlowId() { + std::lock_guard lock(guard); + return identifier->getFlowId(); } void setFlowVersion(const std::string &url, const std::string &bucket_id, const std::string &flow_id) { - registry_url_ = url; - bucket_id_ = bucket_id; - flow_id_ = flow_id; + std::lock_guard lock(guard); + identifier = std::make_shared(url, bucket_id, flow_id); } std::vector serialize() { - std::lock_guard lock_guard(guard); - + std::lock_guard lock(guard); std::vector serialized; SerializedResponseNode ru; ru.name = "registryUrl"; - ru.value = registry_url_; + ru.value = identifier->getRegistryUrl(); SerializedResponseNode bucketid; bucketid.name = "bucketId"; - bucketid.value = bucket_id_; + bucketid.value = identifier->getBucketId(); SerializedResponseNode flowId; flowId.name = "flowId"; - flowId.value = getFlowId(); + flowId.value = identifier->getFlowId(); serialized.push_back(ru); serialized.push_back(bucketid); @@ -121,19 +127,18 @@ class FlowVersion : public DeviceInformation { } FlowVersion &operator=(const FlowVersion &&fv) { - registry_url_ = (std::move(fv.registry_url_)); - bucket_id_ = (std::move(fv.bucket_id_)); - flow_id_ = (std::move(fv.flow_id_)); - setFlowVersion(registry_url_, bucket_id_, flow_id_); + identifier = std::move(fv.identifier); return *this; } protected: std::mutex guard; - std::string registry_url_; - std::string bucket_id_; - std::string flow_id_; + /*std::string registry_url_; + std::string bucket_id_; + std::string flow_id_; + */ + std::shared_ptr identifier; }; class FlowMonitor : public StateMonitorNode { @@ -263,7 +268,7 @@ class FlowInformation : public FlowMonitor { REGISTER_RESOURCE(FlowInformation); -} /* namespace metrics */ +} /* namespace response */ } /* namespace state */ } /* namespace minifi */ } /* namespace nifi */ diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index 8d5906551b..12ce99f6d8 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -73,7 +73,6 @@ FlowController::FlowController(std::shared_ptr provenance_repo max_event_driven_threads_(0), running_(false), updating_(false), - flow_version_(nullptr), c2_enabled_(true), initialized_(false), provenance_repo_(provenance_repo), @@ -159,10 +158,10 @@ FlowController::~FlowController() { provenance_repo_ = nullptr; } -bool FlowController::applyConfiguration(const std::string &configurePayload) { +bool FlowController::applyConfiguration(const std::string &source, const std::string &configurePayload) { std::unique_ptr newRoot; try { - newRoot = flow_configuration_->getRootFromPayload(configurePayload); + newRoot = flow_configuration_->updateFromPayload(source, configurePayload); } catch (...) { logger_->log_error("Invalid configuration payload"); return false; @@ -376,7 +375,6 @@ void FlowController::initializeC2() { state::StateManager::startMetrics(agent->getHeartBeatDelay()); c2_initialized_ = true; - flow_version_ = std::make_shared("", "default", ""); device_information_.clear(); component_metrics_.clear(); component_metrics_by_id_.clear(); @@ -443,7 +441,7 @@ void FlowController::initializeC2() { } flowMonitor->setStateMonitor(shared_from_this()); - flowMonitor->setFlowVersion(flow_version_); + flowMonitor->setFlowVersion(flow_configuration_->getFlowVersion()); } std::lock_guard lock(metrics_mutex_); @@ -806,32 +804,7 @@ void FlowController::enableAllControllerServices() { } int16_t FlowController::applyUpdate(const std::string &source, const std::string &configuration) { - if (!source.empty()) { - std::string host, protocol, path, query, url = source; - int port; - utils::parse_url(&url, &host, &port, &protocol, &path, &query); - - std::string flow_id, bucket_id; - auto path_split = utils::StringUtils::split(path, "/"); - for (size_t i = 0; i < path_split.size(); i++) { - const std::string &str = path_split.at(i); - if (str == "flows") { - if (i + 1 < path_split.size()) { - flow_id = path_split.at(i + 1); - i++; - } - } - - if (str == "bucket") { - if (i + 1 < path_split.size()) { - bucket_id = path_split.at(i + 1); - i++; - } - } - } - flow_version_->setFlowVersion(url, bucket_id, flow_id); - } - if (applyConfiguration(configuration)) { + if (applyConfiguration(source, configuration)) { return 1; } else { return 0; diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp index 8775de70e5..7815e70594 100644 --- a/libminifi/src/FlowFileRecord.cpp +++ b/libminifi/src/FlowFileRecord.cpp @@ -87,6 +87,14 @@ FlowFileRecord::FlowFileRecord(std::shared_ptr flow_repository event->getResourceClaim()->increaseFlowFileRecordOwnedCount(); content_full_fath_ = event->getResourceClaim()->getContentFullPath(); } + if (event->getFlowIdentifier()) { + std::string attr; + event->getAttribute(FlowAttributeKey(FlowAttribute::FLOW_ID), attr); + setFlowIdentifier(event->getFlowIdentifier()); + if (!attr.empty()) { + addKeyedAttribute(FlowAttribute::FLOW_ID, attr); + } + } } FlowFileRecord::FlowFileRecord(std::shared_ptr flow_repository, const std::shared_ptr &content_repo, std::shared_ptr &event) @@ -95,6 +103,14 @@ FlowFileRecord::FlowFileRecord(std::shared_ptr flow_repository snapshot_(""), content_repo_(content_repo), flow_repository_(flow_repository) { + if (event->getFlowIdentifier()) { + std::string attr; + event->getAttribute(FlowAttributeKey(FlowAttribute::FLOW_ID), attr); + setFlowIdentifier(event->getFlowIdentifier()); + if (!attr.empty()) { + addKeyedAttribute(FlowAttribute::FLOW_ID, attr); + } + } } FlowFileRecord::~FlowFileRecord() { @@ -129,7 +145,7 @@ void FlowFileRecord::releaseClaim(std::shared_ptr claim) { } } -bool FlowFileRecord::addKeyedAttribute(FlowAttribute key, std::string value) { +bool FlowFileRecord::addKeyedAttribute(FlowAttribute key, const std::string &value) { const char *keyStr = FlowAttributeKey(key); if (keyStr) { const std::string keyString = keyStr; diff --git a/libminifi/src/core/Connectable.cpp b/libminifi/src/core/Connectable.cpp index 29ee411a12..746f5ec9b3 100644 --- a/libminifi/src/core/Connectable.cpp +++ b/libminifi/src/core/Connectable.cpp @@ -33,12 +33,14 @@ namespace core { Connectable::Connectable(std::string name, uuid_t uuid) : CoreComponent(name, uuid), max_concurrent_tasks_(1), + connectable_version_(nullptr), logger_(logging::LoggerFactory::getLogger()) { } Connectable::Connectable(const Connectable &&other) : CoreComponent(std::move(other)), max_concurrent_tasks_(std::move(other.max_concurrent_tasks_)), + connectable_version_(std::move(other.connectable_version_)), logger_(std::move(other.logger_)) { has_work_ = other.has_work_.load(); strategy_ = other.strategy_.load(); diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp index b082dce147..320797be49 100644 --- a/libminifi/src/core/FlowConfiguration.cpp +++ b/libminifi/src/core/FlowConfiguration.cpp @@ -59,6 +59,35 @@ std::shared_ptr FlowConfiguration::createProvenanceReportTask() return processor; } +std::unique_ptr FlowConfiguration::updateFromPayload(const std::string &source, const std::string &yamlConfigPayload) { + if (!source.empty()) { + std::string host, protocol, path, query, url = source; + int port; + utils::parse_url(&url, &host, &port, &protocol, &path, &query); + + std::string flow_id, bucket_id; + auto path_split = utils::StringUtils::split(path, "/"); + for (size_t i = 0; i < path_split.size(); i++) { + const std::string &str = path_split.at(i); + if (str == "flows") { + if (i + 1 < path_split.size()) { + flow_id = path_split.at(i + 1); + i++; + } + } + + if (str == "bucket") { + if (i + 1 < path_split.size()) { + bucket_id = path_split.at(i + 1); + i++; + } + } + } + flow_version_->setFlowVersion(url, bucket_id, flow_id); + } + return getRootFromPayload(yamlConfigPayload); + } + std::unique_ptr FlowConfiguration::createRootProcessGroup(std::string name, uuid_t uuid, int version) { return std::unique_ptr(new core::ProcessGroup(core::ROOT_PROCESS_GROUP, name, uuid, version)); } diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp index 9b732dcaf1..1bfa9f8301 100644 --- a/libminifi/src/core/ProcessSession.cpp +++ b/libminifi/src/core/ProcessSession.cpp @@ -43,7 +43,15 @@ std::shared_ptr ProcessSession::id_generator_ = utils::IdGen std::shared_ptr ProcessSession::create() { std::map empty; - std::shared_ptr record = std::make_shared(process_context_->getFlowFileRepository(), process_context_->getContentRepository(), empty); + auto flow_version = process_context_->getProcessorNode()->getFlowIdentifier(); + + std::shared_ptr record = std::make_shared(process_context_->getFlowFileRepository(), process_context_->getContentRepository(), empty); + + if (flow_version != nullptr) { + auto flow_id = flow_version->getFlowId(); + std::string attr = FlowAttributeKey(FLOW_ID); + record->setAttribute(attr, flow_version->getFlowId()); + } _addedFlowFiles[record->getUUIDStr()] = record; logger_->log_debug("Create FlowFile with UUID %s", record->getUUIDStr()); @@ -60,9 +68,15 @@ void ProcessSession::add(const std::shared_ptr &record) { std::shared_ptr ProcessSession::create(const std::shared_ptr &parent) { std::map empty; - std::shared_ptr record = std::make_shared(process_context_->getFlowFileRepository(), process_context_->getContentRepository(), empty); + std::shared_ptr record = std::make_shared(process_context_->getFlowFileRepository(), process_context_->getContentRepository(), empty); if (record) { + auto flow_version = process_context_->getProcessorNode()->getFlowIdentifier(); + if (flow_version != nullptr) { + auto flow_id = flow_version->getFlowId(); + std::string attr = FlowAttributeKey(FLOW_ID); + record->setAttribute(attr, flow_version->getFlowId()); + } _addedFlowFiles[record->getUUIDStr()] = record; logger_->log_debug("Create FlowFile with UUID %s", record->getUUIDStr()); } @@ -105,6 +119,12 @@ std::shared_ptr ProcessSession::cloneDuringTransfer(std::shared_ std::shared_ptr record = std::make_shared(process_context_->getFlowFileRepository(), process_context_->getContentRepository(), empty); if (record) { + auto flow_version = process_context_->getProcessorNode()->getFlowIdentifier(); + if (flow_version != nullptr) { + auto flow_id = flow_version->getFlowId(); + std::string attr = FlowAttributeKey(FLOW_ID); + record->setAttribute(attr, flow_version->getFlowId()); + } this->_clonedFlowFiles[record->getUUIDStr()] = record; logger_->log_debug("Clone FlowFile with UUID %s during transfer", record->getUUIDStr()); // Copy attributes @@ -374,8 +394,7 @@ void ProcessSession::importFrom(io::DataStream &stream, const std::shared_ptrsetResourceClaim(claim); - logger_->log_debug("Import offset %llu length %llu into content %s for FlowFile UUID %s", flow->getOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath(), - flow->getUUIDStr()); + logger_->log_debug("Import offset %llu length %llu into content %s for FlowFile UUID %s", flow->getOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath(), flow->getUUIDStr()); content_stream->closeStream(); std::stringstream details; @@ -423,9 +442,7 @@ void ProcessSession::import(std::string source, const std::shared_ptrlog_error("Seeking to %d failed for file %s (does file/filesystem support seeking?)", - offset, - source); + logger_->log_error("Seeking to %d failed for file %s (does file/filesystem support seeking?)", offset, source); invalidWrite = true; } } @@ -506,9 +523,7 @@ void ProcessSession::import(std::string source, std::vectorlog_error("Seeking to %d failed for file %s (does file/filesystem support seeking?)", - offset, - source); + logger_->log_error("Seeking to %d failed for file %s (does file/filesystem support seeking?)", offset, source); throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error"); } } @@ -549,8 +564,8 @@ void ProcessSession::import(std::string source, std::vectorsetResourceClaim(claim); claim->increaseFlowFileRecordOwnedCount(); - logger_->log_debug("Import offset %llu length %llu into content %s for FlowFile UUID %s", flowFile->getOffset(), flowFile->getSize(), - flowFile->getResourceClaim()->getContentFullPath(), flowFile->getUUIDStr()); + logger_->log_debug("Import offset %llu length %llu into content %s for FlowFile UUID %s", flowFile->getOffset(), flowFile->getSize(), flowFile->getResourceClaim()->getContentFullPath(), + flowFile->getUUIDStr()); stream->closeStream(); std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr(); uint64_t endTime = getTimeMillis(); @@ -625,24 +640,24 @@ void ProcessSession::stash(const std::string &key, const std::shared_ptrgetResourceClaim(); flow->setStashClaim(key, claim); - // Clear current claim +// Clear current claim flow->clearResourceClaim(); } void ProcessSession::restore(const std::string &key, const std::shared_ptr &flow) { logger_->log_info("Restoring content to %s from key %s", flow->getUUIDStr(), key); - // Restore the claim +// Restore the claim if (!flow->hasStashClaim(key)) { logger_->log_warn("Requested restore to record %s from unknown key %s", flow->getUUIDStr(), key); return; } - // Disown current claim if existing +// Disown current claim if existing if (flow->getResourceClaim()) { logger_->log_warn("Restoring stashed content of record %s from key %s when there is " "existing content; existing content will be overwritten", @@ -650,7 +665,7 @@ void ProcessSession::restore(const std::string &key, const std::shared_ptrreleaseClaim(flow->getResourceClaim()); } - // Restore the claim +// Restore the claim auto stashClaim = flow->getStashClaim(key); flow->setResourceClaim(stashClaim); flow->clearStashClaim(key); @@ -854,6 +869,12 @@ std::shared_ptr ProcessSession::get() { _updatedFlowFiles[ret->getUUIDStr()] = ret; std::map empty; std::shared_ptr snapshot = std::make_shared(process_context_->getFlowFileRepository(), process_context_->getContentRepository(), empty); + auto flow_version = process_context_->getProcessorNode()->getFlowIdentifier(); + if (flow_version != nullptr) { + auto flow_id = flow_version->getFlowId(); + std::string attr = FlowAttributeKey(FLOW_ID); + snapshot->setAttribute(attr, flow_version->getFlowId()); + } logger_->log_debug("Create Snapshot FlowFile with UUID %s", snapshot->getUUIDStr()); snapshot = ret; // save a snapshot diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp index 01cc04a941..fbe84edf81 100644 --- a/libminifi/src/core/yaml/YamlConfiguration.cpp +++ b/libminifi/src/core/yaml/YamlConfiguration.cpp @@ -118,6 +118,8 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core:: } processor->setName(procCfg.name); + processor->setFlowIdentifier(flow_version_->getFlowIdentifier()); + auto strategyNode = getOptionalField(&procNode, "scheduling strategy", YAML::Node("EVENT_DRIVEN"), diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp index 950d8bbca8..53a1bf66b6 100644 --- a/libminifi/test/TestBase.cpp +++ b/libminifi/test/TestBase.cpp @@ -18,7 +18,7 @@ #include "./TestBase.h" -TestPlan::TestPlan(std::shared_ptr content_repo, std::shared_ptr flow_repo, std::shared_ptr prov_repo) +TestPlan::TestPlan(std::shared_ptr content_repo, std::shared_ptr flow_repo, std::shared_ptr prov_repo, const std::shared_ptr &flow_version) : content_repo_(content_repo), flow_repo_(flow_repo), @@ -26,6 +26,7 @@ TestPlan::TestPlan(std::shared_ptr content_repo, std::s finalized(false), location(-1), current_flowfile_(nullptr), + flow_version_(flow_version), logger_(logging::LoggerFactory::getLogger()) { stream_factory = std::make_shared(std::make_shared()); } @@ -43,6 +44,7 @@ bool linkToPrevious) { processor->setStreamFactory(stream_factory); // initialize the processor processor->initialize(); + processor->setFlowIdentifier(flow_version_->getFlowIdentifier()); processor_mapping_[processor->getUUIDStr()] = processor; @@ -116,7 +118,7 @@ bool TestPlan::setProperty(const std::shared_ptr proc, const std::string &value, bool dynamic) { std::lock_guard guard(mutex); - uint32_t i = 0; + int32_t i = 0; logger_->log_info("Attempting to set property %s %s for %s", prop, value, proc->getName()); for (i = 0; i < processor_queue_.size(); i++) { if (processor_queue_.at(i) == proc) { diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h index 77449cb04e..4792011b2d 100644 --- a/libminifi/test/TestBase.h +++ b/libminifi/test/TestBase.h @@ -43,6 +43,7 @@ #include "core/ProcessSession.h" #include "core/ProcessorNode.h" #include "core/reporting/SiteToSiteProvenanceReportingTask.h" +#include "core/state/nodes/FlowInformation.h" class LogTestController { public: @@ -151,19 +152,15 @@ class LogTestController { class TestPlan { public: - explicit TestPlan(std::shared_ptr content_repo, std::shared_ptr flow_repo, std::shared_ptr prov_repo); + explicit TestPlan(std::shared_ptr content_repo, std::shared_ptr flow_repo, std::shared_ptr prov_repo, const std::shared_ptr &flow_version); std::shared_ptr addProcessor(const std::shared_ptr &processor, const std::string &name, - core::Relationship relationship = core::Relationship("success", "description"), - bool linkToPrevious = false); + core::Relationship relationship = core::Relationship("success", "description"), bool linkToPrevious = false); std::shared_ptr addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship = core::Relationship("success", "description"), - bool linkToPrevious = false); + bool linkToPrevious = false); - bool setProperty(const std::shared_ptr proc, - const std::string &prop, - const std::string &value, - bool dynamic = false); + bool setProperty(const std::shared_ptr proc, const std::string &prop, const std::string &value, bool dynamic = false); void reset(); @@ -209,6 +206,7 @@ class TestPlan { std::shared_ptr current_session_; std::shared_ptr current_flowfile_; + std::shared_ptr flow_version_; std::map> processor_mapping_; std::vector> processor_queue_; std::vector> configured_processors_; @@ -232,6 +230,7 @@ class TestController { minifi::setDefaultDirectory("./"); log.reset(); utils::IdGenerator::getIdGenerator()->initialize(std::make_shared()); + flow_version_ = std::make_shared("test", "test", "test"); } std::shared_ptr createPlan() { @@ -242,10 +241,9 @@ class TestController { std::shared_ptr flow_repo = std::make_shared(); std::shared_ptr repo = std::make_shared(); - return std::make_shared(content_repo, flow_repo, repo); + return std::make_shared(content_repo, flow_repo, repo, flow_version_); } - void runSession(std::shared_ptr &plan, bool runToCompletion = true, std::function&, const std::shared_ptr&)> verify = nullptr) { @@ -284,6 +282,8 @@ class TestController { protected: + std::shared_ptr flow_version_; + std::mutex test_mutex; //std::map diff --git a/libminifi/test/unit/GetFileTests.cpp b/libminifi/test/unit/GetFileTests.cpp index 6e8aa2534f..1960c70532 100644 --- a/libminifi/test/unit/GetFileTests.cpp +++ b/libminifi/test/unit/GetFileTests.cpp @@ -77,6 +77,7 @@ TEST_CASE("GetFile: FIFO", "[getFileFifo]") { // NOLINT write_thread.join(); // Check log output + REQUIRE(LogTestController::getInstance().contains("key:flow.id")); REQUIRE(LogTestController::getInstance().contains("Size:44 Offset:0")); }