diff --git a/C2.md b/C2.md index 9d84974ffb..ad492ec57a 100644 --- a/C2.md +++ b/C2.md @@ -66,9 +66,10 @@ be requested via C2 DESCRIBE manifest command. # DeviceInfoNode: basic info about the system (OS, number of cores etc) # AgentInformation: info about the MiNiFi agent, may include the manifest # FlowInformation: information about the current flow, including queue sizes + # AssetInformation: the state of the asset directory managed by the agent # ConfigurationChecksums: hashes of the configuration files; can be used to detect unexpected modifications # the default is - nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation + nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation,AssetInformation # control c2 heartbeat interval nifi.c2.agent.heartbeat.period=30 sec @@ -80,8 +81,10 @@ be requested via C2 DESCRIBE manifest command. nifi.c2.rest.listener.cacert= # specify the rest URIs if using RESTSender - nifi.c2.rest.url=http:////c2-protocol/heartbeat - nifi.c2.rest.url.ack=http:////c2-protocol/acknowledge + nifi.c2.rest.path.base=https:/// + # specify either absolute url or relative to the nifi.c2.rest.path.base url for hearbeat and acknowledge + nifi.c2.rest.url=/c2-protocol/heartbeat + nifi.c2.rest.url.ack=/c2-protocol/acknowledge nifi.c2.flow.base.url=http:////c2-protocol/ # c2 agent identifier -- must be defined to run agent diff --git a/CONFIGURE.md b/CONFIGURE.md index d37dd7445b..6792972f21 100644 --- a/CONFIGURE.md +++ b/CONFIGURE.md @@ -567,8 +567,13 @@ Additionally, a unique hexadecimal uid.minifi.device.segment should be assigned ### Asset directory -It is possible to make agents download an asset (triggered through the c2 protocol). The target directory can be specified -using the `nifi.asset.directory` agent property, which defaults to `${MINIFI_HOME}/asset`. +There is an asset directory specified using the `nifi.asset.directory` agent property, which defaults to `${MINIFI_HOME}/asset`. +The files referenced in the `.state` file in this directory are managed by the agent. They are deleted, updated, downloaded +using the asset sync c2 command. For the asset sync command to work, the c2 server must be made aware of the current state of the +managed assets by adding the `AssetInformation` entry to the `nifi.c2.root.classes` property. + +Files and directories not referenced in the `.state` file are not directly controlled by the agent, although +it is possible to download an asset (triggered through the c2 protocol) into the asset directory instead. ### Controller Services If you need to reference a controller service in your config.yml file, use the following template. In the example, below, ControllerServiceClass is the name of the class defining the controller Service. ControllerService1 diff --git a/conf/minifi.properties b/conf/minifi.properties index 5e5c295455..425e6e64b0 100644 --- a/conf/minifi.properties +++ b/conf/minifi.properties @@ -84,10 +84,11 @@ nifi.content.repository.class.name=DatabaseContentRepository ## base URL of the c2 server, ## very likely the same base url of rest urls #nifi.c2.flow.base.url= +#nifi.c2.rest.path.base= #nifi.c2.rest.url= #nifi.c2.rest.url.ack= #nifi.c2.rest.ssl.context.service= -nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation +nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation,AssetInformation ## Minimize heartbeat payload size by excluding agent manifest from the heartbeat nifi.c2.full.heartbeat=false ## heartbeat twice a minute diff --git a/docker/test/integration/cluster/containers/MinifiContainer.py b/docker/test/integration/cluster/containers/MinifiContainer.py index db1dbd191b..06083e9ff9 100644 --- a/docker/test/integration/cluster/containers/MinifiContainer.py +++ b/docker/test/integration/cluster/containers/MinifiContainer.py @@ -99,7 +99,7 @@ def _create_properties(self): f.write(f"nifi.c2.rest.url=http://minifi-c2-server-{self.feature_context.id}:10090/c2/config/heartbeat\n") f.write(f"nifi.c2.rest.url.ack=http://minifi-c2-server-{self.feature_context.id}:10090/c2/config/acknowledge\n") f.write(f"nifi.c2.flow.base.url=http://minifi-c2-server-{self.feature_context.id}:10090/c2/config/\n") - f.write("nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation\n") + f.write("nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation,AssetInformation\n") f.write("nifi.c2.full.heartbeat=false\n") f.write("nifi.c2.agent.class=minifi-test-class\n") f.write("nifi.c2.agent.identifier=minifi-test-id\n") @@ -109,7 +109,7 @@ def _create_properties(self): f.write(f"nifi.c2.rest.url.ack=https://minifi-c2-server-{self.feature_context.id}:10090/c2/config/acknowledge\n") f.write("nifi.c2.rest.ssl.context.service=SSLContextService\n") f.write(f"nifi.c2.flow.base.url=https://minifi-c2-server-{self.feature_context.id}:10090/c2/config/\n") - f.write("nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation\n") + f.write("nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation,AssetInformation\n") f.write("nifi.c2.full.heartbeat=false\n") f.write("nifi.c2.agent.class=minifi-test-class\n") f.write("nifi.c2.agent.identifier=minifi-test-id\n") diff --git a/encrypt-config/tests/resources/minifi.properties b/encrypt-config/tests/resources/minifi.properties index 9bac06775a..2f2db68bb7 100644 --- a/encrypt-config/tests/resources/minifi.properties +++ b/encrypt-config/tests/resources/minifi.properties @@ -55,7 +55,7 @@ nifi.c2.enable=true nifi.c2.flow.base.url=http://localhost:10080/c2-server/api nifi.c2.rest.url=http://localhost:10080/c2-server/api/c2-protocol/heartbeat nifi.c2.rest.url.ack=http://localhost:10080/c2-server/api/c2-protocol/acknowledge -nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation +nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation,AssetInformation ## Minimize heartbeat payload size by excluding agent manifest from the heartbeat #nifi.c2.full.heartbeat=false ## heartbeat 4 times a second diff --git a/encrypt-config/tests/resources/with-additional-sensitive-props.minifi.properties b/encrypt-config/tests/resources/with-additional-sensitive-props.minifi.properties index aff36a0656..d2702c34df 100644 --- a/encrypt-config/tests/resources/with-additional-sensitive-props.minifi.properties +++ b/encrypt-config/tests/resources/with-additional-sensitive-props.minifi.properties @@ -57,7 +57,7 @@ nifi.c2.enable=true nifi.c2.flow.base.url=http://localhost:10080/c2-server/api nifi.c2.rest.url=http://localhost:10080/c2-server/api/c2-protocol/heartbeat nifi.c2.rest.url.ack=http://localhost:10080/c2-server/api/c2-protocol/acknowledge -nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation +nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation,AssetInformation ## Minimize heartbeat payload size by excluding agent manifest from the heartbeat #nifi.c2.full.heartbeat=false ## heartbeat 4 times a second diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h index 6f14f04463..6c8308c209 100644 --- a/libminifi/include/FlowController.h +++ b/libminifi/include/FlowController.h @@ -55,6 +55,7 @@ #include "TimerDrivenSchedulingAgent.h" #include "utils/Id.h" #include "utils/file/FileSystem.h" +#include "utils/file/AssetManager.h" #include "core/state/nodes/ResponseNodeLoader.h" #include "core/state/MetricsPublisher.h" #include "core/state/MetricsPublisherStore.h" @@ -72,7 +73,8 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi FlowController(std::shared_ptr provenance_repo, std::shared_ptr flow_file_repo, std::shared_ptr configure, std::shared_ptr flow_configuration, std::shared_ptr content_repo, std::unique_ptr metrics_publisher_store = nullptr, - std::shared_ptr filesystem = std::make_shared(), std::function request_restart = []{}); + std::shared_ptr filesystem = std::make_shared(), std::function request_restart = []{}, + std::shared_ptr asset_manager = {}); ~FlowController() override; diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h index 63829fd2c3..2e7412f739 100644 --- a/libminifi/include/c2/C2Agent.h +++ b/libminifi/include/c2/C2Agent.h @@ -43,6 +43,7 @@ #include "utils/ThreadPool.h" #include "utils/file/FileSystem.h" #include "C2Utils.h" +#include "utils/file/AssetManager.h" namespace org::apache::nifi::minifi::c2 { @@ -62,7 +63,8 @@ class C2Agent : public state::UpdateController { C2Agent(std::shared_ptr configuration, std::weak_ptr node_reporter, std::shared_ptr filesystem, - std::function request_restart); + std::function request_restart, + std::shared_ptr asset_manager); void initialize(core::controller::ControllerServiceProvider *controller, state::Pausable *pause_handler, state::StateMonitor* update_sink); void start() override; @@ -131,6 +133,8 @@ class C2Agent : public state::UpdateController { */ void handle_describe(const C2ContentResponse &resp); + void handle_sync(const C2ContentResponse &resp); + enum class UpdateResult { NO_UPDATE, @@ -235,6 +239,8 @@ class C2Agent : public state::UpdateController { // time point the last time we performed a heartbeat. std::chrono::steady_clock::time_point last_run_; + + std::shared_ptr asset_manager_; }; } // namespace org::apache::nifi::minifi::c2 diff --git a/libminifi/include/c2/C2Payload.h b/libminifi/include/c2/C2Payload.h index 9ff1aaaa7d..57715a0490 100644 --- a/libminifi/include/c2/C2Payload.h +++ b/libminifi/include/c2/C2Payload.h @@ -29,6 +29,7 @@ #include "utils/Enum.h" #include "utils/gsl.h" #include "utils/span.h" +#include "rapidjson/document.h" namespace org::apache::nifi::minifi::c2 { @@ -43,7 +44,8 @@ enum class Operation : uint8_t { clear, transfer, pause, - resume + resume, + sync }; enum class DescribeOperand : uint8_t { @@ -70,6 +72,10 @@ enum class ClearOperand : uint8_t{ corecomponentstate }; +enum class SyncOperand : uint8_t{ + resource +}; + #define PAYLOAD_NO_STATUS 0 #define PAYLOAD_SUCCESS 1 #define PAYLOAD_FAILURE 2 @@ -79,21 +85,65 @@ enum Direction { RECEIVE }; -struct AnnotatedValue : state::response::ValueNode { - using state::response::ValueNode::ValueNode; - using state::response::ValueNode::operator=; +class C2Value { + public: + friend std::ostream& operator<<(std::ostream& out, const C2Value& val); + + C2Value() = default; + C2Value(const C2Value& other) { + (*this) = other; + } + C2Value(C2Value&&) = default; + template + requires(std::constructible_from) + C2Value(T&& value) { value_ = state::response::ValueNode{std::forward(value)}; } // NOLINT(runtime/explicit) + C2Value(const rapidjson::Value& json_value) { // NOLINT(runtime/explicit) + value_.emplace(); + get(value_).CopyFrom(json_value, get(value_).GetAllocator()); + } + C2Value(rapidjson::Document&& json_doc) { // NOLINT(runtime/explicit) + value_ = std::move(json_doc); + } - [[nodiscard]] std::optional> getAnnotation(const std::string& name) const { - auto it = annotations.find(name); - if (it == annotations.end()) { - return {}; + C2Value& operator=(const C2Value& other) { + if (auto* other_val_node = get_if(&other.value_)) { + value_ = *other_val_node; + } else { + value_.emplace(); + get(value_).CopyFrom(get(other.value_), get(value_).GetAllocator()); } - return std::cref(it->second); + return *this; + } + + C2Value& operator=(C2Value&&) = default; + + + bool empty() const { + if (auto* val_node = get_if(&value_)) { + return val_node->empty(); + } + return false; + } + + std::string to_string() const { + if (auto* val_node = get_if(&value_)) { + return val_node->to_string(); + } + return std::string{get(value_).GetString(), get(value_).GetStringLength()}; + } + + const rapidjson::Document* json() const { + return get_if(&value_); + } + + const state::response::ValueNode* valueNode() const { + return get_if(&value_); } - friend std::ostream& operator<<(std::ostream& out, const AnnotatedValue& val); + bool operator==(const C2Value&) const = default; - std::map annotations; + private: + std::variant value_; }; struct C2ContentResponse { @@ -115,7 +165,7 @@ struct C2ContentResponse { friend std::ostream& operator<<(std::ostream& out, const C2ContentResponse& response); - std::optional getArgument(const std::string& arg_name) const { + std::optional getStringArgument(const std::string& arg_name) const { if (auto it = operation_arguments.find(arg_name); it != operation_arguments.end()) { return it->second.to_string(); } @@ -134,7 +184,7 @@ struct C2ContentResponse { // name applied to commands std::string name; // commands that correspond with the operation. - std::map operation_arguments; + std::map operation_arguments; }; /** diff --git a/libminifi/include/c2/PayloadParser.h b/libminifi/include/c2/PayloadParser.h index d6a97642bc..01b70a58ed 100644 --- a/libminifi/include/c2/PayloadParser.h +++ b/libminifi/include/c2/PayloadParser.h @@ -138,27 +138,20 @@ class PayloadParser { } template - inline T getAs(const std::string &field) { + inline T getAs(const std::string &field, const std::optional& fallback = std::nullopt) { for (const auto &cmd : ref_.getContent()) { - auto exists = cmd.operation_arguments.find(field); - if (exists != cmd.operation_arguments.end()) { - return convert_if(exists->second.getValue())(); + if (auto it = cmd.operation_arguments.find(field); it != cmd.operation_arguments.end()) { + if (auto* val_node = it->second.valueNode()) { + return convert_if(val_node->getValue())(); + } } } - std::stringstream ss; - ss << "Invalid Field. Could not find " << field << " in " << ref_.getLabel(); - throw PayloadParseException(ss.str()); - } - - template - inline T getAs(const std::string &field, const T &fallback) { - for (const auto &cmd : ref_.getContent()) { - auto exists = cmd.operation_arguments.find(field); - if (exists != cmd.operation_arguments.end()) { - return convert_if(exists->second.getValue())(); - } + if (!fallback) { + std::stringstream ss; + ss << "Invalid Field. Could not find " << field << " in " << ref_.getLabel(); + throw PayloadParseException(ss.str()); } - return fallback; + return fallback.value(); } size_t getSize() const { diff --git a/libminifi/include/c2/PayloadSerializer.h b/libminifi/include/c2/PayloadSerializer.h index 17de977927..92fa793899 100644 --- a/libminifi/include/c2/PayloadSerializer.h +++ b/libminifi/include/c2/PayloadSerializer.h @@ -42,7 +42,7 @@ class PayloadSerializer { /** * Static function that serializes the value nodes */ - static void serializeValueNode(state::response::ValueNode &value, std::shared_ptr stream) { + static void serializeValueNode(const state::response::ValueNode &value, std::shared_ptr stream) { auto base_type = value.getValue(); if (!base_type) { uint8_t type = 0; @@ -95,7 +95,7 @@ class PayloadSerializer { stream->write(size); for (auto content : payload_content.operation_arguments) { stream->write(content.first); - serializeValueNode(content.second, stream); + serializeValueNode(*gsl::not_null(content.second.valueNode()), stream); } } if (nested_payload.getNestedPayloads().size() > 0) { @@ -170,7 +170,7 @@ class PayloadSerializer { stream->write(size); for (auto content : payload_content.operation_arguments) { stream->write(content.first); - serializeValueNode(content.second, stream); + serializeValueNode(*gsl::not_null(content.second.valueNode()), stream); } } serialize(op, payload, stream); diff --git a/libminifi/include/c2/protocols/RESTProtocol.h b/libminifi/include/c2/protocols/RESTProtocol.h index 36c7412788..f3ed0ebce7 100644 --- a/libminifi/include/c2/protocols/RESTProtocol.h +++ b/libminifi/include/c2/protocols/RESTProtocol.h @@ -43,7 +43,7 @@ class RESTProtocol : public HeartbeatJsonSerializer { protected: void initialize(core::controller::ControllerServiceProvider* controller, const std::shared_ptr &configure); void serializeNestedPayload(rapidjson::Value& target, const C2Payload& payload, rapidjson::Document::AllocatorType& alloc) override; - static C2Payload parseJsonResponse(const C2Payload &payload, std::span response); + C2Payload parseJsonResponse(const C2Payload &payload, std::span response); private: bool containsPayload(const C2Payload &o); diff --git a/libminifi/include/core/state/MetricsPublisherStore.h b/libminifi/include/core/state/MetricsPublisherStore.h index 544e824455..4d827bc00a 100644 --- a/libminifi/include/core/state/MetricsPublisherStore.h +++ b/libminifi/include/core/state/MetricsPublisherStore.h @@ -27,13 +27,14 @@ #include "core/state/nodes/ResponseNodeLoader.h" #include "utils/gsl.h" #include "core/ProcessGroup.h" +#include "utils/file/AssetManager.h" namespace org::apache::nifi::minifi::state { class MetricsPublisherStore { public: MetricsPublisherStore(std::shared_ptr configuration, const std::vector>& repository_metric_sources, - std::shared_ptr flow_configuration); + std::shared_ptr flow_configuration, std::shared_ptr asset_manager = nullptr); void initialize(core::controller::ControllerServiceProvider* controller, state::StateMonitor* update_sink); void loadMetricNodes(core::ProcessGroup* root); void clearMetricNodes(); diff --git a/libminifi/include/core/state/nodes/AssetInformation.h b/libminifi/include/core/state/nodes/AssetInformation.h new file mode 100644 index 0000000000..39fd578fa9 --- /dev/null +++ b/libminifi/include/core/state/nodes/AssetInformation.h @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "core/state/nodes/MetricsBase.h" +#include "utils/file/AssetManager.h" +#include "core/logging/Logger.h" + +namespace org::apache::nifi::minifi::state::response { + +class AssetInformation : public ResponseNode { + public: + AssetInformation(); + explicit AssetInformation(std::string_view name, const utils::Identifier& uuid = {}) : ResponseNode(name, uuid) {} + + MINIFIAPI static constexpr const char* Description = "Metric node that defines hash for all asset identifiers"; + + void setAssetManager(std::shared_ptr asset_manager); + + std::string getName() const override { return "resourceInfo"; } + std::vector serialize() override; + + private: + std::shared_ptr asset_manager_; + std::shared_ptr logger_; +}; + +} // namespace org::apache::nifi::minifi::state::response diff --git a/libminifi/include/core/state/nodes/ResponseNodeLoader.h b/libminifi/include/core/state/nodes/ResponseNodeLoader.h index af330585c4..e59aae372d 100644 --- a/libminifi/include/core/state/nodes/ResponseNodeLoader.h +++ b/libminifi/include/core/state/nodes/ResponseNodeLoader.h @@ -34,13 +34,14 @@ #include "utils/Id.h" #include "utils/expected.h" #include "core/RepositoryMetricsSource.h" +#include "utils/file/AssetManager.h" namespace org::apache::nifi::minifi::state::response { class ResponseNodeLoader { public: ResponseNodeLoader(std::shared_ptr configuration, std::vector> repository_metric_sources, - std::shared_ptr flow_configuration); + std::shared_ptr flow_configuration, std::shared_ptr asset_manager = nullptr); void setNewConfigRoot(core::ProcessGroup* root); void clearConfigRoot(); @@ -62,6 +63,7 @@ class ResponseNodeLoader { void initializeAgentStatus(const SharedResponseNode& response_node) const; void initializeConfigurationChecksums(const SharedResponseNode& response_node) const; void initializeFlowMonitor(const SharedResponseNode& response_node) const; + void initializeAssetInformation(const SharedResponseNode& response_node) const; std::vector getMatchingComponentMetricsNodes(const std::string& regex_str) const; mutable std::mutex root_mutex_; @@ -73,6 +75,7 @@ class ResponseNodeLoader { std::shared_ptr configuration_; std::vector> repository_metric_sources_; std::shared_ptr flow_configuration_; + std::shared_ptr asset_manager_; core::controller::ControllerServiceProvider* controller_{}; state::StateMonitor* update_sink_{}; std::shared_ptr logger_{core::logging::LoggerFactory::getLogger()}; diff --git a/libminifi/include/properties/Configuration.h b/libminifi/include/properties/Configuration.h index 2d9300636a..cea438ea92 100644 --- a/libminifi/include/properties/Configuration.h +++ b/libminifi/include/properties/Configuration.h @@ -118,6 +118,7 @@ class Configuration : public Properties { static constexpr const char *nifi_c2_root_class_definitions = "nifi.c2.root.class.definitions"; static constexpr const char *nifi_c2_rest_listener_port = "nifi.c2.rest.listener.port"; static constexpr const char *nifi_c2_rest_listener_cacert = "nifi.c2.rest.listener.cacert"; + static constexpr const char *nifi_c2_rest_path_base = "nifi.c2.rest.path.base"; static constexpr const char *nifi_c2_rest_url = "nifi.c2.rest.url"; static constexpr const char *nifi_c2_rest_url_ack = "nifi.c2.rest.url.ack"; static constexpr const char *nifi_c2_rest_ssl_context_service = "nifi.c2.rest.ssl.context.service"; diff --git a/libminifi/include/utils/file/AssetManager.h b/libminifi/include/utils/file/AssetManager.h new file mode 100644 index 0000000000..93424a1452 --- /dev/null +++ b/libminifi/include/utils/file/AssetManager.h @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include "core/logging/Logger.h" +#include "utils/expected.h" +#include "properties/Configure.h" + +namespace org::apache::nifi::minifi::utils::file { + +struct AssetDescription { + std::string id; + std::filesystem::path path; + std::string url; + + bool operator<(const AssetDescription& other) const { + return id < other.id; + } +}; + +struct AssetLayout { + std::string digest; + std::set assets; + + void clear() { + digest.clear(); + assets.clear(); + } +}; + +class AssetManager { + public: + explicit AssetManager(const Configure& configuration); + + nonstd::expected sync(const AssetLayout& layout, const std::function, std::string>(std::string_view /*url*/)>& fetch); + + std::string hash() const; + + std::filesystem::path getRoot() const; + + private: + void refreshState(); + + void persist() const; + + mutable std::recursive_mutex mtx_; + std::filesystem::path root_; + AssetLayout state_; + std::shared_ptr logger_; +}; + +} // namespace org::apache::nifi::minifi::utils::file diff --git a/libminifi/include/utils/file/PathUtils.h b/libminifi/include/utils/file/PathUtils.h index 1df2e9d891..04886c1a67 100644 --- a/libminifi/include/utils/file/PathUtils.h +++ b/libminifi/include/utils/file/PathUtils.h @@ -25,6 +25,7 @@ #include #include #include +#include "utils/expected.h" namespace org::apache::nifi::minifi::utils::file { @@ -42,6 +43,27 @@ inline std::optional canonicalize(const std::filesystem:: return result; } +inline nonstd::expected validateRelativeFilePath(const std::filesystem::path& path) { + if (path.empty()) { + return nonstd::make_unexpected("Empty file path"); + } + if (!path.is_relative()) { + return nonstd::make_unexpected("File path must be a relative path '" + path.string() + "'"); + } + if (!path.has_filename()) { + return nonstd::make_unexpected("Filename missing in output path '" + path.string() + "'"); + } + if (path.filename() == "." || path.filename() == "..") { + return nonstd::make_unexpected("Invalid filename '" + path.filename().string() + "'"); + } + for (const auto& segment : path) { + if (segment == "..") { + return nonstd::make_unexpected("Accessing parent directory is forbidden in file path '" + path.string() + "'"); + } + } + return {}; +} + /** * Represents filesystem space information in bytes diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp index 024393e57f..116746c9c5 100644 --- a/libminifi/src/Configuration.cpp +++ b/libminifi/src/Configuration.cpp @@ -88,6 +88,7 @@ const std::unordered_map provenance_repo, std::shared_ptr flow_file_repo, std::shared_ptr configure, std::shared_ptr flow_configuration, std::shared_ptr content_repo, std::unique_ptr metrics_publisher_store, - std::shared_ptr filesystem, std::function request_restart) + std::shared_ptr filesystem, std::function request_restart, + std::shared_ptr asset_manager) : core::controller::ForwardingControllerServiceProvider(core::className()), running_(false), initialized_(false), @@ -82,7 +83,7 @@ FlowController::FlowController(std::shared_ptr provenance_repo if (auto publisher = metrics_publisher_store_->getMetricsPublisher(c2::C2_METRICS_PUBLISHER).lock()) { c2_metrics_publisher = std::dynamic_pointer_cast(publisher); } - c2_agent_ = std::make_unique(configuration_, c2_metrics_publisher, std::move(filesystem), std::move(request_restart)); + c2_agent_ = std::make_unique(configuration_, c2_metrics_publisher, std::move(filesystem), std::move(request_restart), std::move(asset_manager)); } if (c2::isControllerSocketEnabled(configuration_)) { diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp index 5f1097fb43..10d76cf1c4 100644 --- a/libminifi/src/c2/C2Agent.cpp +++ b/libminifi/src/c2/C2Agent.cpp @@ -38,6 +38,7 @@ #include "utils/file/FileManager.h" #include "utils/file/FileSystem.h" #include "http/BaseHTTPClient.h" +#include "utils/file/PathUtils.h" #include "utils/Environment.h" #include "utils/Monitors.h" #include "utils/StringUtils.h" @@ -46,6 +47,7 @@ #include "utils/Id.h" #include "c2/C2Utils.h" #include "c2/protocols/RESTSender.h" +#include "rapidjson/error/en.h" using namespace std::literals::chrono_literals; @@ -54,7 +56,8 @@ namespace org::apache::nifi::minifi::c2 { C2Agent::C2Agent(std::shared_ptr configuration, std::weak_ptr node_reporter, std::shared_ptr filesystem, - std::function request_restart) + std::function request_restart, + std::shared_ptr asset_manager) : heart_beat_period_(3s), max_c2_responses(5), configuration_(std::move(configuration)), @@ -62,7 +65,8 @@ C2Agent::C2Agent(std::shared_ptr configuration, filesystem_(std::move(filesystem)), thread_pool_(2, nullptr, "C2 threadpool"), request_restart_(std::move(request_restart)), - last_run_(std::chrono::steady_clock::now()) { + last_run_(std::chrono::steady_clock::now()), + asset_manager_(std::move(asset_manager)) { if (!configuration_->getAgentClass()) { logger_->log_info("Agent class is not predefined"); } @@ -381,6 +385,9 @@ void C2Agent::handle_c2_server_response(const C2ContentResponse &resp) { } break; } + case Operation::sync: + handle_sync(resp); + break; default: break; // do nothing @@ -608,12 +615,18 @@ void C2Agent::handlePropertyUpdate(const C2ContentResponse &resp) { }; for (const auto& [name, value] : resp.operation_arguments) { - bool persist = ( - value.getAnnotation("persist") - | utils::transform(&AnnotatedValue::to_string) - | utils::andThen(utils::string::toBool)).value_or(true); - PropertyChangeLifetime lifetime = persist ? PropertyChangeLifetime::PERSISTENT : PropertyChangeLifetime::TRANSIENT; - changeUpdateState(update_property(name, value.to_string(), lifetime)); + if (auto* json_val = value.json()) { + if (json_val->IsObject() && json_val->HasMember("value")) { + PropertyChangeLifetime lifetime = PropertyChangeLifetime::PERSISTENT; + if (json_val->HasMember("persist")) { + lifetime = (*json_val)["persist"].GetBool() ? PropertyChangeLifetime::PERSISTENT : PropertyChangeLifetime::TRANSIENT; + } + std::string property_value{(*json_val)["value"].GetString(), (*json_val)["value"].GetStringLength()}; + changeUpdateState(update_property(name, property_value, lifetime)); + continue; + } + } + changeUpdateState(update_property(name, value.to_string(), PropertyChangeLifetime::PERSISTENT)); } // apply changes and persist properties requested to be persisted const bool propertyWasUpdated = result == state::UpdateState::FULLY_APPLIED || result == state::UpdateState::PARTIALLY_APPLIED; @@ -700,6 +713,158 @@ void C2Agent::handle_transfer(const C2ContentResponse &resp) { } } +void C2Agent::handle_sync(const org::apache::nifi::minifi::c2::C2ContentResponse &resp) { + auto send_error = [&] (std::string_view error) { + logger_->log_error("{}", error); + C2Payload response(Operation::acknowledge, state::UpdateState::SET_ERROR, resp.ident, true); + response.setRawData(as_bytes(std::span(error.begin(), error.end()))); + enqueue_c2_response(std::move(response)); + }; + + if (!asset_manager_) { + send_error("Internal error: no asset manager"); + return; + } + + SyncOperand operand = SyncOperand::resource; + try { + operand = utils::enumCast(resp.name, true); + } catch(const std::runtime_error&) { + send_error("Unknown operand '" + resp.name + "'"); + return; + } + + gsl_Assert(operand == SyncOperand::resource); + + utils::file::AssetLayout asset_layout; + + auto state_it = resp.operation_arguments.find("globalHash"); + if (state_it == resp.operation_arguments.end()) { + send_error("Malformed request, missing 'globalHash' argument"); + return; + } + + const rapidjson::Document* state_doc = state_it->second.json(); + if (!state_doc) { + send_error("Argument 'globalHash' is malformed"); + return; + } + + if (!state_doc->IsObject()) { + send_error("Malformed request, 'globalHash' is not an object"); + return; + } + + if (!state_doc->HasMember("digest")) { + send_error("Malformed request, 'globalHash' has no member 'digest'"); + return; + } + if (!(*state_doc)["digest"].IsString()) { + send_error("Malformed request, 'globalHash.digest' is not a string"); + return; + } + + asset_layout.digest = std::string{(*state_doc)["digest"].GetString(), (*state_doc)["digest"].GetStringLength()}; + + auto resource_list_it = resp.operation_arguments.find("resourceList"); + if (resource_list_it == resp.operation_arguments.end()) { + send_error("Malformed request, missing 'resourceList' argument"); + return; + } + + const rapidjson::Document* resource_list = resource_list_it->second.json(); + if (!resource_list) { + send_error("Argument 'resourceList' is malformed"); + return; + } + if (!resource_list->IsArray()) { + send_error("Malformed request, 'resourceList' is not an array"); + return; + } + + for (size_t resource_idx = 0; resource_idx < resource_list->Size(); ++resource_idx) { + auto& resource = resource_list->GetArray()[resource_idx]; + if (!resource.IsObject()) { + send_error(fmt::format("Malformed request, 'resourceList[{}]' is not an object", resource_idx)); + return; + } + auto get_member_str = [&] (const char* key) -> nonstd::expected { + if (!resource.HasMember(key)) { + return nonstd::make_unexpected(fmt::format("Malformed request, 'resourceList[{}]' has no member '{}'", resource_idx, key)); + } + if (!resource[key].IsString()) { + return nonstd::make_unexpected(fmt::format("Malformed request, 'resourceList[{}].{}' is not a string", resource_idx, key)); + } + return std::string_view{resource[key].GetString(), resource[key].GetStringLength()}; + }; + auto id = get_member_str("resourceId"); + if (!id) { + send_error(id.error()); + return; + } + auto name = get_member_str("resourceName"); + if (!name) { + send_error(name.error()); + return; + } + auto type = get_member_str("resourceType"); + if (!type) { + send_error(type.error()); + return; + } + if (type.value() != "ASSET") { + continue; + } + auto path = get_member_str("resourcePath"); + if (!path) { + send_error(path.error()); + return; + } + auto url = get_member_str("url"); + if (!url) { + send_error(url.error()); + return; + } + + auto full_path = std::filesystem::path{path.value()} / name.value(); // NOLINT(whitespace/braces) + + auto path_valid = utils::file::validateRelativeFilePath(full_path); + if (!path_valid) { + send_error(path_valid.error()); + return; + } + + asset_layout.assets.insert(utils::file::AssetDescription{ + .id = std::string{id.value()}, + .path = full_path, + .url = std::string{url.value()} + }); + } + + auto fetch = [&] (std::string_view url) -> nonstd::expected, std::string> { + auto resolved_url = resolveUrl(std::string{url}); + if (!resolved_url) { + return nonstd::make_unexpected("Couldn't resolve url"); + } + C2Payload file_response = protocol_->fetch(resolved_url.value()); + + if (file_response.getStatus().getState() != state::UpdateState::READ_COMPLETE) { + return nonstd::make_unexpected("Failed to fetch file from " + resolved_url.value()); + } + + return std::move(file_response).moveRawData(); + }; + + auto result = asset_manager_->sync(asset_layout, fetch); + if (!result) { + send_error(result.error()); + return; + } + + C2Payload response(Operation::acknowledge, state::UpdateState::FULLY_APPLIED, resp.ident, true); + enqueue_c2_response(std::move(response)); +} + utils::TaskRescheduleInfo C2Agent::produce() { // place priority on messages to send to the c2 server if (protocol_ != nullptr) { @@ -789,6 +954,9 @@ std::optional C2Agent::resolveUrl(const std::string& url) const { return url; } std::string base; + if (configuration_->get(Configuration::nifi_c2_rest_path_base, base)) { + return base + url; + } if (!configuration_->get(Configuration::nifi_c2_rest_url, "c2.rest.url", base)) { logger_->log_error("Missing C2 REST URL"); return std::nullopt; @@ -891,27 +1059,6 @@ static auto make_path(const std::string& str) { return std::filesystem::path(str); } -static std::optional validateFilePath(const std::filesystem::path& path) { - if (path.empty()) { - return "Empty file path"; - } - if (!path.is_relative()) { - return "File path must be a relative path '" + path.string() + "'"; - } - if (!path.has_filename()) { - return "Filename missing in output path '" + path.string() + "'"; - } - if (path.filename() == "." || path.filename() == "..") { - return "Invalid filename '" + path.filename().string() + "'"; - } - for (const auto& segment : path) { - if (segment == "..") { - return "Accessing parent directory is forbidden in file path '" + path.string() + "'"; - } - } - return std::nullopt; -} - void C2Agent::handleAssetUpdate(const C2ContentResponse& resp) { auto send_error = [&] (std::string_view error) { logger_->log_error("{}", error); @@ -919,19 +1066,16 @@ void C2Agent::handleAssetUpdate(const C2ContentResponse& resp) { response.setRawData(as_bytes(std::span(error.begin(), error.end()))); enqueue_c2_response(std::move(response)); }; - std::filesystem::path asset_dir = configuration_->getHome() / "asset"; - if (auto asset_dir_str = configuration_->get(Configuration::nifi_asset_directory)) { - asset_dir = asset_dir_str.value(); - } // output file std::filesystem::path file_path; - if (auto file_rel = resp.getArgument("file") | utils::transform(make_path)) { - if (auto error = validateFilePath(file_rel.value())) { - send_error(error.value()); + if (auto file_rel = resp.getStringArgument("file") | utils::transform(make_path)) { + auto result = utils::file::validateRelativeFilePath(file_rel.value()); + if (!result) { + send_error(result.error()); return; } - file_path = asset_dir / file_rel.value(); + file_path = asset_manager_->getRoot() / file_rel.value(); } else { send_error("Couldn't find 'file' argument"); return; @@ -939,7 +1083,7 @@ void C2Agent::handleAssetUpdate(const C2ContentResponse& resp) { // source url std::string url; - if (auto url_str = resp.getArgument("url")) { + if (auto url_str = resp.getStringArgument("url")) { if (auto resolved_url = resolveUrl(*url_str)) { url = resolved_url.value(); } else { @@ -953,7 +1097,7 @@ void C2Agent::handleAssetUpdate(const C2ContentResponse& resp) { // forceDownload bool force_download = false; - if (auto force_download_str = resp.getArgument("forceDownload")) { + if (auto force_download_str = resp.getStringArgument("forceDownload")) { if (utils::string::equalsIgnoreCase(force_download_str.value(), "true")) { force_download = true; } else if (utils::string::equalsIgnoreCase(force_download_str.value(), "false")) { diff --git a/libminifi/src/c2/C2Payload.cpp b/libminifi/src/c2/C2Payload.cpp index e70413cd54..f21f60437b 100644 --- a/libminifi/src/c2/C2Payload.cpp +++ b/libminifi/src/c2/C2Payload.cpp @@ -131,14 +131,14 @@ std::ostream& operator<<(std::ostream& out, const C2ContentResponse& response) { << "}"; } -std::ostream& operator<<(std::ostream& out, const AnnotatedValue& val) { - if (val.value_) { - out << '"' << val.value_->c_str() << '"'; +std::ostream& operator<<(std::ostream& out, const C2Value& val) { + if (auto* val_ptr = val.valueNode()) { + out << '"' << val_ptr->to_string() << '"'; } else { - out << ""; - } - if (!val.annotations.empty()) { - out << val.annotations; + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + gsl::not_null(val.json())->Accept(writer); + out << std::string_view{buffer.GetString(), buffer.GetLength()}; } return out; } diff --git a/libminifi/src/c2/HeartbeatJsonSerializer.cpp b/libminifi/src/c2/HeartbeatJsonSerializer.cpp index d15ecf71a9..ba31555d5e 100644 --- a/libminifi/src/c2/HeartbeatJsonSerializer.cpp +++ b/libminifi/src/c2/HeartbeatJsonSerializer.cpp @@ -60,9 +60,16 @@ static void serializeOperationInfo(rapidjson::Value& target, const C2Payload& pa target.AddMember("identifier", rapidjson::Value(id.c_str(), alloc), alloc); } -static void setJsonStr(const std::string& key, const state::response::ValueNode& value, rapidjson::Value& parent, rapidjson::Document::AllocatorType& alloc) { // NOLINT +static void setJsonStr(const std::string& key, const c2::C2Value& value, rapidjson::Value& parent, rapidjson::Document::AllocatorType& alloc) { rapidjson::Value valueVal; - auto base_type = value.getValue(); + + if (auto* json_val = value.json()) { + valueVal.CopyFrom(*json_val, alloc); + parent.AddMember(rapidjson::Value(key.c_str(), alloc), valueVal, alloc); + return; + } + + auto base_type = gsl::not_null(value.valueNode())->getValue(); auto type_index = base_type->getTypeIndex(); if (auto sub_type = std::dynamic_pointer_cast(base_type)) { @@ -156,7 +163,7 @@ static rapidjson::Value serializeConnectionQueues(const C2Payload& payload, std: updatedContent.name = uuid; adjusted.setLabel(uuid); adjusted.setIdentifier(uuid); - c2::AnnotatedValue nd; + c2::C2Value nd; // name should be what was previously the TLN ( top level node ) nd = name; updatedContent.operation_arguments.insert(std::make_pair("name", nd)); diff --git a/libminifi/src/c2/protocols/RESTProtocol.cpp b/libminifi/src/c2/protocols/RESTProtocol.cpp index ca4f3faeab..57b42cfcb2 100644 --- a/libminifi/src/c2/protocols/RESTProtocol.cpp +++ b/libminifi/src/c2/protocols/RESTProtocol.cpp @@ -28,6 +28,7 @@ #include #include +#include "rapidjson/error/en.h" #include "core/TypedValues.h" #include "utils/gsl.h" #include "properties/Configuration.h" @@ -36,25 +37,6 @@ namespace org::apache::nifi::minifi::c2 { - -AnnotatedValue parseAnnotatedValue(const rapidjson::Value& jsonValue) { - AnnotatedValue result; - if (jsonValue.IsObject() && jsonValue.HasMember("value")) { - result = jsonValue["value"].GetString(); - for (const auto& annotation : jsonValue.GetObject()) { - if (annotation.name.GetString() == std::string("value")) { - continue; - } - result.annotations[annotation.name.GetString()] = parseAnnotatedValue(annotation.value); - } - } else if (jsonValue.IsBool()) { - result = jsonValue.GetBool(); - } else { - result = jsonValue.GetString(); - } - return result; -} - C2Payload RESTProtocol::parseJsonResponse(const C2Payload &payload, std::span response) { rapidjson::Document root; @@ -123,7 +105,7 @@ C2Payload RESTProtocol::parseJsonResponse(const C2Payload &payload, std::spanlog_error("Failed to parse json response: {} at {}", rapidjson::GetParseError_En(ok.Code()), ok.Offset()); } } catch (...) { } diff --git a/libminifi/src/c2/protocols/RESTSender.cpp b/libminifi/src/c2/protocols/RESTSender.cpp index 98642b009f..659ed51db0 100644 --- a/libminifi/src/c2/protocols/RESTSender.cpp +++ b/libminifi/src/c2/protocols/RESTSender.cpp @@ -41,10 +41,23 @@ void RESTSender::initialize(core::controller::ControllerServiceProvider* control RESTProtocol::initialize(controller, configure); // base URL when one is not specified. if (nullptr != configure) { + std::optional rest_base_path = configure->get(Configuration::nifi_c2_rest_path_base); std::string update_str; std::string ssl_context_service_str; configure->get(Configuration::nifi_c2_rest_url, "c2.rest.url", rest_uri_); configure->get(Configuration::nifi_c2_rest_url_ack, "c2.rest.url.ack", ack_uri_); + if (rest_uri_.starts_with("/")) { + if (!rest_base_path) { + throw Exception(ExceptionType::GENERAL_EXCEPTION, "Cannot use relative nifi.c2.rest.url unless the nifi.c2.rest.path.base is set"); + } + rest_uri_ = rest_base_path.value() + rest_uri_; + } + if (ack_uri_.starts_with("/")) { + if (!rest_base_path) { + throw Exception(ExceptionType::GENERAL_EXCEPTION, "Cannot use relative nifi.c2.rest.url.ack unless the nifi.c2.rest.path.base is set"); + } + ack_uri_ = rest_base_path.value() + ack_uri_; + } if (controller && configure->get(Configuration::nifi_c2_rest_ssl_context_service, "c2.rest.ssl.context.service", ssl_context_service_str)) { if (auto service = controller->getControllerService(ssl_context_service_str)) { ssl_context_service_ = std::static_pointer_cast(service); diff --git a/libminifi/src/core/state/MetricsPublisherStore.cpp b/libminifi/src/core/state/MetricsPublisherStore.cpp index 269cb6267a..d670c12bc1 100644 --- a/libminifi/src/core/state/MetricsPublisherStore.cpp +++ b/libminifi/src/core/state/MetricsPublisherStore.cpp @@ -23,9 +23,9 @@ namespace org::apache::nifi::minifi::state { MetricsPublisherStore::MetricsPublisherStore(std::shared_ptr configuration, const std::vector>& repository_metric_sources, - std::shared_ptr flow_configuration) + std::shared_ptr flow_configuration, std::shared_ptr asset_manager) : configuration_(configuration), - response_node_loader_(std::make_shared(std::move(configuration), repository_metric_sources, std::move(flow_configuration))) { + response_node_loader_(std::make_shared(std::move(configuration), repository_metric_sources, std::move(flow_configuration), std::move(asset_manager))) { } void MetricsPublisherStore::initialize(core::controller::ControllerServiceProvider* controller, state::StateMonitor* update_sink) { diff --git a/libminifi/src/core/state/nodes/AssetInformation.cpp b/libminifi/src/core/state/nodes/AssetInformation.cpp new file mode 100644 index 0000000000..717be7ee06 --- /dev/null +++ b/libminifi/src/core/state/nodes/AssetInformation.cpp @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "core/state/nodes/AssetInformation.h" +#include "core/Resource.h" +#include "core/logging/LoggerFactory.h" + +namespace org::apache::nifi::minifi::state::response { + +AssetInformation::AssetInformation() + : logger_(core::logging::LoggerFactory().getLogger()) {} + +void AssetInformation::setAssetManager(std::shared_ptr asset_manager) { + asset_manager_ = std::move(asset_manager); + if (!asset_manager_) { + logger_->log_error("No asset manager is provided, asset information will not be available"); + } +} + +std::vector AssetInformation::serialize() { + if (!asset_manager_) { + return {}; + } + SerializedResponseNode node; + node.name = "hash"; + node.value = asset_manager_->hash(); + + return std::vector{node}; +} + +REGISTER_RESOURCE(AssetInformation, DescriptionOnly); + +} // namespace org::apache::nifi::minifi::state::response diff --git a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp index 58ff6eceaa..061a96be89 100644 --- a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp +++ b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp @@ -25,6 +25,7 @@ #include "core/state/nodes/QueueMetrics.h" #include "core/state/nodes/AgentInformation.h" #include "core/state/nodes/ConfigurationChecksums.h" +#include "core/state/nodes/AssetInformation.h" #include "utils/gsl.h" #include "utils/RegexUtils.h" #include "utils/StringUtils.h" @@ -33,10 +34,11 @@ namespace org::apache::nifi::minifi::state::response { ResponseNodeLoader::ResponseNodeLoader(std::shared_ptr configuration, std::vector> repository_metric_sources, - std::shared_ptr flow_configuration) + std::shared_ptr flow_configuration, std::shared_ptr asset_manager) : configuration_(std::move(configuration)), repository_metric_sources_(std::move(repository_metric_sources)), - flow_configuration_(std::move(flow_configuration)) { + flow_configuration_(std::move(flow_configuration)), + asset_manager_(std::move(asset_manager)) { } void ResponseNodeLoader::clearConfigRoot() { @@ -194,6 +196,13 @@ void ResponseNodeLoader::initializeConfigurationChecksums(const SharedResponseNo } } +void ResponseNodeLoader::initializeAssetInformation(const SharedResponseNode& response_node) const { + auto asset_info = dynamic_cast(response_node.get()); + if (asset_info) { + asset_info->setAssetManager(asset_manager_); + } +} + void ResponseNodeLoader::initializeFlowMonitor(const SharedResponseNode& response_node) const { auto flowMonitor = dynamic_cast(response_node.get()); if (flowMonitor == nullptr) { @@ -231,6 +240,7 @@ std::vector ResponseNodeLoader::loadResponseNodes(const std: initializeAgentStatus(response_node); initializeConfigurationChecksums(response_node); initializeFlowMonitor(response_node); + initializeAssetInformation(response_node); } return response_nodes; } diff --git a/libminifi/src/core/state/nodes/SupportedOperations.cpp b/libminifi/src/core/state/nodes/SupportedOperations.cpp index b0681415fa..51f86c40b0 100644 --- a/libminifi/src/core/state/nodes/SupportedOperations.cpp +++ b/libminifi/src/core/state/nodes/SupportedOperations.cpp @@ -110,6 +110,10 @@ void SupportedOperations::fillProperties(SerializedResponseNode& properties, min } break; } + case minifi::c2::Operation::sync: { + serializeProperty(properties); + break; + } default: break; } diff --git a/libminifi/src/utils/file/AssetManager.cpp b/libminifi/src/utils/file/AssetManager.cpp new file mode 100644 index 0000000000..fd70cf197c --- /dev/null +++ b/libminifi/src/utils/file/AssetManager.cpp @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "utils/file/AssetManager.h" +#include "utils/file/FileUtils.h" +#include "rapidjson/document.h" +#include "rapidjson/writer.h" +#include "core/logging/LoggerFactory.h" +#include "utils/Hash.h" + +#undef GetObject // windows.h #defines GetObject = GetObjectA or GetObjectW, which conflicts with rapidjson + +namespace org::apache::nifi::minifi::utils::file { + +AssetManager::AssetManager(const Configure& configuration) + : root_(configuration.get(Configure::nifi_asset_directory).value_or((configuration.getHome() / "asset").string())), + logger_(core::logging::LoggerFactory::getLogger()) { + refreshState(); +} + +void AssetManager::refreshState() { + std::lock_guard lock(mtx_); + state_.clear(); + if (!utils::file::FileUtils::exists(root_)) { + std::filesystem::create_directories(root_); + } + if (!utils::file::FileUtils::exists(root_ / ".state")) { + std::ofstream{root_ / ".state", std::ios::binary} << R"({"digest": "", "assets": {}})"; + } + rapidjson::Document doc; + + std::string file_content = utils::file::get_content(root_ / ".state"); + + rapidjson::ParseResult res = doc.Parse(file_content.c_str(), file_content.size()); + if (!res) { + logger_->log_error("Failed to parse asset '.state' file, not a valid json file"); + return; + } + if (!doc.IsObject()) { + logger_->log_error("Asset '.state' file is malformed"); + return; + } + if (!doc.HasMember("digest")) { + logger_->log_error("Asset '.state' file is malformed, missing 'digest'"); + return; + } + if (!doc["digest"].IsString()) { + logger_->log_error("Asset '.state' file is malformed, 'digest' is not a string"); + return; + } + if (!doc.HasMember("assets")) { + logger_->log_error("Asset '.state' file is malformed, missing 'assets'"); + return; + } + if (!doc["assets"].IsObject()) { + logger_->log_error("Asset '.state' file is malformed, 'assets' is not an object"); + return; + } + + + AssetLayout new_state; + new_state.digest = std::string{doc["digest"].GetString(), doc["digest"].GetStringLength()}; + + for (auto& [id, entry] : doc["assets"].GetObject()) { + if (!entry.IsObject()) { + logger_->log_error("Asset '.state' file is malformed, 'assets.{}' is not an object", std::string_view{id.GetString(), id.GetStringLength()}); + return; + } + AssetDescription description; + description.id = std::string{id.GetString(), id.GetStringLength()}; + if (!entry.HasMember("path") || !entry["path"].IsString()) { + logger_->log_error("Asset '.state' file is malformed, 'assets.{}.path' does not exist or is not a string", std::string_view{id.GetString(), id.GetStringLength()}); + return; + } + description.path = std::string{entry["path"].GetString(), entry["path"].GetStringLength()}; + if (!entry.HasMember("url") || !entry["url"].IsString()) { + logger_->log_error("Asset '.state' file is malformed, 'assets.{}.url' does not exist or is not a string", std::string_view{id.GetString(), id.GetStringLength()}); + return; + } + description.url = std::string{entry["url"].GetString(), entry["url"].GetStringLength()}; + + if (utils::file::FileUtils::exists(root_ / description.id)) { + new_state.assets.insert(std::move(description)); + } else { + logger_->log_error("Asset '.state' file contains entry '{}' that does not exist on the filesystem at '{}'", + std::string_view{id.GetString(), id.GetStringLength()}, (root_ / description.id).string()); + } + } + state_ = std::move(new_state); +} + +std::string AssetManager::hash() const { + std::lock_guard lock(mtx_); + return state_.digest.empty() ? "null" : state_.digest; +} + +nonstd::expected AssetManager::sync( + const org::apache::nifi::minifi::utils::file::AssetLayout& layout, + const std::function, std::string>(std::string_view /*url*/)>& fetch) { + std::lock_guard lock(mtx_); + org::apache::nifi::minifi::utils::file::AssetLayout new_state{ + .digest = state_.digest, + .assets = {} + }; + std::string fetch_errors; + std::vector>> new_file_contents; + for (auto& new_entry : layout.assets) { + if (std::find_if(state_.assets.begin(), state_.assets.end(), [&] (auto& entry) {return entry.id == new_entry.id;}) == state_.assets.end()) { + logger_->log_debug("Fetching asset {} from {}", new_entry.id, new_entry.url); + if (auto data = fetch(new_entry.url)) { + new_file_contents.emplace_back(new_entry.path, data.value()); + new_state.assets.insert(new_entry); + } else { + logger_->log_error("Failed to fetch asset {} from {}: {}", new_entry.id, new_entry.url, data.error()); + fetch_errors += "Failed to fetch '" + new_entry.id + "' from '" + new_entry.url + "': " + data.error() + "\n"; + } + } else { + logger_->log_debug("Asset {} already exists", new_entry.id); + new_state.assets.insert(new_entry); + } + } + if (fetch_errors.empty()) { + new_state.digest = layout.digest; + } + + for (auto& old_entry : state_.assets) { + if (std::find_if(layout.assets.begin(), layout.assets.end(), [&] (auto& entry) {return entry.id == old_entry.id;}) == layout.assets.end()) { + // we no longer need this asset + std::filesystem::remove(root_ / old_entry.path); + } + } + + for (auto& [path, content] : new_file_contents) { + utils::file::create_dir((root_ / path).parent_path()); + std::ofstream{root_ / path, std::ios::binary}.write(reinterpret_cast(content.data()), gsl::narrow(content.size())); + } + + state_ = std::move(new_state); + persist(); + + if (!fetch_errors.empty()) { + return nonstd::make_unexpected(fetch_errors); + } + + return {}; +} + +void AssetManager::persist() const { + std::lock_guard lock(mtx_); + rapidjson::Document doc; + doc.SetObject(); + + doc.AddMember(rapidjson::StringRef("digest"), rapidjson::Value{state_.digest, doc.GetAllocator()}, doc.GetAllocator()); + doc.AddMember(rapidjson::StringRef("assets"), rapidjson::Value{rapidjson::kObjectType}, doc.GetAllocator()); + + for (auto& entry : state_.assets) { + rapidjson::Value entry_val(rapidjson::kObjectType); + entry_val.AddMember(rapidjson::StringRef("path"), rapidjson::Value(entry.path.generic_string(), doc.GetAllocator()), doc.GetAllocator()); + entry_val.AddMember(rapidjson::StringRef("url"), rapidjson::StringRef(entry.url), doc.GetAllocator()); + doc["assets"].AddMember(rapidjson::StringRef(entry.id), entry_val, doc.GetAllocator()); + } + + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + doc.Accept(writer); + + std::ofstream{root_ / ".state", std::ios::binary}.write(buffer.GetString(), gsl::narrow(buffer.GetSize())); +} + +std::filesystem::path AssetManager::getRoot() const { + std::lock_guard lock(mtx_); + return root_; +} + +} // namespace org::apache::nifi::minifi::utils::file diff --git a/libminifi/test/integration/C2AssetSyncTest.cpp b/libminifi/test/integration/C2AssetSyncTest.cpp new file mode 100644 index 0000000000..e77ac62937 --- /dev/null +++ b/libminifi/test/integration/C2AssetSyncTest.cpp @@ -0,0 +1,280 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include "integration/HTTPIntegrationBase.h" +#include "integration/HTTPHandlers.h" +#include "utils/file/FileUtils.h" +#include "utils/file/AssetManager.h" +#include "unit/TestUtils.h" +#include "unit/Catch.h" + +namespace org::apache::nifi::minifi::test { + +class FileProvider : public ServerAwareHandler { + public: + explicit FileProvider(std::string file_content): file_content_(std::move(file_content)) {} + + bool handleGet(CivetServer* /*server*/, struct mg_connection* conn) override { + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + file_content_.length()); + mg_printf(conn, "%s", file_content_.c_str()); + return true; + } + + private: + std::string file_content_; +}; + +class C2HeartbeatHandler : public HeartbeatHandler { + public: + using HeartbeatHandler::HeartbeatHandler; + using AssetDescription = org::apache::nifi::minifi::utils::file::AssetDescription; + + void handleHeartbeat(const rapidjson::Document& root, struct mg_connection* conn) override { + std::string hb_str = [&] { + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + root.Accept(writer); + return std::string{buffer.GetString(), buffer.GetSize()}; + }(); + auto& asset_info_node = root["resourceInfo"]; + auto& asset_hash_node = asset_info_node["hash"]; + std::string asset_hash{asset_hash_node.GetString(), asset_hash_node.GetStringLength()}; + + std::vector operations; + { + std::lock_guard guard(asset_mtx_); + agent_asset_hash_ = asset_hash; + if (asset_hash != calculateAssetHash()) { + std::unordered_map args; + rapidjson::Document global_hash_doc{rapidjson::kObjectType}; + global_hash_doc.AddMember("digest", calculateAssetHash(), global_hash_doc.GetAllocator()); + args["globalHash"] = std::move(global_hash_doc); + rapidjson::Document resource_list_doc{rapidjson::kArrayType}; + + for (auto& asset : expected_assets_) { + rapidjson::Value resource_obj{rapidjson::kObjectType}; + resource_obj.AddMember("resourceId", asset.id, resource_list_doc.GetAllocator()); + resource_obj.AddMember("resourceName", asset.path.filename().string(), resource_list_doc.GetAllocator()); + resource_obj.AddMember("resourceType", "ASSET", resource_list_doc.GetAllocator()); + resource_obj.AddMember("resourcePath", asset.path.parent_path().string(), resource_list_doc.GetAllocator()); + resource_obj.AddMember("url", asset.url, resource_list_doc.GetAllocator()); + resource_list_doc.PushBack(resource_obj, resource_list_doc.GetAllocator()); + } + args["resourceList"] = std::move(resource_list_doc); + + operations.push_back(C2Operation{ + .operation = "sync", + .operand = "resource", + .operation_id = std::to_string(next_op_id_++), + .args = std::move(args) + }); + } + } + sendHeartbeatResponse(operations, conn); + } + + void addAsset(std::string id, std::string path, std::string url) { + std::lock_guard guard(asset_mtx_); + expected_assets_.insert(AssetDescription{ + .id = std::move(id), + .path = std::move(path), + .url = std::move(url) + }); + } + + void removeAsset(std::string id) { + std::lock_guard guard{asset_mtx_}; + expected_assets_.erase(AssetDescription{.id = std::move(id), .path = {}, .url = {}}); + } + + std::optional getAgentAssetHash() const { + std::lock_guard lock(asset_mtx_); + return agent_asset_hash_; + } + + std::string calculateAssetHash() const { + std::lock_guard guard{asset_mtx_}; + size_t hash_value{0}; + for (auto& asset : expected_assets_) { + hash_value = minifi::utils::hash_combine(hash_value, std::hash{}(asset.id)); + } + return std::to_string(hash_value); + } + + std::string assetState() const { + std::lock_guard guard{asset_mtx_}; + rapidjson::Document doc; + doc.SetObject(); + doc.AddMember(rapidjson::StringRef("digest"), rapidjson::Value{calculateAssetHash(), doc.GetAllocator()}, doc.GetAllocator()); + doc.AddMember(rapidjson::StringRef("assets"), rapidjson::Value{rapidjson::kObjectType}, doc.GetAllocator()); + for (auto& asset : expected_assets_) { + auto path_str = asset.path.string(); + doc["assets"].AddMember(rapidjson::StringRef(asset.id), rapidjson::kObjectType, doc.GetAllocator()); + doc["assets"][asset.id].AddMember(rapidjson::StringRef("path"), rapidjson::Value(path_str, doc.GetAllocator()), doc.GetAllocator()); + doc["assets"][asset.id].AddMember(rapidjson::StringRef("url"), rapidjson::StringRef(asset.url), doc.GetAllocator()); + } + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + doc.Accept(writer); + + return {buffer.GetString(), buffer.GetSize()}; + } + + private: + mutable std::recursive_mutex asset_mtx_; + std::set expected_assets_; + + std::optional agent_asset_hash_; + + std::atomic next_op_id_{1}; +}; + +class VerifyC2AssetSync : public VerifyC2Base { + public: + void configureC2() override { + configuration->set("nifi.c2.agent.protocol.class", "RESTSender"); + configuration->set("nifi.c2.enable", "true"); + configuration->set("nifi.c2.agent.heartbeat.period", "100"); + configuration->set("nifi.c2.root.classes", "DeviceInfoNode,AgentInformation,FlowInformation,AssetInformation"); + } + + void runAssertions() override { + verify_(); + } + + void setVerifier(std::function verify) { + verify_ = std::move(verify); + } + + private: + std::function verify_; +}; + +TEST_CASE("C2AssetSync", "[c2test]") { + TestController controller; + + // setup minifi home + const std::filesystem::path home_dir = controller.createTempDirectory(); + const auto asset_dir = home_dir / "asset"; + + std::filesystem::current_path(home_dir); + auto wd_guard = gsl::finally([] { + std::filesystem::current_path(minifi::utils::file::get_executable_dir()); + }); + + C2AcknowledgeHandler ack_handler; + std::string file_A = "hello from file A"; + FileProvider file_A_provider{file_A}; + std::string file_B = "hello from file B"; + FileProvider file_B_provider{file_B}; + std::string file_C = "hello from file C"; + FileProvider file_C_provider{file_C}; + std::string file_A_v2 = "hello from file A version 2"; + FileProvider file_Av2_provider{file_A_v2}; + C2HeartbeatHandler hb_handler{std::make_shared()}; + + VerifyC2AssetSync harness; + harness.setUrl("http://localhost:0/api/file/A.txt", &file_A_provider); + harness.setUrl("http://localhost:0/api/file/Av2.txt", &file_Av2_provider); + harness.setUrl("http://localhost:0/api/file/B.txt", &file_B_provider); + harness.setUrl("http://localhost:0/api/file/C.txt", &file_C_provider); + + std::string absolute_file_A_url = "http://localhost:" + harness.getWebPort() + "/api/file/A.txt"; + + hb_handler.addAsset("Av1", "A.txt", "/api/file/A.txt"); + hb_handler.addAsset("Bv1", "nested/dir/B.txt", "/api/file/B.txt"); + hb_handler.addAsset("Cv1", "nested/C.txt", "/api/file/C.txt"); + + harness.setUrl("http://localhost:0/api/heartbeat", &hb_handler); + harness.setUrl("http://localhost:0/api/acknowledge", &ack_handler); + harness.setC2Url("/api/heartbeat", "/api/acknowledge"); + + auto get_asset_structure = [&] () { + std::unordered_map contents; + for (auto& [dir, file] : minifi::utils::file::list_dir_all(asset_dir, controller.getLogger())) { + contents[(dir / file).string()] = minifi::utils::file::get_content(dir / file); + } + return contents; + }; + + harness.setVerifier([&] () { + REQUIRE(utils::verifyEventHappenedInPollTime(10s, [&] { + std::cout << "calculated hash = " << hb_handler.calculateAssetHash() << std::endl; + std::cout << "reported hash = " << hb_handler.getAgentAssetHash().value_or("") << std::endl; + return hb_handler.calculateAssetHash() == hb_handler.getAgentAssetHash(); + })); + + { + std::unordered_map expected_assets{ + {(asset_dir / "A.txt").string(), file_A}, + {(asset_dir / "nested" / "dir" / "B.txt").string(), file_B}, + {(asset_dir / "nested" / "C.txt").string(), file_C}, + {(asset_dir / ".state").string(), hb_handler.assetState()} + }; + auto actual_assets = get_asset_structure(); + if (actual_assets != expected_assets) { + controller.getLogger()->log_error("Mismatch between expected and actual assets"); + for (auto& [path, content] : expected_assets) { + controller.getLogger()->log_error("Expected asset at {}: {}", path, content); + } + for (auto& [path, content] : actual_assets) { + controller.getLogger()->log_error("Actual asset at {}: {}", path, content); + } + REQUIRE(false); + } + } + + hb_handler.removeAsset("Av1"); + hb_handler.removeAsset("Cv1"); + hb_handler.addAsset("Av2", "A.txt", "/api/file/Av2.txt"); + + + REQUIRE(utils::verifyEventHappenedInPollTime(10s, [&] {return hb_handler.calculateAssetHash() == hb_handler.getAgentAssetHash();})); + + { + std::unordered_map expected_assets{ + {(asset_dir / "A.txt").string(), file_A_v2}, + {(asset_dir / "nested" / "dir" / "B.txt").string(), file_B}, + {(asset_dir / ".state").string(), hb_handler.assetState()} + }; + + auto actual_assets = get_asset_structure(); + if (actual_assets != expected_assets) { + controller.getLogger()->log_error("Mismatch between expected and actual assets"); + for (auto& [path, content] : expected_assets) { + controller.getLogger()->log_error("Expected asset at {}: {}", path, content); + } + for (auto& [path, content] : actual_assets) { + controller.getLogger()->log_error("Actual asset at {}: {}", path, content); + } + REQUIRE(false); + } + } + }); + + harness.run(); +} + +} // namespace org::apache::nifi::minifi::test diff --git a/libminifi/test/integration/C2MetricsTest.cpp b/libminifi/test/integration/C2MetricsTest.cpp index 4d356fc2bb..8997228ac9 100644 --- a/libminifi/test/integration/C2MetricsTest.cpp +++ b/libminifi/test/integration/C2MetricsTest.cpp @@ -62,7 +62,7 @@ class MetricsHandler: public HeartbeatHandler { explicit MetricsHandler(std::atomic_bool& metrics_updated_successfully, std::shared_ptr configuration, const std::filesystem::path& replacement_config_path) : HeartbeatHandler(std::move(configuration)), metrics_updated_successfully_(metrics_updated_successfully), - replacement_config_(getReplacementConfigAsJsonValue(replacement_config_path.string())) { + replacement_config_(minifi::utils::file::get_content(replacement_config_path.string())) { } void handleHeartbeat(const rapidjson::Document& root, struct mg_connection* conn) override { @@ -178,14 +178,6 @@ class MetricsHandler: public HeartbeatHandler { processor_metrics["GetTCPMetrics"][GETTCP1_UUID].HasMember("TransferredBytes"); } - [[nodiscard]] static std::string getReplacementConfigAsJsonValue(const std::string& replacement_config_path) { - std::ifstream is(replacement_config_path); - auto content = std::string((std::istreambuf_iterator(is)), std::istreambuf_iterator()); - content = minifi::utils::string::replaceAll(content, "\n", "\\n"); - content = minifi::utils::string::replaceAll(content, "\"", "\\\""); - return content; - } - std::atomic_bool& metrics_updated_successfully_; TestState test_state_ = TestState::VERIFY_INITIAL_METRICS; std::string replacement_config_; diff --git a/libminifi/test/integration/C2UpdateAssetTest.cpp b/libminifi/test/integration/C2UpdateAssetTest.cpp index 71cd0b7376..458a5f2bed 100644 --- a/libminifi/test/integration/C2UpdateAssetTest.cpp +++ b/libminifi/test/integration/C2UpdateAssetTest.cpp @@ -56,7 +56,7 @@ class C2HeartbeatHandler : public HeartbeatHandler { return true; } - void addOperation(std::string id, std::unordered_map args) { + void addOperation(std::string id, std::unordered_map args) { std::lock_guard guard(op_mtx_); operations_.push_back(C2Operation{ .operation = "update", @@ -92,7 +92,7 @@ class VerifyC2AssetUpdate : public VerifyC2Base { struct AssetUpdateOperation { std::string id; - std::unordered_map args; + std::unordered_map args; std::string state; std::optional details; }; @@ -103,7 +103,11 @@ TEST_CASE("Test update asset C2 command", "[c2test]") { // setup minifi home const std::filesystem::path home_dir = controller.createTempDirectory(); const auto asset_dir = home_dir / "asset"; + std::filesystem::current_path(home_dir); + auto wd_guard = gsl::finally([] { + std::filesystem::current_path(minifi::utils::file::get_executable_dir()); + }); C2AcknowledgeHandler ack_handler; std::string file_A = "hello from file A"; @@ -244,11 +248,12 @@ TEST_CASE("Test update asset C2 command", "[c2test]") { // this op failed no file made on the disk continue; } - expected_files[(asset_dir / op.args["file"]).string()] = minifi::utils::string::endsWith(op.args["url"], "A.txt") ? file_A : file_B; + expected_files[(asset_dir / op.args["file"].to_string()).string()] = minifi::utils::string::endsWith(op.args["url"].to_string(), "A.txt") ? file_A : file_B; } size_t file_count = minifi::utils::file::list_dir_all(asset_dir.string(), controller.getLogger()).size(); - if (file_count != expected_files.size()) { + // + 1 is for the .state file from the AssetManager + if (file_count != expected_files.size() + 1) { controller.getLogger()->log_error("Expected {} files, got {}", expected_files.size(), file_count); REQUIRE(false); } @@ -258,8 +263,6 @@ TEST_CASE("Test update asset C2 command", "[c2test]") { REQUIRE(false); } } - - std::filesystem::current_path(minifi::utils::file::get_executable_dir()); } } // namespace org::apache::nifi::minifi::test diff --git a/libminifi/test/libtest/integration/HTTPHandlers.cpp b/libminifi/test/libtest/integration/HTTPHandlers.cpp index eb357b2b99..31578022ac 100644 --- a/libminifi/test/libtest/integration/HTTPHandlers.cpp +++ b/libminifi/test/libtest/integration/HTTPHandlers.cpp @@ -56,7 +56,7 @@ bool PeerResponder::handleGet(CivetServer* /*server*/, struct mg_connection *con #else std::string hostname = "localhost"; #endif - std::string site2site_rest_resp = "{\"peers\" : [{ \"hostname\": \"" + hostname + "\", \"port\": " + port + ", \"secure\": false, \"flowFileCount\" : 0 }] }"; + std::string site2site_rest_resp = R"({"peers" : [{ "hostname": ")" + hostname + R"(", "port": )" + port + R"(, "secure": false, "flowFileCount" : 0 }] })"; std::stringstream headers; headers << "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\nConnection: close\r\n\r\n"; mg_printf(conn, "%s", headers.str().c_str()); @@ -134,7 +134,8 @@ bool FlowFileResponder::handlePost(CivetServer* /*server*/, struct mg_connection const auto flow = std::make_shared(); for (uint32_t i = 0; i < num_attributes; i++) { - std::string name, value; + std::string name; + std::string value; { const auto read = stream.read(name, true); if (!isServerRunning()) return false; @@ -204,7 +205,7 @@ bool FlowFileResponder::handleGet(CivetServer* /*server*/, struct mg_connection minifi::io::BufferStream serializer; minifi::io::CRCStream stream(gsl::make_not_null(&serializer)); for (const auto& flow : flows) { - uint32_t num_attributes = gsl::narrow(flow->attributes.size()); + auto num_attributes = gsl::narrow(flow->attributes.size()); stream.write(num_attributes); for (const auto& entry : flow->attributes) { stream.write(entry.first); @@ -235,41 +236,38 @@ bool DeleteTransactionResponder::handleDelete(CivetServer* /*server*/, struct mg } void HeartbeatHandler::sendHeartbeatResponse(const std::vector& operations, struct mg_connection * conn) { - std::string operation_jsons; + rapidjson::Document hb_obj{rapidjson::kObjectType}; + hb_obj.AddMember("operation", "heartbeat", hb_obj.GetAllocator()); + hb_obj.AddMember("requested_operations", rapidjson::kArrayType, hb_obj.GetAllocator()); for (const auto& c2_operation : operations) { - std::string resp_args; + rapidjson::Value op{rapidjson::kObjectType}; + op.AddMember("operation", c2_operation.operation, hb_obj.GetAllocator()); + op.AddMember("operationid", c2_operation.operation_id, hb_obj.GetAllocator()); + op.AddMember("operand", c2_operation.operand, hb_obj.GetAllocator()); if (!c2_operation.args.empty()) { - resp_args = ", \"args\": {"; - auto it = c2_operation.args.begin(); - while (it != c2_operation.args.end()) { - resp_args += "\"" + it->first + "\": \"" + it->second + "\""; - ++it; - if (it != c2_operation.args.end()) { - resp_args += ", "; + rapidjson::Value args{rapidjson::kObjectType}; + for (auto& [arg_name, arg_val] : c2_operation.args) { + rapidjson::Value json_arg_val; + if (auto* json_val = arg_val.json()) { + json_arg_val.CopyFrom(*json_val, hb_obj.GetAllocator()); + } else { + json_arg_val.SetString(arg_val.to_string(), hb_obj.GetAllocator()); } + args.AddMember(rapidjson::StringRef(arg_name), json_arg_val, hb_obj.GetAllocator()); } - resp_args += "}"; - } - - std::string operation_json = "{" - "\"operation\" : \"" + c2_operation.operation + "\"," - "\"operationid\" : \"" + c2_operation.operation_id + "\"," - "\"operand\": \"" + c2_operation.operand + "\"" + - resp_args + "}"; - - if (operation_jsons.empty()) { - operation_jsons += operation_json; - } else { - operation_jsons += ", " + operation_json; + op.AddMember("args", args, hb_obj.GetAllocator()); } + hb_obj["requested_operations"].PushBack(op, hb_obj.GetAllocator()); } - std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ " + operation_jsons + " ]}"; + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + hb_obj.Accept(writer); mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", - heartbeat_response.length()); - mg_printf(conn, "%s", heartbeat_response.c_str()); + buffer.GetLength()); + mg_printf(conn, "%s", buffer.GetString()); } void HeartbeatHandler::verifyJsonHasAgentManifest(const rapidjson::Document& root, const std::vector& verify_components, const std::vector& disallowed_properties) { @@ -477,9 +475,9 @@ bool C2UpdateHandler::handlePost(CivetServer* /*server*/, struct mg_connection * } void C2UpdateHandler::setC2RestResponse(const std::string& url, const std::string& name, const std::optional& persist) { - std::string content = "{\"location\": \"" + url + "\""; + std::string content = R"({"location": ")" + url + "\""; if (persist) { - content += ", \"persist\": \"" + *persist + "\""; + content += R"(, "persist": ")" + *persist + "\""; } content += "}"; response_ = diff --git a/libminifi/test/libtest/integration/HTTPHandlers.h b/libminifi/test/libtest/integration/HTTPHandlers.h index a47d2a5061..c73395ed0c 100644 --- a/libminifi/test/libtest/integration/HTTPHandlers.h +++ b/libminifi/test/libtest/integration/HTTPHandlers.h @@ -214,15 +214,15 @@ class HeartbeatHandler : public ServerAwareHandler { std::string operation; std::string operand; std::string operation_id; - std::unordered_map args; + std::unordered_map args; }; - void sendHeartbeatResponse(const std::string& operation, const std::string& operand, const std::string& operation_id, struct mg_connection* conn, - const std::unordered_map& args = {}) { + static void sendHeartbeatResponse(const std::string& operation, const std::string& operand, const std::string& operation_id, struct mg_connection* conn, + const std::unordered_map& args = {}) { sendHeartbeatResponse({{operation, operand, operation_id, args}}, conn); } - void sendHeartbeatResponse(const std::vector& operations, struct mg_connection * conn); + static void sendHeartbeatResponse(const std::vector& operations, struct mg_connection * conn); void verifyJsonHasAgentManifest(const rapidjson::Document& root, const std::vector& verify_components = {}, const std::vector& disallowed_properties = {}); void verify(struct mg_connection *conn); diff --git a/libminifi/test/libtest/integration/IntegrationBase.cpp b/libminifi/test/libtest/integration/IntegrationBase.cpp index 75d3577067..b619a321e6 100644 --- a/libminifi/test/libtest/integration/IntegrationBase.cpp +++ b/libminifi/test/libtest/integration/IntegrationBase.cpp @@ -23,6 +23,7 @@ #include "utils/HTTPUtils.h" #include "unit/ProvenanceTestHelper.h" #include "utils/FifoExecutor.h" +#include "utils/file/AssetManager.h" #include "core/ConfigurationFactory.h" namespace org::apache::nifi::minifi::test { @@ -117,9 +118,10 @@ void IntegrationBase::run(const std::optional& test_file_ }; std::vector> repo_metric_sources{test_repo, test_flow_repo, content_repo}; - auto metrics_publisher_store = std::make_unique(configuration, repo_metric_sources, flow_config); + auto asset_manager = std::make_shared(*configuration); + auto metrics_publisher_store = std::make_unique(configuration, repo_metric_sources, flow_config, asset_manager); flowController_ = std::make_unique(test_repo, test_flow_repo, configuration, - std::move(flow_config), content_repo, std::move(metrics_publisher_store), filesystem, request_restart); + std::move(flow_config), content_repo, std::move(metrics_publisher_store), filesystem, request_restart, asset_manager); flowController_->load(); updateProperties(*flowController_); flowController_->start(); diff --git a/libminifi/test/resources/encrypted.minifi.properties b/libminifi/test/resources/encrypted.minifi.properties index c9f3eac068..f19422e43f 100644 --- a/libminifi/test/resources/encrypted.minifi.properties +++ b/libminifi/test/resources/encrypted.minifi.properties @@ -57,7 +57,7 @@ nifi.c2.enable=true nifi.c2.flow.base.url=http://localhost:10080/c2-server/api nifi.c2.rest.url=http://localhost:10080/c2-server/api/c2-protocol/heartbeat nifi.c2.rest.url.ack=http://localhost:10080/c2-server/api/c2-protocol/acknowledge -nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation +nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation,AssetInformation ## Minimize heartbeat payload size by excluding agent manifest from the heartbeat #nifi.c2.full.heartbeat=false ## heartbeat 4 times a second diff --git a/minifi_main/MiNiFiMain.cpp b/minifi_main/MiNiFiMain.cpp index b6801ba94c..de468bb1ef 100644 --- a/minifi_main/MiNiFiMain.cpp +++ b/minifi_main/MiNiFiMain.cpp @@ -57,6 +57,7 @@ #include "properties/Decryptor.h" #include "utils/file/PathUtils.h" #include "utils/file/FileUtils.h" +#include "utils/file/AssetManager.h" #include "utils/Environment.h" #include "utils/FileMutex.h" #include "FlowController.h" @@ -397,11 +398,13 @@ int main(int argc, char **argv) { .sensitive_properties_encryptor = utils::crypto::EncryptionProvider::createSensitivePropertiesEncryptor(minifiHome) }, nifi_configuration_class_name); - std::vector> repo_metric_sources{prov_repo, flow_repo, content_repo}; - auto metrics_publisher_store = std::make_unique(configure, repo_metric_sources, flow_configuration); + auto asset_manager = std::make_shared(*configure); + std::vector> repo_metric_sources{prov_repo, flow_repo, content_repo}; + auto metrics_publisher_store = std::make_unique(configure, repo_metric_sources, flow_configuration, asset_manager); const auto controller = std::make_unique( - prov_repo, flow_repo, configure, std::move(flow_configuration), content_repo, std::move(metrics_publisher_store), filesystem, request_restart); + prov_repo, flow_repo, configure, std::move(flow_configuration), content_repo, + std::move(metrics_publisher_store), filesystem, request_restart, asset_manager); const bool disk_space_watchdog_enable = configure->get(minifi::Configure::minifi_disk_space_watchdog_enable) | utils::andThen(utils::string::toBool)