From 3a4062f04ca738d8328d1e1a289368862fcde55b Mon Sep 17 00:00:00 2001 From: Marc Parisi Date: Tue, 25 Sep 2018 17:45:07 -0400 Subject: [PATCH 1/3] MINIFICPP-618: Add C2 triggers, first of which monitors a local file for changes --- libminifi/CMakeLists.txt | 2 +- libminifi/include/c2/C2Agent.h | 17 ++- libminifi/include/c2/C2Trigger.h | 68 +++++++++++ .../include/c2/triggers/FileUpdateTrigger.h | 111 ++++++++++++++++++ libminifi/include/properties/Configure.h | 5 +- libminifi/src/Configure.cpp | 1 + libminifi/src/FlowController.cpp | 2 +- libminifi/src/c2/C2Agent.cpp | 61 ++++++++-- .../src/c2/triggers/FileUpdateTrigger.cpp | 49 ++++++++ libminifi/test/unit/FileTriggerTests.cpp | 99 ++++++++++++++++ main/MiNiFiMain.cpp | 1 - 11 files changed, 403 insertions(+), 13 deletions(-) create mode 100644 libminifi/include/c2/C2Trigger.h create mode 100644 libminifi/include/c2/triggers/FileUpdateTrigger.h create mode 100644 libminifi/src/c2/triggers/FileUpdateTrigger.cpp create mode 100644 libminifi/test/unit/FileTriggerTests.cpp diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt index 20119ba2a9..fbd78f7b5a 100644 --- a/libminifi/CMakeLists.txt +++ b/libminifi/CMakeLists.txt @@ -97,7 +97,7 @@ find_package(OpenSSL) if (OPENSSL_FOUND) set(TLS_SOURCES "src/io/tls/*.cpp") endif(OPENSSL_FOUND) -file(GLOB SOURCES "src/sitetosite/*.cpp" "src/core/logging/*.cpp" "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp" "src/provenance/*.cpp" "src/utils/*.cpp" "src/*.cpp") +file(GLOB SOURCES "src/sitetosite/*.cpp" "src/core/logging/*.cpp" "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp" "src/provenance/*.cpp" "src/utils/*.cpp" "src/*.cpp") file(GLOB PROCESSOR_SOURCES "src/processors/*.cpp" ) diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h index e9ff4e4fe3..e5dcd154c4 100644 --- a/libminifi/include/c2/C2Agent.h +++ b/libminifi/include/c2/C2Agent.h @@ -30,6 +30,7 @@ #include "controllers/UpdatePolicyControllerService.h" #include "core/state/Value.h" #include "C2Payload.h" +#include "C2Trigger.h" #include "C2Protocol.h" #include "io/validation.h" #include "HeartBeatReporter.h" @@ -90,10 +91,21 @@ class C2Agent : public state::UpdateController, public state::response::Response protected: + /** + * Restarts this agent. + */ void restart_agent(); + /** + * Update agent per the provided C2 update from c2 server or triggers + */ void update_agent(); + /** + * Check the collection of triggers for any updates that need to be handled. + */ + void checkTriggers(); + /** * Configure the C2 agent */ @@ -212,6 +224,8 @@ class C2Agent : public state::UpdateController, public state::response::Response std::vector> heartbeat_protocols_; + std::vector> triggers_; + std::atomic protocol_; bool allow_updates_; @@ -223,8 +237,7 @@ class C2Agent : public state::UpdateController, public state::response::Response std::string bin_location_; std::shared_ptr logger_; -} -; +}; } /* namesapce c2 */ } /* namespace minifi */ diff --git a/libminifi/include/c2/C2Trigger.h b/libminifi/include/c2/C2Trigger.h new file mode 100644 index 0000000000..2d47b7b249 --- /dev/null +++ b/libminifi/include/c2/C2Trigger.h @@ -0,0 +1,68 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_C2_C2TRIGGER_H_ +#define LIBMINIFI_INCLUDE_C2_C2TRIGGER_H_ + +#include "core/Connectable.h" +#include "c2/C2Payload.h" +#include "properties/Configure.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +/** + * Purpose: Defines basic triggering mechanism for command and control interfaces + * + * Design: Extends Connectable so that we can instantiate with the class name + */ +class C2Trigger : public core::Connectable{ + public: + + C2Trigger(std::string name, utils::Identifier uuid) + : core::Connectable(name, uuid){ + + } + virtual ~C2Trigger() { + } + + + /** + * initializes trigger with minifi configuration. + */ + virtual void initialize(const std::shared_ptr &configuration) = 0; + /** + * returns true if triggered + */ + virtual bool triggered() = 0; + + /** + * Returns a payload implementing a C2 action + */ + virtual C2Payload getAction() = 0; +}; + +} /* namesapce c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_C2_C2TRIGGER_H_ */ diff --git a/libminifi/include/c2/triggers/FileUpdateTrigger.h b/libminifi/include/c2/triggers/FileUpdateTrigger.h new file mode 100644 index 0000000000..efee762430 --- /dev/null +++ b/libminifi/include/c2/triggers/FileUpdateTrigger.h @@ -0,0 +1,111 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_C2_TRIGGERS_FILESYSTEMTRIGGER_H_ +#define LIBMINIFI_INCLUDE_C2_TRIGGERS_FILESYSTEMTRIGGER_H_ +#include +#include "c2/C2Trigger.h" +#include "utils/StringUtils.h" +#include "utils/file/FileUtils.h" +#include "core/Resource.h" +#include "c2/C2Payload.h" +#include "properties/Configure.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +/** + * Defines a file update trigger when the last write time of a file has been changed. + */ +class FileUpdateTrigger : public C2Trigger { + public: + + FileUpdateTrigger(std::string name, utils::Identifier uuid = utils::Identifier()) + : C2Trigger(name, uuid), + last_update_(0), + update_(false) { + } + + void initialize(const std::shared_ptr &configuration) { + if (nullptr != configuration) { + if (configuration->get(minifi::Configure::nifi_c2_file_watch, file_)) { + last_update_ = utils::file::FileUtils::last_write_time(file_); + } + + } + } + + + virtual bool triggered() { + if (last_update_ == -1) + return false; + auto update_time = utils::file::FileUtils::last_write_time(file_); + if (update_time > last_update_) { + last_update_ = update_time; + update_ = true; + return true; + } + return false; + } + + /** + * Returns an update payload implementing a C2 action + */ + virtual C2Payload getAction(); + + /** + * Determines if we are connected and operating + */ + virtual bool isRunning() { + return true; + } + + /** + * Block until work is available on any input connection, or the given duration elapses + * @param timeoutMs timeout in milliseconds + */ + + virtual void yield() { + + } + + /** + * Determines if work is available by this connectable + * @return boolean if work is available. + */ + virtual bool isWorkAvailable() { + return true; + } + + protected: + std::mutex mutex_; + std::string file_; + std::atomic last_update_; + std::atomic update_; +}; +REGISTER_RESOURCE(FileUpdateTrigger) + +} /* namesapce c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_C2_TRIGGERS_FILESYSTEMTRIGGER_H_ */ diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h index 872c35d90d..4fb68dc5ec 100644 --- a/libminifi/include/properties/Configure.h +++ b/libminifi/include/properties/Configure.h @@ -41,7 +41,6 @@ class Configure : public Properties { } // nifi.flow.configuration.file static const char *nifi_default_directory; - static const char *nifi_c2_enable; static const char *nifi_flow_configuration_file; static const char *nifi_flow_configuration_file_backup_update; static const char *nifi_flow_engine_threads; @@ -78,6 +77,10 @@ class Configure : public Properties { // nifi rest api user name and password static const char *nifi_rest_api_user_name; static const char *nifi_rest_api_password; + // c2 options + + static const char *nifi_c2_enable; + static const char *nifi_c2_file_watch; private: std::string agent_identifier_; diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp index 21cce95350..4bcf315158 100644 --- a/libminifi/src/Configure.cpp +++ b/libminifi/src/Configure.cpp @@ -55,6 +55,7 @@ const char *Configure::nifi_security_client_pass_phrase = "nifi.security.client. const char *Configure::nifi_security_client_ca_certificate = "nifi.security.client.ca.certificate"; const char *Configure::nifi_rest_api_user_name = "nifi.rest.api.user.name"; const char *Configure::nifi_rest_api_password = "nifi.rest.api.password"; +const char *Configure::nifi_c2_file_watch = "nifi.c2.file.watch"; } /* namespace minifi */ } /* namespace nifi */ diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index c840f141c8..7b67602163 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -189,7 +189,7 @@ bool FlowController::applyConfiguration(const std::string &source, const std::st this->root_ = std::move(newRoot); loadFlowRepo(); initialized_ = true; - bool started = start(); + bool started = start() == 0; updating_ = false; diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp index 1ef8eeaa4c..cae501446d 100644 --- a/libminifi/src/c2/C2Agent.cpp +++ b/libminifi/src/c2/C2Agent.cpp @@ -62,7 +62,7 @@ C2Agent::C2Agent(const std::shared_ptr(now - last_run_).count(); // place priority on messages to send to the c2 server - if ( request_mutex.try_lock_until(now + std::chrono::seconds(1)) ) { + if ( protocol_ != nullptr && request_mutex.try_lock_until(now + std::chrono::seconds(1)) ) { if (requests.size() > 0) { int count = 0; do { @@ -80,6 +80,8 @@ C2Agent::C2Agent(const std::shared_ptrlog_info("Checking triggers"); + for (const auto &trigger : triggers_) { + if (trigger->triggered()) { + C2Payload &&triggerAction = trigger->getAction(); + logger_->log_info("Action triggered"); + // handle the response the same way. This means that + // acknowledgements will be sent to the c2 server for every trigger action. + // this is expected + enqueue_c2_server_response(std::move(triggerAction)); + } + } +} void C2Agent::configure(const std::shared_ptr &configure, bool reconfigure) { std::string clazz, heartbeat_period, device; @@ -187,6 +202,22 @@ void C2Agent::configure(const std::shared_ptr &configure, bool reconf } } + std::string trigger_classes; + if (configure->get("c2.agent.trigger.classes", trigger_classes)) { + std::vector triggers = utils::StringUtils::split(trigger_classes, ","); + std::lock_guard lock(heartbeat_mutex); + for (auto trigger : triggers) { + auto trigger_obj = core::ClassLoader::getDefaultClassLoader().instantiate(trigger, trigger); + if (trigger_obj == nullptr) { + logger_->log_debug("Could not instantiate %s", trigger); + } else { + std::shared_ptr trg_impl = std::static_pointer_cast(trigger_obj); + trg_impl->initialize(configuration_); + triggers_.push_back(trg_impl); + } + } + } + auto base_reporter = "ControllerSocketProtocol"; auto heartbeat_reporter_obj = core::ClassLoader::getDefaultClassLoader().instantiate(base_reporter, base_reporter); if (heartbeat_reporter_obj == nullptr) { @@ -514,23 +545,38 @@ void C2Agent::handle_update(const C2ContentResponse &resp) { // just get the raw data. C2Payload payload(Operation::TRANSFER, false, true); - C2Payload &&response = protocol_.load()->consumePayload(url->second.to_string(), payload, RECEIVE, false); + auto urlStr = url->second.to_string(); - auto raw_data = response.getRawData(); - std::string file_path = std::string(raw_data.data(), raw_data.size()); + std::string file_path = urlStr; + if (nullptr != protocol_ && file_path.find("http") != std::string::npos) { + C2Payload &&response = protocol_.load()->consumePayload(urlStr, payload, RECEIVE, false); + + auto raw_data = response.getRawData(); + file_path = std::string(raw_data.data(), raw_data.size()); + } std::ifstream new_conf(file_path); std::string raw_data_str((std::istreambuf_iterator(new_conf)), std::istreambuf_iterator()); unlink(file_path.c_str()); // if we can apply the update, we will acknowledge it and then backup the configuration file. - if (update_sink_->applyUpdate(url->second.to_string(), raw_data_str)) { + if (update_sink_->applyUpdate(urlStr, raw_data_str)) { C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); enqueue_c2_response(std::move(response)); if (persist != resp.operation_arguments.end() && utils::StringUtils::equalsIgnoreCase(persist->second.to_string(), "true")) { // update nifi.flow.configuration.file=./conf/config.yml std::string config_file; + configuration_->get(minifi::Configure::nifi_flow_configuration_file, config_file); + std::string adjustedFilename; + if (config_file[0] != '/') { + adjustedFilename = adjustedFilename + configuration_->getHome() + "/" + config_file; + } else { + adjustedFilename += config_file; + } + + config_file = adjustedFilename; + std::stringstream config_file_backup; config_file_backup << config_file << ".bak"; // we must be able to successfuly copy the file. @@ -540,14 +586,15 @@ void C2Agent::handle_update(const C2ContentResponse &resp) { if (configuration_->get(minifi::Configure::nifi_flow_configuration_file_backup_update, backup_config) && utils::StringUtils::StringToBool(backup_config, backup_file)) { if (utils::file::FileUtils::copy_file(config_file, config_file_backup.str()) != 0) { + logger_->log_debug("Cannot copy %s to %s", config_file, config_file_backup.str()); persist_config = false; } } + logger_->log_debug("Copy %s to %s %d", config_file, config_file_backup.str(), persist_config); if (persist_config) { std::ofstream writer(config_file); if (writer.is_open()) { - auto output = response.getRawData(); - writer.write(output.data(), output.size()); + writer.write(raw_data_str.data(), raw_data_str.size()); } writer.close(); } diff --git a/libminifi/src/c2/triggers/FileUpdateTrigger.cpp b/libminifi/src/c2/triggers/FileUpdateTrigger.cpp new file mode 100644 index 0000000000..84cf0d380e --- /dev/null +++ b/libminifi/src/c2/triggers/FileUpdateTrigger.cpp @@ -0,0 +1,49 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "c2/triggers/FileUpdateTrigger.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +/** + * Returns a payload implementing a C2 action + */ +C2Payload FileUpdateTrigger::getAction() { + if (update_) { + C2Payload response_payload(Operation::UPDATE, state::UpdateState::READ_COMPLETE, true, true); + C2ContentResponse resp(Operation::UPDATE); + resp.ident = "triggered"; + resp.name = "configuration"; + resp.operation_arguments["location"] = file_; + resp.operation_arguments["persist"] = "true"; + response_payload.addContent(std::move(resp)); + update_ = false; + return response_payload; + } + C2Payload response_payload(Operation::HEARTBEAT, state::UpdateState::READ_COMPLETE, true, true); + return response_payload; +} +} /* namespace c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ diff --git a/libminifi/test/unit/FileTriggerTests.cpp b/libminifi/test/unit/FileTriggerTests.cpp new file mode 100644 index 0000000000..ea5d9a141e --- /dev/null +++ b/libminifi/test/unit/FileTriggerTests.cpp @@ -0,0 +1,99 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include + +#include "c2/triggers/FileUpdateTrigger.h" +#include "../TestBase.h" +#include "io/ClientSocket.h" +#include "core/Processor.h" +#include "core/ClassLoader.h" +#include "core/yaml/YamlConfiguration.h" + +TEST_CASE("Empty file", "[t1]") { + minifi::c2::FileUpdateTrigger trigger("test"); + std::shared_ptr configuration = std::make_shared(); + trigger.initialize(configuration); + + REQUIRE(false == trigger.triggered()); + REQUIRE(minifi::c2::Operation::HEARTBEAT == trigger.getAction().getOperation()); +} + +TEST_CASE("invalidfile file", "[t2]") { + minifi::c2::FileUpdateTrigger trigger("test"); + std::shared_ptr configuration = std::make_shared(); + configuration->set(minifi::Configure::nifi_c2_file_watch, "/tmp/blahblahblhalbha"); + trigger.initialize(configuration); + + REQUIRE(false == trigger.triggered()); + REQUIRE(minifi::c2::Operation::HEARTBEAT == trigger.getAction().getOperation()); +} + +TEST_CASE("test valid file no update", "[t3]") { + TestController testController; + + char format[] = "/tmp/gt.XXXXXX"; + char *dir = testController.createTempDirectory(format); + + std::fstream file; + std::stringstream ss; + ss << dir << "/" << "tstFile.ext"; + std::string path = ss.str(); + file.open(path, std::ios::out); + file << "tempFile"; + file.close(); + + minifi::c2::FileUpdateTrigger trigger("test"); + std::shared_ptr configuration = std::make_shared(); + configuration->set(minifi::Configure::nifi_c2_file_watch, path); + trigger.initialize(configuration); + + REQUIRE(false == trigger.triggered()); + REQUIRE(minifi::c2::Operation::HEARTBEAT == trigger.getAction().getOperation()); +} + +TEST_CASE("test valid file update", "[t4]") { + TestController testController; + + char format[] = "/tmp/gt.XXXXXX"; + char *dir = testController.createTempDirectory(format); + + std::fstream file; + std::stringstream ss; + ss << dir << "/" << "tstFile.ext"; + std::string path = ss.str(); + file.open(path, std::ios::out); + file << "tempFile"; + file.close(); + + minifi::c2::FileUpdateTrigger trigger("test"); + std::shared_ptr configuration = std::make_shared(); + configuration->set(minifi::Configure::nifi_c2_file_watch, path); + trigger.initialize(configuration); + + std::this_thread::sleep_for(std::chrono::milliseconds(105)); + file.open(path, std::ios::out); + file << "tempFiles"; + file.close(); + + REQUIRE(true == trigger.triggered()); + + REQUIRE(minifi::c2::Operation::UPDATE == trigger.getAction().getOperation()); +} diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp index 1c5bded452..58dc3b2ca7 100644 --- a/main/MiNiFiMain.cpp +++ b/main/MiNiFiMain.cpp @@ -36,7 +36,6 @@ #include #include #include -#include #include #include "ResourceClaim.h" #include "core/Core.h" From f322529ba8d9da2e25e04134c5402521ea7a0dbd Mon Sep 17 00:00:00 2001 From: Marc Parisi Date: Wed, 26 Sep 2018 07:38:41 -0400 Subject: [PATCH 2/3] MINIFICPP-618: Add comments to C2 readme --- C2.md | 10 +++++++++- libminifi/include/c2/triggers/FileUpdateTrigger.h | 3 +-- libminifi/test/unit/FileTriggerTests.cpp | 2 +- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/C2.md b/C2.md index 4a3daa43ee..fa546fcbf3 100644 --- a/C2.md +++ b/C2.md @@ -322,4 +322,12 @@ the following flag: cmake -DBOOTSTRAP= .. - \ No newline at end of file + + +### C2 File triggers + +C2 updates can be triggered with updates to a flow configuration file. It doesn't have to be the same base configuration file. It +will be copied into place. A new property, nifi.c2.file.watch, can be placed into minifi.properties to monitor. If the update time +changes while the agent is running, it will be copied into place of the file defined by nifi.flow.configuration.file. The agent +will then be restarted with the new flow configuration. If a failure occurs in reading that file or it is an invalid YAML file, the +update process will be halted. \ No newline at end of file diff --git a/libminifi/include/c2/triggers/FileUpdateTrigger.h b/libminifi/include/c2/triggers/FileUpdateTrigger.h index efee762430..36fdc0b11d 100644 --- a/libminifi/include/c2/triggers/FileUpdateTrigger.h +++ b/libminifi/include/c2/triggers/FileUpdateTrigger.h @@ -54,7 +54,7 @@ class FileUpdateTrigger : public C2Trigger { virtual bool triggered() { - if (last_update_ == -1) + if (last_update_ == 0) return false; auto update_time = utils::file::FileUtils::last_write_time(file_); if (update_time > last_update_) { @@ -95,7 +95,6 @@ class FileUpdateTrigger : public C2Trigger { } protected: - std::mutex mutex_; std::string file_; std::atomic last_update_; std::atomic update_; diff --git a/libminifi/test/unit/FileTriggerTests.cpp b/libminifi/test/unit/FileTriggerTests.cpp index ea5d9a141e..c05327ce33 100644 --- a/libminifi/test/unit/FileTriggerTests.cpp +++ b/libminifi/test/unit/FileTriggerTests.cpp @@ -88,7 +88,7 @@ TEST_CASE("test valid file update", "[t4]") { configuration->set(minifi::Configure::nifi_c2_file_watch, path); trigger.initialize(configuration); - std::this_thread::sleep_for(std::chrono::milliseconds(105)); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); file.open(path, std::ios::out); file << "tempFiles"; file.close(); From 4a23da8c0a90e14155353d796ee1e7ab6af14d75 Mon Sep 17 00:00:00 2001 From: Marc Parisi Date: Thu, 27 Sep 2018 16:16:07 -0400 Subject: [PATCH 3/3] MINIFICPP-618: Set trigger in trigger critical section and add fx to clear. MINIFICPP-624: Add alternate names for C2 configuration items and support both --- C2.md | 18 ++++++++++------- .../http-curl/protocols/RESTReceiver.cpp | 4 ++-- extensions/http-curl/protocols/RESTSender.cpp | 12 +++++------ libminifi/include/c2/C2Trigger.h | 2 ++ .../include/c2/triggers/FileUpdateTrigger.h | 8 ++++++-- libminifi/include/properties/Properties.h | 3 +++ libminifi/src/FlowController.cpp | 3 +-- libminifi/src/Properties.cpp | 19 ++++++++++++++++++ libminifi/src/c2/C2Agent.cpp | 20 ++++++++++--------- 9 files changed, 61 insertions(+), 28 deletions(-) diff --git a/C2.md b/C2.md index fa546fcbf3..107c16649b 100644 --- a/C2.md +++ b/C2.md @@ -41,6 +41,10 @@ will be explained in greater detail in the metrics section. For more more insight into the API used within the C2 agent, please visit: https://cwiki.apache.org/confluence/display/MINIFI/C2+Design+Proposal +Release 0.6.0: Please note that all c2 properties now exist as nifi.c2.* . If your configuration properties +files contain the former naming convention of c2.*, we will continue to support that as +an alternate key, but you are encouraged to switch your configuration options as soon as possible. + in minifi.properties @@ -51,19 +55,19 @@ https://cwiki.apache.org/confluence/display/MINIFI/C2+Design+Proposal nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation # specify C2 protocol -- default is RESTSender if this is not specified - c2.agent.protocol.class=RESTSender + nifi.c2.agent.protocol.class=RESTSender # may also use MQTT - # c2.agent.protocol.class=MQTTC2Protocol + # nifi.c2.agent.protocol.class=MQTTC2Protocol # control c2 heartbeat interval in millisecocnds - c2.agent.heartbeat.period=3000 + nifi.c2.agent.heartbeat.period=3000 # enable reporter classes - c2.agent.heartbeat.reporter.class=RESTReciver + nifi.c2.agent.heartbeat.reporter.class=RESTReciver # specify the rest URIs if using RESTSender - c2.rest.url=http://localhost:10080/minifi-c2-api/c2-protocol/heartbeat - c2.rest.url.ack=http://localhost:10080/minifi-c2-api/c2-protocol/acknowledge + nifi.c2.rest.url=http://localhost:10080/minifi-c2-api/c2-protocol/heartbeat + nifi.c2.rest.url.ack=http://localhost:10080/minifi-c2-api/c2-protocol/acknowledge # c2 agent identifier nifi.c2.agent.identifier= @@ -72,7 +76,7 @@ https://cwiki.apache.org/confluence/display/MINIFI/C2+Design+Proposal nifi.c2.agent.class= # configure SSL Context service for REST Protocol - c2.rest.ssl.context.service + nifi.c2.rest.ssl.context.service ### Metrics diff --git a/extensions/http-curl/protocols/RESTReceiver.cpp b/extensions/http-curl/protocols/RESTReceiver.cpp index d2c17b6ddd..babc983a9e 100644 --- a/extensions/http-curl/protocols/RESTReceiver.cpp +++ b/extensions/http-curl/protocols/RESTReceiver.cpp @@ -49,8 +49,8 @@ void RESTReceiver::initialize(const std::shared_ptrlog_trace("Initializing rest receiver"); if (nullptr != configuration_) { std::string listeningPort,rootUri="/", caCert; - configuration_->get("c2.rest.listener.port", listeningPort); - configuration_->get("c2.rest.listener.cacert", caCert); + configuration_->get("nifi.c2.rest.listener.port","c2.rest.listener.port", listeningPort); + configuration_->get("nifi.c2.rest.listener.cacert","c2.rest.listener.cacert", caCert); if (!listeningPort.empty() && !rootUri.empty()) { handler = std::unique_ptr(new ListeningProtocol()); if (!caCert.empty()) { diff --git a/extensions/http-curl/protocols/RESTSender.cpp b/extensions/http-curl/protocols/RESTSender.cpp index 7db998a799..9b4ce5e7fd 100644 --- a/extensions/http-curl/protocols/RESTSender.cpp +++ b/extensions/http-curl/protocols/RESTSender.cpp @@ -46,15 +46,15 @@ void RESTSender::initialize(const std::shared_ptrget("c2.rest.url", rest_uri_); - configure->get("c2.rest.url.ack", ack_uri_); - if (configure->get("c2.rest.ssl.context.service", ssl_context_service_str)) { + configure->get("nifi.c2.rest.url","c2.rest.url", rest_uri_); + configure->get("nifi.c2.rest.url.ack","c2.rest.url.ack", ack_uri_); + if (configure->get("nifi.c2.rest.ssl.context.service","c2.rest.ssl.context.service", ssl_context_service_str)) { auto service = controller->getControllerService(ssl_context_service_str); if (nullptr != service) { ssl_context_service_ = std::static_pointer_cast(service); } } - configure->get("c2.rest.heartbeat.minimize.updates", update_str); + configure->get("nifi.c2.rest.heartbeat.minimize.updates","c2.rest.heartbeat.minimize.updates", update_str); utils::StringUtils::StringToBool(update_str, minimize_updates_); } logger_->log_debug("Submitting to %s", rest_uri_); @@ -78,8 +78,8 @@ C2Payload RESTSender::consumePayload(const C2Payload &payload, Direction directi void RESTSender::update(const std::shared_ptr &configure) { std::string url; - configure->get("c2.rest.url", url); - configure->get("c2.rest.url.ack", url); + configure->get("nifi.c2.rest.url","c2.rest.url", url); + configure->get("nifi.c2.rest.url.ack","c2.rest.url.ack", url); } const C2Payload RESTSender::sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig) { diff --git a/libminifi/include/c2/C2Trigger.h b/libminifi/include/c2/C2Trigger.h index 2d47b7b249..ecce77dd26 100644 --- a/libminifi/include/c2/C2Trigger.h +++ b/libminifi/include/c2/C2Trigger.h @@ -53,6 +53,8 @@ class C2Trigger : public core::Connectable{ */ virtual bool triggered() = 0; + virtual void reset() = 0; + /** * Returns a payload implementing a C2 action */ diff --git a/libminifi/include/c2/triggers/FileUpdateTrigger.h b/libminifi/include/c2/triggers/FileUpdateTrigger.h index 36fdc0b11d..1d92daef59 100644 --- a/libminifi/include/c2/triggers/FileUpdateTrigger.h +++ b/libminifi/include/c2/triggers/FileUpdateTrigger.h @@ -45,14 +45,13 @@ class FileUpdateTrigger : public C2Trigger { void initialize(const std::shared_ptr &configuration) { if (nullptr != configuration) { - if (configuration->get(minifi::Configure::nifi_c2_file_watch, file_)) { + if (configuration->get(minifi::Configure::nifi_c2_file_watch, "c2.file.watch", file_)) { last_update_ = utils::file::FileUtils::last_write_time(file_); } } } - virtual bool triggered() { if (last_update_ == 0) return false; @@ -65,6 +64,11 @@ class FileUpdateTrigger : public C2Trigger { return false; } + virtual void reset() { + last_update_ = utils::file::FileUtils::last_write_time(file_); + update_ = false; + } + /** * Returns an update payload implementing a C2 action */ diff --git a/libminifi/include/properties/Properties.h b/libminifi/include/properties/Properties.h index ec0ca5dee5..28be26b2ef 100644 --- a/libminifi/include/properties/Properties.h +++ b/libminifi/include/properties/Properties.h @@ -60,6 +60,9 @@ class Properties { // Get the config value bool get(std::string key, std::string &value); + // Get the config value + bool get(std::string key, std::string alternate_key, std::string &value); + /** * Returns the configuration value or an empty string. * @return value corresponding to key or empty value. diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index 7b67602163..9206f419b3 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -54,7 +54,6 @@ #include "core/Connectable.h" #include "utils/HTTPClient.h" - #ifdef _MSC_VER #ifndef PATH_MAX #define PATH_MAX 260 @@ -358,7 +357,7 @@ void FlowController::initializeC2() { std::string c2_enable_str; - if (configuration_->get(Configure::nifi_c2_enable, c2_enable_str)) { + if (configuration_->get(Configure::nifi_c2_enable, "c2.enable", c2_enable_str)) { bool enable_c2 = true; utils::StringUtils::StringToBool(c2_enable_str, enable_c2); c2_enabled_ = enable_c2; diff --git a/libminifi/src/Properties.cpp b/libminifi/src/Properties.cpp index f5537fcf3e..97f8e27526 100644 --- a/libminifi/src/Properties.cpp +++ b/libminifi/src/Properties.cpp @@ -45,6 +45,25 @@ bool Properties::get(std::string key, std::string &value) { } } +bool Properties::get(std::string key, std::string alternate_key, std::string &value) { + std::lock_guard lock(mutex_); + auto it = properties_.find(key); + + if (it == properties_.end()) { + it = properties_.find(alternate_key); + if (it != properties_.end()) { + logger_->log_warn("%s is an alternate property that may not be supported in future releases. Please use %s instead.", alternate_key, key); + } + } + + if (it != properties_.end()) { + value = it->second; + return true; + } else { + return false; + } +} + int Properties::getInt(const std::string &key, int default_value) { std::lock_guard lock(mutex_); auto it = properties_.find(key); diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp index cae501446d..cf172a2e81 100644 --- a/libminifi/src/c2/C2Agent.cpp +++ b/libminifi/src/c2/C2Agent.cpp @@ -113,7 +113,9 @@ void C2Agent::checkTriggers() { // handle the response the same way. This means that // acknowledgements will be sent to the c2 server for every trigger action. // this is expected - enqueue_c2_server_response(std::move(triggerAction)); + extractPayload(std::move(triggerAction)); + + trigger->reset(); } } } @@ -121,7 +123,7 @@ void C2Agent::configure(const std::shared_ptr &configure, bool reconf std::string clazz, heartbeat_period, device; if (!reconfigure) { - if (!configure->get("c2.agent.protocol.class", clazz)) { + if (!configure->get("nifi.c2.agent.protocol.class", "c2.agent.protocol.class", clazz)) { clazz = "RESTSender"; } logger_->log_info("Class is %s", clazz); @@ -147,7 +149,7 @@ void C2Agent::configure(const std::shared_ptr &configure, bool reconf protocol_.load()->update(configure); } - if (configure->get("c2.agent.heartbeat.period", heartbeat_period)) { + if (configure->get("nifi.c2.agent.heartbeat.period", "c2.agent.heartbeat.period", heartbeat_period)) { try { heart_beat_period_ = std::stoi(heartbeat_period); } catch (const std::invalid_argument &ie) { @@ -159,12 +161,12 @@ void C2Agent::configure(const std::shared_ptr &configure, bool reconf } std::string update_settings; - if (configure->get("c2.agent.update.allow", update_settings) && utils::StringUtils::StringToBool(update_settings, allow_updates_)) { + if (configure->get("nifi.c2.agent.update.allow", "c2.agent.update.allow", update_settings) && utils::StringUtils::StringToBool(update_settings, allow_updates_)) { // allow the agent to be updated. we then need to get an update command to execute after } if (allow_updates_) { - if (!configure->get("c2.agent.update.command", update_command_)) { + if (!configure->get("nifi.c2.agent.update.command", "c2.agent.update.command", update_command_)) { char cwd[1024]; getcwd(cwd, sizeof(cwd)); @@ -173,7 +175,7 @@ void C2Agent::configure(const std::shared_ptr &configure, bool reconf update_command_ = command.str(); } - if (!configure->get("c2.agent.update.temp.location", update_location_)) { + if (!configure->get("nifi.c2.agent.update.temp.location", "c2.agent.update.temp.location", update_location_)) { char cwd[1024]; getcwd(cwd, sizeof(cwd)); @@ -184,10 +186,10 @@ void C2Agent::configure(const std::shared_ptr &configure, bool reconf } // if not defined we won't beable to update - configure->get("c2.agent.bin.location", bin_location_); + configure->get("nifi.c2.agent.bin.location", "c2.agent.bin.location", bin_location_); } std::string heartbeat_reporters; - if (configure->get("c2.agent.heartbeat.reporter.classes", heartbeat_reporters)) { + if (configure->get("nifi.c2.agent.heartbeat.reporter.classes", "c2.agent.heartbeat.reporter.classes", heartbeat_reporters)) { std::vector reporters = utils::StringUtils::split(heartbeat_reporters, ","); std::lock_guard lock(heartbeat_mutex); for (auto reporter : reporters) { @@ -203,7 +205,7 @@ void C2Agent::configure(const std::shared_ptr &configure, bool reconf } std::string trigger_classes; - if (configure->get("c2.agent.trigger.classes", trigger_classes)) { + if (configure->get("nifi.c2.agent.trigger.classes", "c2.agent.trigger.classes", trigger_classes)) { std::vector triggers = utils::StringUtils::split(trigger_classes, ","); std::lock_guard lock(heartbeat_mutex); for (auto trigger : triggers) {