From 228bb3c1e651e576c5a13cce1871a74afc1d3081 Mon Sep 17 00:00:00 2001 From: Aldrin Piri Date: Thu, 25 Aug 2016 16:40:25 -0400 Subject: [PATCH] MINIFI-63 MINIFI-87 - Introduce minifi.sh script which serves as a wrapper around the minifi binary and provides installation as a service. This closes #8 and closes #9. --- .gitignore | 1 + Makefile | 11 +- bin/minifi.sh | 338 ++++++++++++++++++++++++++++++++++++ inc/Configure.h | 18 +- inc/Connection.h | 2 - inc/Logger.h | 10 +- main/MiNiFiMain.cpp | 89 +++++----- src/Configure.cpp | 47 +++-- src/FlowControlProtocol.cpp | 1 - src/FlowController.cpp | 78 +++++---- src/ListenSyslog.cpp | 3 - 11 files changed, 483 insertions(+), 115 deletions(-) create mode 100755 bin/minifi.sh diff --git a/.gitignore b/.gitignore index c7dcff2916..9d78a7ff2b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ # Filter out generated files from the included libuuid thirdparty/uuid/tst_uuid* +assemblies diff --git a/Makefile b/Makefile index 40b11ff695..2ce489b7c5 100644 --- a/Makefile +++ b/Makefile @@ -80,6 +80,7 @@ $(BUILD_DIR)/$(TARGET_LIB): $(OBJS) minifi: $(BUILD_DIR)/$(TARGET_LIB) thirdparty/yaml-cpp-yaml-cpp-0.5.3/lib/libyaml-cpp.a $(CC) $(CFLAGS) $(INCLUDES) -o $(BUILD_DIR)/$(TARGET_EXE) main/MiNiFiMain.cpp $(LDDIRECTORY) $(LDFLAGS) cp $(BUILD_DIR)/$(TARGET_EXE) $(TARGET_DIR)/$(TARGET_EXE) + cp $(BUILD_DIR)/$(TARGET_EXE) bin/$(TARGET_EXE) .PHONY: tests tests: $(BUILD_DIR)/$(TARGET_LIB) thirdparty/yaml-cpp-yaml-cpp-0.5.3/lib/libyaml-cpp.a @@ -100,6 +101,7 @@ $(ASSEMBLIES_DIR)/$(PROJECT)-$(VERSION)-source.tar.gz : $(ASSEMBLIES_DIR) inc \ src \ main \ + bin \ conf \ thirdparty \ Makefile \ @@ -107,12 +109,15 @@ $(ASSEMBLIES_DIR)/$(PROJECT)-$(VERSION)-source.tar.gz : $(ASSEMBLIES_DIR) tar -czf $(ASSEMBLIES_DIR)/$(PROJECT)-$(VERSION)-source.tar.gz -C $(ASSEMBLIES_DIR) $(PROJECT)-$(VERSION)-source $(ASSEMBLIES_DIR)/$(PROJECT)-$(VERSION)-bin.tar.gz : $(ASSEMBLIES_DIR) $(TARGET_EXE) - tar -czf $(ASSEMBLIES_DIR)/$(PROJECT)-$(VERSION)-bin.tar.gz \ - LICENSE \ + mkdir -p $(ASSEMBLIES_DIR)/$(PROJECT)-$(VERSION)-bin + cp -R LICENSE \ NOTICE \ README.md \ conf \ - -C target minifi + bin \ + $(ASSEMBLIES_DIR)/$(PROJECT)-$(VERSION)-bin + cp target/minifi $(ASSEMBLIES_DIR)/$(PROJECT)-$(VERSION)-bin/bin/ + tar -czf $(ASSEMBLIES_DIR)/$(PROJECT)-$(VERSION)-bin.tar.gz -C $(ASSEMBLIES_DIR) $(PROJECT)-$(VERSION)-bin .PHONY: clean clean: diff --git a/bin/minifi.sh b/bin/minifi.sh new file mode 100755 index 0000000000..dca94f3a43 --- /dev/null +++ b/bin/minifi.sh @@ -0,0 +1,338 @@ +#!/bin/sh +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Script structure inspired from Apache Karaf and other Apache projects with similar startup approaches + +SCRIPT_DIR=$(dirname "$0") +SCRIPT_NAME=$(basename "$0") +PROGNAME=$(basename "$0") +SCRIPTPATH="$( cd "$(dirname "$0")" ; pwd -P )" +export MINIFI_HOME="$(dirname ${SCRIPTPATH})" +bin_dir=${MINIFI_HOME}/bin +minifi_executable=${bin_dir}/minifi +pid_file=${bin_dir}/.minifi.pid + +warn() { + echo "${PROGNAME}: $*" +} + +die() { + warn "$*" + exit 1 +} + +detectOS() { + # OS specific support (must be 'true' or 'false'). + cygwin=false; + aix=false; + os400=false; + darwin=false; + case "$(uname)" in + CYGWIN*) + cygwin=true + ;; + AIX*) + aix=true + ;; + OS400*) + os400=true + ;; + Darwin) + darwin=true + ;; + esac + # For AIX, set an environment variable + if ${aix}; then + export LDR_CNTRL=MAXDATA=0xB0000000@DSA + echo ${LDR_CNTRL} + fi +} + +init() { + # Determine if there is special OS handling we must perform + detectOS +} + +# determines the pid +get_pid() { + # Default to a -1 for pid + pid=-1 + # Check to see if we have a pid file + if [ -f ${pid_file} ]; then + pid=$(cat ${pid_file}) + fi + echo ${pid} +} + +# Performs a check to see if the provided pid is one that currently exists +active_pid() { + pid=${1} + kill -s 0 ${pid} > /dev/null 2>&1 + echo $? +} + +install() { + detectOS + + if [ "${darwin}" = "true" ] || [ "${cygwin}" = "true" ]; then + echo 'Installing Apache MiNiFi as a service is not supported on OS X or Cygwin.' + exit 1 + fi + + SVC_NAME=minifi + if [ "x$2" != "x" ] ; then + SVC_NAME=$2 + fi + + initd_dir='/etc/init.d' + SVC_FILE="${initd_dir}/${SVC_NAME}" + + if [ ! -w "${initd_dir}" ]; then + echo "Current user does not have write permissions to ${initd_dir}. Cannot install MiNiFi as a service." + exit 1 + fi + +# Create the init script, overwriting anything currently present +cat < ${SVC_FILE} +#!/bin/sh +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# chkconfig: 2345 20 80 +# description: Apache NiFi MiNiFi is a subproject of Apache nifi to collect data where it originates. +# +# Make use of the configured MINIFI_HOME directory and pass service requests to the minifi executable +export MINIFI_HOME=${MINIFI_HOME} +bin_dir=\${MINIFI_HOME}/bin +minifi_executable=\${bin_dir}/minifi +pid_file=${bin_dir}/.minifi.pid + +# determines the pid +get_pid() { + # Default to a -1 for pid + pid=-1 + # Check to see if we have a pid file + if [ -f \${pid_file} ]; then + pid=\$(cat ${pid_file}) + fi + echo \${pid} +} + +# Performs a check to see if the provided pid is one that currently exists +active_pid() { + pid=\${1} + kill -s 0 \${pid} > /dev/null 2>&1 + echo \$? +} + +saved_pid=\$(get_pid) + +case "\$1" in + start) + if [ "\${saved_pid}" -gt 0 ]; then + if [ \$(active_pid \${saved_pid}) -ne 0 ]; then + echo "PID \${saved_pid} is stale, removing pid file at \${pid_file}"; + if ! rm -f \${pid_file}; then + echo "Could not remove \${pid_file}. File will need to be manually removed." + exit 1; + fi + else + echo "MINIFI is currently running (PID: \${saved_pid}) with pid file \${pid_file}." + exit 0; + fi + fi + \${minifi_executable} & + pid=\$! + echo \${pid} > \${pid_file} + echo Starting MiNiFi with PID \${pid} and pid file \${pid_file} + ;; + stop) + if [ \$(active_pid \${saved_pid}) -ne 0 ]; then + echo "MiNiFi is not currently running." + else + echo "Stopping MiNiFi (PID: \${saved_pid})." + # Send a SIGINT to MiNiFi so that the handler begins shutdown. + kill -2 \${saved_pid} > /dev/null 2>&1 + if [ \$? -ne 0 ]; then + echo "Could not successfully send termination signal to MiNiFi (PID: \${saved_pid})" + exit 1; + else + # Clean up our pid file + rm -f \${pid_file} + fi + fi + ;; + run) + if [ "\${saved_pid}" -gt 0 ]; then + if ! active_pid \${saved_pid}; then + echo "PID \${saved_pid} is stale, removing pid file at \${pid_file}"; + if ! rm -f \${pid_file}; then + echo "Could not remove \${pid_file}. File will need to be manually removed." + exit 1; + fi + else + echo "MINIFI is currently running (PID: \${saved_pid}) with pid file \${pid_file}." + exit 0; + fi + fi + echo running + \${minifi_executable} + ;; + status) + # interpret status as per LSB specifications + # see: http://refspecs.linuxbase.org/LSB_3.1.0/LSB-Core-generic/LSB-Core-generic/iniscrptact.html + + if [ "\${saved_pid}" -gt 0 ]; then + if [ \$(active_pid \${saved_pid}) -ne 0 ]; then + # program is dead and pid file exists + echo "Program is not currently running but stale pid file (\${pid_file}) exists."; + exit 1 + else + # pid is correct, program is running + echo "MINIFI is currently running (PID: \${saved_pid}) with pid file \${pid_file}." + exit 0; + fi + else + # program is not running + echo "MiNiFi is not currently running." + exit 3; + fi + ;; + restart) + echo Restarting MiNiFi service + \${bin_dir}/minifi.sh stop + \${bin_dir}/minifi.sh start + ;; + *) + echo "Usage: service minifi {start|stop|restart|status}" + ;; +esac + +SERVICEDESCRIPTOR + + if [ ! -f "${SVC_FILE}" ]; then + echo "Could not create service file ${SVC_FILE}" + exit 1 + fi + + # Provide the user execute access on the file + chmod u+x ${SVC_FILE} + + rm -f "/etc/rc2.d/S65${SVC_NAME}" + ln -s "/etc/init.d/${SVC_NAME}" "/etc/rc2.d/S65${SVC_NAME}" || { echo "Could not create link /etc/rc2.d/S65${SVC_NAME}"; exit 1; } + rm -f "/etc/rc2.d/K65${SVC_NAME}" + ln -s "/etc/init.d/${SVC_NAME}" "/etc/rc2.d/K65${SVC_NAME}" || { echo "Could not create link /etc/rc2.d/K65${SVC_NAME}"; exit 1; } + echo "Service ${SVC_NAME} installed" +} + +saved_pid=$(get_pid) + +case "$1" in + start) + if [ "${saved_pid}" -gt 0 ]; then + if [ $(active_pid ${saved_pid}) -ne 0 ]; then + echo "PID ${saved_pid} is stale, removing pid file at ${pid_file}"; + if ! rm -f ${pid_file}; then + echo "Could not remove ${pid_file}. File will need to be manually removed." + exit 1; + fi + else + echo "MINIFI is currently running (PID: ${saved_pid}) with pid file ${pid_file}." + exit 0; + fi + fi + ${minifi_executable} & + pid=$! + echo ${pid} > ${pid_file} + echo Starting MiNiFi with PID ${pid} and pid file ${pid_file} + ;; + stop) + if [ $(active_pid ${saved_pid}) -ne 0 ]; then + echo "MiNiFi is not currently running." + else + echo "Stopping MiNiFi (PID: ${saved_pid})." + # Send a SIGINT to MiNiFi so that the handler begins shutdown. + kill -2 ${saved_pid} > /dev/null 2>&1 + if [ $? -ne 0 ]; then + echo "Could not successfully send termination signal to MiNiFi (PID: ${saved_pid})" + exit 1; + else + # Clean up our pid file + rm -f ${pid_file} + fi + fi + ;; + run) + if [ "${saved_pid}" -gt 0 ]; then + if [ $(active_pid ${saved_pid}) -ne 0 ]; then + echo "PID ${saved_pid} is stale, removing pid file at ${pid_file}"; + if ! rm -f ${pid_file}; then + echo "Could not remove ${pid_file}. File will need to be manually removed." + exit 1; + fi + else + echo "MINIFI is currently running (PID: ${saved_pid}) with pid file ${pid_file}." + exit 0; + fi + fi + ${minifi_executable} + ;; + status) + # interpret status as per LSB specifications + # see: http://refspecs.linuxbase.org/LSB_3.1.0/LSB-Core-generic/LSB-Core-generic/iniscrptact.html + + if [ "${saved_pid}" -gt 0 ]; then + if [ $(active_pid ${saved_pid}) -ne 0 ]; then + # program is dead and pid file exists + echo "Program is not currently running but stale pid file (${pid_file}) exists."; + exit 1 + else + # pid is correct, program is running + echo "MINIFI is currently running (PID: ${saved_pid}) with pid file ${pid_file}." + exit 0; + fi + else + # program is not running + echo "MiNiFi is not currently running." + exit 3; + fi + ;; + restart) + echo Restarting MiNiFi service + ${bin_dir}/minifi.sh stop + ${bin_dir}/minifi.sh start + ;; + install) + install "$@" + ;; + *) + echo "Usage: minifi.sh {start|stop|run|restart|status|install}" + ;; +esac diff --git a/inc/Configure.h b/inc/Configure.h index 502916f95c..d325fa0636 100644 --- a/inc/Configure.h +++ b/inc/Configure.h @@ -76,14 +76,28 @@ class Configure { void parseConfigureFileLine(char *buf); //! Load Configure File void loadConfigureFile(const char *fileName); - //! Parse Command Line - void pareCommandLine(int argc, char **argv); + //! Set the determined MINIFI_HOME + void setHome(std::string minifiHome) + { + _minifiHome = minifiHome; + } + + //! Get the determined MINIFI_HOME + std::string getHome() + { + return _minifiHome; + } + //! Parse Command Line + void parseCommandLine(int argc, char **argv); private: //! Mutex for protection std::mutex _mtx; //! Logger Logger *_logger; + //! Home location for this executable + std::string _minifiHome; + Configure() { _logger = Logger::getLogger(); diff --git a/inc/Connection.h b/inc/Connection.h index 919cdc9b20..dc6b94bea0 100644 --- a/inc/Connection.h +++ b/inc/Connection.h @@ -102,8 +102,6 @@ class Connection } //! Set Connection relationship void setRelationship(Relationship relationship) { - _logger->log_debug("Set connection %s relationship %s", - _name.c_str(), relationship.getName().c_str()); _relationship = relationship; } // ! Get Connection relationship diff --git a/inc/Logger.h b/inc/Logger.h index 42cf3ea089..3edad9dfab 100644 --- a/inc/Logger.h +++ b/inc/Logger.h @@ -38,8 +38,8 @@ using spdlog::logger; #define DEFAULT_LOG_FILE_SIZE (5*1024*1024) //! 3 log files rotation #define DEFAULT_LOG_FILE_NUMBER 3 -#define LOG_NAME "nifi" -#define LOG_FILE_NAME "nifi" +#define LOG_NAME "minifi log" +#define LOG_FILE_NAME "minifi-app.log" typedef enum { @@ -141,11 +141,7 @@ class Logger { * Create a logger * */ Logger(const std::string logger_name = LOG_NAME, const std::string filename = LOG_FILE_NAME, size_t max_file_size = DEFAULT_LOG_FILE_SIZE, size_t max_files = DEFAULT_LOG_FILE_NUMBER, bool force_flush = true) { - /* - if (!filename.empty()) - _spdlog = rotating_logger_mt(logger_name, filename, max_file_size, max_files, force_flush); - else */ - _spdlog = stdout_logger_mt("console"); + _spdlog = rotating_logger_mt(logger_name, filename, max_file_size, max_files, force_flush); _spdlog->set_level((spdlog::level::level_enum) debug); } //! spdlog diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp index 4506af1b4b..bf394b74b5 100644 --- a/main/MiNiFiMain.cpp +++ b/main/MiNiFiMain.cpp @@ -24,6 +24,8 @@ #include #include #include +#include +#include "spdlog/spdlog.h" #include "Logger.h" #include "Configure.h" @@ -37,6 +39,8 @@ #define DEFAULT_NIFI_CONFIG_YML "./conf/config.yml" //! Default nifi properties file path #define DEFAULT_NIFI_PROPERTIES_FILE "./conf/minifi.properties" +//! Define home environment variable +#define MINIFI_HOME_ENV_KEY "MINIFI_HOME" /* Define Parser Values for Configuration YAML sections */ #define CONFIG_YAML_FLOW_CONTROLLER_KEY "Flow Controller" @@ -59,64 +63,49 @@ void sigHandler(int signal) } } -int loadYaml() { - YAML::Node flow = YAML::LoadFile("./conf/flow.yml"); - - YAML::Node flowControllerNode = flow[CONFIG_YAML_FLOW_CONTROLLER_KEY]; - YAML::Node processorsNode = flow[CONFIG_YAML_PROCESSORS_KEY]; - YAML::Node connectionsNode = flow[CONFIG_YAML_CONNECTIONS_KEY]; - YAML::Node remoteProcessingGroupNode = flow[CONFIG_YAML_REMOTE_PROCESSING_GROUPS_KEY]; - - if (processorsNode) { - int numProcessors = processorsNode.size(); - if (numProcessors < 1) { - throw new std::invalid_argument("There must be at least one processor configured."); - } - - std::vector processorConfigs; - - if (processorsNode.IsSequence()) { - for (YAML::const_iterator iter = processorsNode.begin(); iter != processorsNode.end(); ++iter) { - ProcessorConfig procCfg; - YAML::Node procNode = iter->as(); - - procCfg.name = procNode["name"].as(); - procCfg.javaClass = procNode["class"].as(); - - processorConfigs.push_back(procCfg); - } - } - - Logger::getLogger()->log_info("Added %d processor configs.", processorConfigs.size()); - } else { - throw new std::invalid_argument( - "Cannot instantiate a MiNiFi instance without a defined Processors configuration node."); +int main(int argc, char **argv) +{ + try + { + std::vector sinks; + sinks.push_back(std::make_shared()); + sinks.push_back(std::make_shared("logfile", "log", 23, 59)); + auto combined_logger = std::make_shared("name", begin(sinks), end(sinks)); + spdlog::register_logger(combined_logger); + } + catch (const spdlog::spdlog_ex& ex) + { + std::cout << "Log failed: " << ex.what() << std::endl; } - - return 0; -} - -int main(int argc, char **argv) { Logger *logger = Logger::getLogger(); logger->setLogLevel(info); - logger->log_info("MiNiFi started"); - try { - logger->log_info("Performing parsing of specified config.yml"); - loadYaml(); - } catch (...) { - std::cout << "Could not load YAML due to improper configuration."; - return 1; - } - - if (signal(SIGINT, sigHandler) == SIG_ERR || signal(SIGTERM, sigHandler) == SIG_ERR || signal(SIGPIPE, SIG_IGN) == SIG_ERR) { + // assumes POSIX compliant environment + std::string minifiHome; + if (const char* env_p = std::getenv(MINIFI_HOME_ENV_KEY)) + { + minifiHome = env_p; + } + else + { + logger->log_info("MINIFI_HOME was not found, determining based on executable path."); + char *path = NULL; + char full_path[PATH_MAX]; + path = realpath(argv[0], full_path); + std::string minifiHome(path); + minifiHome = minifiHome.substr(0, minifiHome.find_last_of("/\\")); + } + + if (signal(SIGINT, sigHandler) == SIG_ERR || signal(SIGTERM, sigHandler) == SIG_ERR || signal(SIGPIPE, SIG_IGN) == SIG_ERR) + { logger->log_error("Can not install signal handler"); return -1; } - Configure *configure = Configure::getConfigure(); - configure->loadConfigureFile(DEFAULT_NIFI_PROPERTIES_FILE); + Configure *configure = Configure::getConfigure(); + configure->setHome(minifiHome); + configure->loadConfigureFile(DEFAULT_NIFI_PROPERTIES_FILE); controller = new FlowController(); @@ -126,6 +115,8 @@ int main(int argc, char **argv) { controller->start(); running = true; + logger->log_info("MiNiFi started"); + // main loop while (running) { diff --git a/src/Configure.cpp b/src/Configure.cpp index 862bb715da..d7fd95bcb4 100644 --- a/src/Configure.cpp +++ b/src/Configure.cpp @@ -107,23 +107,42 @@ void Configure::parseConfigureFileLine(char *buf) //! Load Configure File void Configure::loadConfigureFile(const char *fileName) { - std::ifstream file(fileName, std::ifstream::in); - if (!file.good()) - { - _logger->log_error("load configure file failed %s", fileName); - return; - } - this->clear(); - const unsigned int bufSize = 512; - char buf[bufSize]; - for (file.getline(buf,bufSize); file.good(); file.getline(buf,bufSize)) - { - parseConfigureFileLine(buf); - } + + std::string adjustedFilename; + if (fileName) + { + // perform a naive determination if this is a relative path + if (fileName[0] != '/') + { + adjustedFilename = adjustedFilename + _configure->getHome() + "/" + fileName; + } + else + { + adjustedFilename += fileName; + } + } + char *path = NULL; + char full_path[PATH_MAX]; + path = realpath(adjustedFilename.c_str(), full_path); + _logger->log_info("Using configuration file located at %s", path); + + std::ifstream file(path, std::ifstream::in); + if (!file.good()) + { + _logger->log_error("load configure file failed %s", path); + return; + } + this->clear(); + const unsigned int bufSize = 512; + char buf[bufSize]; + for (file.getline(buf, bufSize); file.good(); file.getline(buf, bufSize)) + { + parseConfigureFileLine(buf); + } } //! Parse Command Line -void Configure::pareCommandLine(int argc, char **argv) +void Configure::parseCommandLine(int argc, char **argv) { int i; bool keyFound = false; diff --git a/src/FlowControlProtocol.cpp b/src/FlowControlProtocol.cpp index 6f1517c5f2..011ebcfdfa 100644 --- a/src/FlowControlProtocol.cpp +++ b/src/FlowControlProtocol.cpp @@ -231,7 +231,6 @@ void FlowControlProtocol::stop() return; _running = false; _logger->log_info("FlowControl Protocol Stop"); - delete _thread; } void FlowControlProtocol::run(FlowControlProtocol *protocol) diff --git a/src/FlowController.cpp b/src/FlowController.cpp index c01c3853b5..b176a12793 100644 --- a/src/FlowController.cpp +++ b/src/FlowController.cpp @@ -37,7 +37,7 @@ FlowController::FlowController(std::string name) uuid_generate(_uuid); // Setup the default values - _configurationFileName = DEFAULT_FLOW_XML_FILE_NAME; + _configurationFileName = DEFAULT_FLOW_YAML_FILE_NAME; _maxEventDrivenThreads = DEFAULT_MAX_EVENT_DRIVEN_THREAD; _maxTimerDrivenThreads = DEFAULT_MAX_TIMER_DRIVEN_THREAD; _running = false; @@ -48,10 +48,46 @@ FlowController::FlowController(std::string name) // NiFi config properties _configure = Configure::getConfigure(); - _configure->get(Configure::nifi_flow_configuration_file, _configurationFileName); - _logger->log_info("FlowController NiFi XML file %s", _configurationFileName.c_str()); - // Create repos for flow record and provenance + std::string rawConfigFileString; + _configure->get(Configure::nifi_flow_configuration_file, rawConfigFileString); + + if (!rawConfigFileString.empty()) + { + _configurationFileName = rawConfigFileString; + } + + char *path = NULL; + char full_path[PATH_MAX]; + + std::string adjustedFilename; + if (!_configurationFileName.empty()) + { + // perform a naive determination if this is a relative path + if (_configurationFileName.c_str()[0] != '/') + { + adjustedFilename = adjustedFilename + _configure->getHome() + "/" + _configurationFileName; + } + else + { + adjustedFilename = _configurationFileName; + } + } + + path = realpath(adjustedFilename.c_str(), full_path); + if (!path) + { + _logger->log_error("Could not locate path from provided configuration file name."); + } + + char *flowPath = NULL; + char flow_full_path[PATH_MAX]; + + std::string pathString(path); + _configurationFileName = pathString; + _logger->log_info("FlowController NiFi Configuration file %s", pathString.c_str()); + + // Create repos for flow record and provenance _logger->log_info("FlowController %s created", _name.c_str()); } @@ -419,15 +455,11 @@ void FlowController::parseProcessorNodeYaml(YAML::Node processorsNode, ProcessGr std::vector rawAutoTerminatedRelationshipValues; if (autoTerminatedSequence.IsSequence() && !autoTerminatedSequence.IsNull() && autoTerminatedSequence.size() > 0) { - _logger->log_debug("Found non-empty auto terminated sequence... interpreting."); for (YAML::const_iterator relIter = autoTerminatedSequence.begin(); relIter != autoTerminatedSequence.end(); ++relIter) { std::string autoTerminatedRel = relIter->as(); - _logger->log_debug("Auto terminating relationship %s", autoTerminatedRel.c_str()); rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel); } - } else { - _logger->log_debug("no relationships are auto terminated here..."); } procCfg.autoTerminatedRelationships = rawAutoTerminatedRelationshipValues; @@ -604,7 +636,7 @@ void FlowController::parseConnectionYaml(YAML::Node *connectionsNode, ProcessGro if (connectionsNode->IsSequence()) { for (YAML::const_iterator iter = connectionsNode->begin(); iter != connectionsNode->end(); ++iter) { -// generate the random UIID + // generate the random UIID uuid_generate(uuid); YAML::Node connectionNode = iter->as(); @@ -623,10 +655,9 @@ void FlowController::parseConnectionYaml(YAML::Node *connectionsNode, ProcessGro if (connection) connection->setRelationship(relationship); std::string connectionSrcProcName = connectionNode["source name"].as(); + Processor *srcProcessor = this->_root->findProcessor(connectionSrcProcName); - _logger->log_debug("I see processor with name %s looking for source with name %s", - this->_root->findProcessor(connectionSrcProcName)->getName().c_str(), - connectionSrcProcName.c_str()); + if (!srcProcessor) { _logger->log_error("Could not locate a source with name %s to create a connection", connectionSrcProcName.c_str()); @@ -634,24 +665,15 @@ void FlowController::parseConnectionYaml(YAML::Node *connectionsNode, ProcessGro "Could not locate a source with name %s to create a connection " + connectionSrcProcName); } - _logger->log_debug("This processor has UUID of %s", srcProcessor->getUUIDStr().c_str()); - _logger->log_trace("Trying to find dest processor by name %s", destName.c_str()); Processor *destProcessor = this->_root->findProcessor(destName); // If we could not find name, try by UUID if (!destProcessor) { - _logger->log_trace("Now looking up by uuid"); uuid_t destUuid; uuid_parse(destName.c_str(), destUuid); destProcessor = this->_root->findProcessor(destUuid); } if (destProcessor) { std::string destUuid = destProcessor->getUUIDStr(); - if (!destUuid.empty()) { - _logger->log_debug("This destination processor has UUID of %s", - destProcessor->getUUIDStr().c_str()); - } - } else { - _logger->log_debug("!!! === Could not find a destination processor for the connection."); } uuid_t srcUuid; @@ -787,8 +809,6 @@ void FlowController::parsePortYaml(YAML::Node *portNode, ProcessGroup *parent, T Processor *processor = NULL; RemoteProcessorGroupPort *port = NULL; - _logger->log_trace("Creating a port from YAML."); - if (!parent) { _logger->log_error("parseProcessNode: no parent group existed"); return; @@ -796,7 +816,7 @@ void FlowController::parsePortYaml(YAML::Node *portNode, ProcessGroup *parent, T YAML::Node inputPortsObj = portNode->as(); -// generate the random UIID + // generate the random UIID uuid_generate(uuid); auto portId = inputPortsObj["id"].as(); @@ -804,8 +824,6 @@ void FlowController::parsePortYaml(YAML::Node *portNode, ProcessGroup *parent, T uuid_parse(portId.c_str(), uuid); port = new RemoteProcessorGroupPort(nameStr.c_str(), uuid); - _logger->log_debug("parse input port: name => [%s]", nameStr.c_str()); - _logger->log_debug("parse input port: id => [%s]", portId.c_str()); processor = (Processor *) port; port->setDirection(direction); @@ -819,23 +837,17 @@ void FlowController::parsePortYaml(YAML::Node *portNode, ProcessGroup *parent, T YAML::Node propertiesNode = nodeVal["Properties"]; std::vector properties; - _logger->log_debug("!!! === Checking out properties for input port...."); - if (propertiesNode.IsMap() && !propertiesNode.IsNull() && propertiesNode.size() > 0) { std::map propertiesMap = propertiesNode.as>(); - _logger->log_debug("Found non-empty properties sequence... interpreting."); for (std::map::iterator propsIter = propertiesMap.begin(); propsIter != propertiesMap.end(); propsIter++) { std::string propertyName = propsIter->first; std::string propertyValue = propsIter->second; - _logger->log_debug("Detected property %s => %s", propertyName.c_str(), propertyValue.c_str()); if (!processor->setProperty(propertyName, propertyValue)) { _logger->log_warn("Received property %s with value %s but is not one of the properties for %s", propertyName.c_str(), propertyValue.c_str(), nameStr.c_str()); } } - } else { - _logger->log_debug("no properties here..."); } // add processor to parent @@ -1149,8 +1161,6 @@ void FlowController::load(ConfigFormat configFormat) { xmlCleanupParser(); _initialized = true; } else if (ConfigFormat::YAML == configFormat) { - _logger->log_info("Detected a YAML configuration file for processing."); - YAML::Node flow = YAML::LoadFile(_configurationFileName); YAML::Node flowControllerNode = flow["Flow Controller"]; diff --git a/src/ListenSyslog.cpp b/src/ListenSyslog.cpp index 090c988d2f..ace37d775f 100644 --- a/src/ListenSyslog.cpp +++ b/src/ListenSyslog.cpp @@ -331,9 +331,6 @@ void ListenSyslog::onTrigger(ProcessContext *context, ProcessSession *session) } else { - /* - ListenSyslog::WriteCallback callbackSep((char *)_messageDelimiter.data(), _messageDelimiter.size()); - session->append(flowFile, &callbackSep); */ ListenSyslog::WriteCallback callback((char *)event.payload, event.len); session->append(flowFile, &callback); delete[] event.payload;