Skip to content

Commit

Permalink
MINIFICPP-618: Add C2 triggers, first of which monitors a local file …
Browse files Browse the repository at this point in the history
…for changes
  • Loading branch information
phrocker committed Sep 26, 2018
1 parent 8ddaea0 commit 3a4062f
Show file tree
Hide file tree
Showing 11 changed files with 403 additions and 13 deletions.
2 changes: 1 addition & 1 deletion libminifi/CMakeLists.txt
Expand Up @@ -97,7 +97,7 @@ find_package(OpenSSL)
if (OPENSSL_FOUND)
set(TLS_SOURCES "src/io/tls/*.cpp")
endif(OPENSSL_FOUND)
file(GLOB SOURCES "src/sitetosite/*.cpp" "src/core/logging/*.cpp" "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp" "src/provenance/*.cpp" "src/utils/*.cpp" "src/*.cpp")
file(GLOB SOURCES "src/sitetosite/*.cpp" "src/core/logging/*.cpp" "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp" "src/provenance/*.cpp" "src/utils/*.cpp" "src/*.cpp")

file(GLOB PROCESSOR_SOURCES "src/processors/*.cpp" )

Expand Down
17 changes: 15 additions & 2 deletions libminifi/include/c2/C2Agent.h
Expand Up @@ -30,6 +30,7 @@
#include "controllers/UpdatePolicyControllerService.h"
#include "core/state/Value.h"
#include "C2Payload.h"
#include "C2Trigger.h"
#include "C2Protocol.h"
#include "io/validation.h"
#include "HeartBeatReporter.h"
Expand Down Expand Up @@ -90,10 +91,21 @@ class C2Agent : public state::UpdateController, public state::response::Response

protected:

/**
* Restarts this agent.
*/
void restart_agent();

/**
* Update agent per the provided C2 update from c2 server or triggers
*/
void update_agent();

/**
* Check the collection of triggers for any updates that need to be handled.
*/
void checkTriggers();

/**
* Configure the C2 agent
*/
Expand Down Expand Up @@ -212,6 +224,8 @@ class C2Agent : public state::UpdateController, public state::response::Response

std::vector<std::shared_ptr<HeartBeatReporter>> heartbeat_protocols_;

std::vector<std::shared_ptr<C2Trigger>> triggers_;

std::atomic<C2Protocol*> protocol_;

bool allow_updates_;
Expand All @@ -223,8 +237,7 @@ class C2Agent : public state::UpdateController, public state::response::Response
std::string bin_location_;

std::shared_ptr<logging::Logger> logger_;
}
;
};

} /* namesapce c2 */
} /* namespace minifi */
Expand Down
68 changes: 68 additions & 0 deletions libminifi/include/c2/C2Trigger.h
@@ -0,0 +1,68 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBMINIFI_INCLUDE_C2_C2TRIGGER_H_
#define LIBMINIFI_INCLUDE_C2_C2TRIGGER_H_

#include "core/Connectable.h"
#include "c2/C2Payload.h"
#include "properties/Configure.h"

namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace c2 {

/**
* Purpose: Defines basic triggering mechanism for command and control interfaces
*
* Design: Extends Connectable so that we can instantiate with the class name
*/
class C2Trigger : public core::Connectable{
public:

C2Trigger(std::string name, utils::Identifier uuid)
: core::Connectable(name, uuid){

}
virtual ~C2Trigger() {
}


/**
* initializes trigger with minifi configuration.
*/
virtual void initialize(const std::shared_ptr<minifi::Configure> &configuration) = 0;
/**
* returns true if triggered
*/
virtual bool triggered() = 0;

/**
* Returns a payload implementing a C2 action
*/
virtual C2Payload getAction() = 0;
};

} /* namesapce c2 */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */

#endif /* LIBMINIFI_INCLUDE_C2_C2TRIGGER_H_ */
111 changes: 111 additions & 0 deletions libminifi/include/c2/triggers/FileUpdateTrigger.h
@@ -0,0 +1,111 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBMINIFI_INCLUDE_C2_TRIGGERS_FILESYSTEMTRIGGER_H_
#define LIBMINIFI_INCLUDE_C2_TRIGGERS_FILESYSTEMTRIGGER_H_
#include <atomic>
#include "c2/C2Trigger.h"
#include "utils/StringUtils.h"
#include "utils/file/FileUtils.h"
#include "core/Resource.h"
#include "c2/C2Payload.h"
#include "properties/Configure.h"

namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace c2 {

/**
* Defines a file update trigger when the last write time of a file has been changed.
*/
class FileUpdateTrigger : public C2Trigger {
public:

FileUpdateTrigger(std::string name, utils::Identifier uuid = utils::Identifier())
: C2Trigger(name, uuid),
last_update_(0),
update_(false) {
}

void initialize(const std::shared_ptr<minifi::Configure> &configuration) {
if (nullptr != configuration) {
if (configuration->get(minifi::Configure::nifi_c2_file_watch, file_)) {
last_update_ = utils::file::FileUtils::last_write_time(file_);
}

}
}


virtual bool triggered() {
if (last_update_ == -1)
return false;
auto update_time = utils::file::FileUtils::last_write_time(file_);
if (update_time > last_update_) {
last_update_ = update_time;
update_ = true;
return true;
}
return false;
}

/**
* Returns an update payload implementing a C2 action
*/
virtual C2Payload getAction();

/**
* Determines if we are connected and operating
*/
virtual bool isRunning() {
return true;
}

/**
* Block until work is available on any input connection, or the given duration elapses
* @param timeoutMs timeout in milliseconds
*/

virtual void yield() {

}

/**
* Determines if work is available by this connectable
* @return boolean if work is available.
*/
virtual bool isWorkAvailable() {
return true;
}

protected:
std::mutex mutex_;
std::string file_;
std::atomic<uint64_t> last_update_;
std::atomic<bool> update_;
};
REGISTER_RESOURCE(FileUpdateTrigger)

} /* namesapce c2 */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */

#endif /* LIBMINIFI_INCLUDE_C2_TRIGGERS_FILESYSTEMTRIGGER_H_ */
5 changes: 4 additions & 1 deletion libminifi/include/properties/Configure.h
Expand Up @@ -41,7 +41,6 @@ class Configure : public Properties {
}
// nifi.flow.configuration.file
static const char *nifi_default_directory;
static const char *nifi_c2_enable;
static const char *nifi_flow_configuration_file;
static const char *nifi_flow_configuration_file_backup_update;
static const char *nifi_flow_engine_threads;
Expand Down Expand Up @@ -78,6 +77,10 @@ class Configure : public Properties {
// nifi rest api user name and password
static const char *nifi_rest_api_user_name;
static const char *nifi_rest_api_password;
// c2 options

static const char *nifi_c2_enable;
static const char *nifi_c2_file_watch;

private:
std::string agent_identifier_;
Expand Down
1 change: 1 addition & 0 deletions libminifi/src/Configure.cpp
Expand Up @@ -55,6 +55,7 @@ const char *Configure::nifi_security_client_pass_phrase = "nifi.security.client.
const char *Configure::nifi_security_client_ca_certificate = "nifi.security.client.ca.certificate";
const char *Configure::nifi_rest_api_user_name = "nifi.rest.api.user.name";
const char *Configure::nifi_rest_api_password = "nifi.rest.api.password";
const char *Configure::nifi_c2_file_watch = "nifi.c2.file.watch";

} /* namespace minifi */
} /* namespace nifi */
Expand Down
2 changes: 1 addition & 1 deletion libminifi/src/FlowController.cpp
Expand Up @@ -189,7 +189,7 @@ bool FlowController::applyConfiguration(const std::string &source, const std::st
this->root_ = std::move(newRoot);
loadFlowRepo();
initialized_ = true;
bool started = start();
bool started = start() == 0;

updating_ = false;

Expand Down
61 changes: 54 additions & 7 deletions libminifi/src/c2/C2Agent.cpp
Expand Up @@ -62,7 +62,7 @@ C2Agent::C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvid
auto time_since = std::chrono::duration_cast<std::chrono::milliseconds>(now - last_run_).count();

// place priority on messages to send to the c2 server
if ( request_mutex.try_lock_until(now + std::chrono::seconds(1)) ) {
if ( protocol_ != nullptr && request_mutex.try_lock_until(now + std::chrono::seconds(1)) ) {
if (requests.size() > 0) {
int count = 0;
do {
Expand All @@ -80,6 +80,8 @@ C2Agent::C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvid
performHeartBeat();
}

checkTriggers();

std::this_thread::sleep_for(std::chrono::milliseconds(500));
return state::Update(state::UpdateStatus(state::UpdateState::READ_COMPLETE, false));
};
Expand All @@ -102,6 +104,19 @@ C2Agent::C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvid
functions_.push_back(c2_consumer_);
}

void C2Agent::checkTriggers() {
logger_->log_info("Checking triggers");
for (const auto &trigger : triggers_) {
if (trigger->triggered()) {
C2Payload &&triggerAction = trigger->getAction();
logger_->log_info("Action triggered");
// handle the response the same way. This means that
// acknowledgements will be sent to the c2 server for every trigger action.
// this is expected
enqueue_c2_server_response(std::move(triggerAction));
}
}
}
void C2Agent::configure(const std::shared_ptr<Configure> &configure, bool reconfigure) {
std::string clazz, heartbeat_period, device;

Expand Down Expand Up @@ -187,6 +202,22 @@ void C2Agent::configure(const std::shared_ptr<Configure> &configure, bool reconf
}
}

std::string trigger_classes;
if (configure->get("c2.agent.trigger.classes", trigger_classes)) {
std::vector<std::string> triggers = utils::StringUtils::split(trigger_classes, ",");
std::lock_guard<std::mutex> lock(heartbeat_mutex);
for (auto trigger : triggers) {
auto trigger_obj = core::ClassLoader::getDefaultClassLoader().instantiate(trigger, trigger);
if (trigger_obj == nullptr) {
logger_->log_debug("Could not instantiate %s", trigger);
} else {
std::shared_ptr<C2Trigger> trg_impl = std::static_pointer_cast<C2Trigger>(trigger_obj);
trg_impl->initialize(configuration_);
triggers_.push_back(trg_impl);
}
}
}

auto base_reporter = "ControllerSocketProtocol";
auto heartbeat_reporter_obj = core::ClassLoader::getDefaultClassLoader().instantiate(base_reporter, base_reporter);
if (heartbeat_reporter_obj == nullptr) {
Expand Down Expand Up @@ -514,23 +545,38 @@ void C2Agent::handle_update(const C2ContentResponse &resp) {
// just get the raw data.
C2Payload payload(Operation::TRANSFER, false, true);

C2Payload &&response = protocol_.load()->consumePayload(url->second.to_string(), payload, RECEIVE, false);
auto urlStr = url->second.to_string();

auto raw_data = response.getRawData();
std::string file_path = std::string(raw_data.data(), raw_data.size());
std::string file_path = urlStr;
if (nullptr != protocol_ && file_path.find("http") != std::string::npos) {
C2Payload &&response = protocol_.load()->consumePayload(urlStr, payload, RECEIVE, false);

auto raw_data = response.getRawData();
file_path = std::string(raw_data.data(), raw_data.size());
}

std::ifstream new_conf(file_path);
std::string raw_data_str((std::istreambuf_iterator<char>(new_conf)), std::istreambuf_iterator<char>());
unlink(file_path.c_str());
// if we can apply the update, we will acknowledge it and then backup the configuration file.
if (update_sink_->applyUpdate(url->second.to_string(), raw_data_str)) {
if (update_sink_->applyUpdate(urlStr, raw_data_str)) {
C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
enqueue_c2_response(std::move(response));

if (persist != resp.operation_arguments.end() && utils::StringUtils::equalsIgnoreCase(persist->second.to_string(), "true")) {
// update nifi.flow.configuration.file=./conf/config.yml
std::string config_file;

configuration_->get(minifi::Configure::nifi_flow_configuration_file, config_file);
std::string adjustedFilename;
if (config_file[0] != '/') {
adjustedFilename = adjustedFilename + configuration_->getHome() + "/" + config_file;
} else {
adjustedFilename += config_file;
}

config_file = adjustedFilename;

std::stringstream config_file_backup;
config_file_backup << config_file << ".bak";
// we must be able to successfuly copy the file.
Expand All @@ -540,14 +586,15 @@ void C2Agent::handle_update(const C2ContentResponse &resp) {

if (configuration_->get(minifi::Configure::nifi_flow_configuration_file_backup_update, backup_config) && utils::StringUtils::StringToBool(backup_config, backup_file)) {
if (utils::file::FileUtils::copy_file(config_file, config_file_backup.str()) != 0) {
logger_->log_debug("Cannot copy %s to %s", config_file, config_file_backup.str());
persist_config = false;
}
}
logger_->log_debug("Copy %s to %s %d", config_file, config_file_backup.str(), persist_config);
if (persist_config) {
std::ofstream writer(config_file);
if (writer.is_open()) {
auto output = response.getRawData();
writer.write(output.data(), output.size());
writer.write(raw_data_str.data(), raw_data_str.size());
}
writer.close();
}
Expand Down

0 comments on commit 3a4062f

Please sign in to comment.