From 1f3ffd7b25506323c6e2e2d43108c894857bc9c1 Mon Sep 17 00:00:00 2001 From: Marc Parisi Date: Tue, 25 Sep 2018 17:45:07 -0400 Subject: [PATCH 1/4] MINIFICPP-618: Add C2 triggers, first of which monitors a local file for changes MINIFICPP-618: Add comments to C2 readme MINIFICPP-618: Set trigger in trigger critical section and add fx to clear. MINIFICPP-624: Add alternate names for C2 configuration items and support both --- C2.md | 28 ++-- .../http-curl/protocols/RESTReceiver.cpp | 4 +- extensions/http-curl/protocols/RESTSender.cpp | 12 +- libminifi/CMakeLists.txt | 2 +- libminifi/include/c2/C2Agent.h | 18 ++- libminifi/include/c2/C2Trigger.h | 82 ++++++++++++ .../include/c2/triggers/FileUpdateTrigger.h | 126 ++++++++++++++++++ libminifi/include/properties/Configure.h | 5 +- libminifi/include/properties/Properties.h | 20 ++- libminifi/src/Configure.cpp | 1 + libminifi/src/FlowController.cpp | 5 +- libminifi/src/Properties.cpp | 21 ++- libminifi/src/c2/C2Agent.cpp | 82 ++++++++++-- .../src/c2/triggers/FileUpdateTrigger.cpp | 49 +++++++ libminifi/test/unit/FileTriggerTests.cpp | 99 ++++++++++++++ main/MiNiFiMain.cpp | 1 - 16 files changed, 514 insertions(+), 41 deletions(-) create mode 100644 libminifi/include/c2/C2Trigger.h create mode 100644 libminifi/include/c2/triggers/FileUpdateTrigger.h create mode 100644 libminifi/src/c2/triggers/FileUpdateTrigger.cpp create mode 100644 libminifi/test/unit/FileTriggerTests.cpp diff --git a/C2.md b/C2.md index b41c29cd0d..163fdbcd4d 100644 --- a/C2.md +++ b/C2.md @@ -41,6 +41,10 @@ will be explained in greater detail in the metrics section. For more more insight into the API used within the C2 agent, please visit: https://cwiki.apache.org/confluence/display/MINIFI/C2+Design+Proposal +Release 0.6.0: Please note that all c2 properties now exist as nifi.c2.* . If your configuration properties +files contain the former naming convention of c2.*, we will continue to support that as +an alternate key, but you are encouraged to switch your configuration options as soon as possible. + in minifi.properties @@ -51,19 +55,19 @@ https://cwiki.apache.org/confluence/display/MINIFI/C2+Design+Proposal nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation # specify C2 protocol -- default is RESTSender if this is not specified - c2.agent.protocol.class=RESTSender + nifi.c2.agent.protocol.class=RESTSender # may also use MQTT - # c2.agent.protocol.class=MQTTC2Protocol + # nifi.c2.agent.protocol.class=MQTTC2Protocol # control c2 heartbeat interval in millisecocnds - c2.agent.heartbeat.period=3000 + nifi.c2.agent.heartbeat.period=3000 # enable reporter classes - c2.agent.heartbeat.reporter.class=RESTReciver + nifi.c2.agent.heartbeat.reporter.class=RESTReciver # specify the rest URIs if using RESTSender - c2.rest.url=http://localhost:10080/minifi-c2-api/c2-protocol/heartbeat - c2.rest.url.ack=http://localhost:10080/minifi-c2-api/c2-protocol/acknowledge + nifi.c2.rest.url=http://localhost:10080/minifi-c2-api/c2-protocol/heartbeat + nifi.c2.rest.url.ack=http://localhost:10080/minifi-c2-api/c2-protocol/acknowledge # c2 agent identifier nifi.c2.agent.identifier= @@ -72,7 +76,7 @@ https://cwiki.apache.org/confluence/display/MINIFI/C2+Design+Proposal nifi.c2.agent.class= # configure SSL Context service for REST Protocol - c2.rest.ssl.context.service + nifi.c2.rest.ssl.context.service ### Metrics @@ -322,4 +326,12 @@ the following flag: cmake -DBOOTSTRAP= .. - \ No newline at end of file + + +### C2 File triggers + +C2 updates can be triggered with updates to a flow configuration file. It doesn't have to be the same base configuration file. It +will be copied into place. A new property, nifi.c2.file.watch, can be placed into minifi.properties to monitor. If the update time +changes while the agent is running, it will be copied into place of the file defined by nifi.flow.configuration.file. The agent +will then be restarted with the new flow configuration. If a failure occurs in reading that file or it is an invalid YAML file, the +update process will be halted. \ No newline at end of file diff --git a/extensions/http-curl/protocols/RESTReceiver.cpp b/extensions/http-curl/protocols/RESTReceiver.cpp index d2c17b6ddd..babc983a9e 100644 --- a/extensions/http-curl/protocols/RESTReceiver.cpp +++ b/extensions/http-curl/protocols/RESTReceiver.cpp @@ -49,8 +49,8 @@ void RESTReceiver::initialize(const std::shared_ptrlog_trace("Initializing rest receiver"); if (nullptr != configuration_) { std::string listeningPort,rootUri="/", caCert; - configuration_->get("c2.rest.listener.port", listeningPort); - configuration_->get("c2.rest.listener.cacert", caCert); + configuration_->get("nifi.c2.rest.listener.port","c2.rest.listener.port", listeningPort); + configuration_->get("nifi.c2.rest.listener.cacert","c2.rest.listener.cacert", caCert); if (!listeningPort.empty() && !rootUri.empty()) { handler = std::unique_ptr(new ListeningProtocol()); if (!caCert.empty()) { diff --git a/extensions/http-curl/protocols/RESTSender.cpp b/extensions/http-curl/protocols/RESTSender.cpp index 7db998a799..9b4ce5e7fd 100644 --- a/extensions/http-curl/protocols/RESTSender.cpp +++ b/extensions/http-curl/protocols/RESTSender.cpp @@ -46,15 +46,15 @@ void RESTSender::initialize(const std::shared_ptrget("c2.rest.url", rest_uri_); - configure->get("c2.rest.url.ack", ack_uri_); - if (configure->get("c2.rest.ssl.context.service", ssl_context_service_str)) { + configure->get("nifi.c2.rest.url","c2.rest.url", rest_uri_); + configure->get("nifi.c2.rest.url.ack","c2.rest.url.ack", ack_uri_); + if (configure->get("nifi.c2.rest.ssl.context.service","c2.rest.ssl.context.service", ssl_context_service_str)) { auto service = controller->getControllerService(ssl_context_service_str); if (nullptr != service) { ssl_context_service_ = std::static_pointer_cast(service); } } - configure->get("c2.rest.heartbeat.minimize.updates", update_str); + configure->get("nifi.c2.rest.heartbeat.minimize.updates","c2.rest.heartbeat.minimize.updates", update_str); utils::StringUtils::StringToBool(update_str, minimize_updates_); } logger_->log_debug("Submitting to %s", rest_uri_); @@ -78,8 +78,8 @@ C2Payload RESTSender::consumePayload(const C2Payload &payload, Direction directi void RESTSender::update(const std::shared_ptr &configure) { std::string url; - configure->get("c2.rest.url", url); - configure->get("c2.rest.url.ack", url); + configure->get("nifi.c2.rest.url","c2.rest.url", url); + configure->get("nifi.c2.rest.url.ack","c2.rest.url.ack", url); } const C2Payload RESTSender::sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig) { diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt index 1f556d0897..11eec1c246 100644 --- a/libminifi/CMakeLists.txt +++ b/libminifi/CMakeLists.txt @@ -97,7 +97,7 @@ find_package(OpenSSL) if (OPENSSL_FOUND) set(TLS_SOURCES "src/io/tls/*.cpp") endif(OPENSSL_FOUND) -file(GLOB SOURCES "src/sitetosite/*.cpp" "src/core/logging/*.cpp" "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp" "src/provenance/*.cpp" "src/utils/*.cpp" "src/*.cpp") +file(GLOB SOURCES "src/sitetosite/*.cpp" "src/core/logging/*.cpp" "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp" "src/provenance/*.cpp" "src/utils/*.cpp" "src/*.cpp") file(GLOB PROCESSOR_SOURCES "src/processors/*.cpp" ) diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h index e9ff4e4fe3..cc575a2143 100644 --- a/libminifi/include/c2/C2Agent.h +++ b/libminifi/include/c2/C2Agent.h @@ -30,6 +30,7 @@ #include "controllers/UpdatePolicyControllerService.h" #include "core/state/Value.h" #include "C2Payload.h" +#include "C2Trigger.h" #include "C2Protocol.h" #include "io/validation.h" #include "HeartBeatReporter.h" @@ -90,10 +91,22 @@ class C2Agent : public state::UpdateController, public state::response::Response protected: + /** + * Restarts this agent. + */ void restart_agent(); + /** + * Update agent per the provided C2 update from c2 server or triggers + */ void update_agent(); + /** + * Check the collection of triggers for any updates that need to be handled. + * This is an optional step + */ + void checkTriggers(); + /** * Configure the C2 agent */ @@ -212,6 +225,8 @@ class C2Agent : public state::UpdateController, public state::response::Response std::vector> heartbeat_protocols_; + std::vector> triggers_; + std::atomic protocol_; bool allow_updates_; @@ -223,8 +238,7 @@ class C2Agent : public state::UpdateController, public state::response::Response std::string bin_location_; std::shared_ptr logger_; -} -; +}; } /* namesapce c2 */ } /* namespace minifi */ diff --git a/libminifi/include/c2/C2Trigger.h b/libminifi/include/c2/C2Trigger.h new file mode 100644 index 0000000000..87f33f91ef --- /dev/null +++ b/libminifi/include/c2/C2Trigger.h @@ -0,0 +1,82 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_C2_C2TRIGGER_H_ +#define LIBMINIFI_INCLUDE_C2_C2TRIGGER_H_ + +#include "core/Connectable.h" +#include "c2/C2Payload.h" +#include "properties/Configure.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +/** + * Purpose: Defines basic triggering mechanism for command and control interfaces + * + * Design: Extends Connectable so that we can instantiate with the class name + * + * The state machine expects triggered (yes ) -> getAction -> reset(optional) + */ +class C2Trigger : public core::Connectable{ + public: + + C2Trigger(std::string name, utils::Identifier uuid) + : core::Connectable(name, uuid){ + + } + virtual ~C2Trigger() { + } + + + /** + * initializes trigger with minifi configuration. + */ + virtual void initialize(const std::shared_ptr &configuration) = 0; + /** + * returns true if triggered, false otherwise. calling this function multiple times + * may change internal state. + */ + virtual bool triggered() = 0; + + /** + * Resets actions once they have been triggered. The flow of events does not require + * this to occur after an action has been triggered. Instead this is optional + * and a feature available to potential triggers that require a reset. + * + * This will occur because there are times in which the C2Action may take a significant + * amount of time and a reset is in order to avoid continual triggering. + */ + virtual void reset() = 0; + + /** + * Returns a payload implementing a C2 action. May or may not reset the action. + * @return C2Payload of the action to perform. + */ + virtual C2Payload getAction() = 0; +}; + +} /* namesapce c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_C2_C2TRIGGER_H_ */ diff --git a/libminifi/include/c2/triggers/FileUpdateTrigger.h b/libminifi/include/c2/triggers/FileUpdateTrigger.h new file mode 100644 index 0000000000..031a24517e --- /dev/null +++ b/libminifi/include/c2/triggers/FileUpdateTrigger.h @@ -0,0 +1,126 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_C2_TRIGGERS_FILESYSTEMTRIGGER_H_ +#define LIBMINIFI_INCLUDE_C2_TRIGGERS_FILESYSTEMTRIGGER_H_ +#include +#include "c2/C2Trigger.h" +#include "utils/StringUtils.h" +#include "utils/file/FileUtils.h" +#include "core/Resource.h" +#include "c2/C2Payload.h" +#include "properties/Configure.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +/** + * Purpose: Defines a file update trigger when the last write time of a file has been changed. + * Design: Extends C2Trigger, and implements a trigger, action, reset state machine. Calling + * triggered will check the file. + */ +class FileUpdateTrigger : public C2Trigger { + public: + + FileUpdateTrigger(std::string name, utils::Identifier uuid = utils::Identifier()) + : C2Trigger(name, uuid), + last_update_(0), + update_(false), + logger_(logging::LoggerFactory::getLogger()) { + } + + void initialize(const std::shared_ptr &configuration) { + if (nullptr != configuration) { + if (configuration->get(minifi::Configure::nifi_c2_file_watch, "c2.file.watch", file_)) { + last_update_ = utils::file::FileUtils::last_write_time(file_); + } else { + logger_->log_trace("Could not configure file"); + } + + } + } + + virtual bool triggered() { + if (last_update_ == 0) { + logger_->log_trace("Last Update is zero"); + return false; + } + auto update_time = utils::file::FileUtils::last_write_time(file_); + logger_->log_trace("Last Update is %d and update time is %d", last_update_.load(), update_time); + if (update_time > last_update_) { + last_update_ = update_time; + update_ = true; + return true; + } + return false; + } + + virtual void reset() { + // reset the last write time + last_update_ = utils::file::FileUtils::last_write_time(file_); + update_ = false; + } + + /** + * Returns an update payload implementing a C2 action + */ + virtual C2Payload getAction(); + + /** + * Determines if we are connected and operating + */ + virtual bool isRunning() { + return true; + } + + /** + * Block until work is available on any input connection, or the given duration elapses + * @param timeoutMs timeout in milliseconds + */ + + virtual void yield() { + + } + + /** + * Determines if work is available by this connectable + * @return boolean if work is available. + */ + virtual bool isWorkAvailable() { + return true; + } + + protected: + std::string file_; + std::atomic last_update_; + std::atomic update_; + private: + std::shared_ptr logger_; +}; +// add the trigger to the known resources. +REGISTER_RESOURCE(FileUpdateTrigger) + +} /* namesapce c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_C2_TRIGGERS_FILESYSTEMTRIGGER_H_ */ diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h index 872c35d90d..4fb68dc5ec 100644 --- a/libminifi/include/properties/Configure.h +++ b/libminifi/include/properties/Configure.h @@ -41,7 +41,6 @@ class Configure : public Properties { } // nifi.flow.configuration.file static const char *nifi_default_directory; - static const char *nifi_c2_enable; static const char *nifi_flow_configuration_file; static const char *nifi_flow_configuration_file_backup_update; static const char *nifi_flow_engine_threads; @@ -78,6 +77,10 @@ class Configure : public Properties { // nifi rest api user name and password static const char *nifi_rest_api_user_name; static const char *nifi_rest_api_password; + // c2 options + + static const char *nifi_c2_enable; + static const char *nifi_c2_file_watch; private: std::string agent_identifier_; diff --git a/libminifi/include/properties/Properties.h b/libminifi/include/properties/Properties.h index ec0ca5dee5..eadb77d9f6 100644 --- a/libminifi/include/properties/Properties.h +++ b/libminifi/include/properties/Properties.h @@ -57,8 +57,24 @@ class Properties { std::lock_guard lock(mutex_); return (properties_.find(key) != properties_.end()); } - // Get the config value - bool get(std::string key, std::string &value); + /** + * Returns the config value by placing it into the referenced param value + * @param key key to look up + * @param value value in which to place the map's stored property value + * @returns true if found, false otherwise. + */ + bool get(const std::string &key, std::string &value); + + /** + * Returns the config value by placing it into the referenced param value + * Uses alternate_key if key is not found within the map. + * + * @param key key to look up + * @param alternate_key is the secondary lookup key if key is not found + * @param value value in which to place the map's stored property value + * @returns true if found, false otherwise. + */ + bool get(const std::string &key, const std::string &alternate_key, std::string &value); /** * Returns the configuration value or an empty string. diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp index 21cce95350..4bcf315158 100644 --- a/libminifi/src/Configure.cpp +++ b/libminifi/src/Configure.cpp @@ -55,6 +55,7 @@ const char *Configure::nifi_security_client_pass_phrase = "nifi.security.client. const char *Configure::nifi_security_client_ca_certificate = "nifi.security.client.ca.certificate"; const char *Configure::nifi_rest_api_user_name = "nifi.rest.api.user.name"; const char *Configure::nifi_rest_api_password = "nifi.rest.api.password"; +const char *Configure::nifi_c2_file_watch = "nifi.c2.file.watch"; } /* namespace minifi */ } /* namespace nifi */ diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index c840f141c8..9206f419b3 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -54,7 +54,6 @@ #include "core/Connectable.h" #include "utils/HTTPClient.h" - #ifdef _MSC_VER #ifndef PATH_MAX #define PATH_MAX 260 @@ -189,7 +188,7 @@ bool FlowController::applyConfiguration(const std::string &source, const std::st this->root_ = std::move(newRoot); loadFlowRepo(); initialized_ = true; - bool started = start(); + bool started = start() == 0; updating_ = false; @@ -358,7 +357,7 @@ void FlowController::initializeC2() { std::string c2_enable_str; - if (configuration_->get(Configure::nifi_c2_enable, c2_enable_str)) { + if (configuration_->get(Configure::nifi_c2_enable, "c2.enable", c2_enable_str)) { bool enable_c2 = true; utils::StringUtils::StringToBool(c2_enable_str, enable_c2); c2_enabled_ = enable_c2; diff --git a/libminifi/src/Properties.cpp b/libminifi/src/Properties.cpp index f5537fcf3e..e64a92f789 100644 --- a/libminifi/src/Properties.cpp +++ b/libminifi/src/Properties.cpp @@ -33,7 +33,7 @@ Properties::Properties() } // Get the config value -bool Properties::get(std::string key, std::string &value) { +bool Properties::get(const std::string &key, std::string &value) { std::lock_guard lock(mutex_); auto it = properties_.find(key); @@ -45,6 +45,25 @@ bool Properties::get(std::string key, std::string &value) { } } +bool Properties::get(const std::string &key, const std::string &alternate_key, std::string &value) { + std::lock_guard lock(mutex_); + auto it = properties_.find(key); + + if (it == properties_.end()) { + it = properties_.find(alternate_key); + if (it != properties_.end()) { + logger_->log_warn("%s is an alternate property that may not be supported in future releases. Please use %s instead.", alternate_key, key); + } + } + + if (it != properties_.end()) { + value = it->second; + return true; + } else { + return false; + } +} + int Properties::getInt(const std::string &key, int default_value) { std::lock_guard lock(mutex_); auto it = properties_.find(key); diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp index 10d8d29377..8db6894297 100644 --- a/libminifi/src/c2/C2Agent.cpp +++ b/libminifi/src/c2/C2Agent.cpp @@ -62,7 +62,7 @@ C2Agent::C2Agent(const std::shared_ptr(now - last_run_).count(); // place priority on messages to send to the c2 server - if ( request_mutex.try_lock_until(now + std::chrono::seconds(1)) ) { + if ( protocol_ != nullptr && request_mutex.try_lock_until(now + std::chrono::seconds(1)) ) { if (requests.size() > 0) { int count = 0; do { @@ -80,6 +80,8 @@ C2Agent::C2Agent(const std::shared_ptrlog_info("Checking %d triggers", triggers_.size()); + for (const auto &trigger : triggers_) { + if (trigger->triggered()) { + /** + * Action was triggered, so extract it. + */ + C2Payload &&triggerAction = trigger->getAction(); + logger_->log_trace("%s action triggered", trigger->getName()); + // handle the response the same way. This means that + // acknowledgements will be sent to the c2 server for every trigger action. + // this is expected + extractPayload(std::move(triggerAction)); + // call reset if the trigger supports this activity + trigger->reset(); + } else { + logger_->log_trace("%s action not triggered", trigger->getName()); + } + } +} void C2Agent::configure(const std::shared_ptr &configure, bool reconfigure) { std::string clazz, heartbeat_period, device; if (!reconfigure) { - if (!configure->get("c2.agent.protocol.class", clazz)) { + if (!configure->get("nifi.c2.agent.protocol.class", "c2.agent.protocol.class", clazz)) { clazz = "RESTSender"; } logger_->log_info("Class is %s", clazz); @@ -132,7 +154,7 @@ void C2Agent::configure(const std::shared_ptr &configure, bool reconf protocol_.load()->update(configure); } - if (configure->get("c2.agent.heartbeat.period", heartbeat_period)) { + if (configure->get("nifi.c2.agent.heartbeat.period", "c2.agent.heartbeat.period", heartbeat_period)) { try { heart_beat_period_ = std::stoi(heartbeat_period); } catch (const std::invalid_argument &ie) { @@ -144,12 +166,12 @@ void C2Agent::configure(const std::shared_ptr &configure, bool reconf } std::string update_settings; - if (configure->get("c2.agent.update.allow", update_settings) && utils::StringUtils::StringToBool(update_settings, allow_updates_)) { + if (configure->get("nifi.c2.agent.update.allow", "c2.agent.update.allow", update_settings) && utils::StringUtils::StringToBool(update_settings, allow_updates_)) { // allow the agent to be updated. we then need to get an update command to execute after } if (allow_updates_) { - if (!configure->get("c2.agent.update.command", update_command_)) { + if (!configure->get("nifi.c2.agent.update.command", "c2.agent.update.command", update_command_)) { char cwd[1024]; getcwd(cwd, sizeof(cwd)); @@ -158,7 +180,7 @@ void C2Agent::configure(const std::shared_ptr &configure, bool reconf update_command_ = command.str(); } - if (!configure->get("c2.agent.update.temp.location", update_location_)) { + if (!configure->get("nifi.c2.agent.update.temp.location", "c2.agent.update.temp.location", update_location_)) { char cwd[1024]; getcwd(cwd, sizeof(cwd)); @@ -169,10 +191,10 @@ void C2Agent::configure(const std::shared_ptr &configure, bool reconf } // if not defined we won't beable to update - configure->get("c2.agent.bin.location", bin_location_); + configure->get("nifi.c2.agent.bin.location", "c2.agent.bin.location", bin_location_); } std::string heartbeat_reporters; - if (configure->get("c2.agent.heartbeat.reporter.classes", heartbeat_reporters)) { + if (configure->get("nifi.c2.agent.heartbeat.reporter.classes", "c2.agent.heartbeat.reporter.classes", heartbeat_reporters)) { std::vector reporters = utils::StringUtils::split(heartbeat_reporters, ","); std::lock_guard lock(heartbeat_mutex); for (auto reporter : reporters) { @@ -187,6 +209,22 @@ void C2Agent::configure(const std::shared_ptr &configure, bool reconf } } + std::string trigger_classes; + if (configure->get("nifi.c2.agent.trigger.classes", "c2.agent.trigger.classes", trigger_classes)) { + std::vector triggers = utils::StringUtils::split(trigger_classes, ","); + std::lock_guard lock(heartbeat_mutex); + for (auto trigger : triggers) { + auto trigger_obj = core::ClassLoader::getDefaultClassLoader().instantiate(trigger, trigger); + if (trigger_obj == nullptr) { + logger_->log_debug("Could not instantiate %s", trigger); + } else { + std::shared_ptr trg_impl = std::static_pointer_cast(trigger_obj); + trg_impl->initialize(configuration_); + triggers_.push_back(trg_impl); + } + } + } + auto base_reporter = "ControllerSocketProtocol"; auto heartbeat_reporter_obj = core::ClassLoader::getDefaultClassLoader().instantiate(base_reporter, base_reporter); if (heartbeat_reporter_obj == nullptr) { @@ -514,23 +552,38 @@ void C2Agent::handle_update(const C2ContentResponse &resp) { // just get the raw data. C2Payload payload(Operation::TRANSFER, false, true); - C2Payload &&response = protocol_.load()->consumePayload(url->second.to_string(), payload, RECEIVE, false); + auto urlStr = url->second.to_string(); - auto raw_data = response.getRawData(); - std::string file_path = std::string(raw_data.data(), raw_data.size()); + std::string file_path = urlStr; + if (nullptr != protocol_ && file_path.find("http") != std::string::npos) { + C2Payload &&response = protocol_.load()->consumePayload(urlStr, payload, RECEIVE, false); + + auto raw_data = response.getRawData(); + file_path = std::string(raw_data.data(), raw_data.size()); + } std::ifstream new_conf(file_path); std::string raw_data_str((std::istreambuf_iterator(new_conf)), std::istreambuf_iterator()); unlink(file_path.c_str()); // if we can apply the update, we will acknowledge it and then backup the configuration file. - if (update_sink_->applyUpdate(url->second.to_string(), raw_data_str)) { + if (update_sink_->applyUpdate(urlStr, raw_data_str)) { C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); enqueue_c2_response(std::move(response)); if (persist != resp.operation_arguments.end() && utils::StringUtils::equalsIgnoreCase(persist->second.to_string(), "true")) { // update nifi.flow.configuration.file=./conf/config.yml std::string config_file; + configuration_->get(minifi::Configure::nifi_flow_configuration_file, config_file); + std::string adjustedFilename; + if (config_file[0] != '/') { + adjustedFilename = adjustedFilename + configuration_->getHome() + "/" + config_file; + } else { + adjustedFilename += config_file; + } + + config_file = adjustedFilename; + std::stringstream config_file_backup; config_file_backup << config_file << ".bak"; // we must be able to successfuly copy the file. @@ -540,14 +593,15 @@ void C2Agent::handle_update(const C2ContentResponse &resp) { if (configuration_->get(minifi::Configure::nifi_flow_configuration_file_backup_update, backup_config) && utils::StringUtils::StringToBool(backup_config, backup_file)) { if (utils::file::FileUtils::copy_file(config_file, config_file_backup.str()) != 0) { + logger_->log_debug("Cannot copy %s to %s", config_file, config_file_backup.str()); persist_config = false; } } + logger_->log_debug("Copy %s to %s %d", config_file, config_file_backup.str(), persist_config); if (persist_config) { std::ofstream writer(config_file); if (writer.is_open()) { - auto output = response.getRawData(); - writer.write(output.data(), output.size()); + writer.write(raw_data_str.data(), raw_data_str.size()); } writer.close(); } diff --git a/libminifi/src/c2/triggers/FileUpdateTrigger.cpp b/libminifi/src/c2/triggers/FileUpdateTrigger.cpp new file mode 100644 index 0000000000..84cf0d380e --- /dev/null +++ b/libminifi/src/c2/triggers/FileUpdateTrigger.cpp @@ -0,0 +1,49 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "c2/triggers/FileUpdateTrigger.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +/** + * Returns a payload implementing a C2 action + */ +C2Payload FileUpdateTrigger::getAction() { + if (update_) { + C2Payload response_payload(Operation::UPDATE, state::UpdateState::READ_COMPLETE, true, true); + C2ContentResponse resp(Operation::UPDATE); + resp.ident = "triggered"; + resp.name = "configuration"; + resp.operation_arguments["location"] = file_; + resp.operation_arguments["persist"] = "true"; + response_payload.addContent(std::move(resp)); + update_ = false; + return response_payload; + } + C2Payload response_payload(Operation::HEARTBEAT, state::UpdateState::READ_COMPLETE, true, true); + return response_payload; +} +} /* namespace c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ diff --git a/libminifi/test/unit/FileTriggerTests.cpp b/libminifi/test/unit/FileTriggerTests.cpp new file mode 100644 index 0000000000..c05327ce33 --- /dev/null +++ b/libminifi/test/unit/FileTriggerTests.cpp @@ -0,0 +1,99 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include + +#include "c2/triggers/FileUpdateTrigger.h" +#include "../TestBase.h" +#include "io/ClientSocket.h" +#include "core/Processor.h" +#include "core/ClassLoader.h" +#include "core/yaml/YamlConfiguration.h" + +TEST_CASE("Empty file", "[t1]") { + minifi::c2::FileUpdateTrigger trigger("test"); + std::shared_ptr configuration = std::make_shared(); + trigger.initialize(configuration); + + REQUIRE(false == trigger.triggered()); + REQUIRE(minifi::c2::Operation::HEARTBEAT == trigger.getAction().getOperation()); +} + +TEST_CASE("invalidfile file", "[t2]") { + minifi::c2::FileUpdateTrigger trigger("test"); + std::shared_ptr configuration = std::make_shared(); + configuration->set(minifi::Configure::nifi_c2_file_watch, "/tmp/blahblahblhalbha"); + trigger.initialize(configuration); + + REQUIRE(false == trigger.triggered()); + REQUIRE(minifi::c2::Operation::HEARTBEAT == trigger.getAction().getOperation()); +} + +TEST_CASE("test valid file no update", "[t3]") { + TestController testController; + + char format[] = "/tmp/gt.XXXXXX"; + char *dir = testController.createTempDirectory(format); + + std::fstream file; + std::stringstream ss; + ss << dir << "/" << "tstFile.ext"; + std::string path = ss.str(); + file.open(path, std::ios::out); + file << "tempFile"; + file.close(); + + minifi::c2::FileUpdateTrigger trigger("test"); + std::shared_ptr configuration = std::make_shared(); + configuration->set(minifi::Configure::nifi_c2_file_watch, path); + trigger.initialize(configuration); + + REQUIRE(false == trigger.triggered()); + REQUIRE(minifi::c2::Operation::HEARTBEAT == trigger.getAction().getOperation()); +} + +TEST_CASE("test valid file update", "[t4]") { + TestController testController; + + char format[] = "/tmp/gt.XXXXXX"; + char *dir = testController.createTempDirectory(format); + + std::fstream file; + std::stringstream ss; + ss << dir << "/" << "tstFile.ext"; + std::string path = ss.str(); + file.open(path, std::ios::out); + file << "tempFile"; + file.close(); + + minifi::c2::FileUpdateTrigger trigger("test"); + std::shared_ptr configuration = std::make_shared(); + configuration->set(minifi::Configure::nifi_c2_file_watch, path); + trigger.initialize(configuration); + + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + file.open(path, std::ios::out); + file << "tempFiles"; + file.close(); + + REQUIRE(true == trigger.triggered()); + + REQUIRE(minifi::c2::Operation::UPDATE == trigger.getAction().getOperation()); +} diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp index a99c219bcc..23c7e70b77 100644 --- a/main/MiNiFiMain.cpp +++ b/main/MiNiFiMain.cpp @@ -36,7 +36,6 @@ #include #include #include -#include #include #include "ResourceClaim.h" #include "core/Core.h" From 1d20f3aa6f48f63cf70859bd244241fe1012b9d4 Mon Sep 17 00:00:00 2001 From: Marc Parisi Date: Fri, 12 Oct 2018 15:09:03 -0400 Subject: [PATCH 2/4] MINIFICPP-618: Update documentation --- C2.md | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/C2.md b/C2.md index 163fdbcd4d..e0ef98a53c 100644 --- a/C2.md +++ b/C2.md @@ -326,12 +326,26 @@ the following flag: cmake -DBOOTSTRAP= .. - - +## C2 Triggers + + C2 Triggers can be activated to perform some C2 activity via a local event. Currently only FileUpdateTrigger exists, which monitors + for C2 File triggers to update the flow configuration. Classes can be defined as a comma separated list of classes to load via the option + nifi.c2.agent.trigger.classes + + ### C2 File triggers C2 updates can be triggered with updates to a flow configuration file. It doesn't have to be the same base configuration file. It will be copied into place. A new property, nifi.c2.file.watch, can be placed into minifi.properties to monitor. If the update time changes while the agent is running, it will be copied into place of the file defined by nifi.flow.configuration.file. The agent will then be restarted with the new flow configuration. If a failure occurs in reading that file or it is an invalid YAML file, the -update process will be halted. \ No newline at end of file +update process will be halted. + + in minifi.properties to activate the file update trigger specify + + # specifying a trigger + nifi.c2.agent.trigger.classes=FileUpdateTrigger + nifi.c2.file.watch= + + + \ No newline at end of file From d8a271d9567c4a7cf858a194b23c0171d42be01d Mon Sep 17 00:00:00 2001 From: Marc Parisi Date: Fri, 12 Oct 2018 15:10:16 -0400 Subject: [PATCH 3/4] MINIFICPP-618: Update documentation -- make more clear --- C2.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/C2.md b/C2.md index e0ef98a53c..cb72082c54 100644 --- a/C2.md +++ b/C2.md @@ -345,7 +345,7 @@ update process will be halted. # specifying a trigger nifi.c2.agent.trigger.classes=FileUpdateTrigger - nifi.c2.file.watch= + nifi.c2.file.watch= \ No newline at end of file From 37d43ccf3ab4f92aa2f22911ba1f99b222d61046 Mon Sep 17 00:00:00 2001 From: Marc Parisi Date: Fri, 12 Oct 2018 15:17:00 -0400 Subject: [PATCH 4/4] MINIFICPP-618: Update documentation -- make more clear --- C2.md | 38 +++++++++++++++++++++----------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/C2.md b/C2.md index cb72082c54..9d9068acd4 100644 --- a/C2.md +++ b/C2.md @@ -25,6 +25,10 @@ options defined are located in minifi.properties. - [Configuration](#configuration) - [Base Options](#base-options) - [Metrics](#metrics) + - [Protocols](#protocols) + - [Triggers](#triggers) + - [UpdatePolicies](#updatepolicies) + - [Documentation](#documentation) ## Description @@ -291,7 +295,7 @@ will forward responses and updates to the heartbeating agents. Remote Process Groups: [] NiFi Properties Overrides: {} -### Update Policies +### UpdatePolicies Updates to MiNiFi C++ properties can be controlled through an UpdatePolicyControllerService named C2UpdatePolicy. The service supports several configuration options. They are defined in the following example: @@ -312,28 +316,15 @@ C2UpdatePolicy. The service supports several configuration options. They are def Property_3:true Property_4:true -### Update Type Descriptions - -Type descriptions ( class descriptions entered in PROCESSORS.md ) can be automatically placed within C2 by building cmake with -the following flag: - - cmake -DBOOTSTRAP=ON .. - - You can then run ./extensions/bootstrap/bstrp --inputc2docs --outputc2docs ../libminifi/include/agent/agent_docs.h - - When cmake is instantiated with this, a build will re-generate the type descriptions from PROCESSORS.md. Once this is finished - you may re-build the project with the following command from the build directory, running the build as you normally would: - - cmake -DBOOTSTRAP= .. -## C2 Triggers +### Triggers C2 Triggers can be activated to perform some C2 activity via a local event. Currently only FileUpdateTrigger exists, which monitors for C2 File triggers to update the flow configuration. Classes can be defined as a comma separated list of classes to load via the option nifi.c2.agent.trigger.classes -### C2 File triggers +#### C2 File triggers C2 updates can be triggered with updates to a flow configuration file. It doesn't have to be the same base configuration file. It will be copied into place. A new property, nifi.c2.file.watch, can be placed into minifi.properties to monitor. If the update time @@ -348,4 +339,17 @@ update process will be halted. nifi.c2.file.watch= - \ No newline at end of file + +## Documentation + +Type descriptions ( class descriptions entered in PROCESSORS.md ) can be automatically placed within C2 by building cmake with +the following flag: + + cmake -DBOOTSTRAP=ON .. + + You can then run ./extensions/bootstrap/bstrp --inputc2docs --outputc2docs ../libminifi/include/agent/agent_docs.h + + When cmake is instantiated with this, a build will re-generate the type descriptions from PROCESSORS.md. Once this is finished + you may re-build the project with the following command from the build directory, running the build as you normally would: + + cmake -DBOOTSTRAP= .. \ No newline at end of file