From ce3fcfdc3ad299c2f6d790ada1e152cabaf84c52 Mon Sep 17 00:00:00 2001 From: Marc Date: Thu, 9 Feb 2017 20:40:32 -0500 Subject: [PATCH] MINIFI-190: Creating initial commit with changes Resolves MINIFI-194 by using a semaphore in place of the FlowController instance. Stop is performed outside of the signal handler to avoid synchronicity issues. Resolves MINIFI-192 by using lock_guard based on a conditional Resolves issues found with MINIFI-190 regarding GetFile. Added pragma definitions for GCC < 4.9 --- .travis.yml | 2 +- CMakeLists.txt | 3 +- libminifi/cmake/FindLeveldb.cmake | 1 + libminifi/include/Configure.h | 3 + libminifi/include/FlowController.h | 238 +++-- libminifi/include/Logger.h | 34 + libminifi/include/Processor.h | 2 +- libminifi/include/Provenance.h | 219 +--- libminifi/include/ResourceClaim.h | 6 +- libminifi/include/Serializable.h | 294 ++++++ libminifi/src/Configure.cpp | 2 + libminifi/src/FlowControlProtocol.cpp | 21 - libminifi/src/FlowController.cpp | 1103 +++++++++++--------- libminifi/src/FlowFileRecord.cpp | 2 +- libminifi/src/GetFile.cpp | 55 +- libminifi/src/ProcessGroup.cpp | 14 +- libminifi/src/ProcessSession.cpp | 57 +- libminifi/src/Processor.cpp | 108 +- libminifi/src/Provenance.cpp | 822 +++++---------- libminifi/src/PutFile.cpp | 2 + libminifi/src/ResourceClaim.cpp | 4 + libminifi/src/Serializable.cpp | 365 +++++++ libminifi/src/Site2SitePeer.cpp | 2 +- libminifi/test/Server.cpp | 1 - libminifi/test/TestBase.h | 62 +- libminifi/test/unit/ProcessorTests.h | 72 +- libminifi/test/unit/ProvenanceTestHelper.h | 70 ++ libminifi/test/unit/ProvenanceTests.h | 66 +- main/CMakeLists.txt | 2 + main/MiNiFiMain.cpp | 98 +- 30 files changed, 2237 insertions(+), 1493 deletions(-) create mode 100644 libminifi/include/Serializable.h create mode 100644 libminifi/src/Serializable.cpp diff --git a/.travis.yml b/.travis.yml index 91ef32933e..b64902f3a6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -22,7 +22,7 @@ script: # Establish updated toolchain as default - sudo unlink /usr/bin/gcc && sudo ln -s /usr/bin/gcc-4.8 /usr/bin/gcc - sudo unlink /usr/bin/g++ && sudo ln -s /usr/bin/g++-4.8 /usr/bin/g++ - - mkdir ./build && cd ./build && cmake .. && make && make test + - mkdir ./build && cd ./build && cmake .. && make && ./tests addons: apt: diff --git a/CMakeLists.txt b/CMakeLists.txt index 9900e2f327..8d7276bb6a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -60,10 +60,8 @@ endif() list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake") find_package(UUID REQUIRED) - file(GLOB SPD_SOURCES "include/spdlog/*") - add_subdirectory(thirdparty/yaml-cpp-yaml-cpp-0.5.3) add_subdirectory(libminifi) add_subdirectory(main) @@ -101,6 +99,7 @@ set(CPACK_COMPONENTS_ALL bin) include(CPack) + enable_testing(test) file(GLOB LIBMINIFI_TEST_SOURCES "libminifi/test/unit/*.cpp") add_executable(tests ${LIBMINIFI_TEST_SOURCES} ${SPD_SOURCES}) diff --git a/libminifi/cmake/FindLeveldb.cmake b/libminifi/cmake/FindLeveldb.cmake index 32adafaf71..9e0cad7511 100644 --- a/libminifi/cmake/FindLeveldb.cmake +++ b/libminifi/cmake/FindLeveldb.cmake @@ -9,6 +9,7 @@ # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, + # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the diff --git a/libminifi/include/Configure.h b/libminifi/include/Configure.h index 00b774245a..d4a43e03f8 100644 --- a/libminifi/include/Configure.h +++ b/libminifi/include/Configure.h @@ -44,6 +44,8 @@ class Configure { static const char *nifi_flow_configuration_file; static const char *nifi_administrative_yield_duration; static const char *nifi_bored_yield_duration; + static const char *nifi_graceful_shutdown_seconds; + static const char *nifi_log_level; static const char *nifi_server_name; static const char *nifi_server_port; static const char *nifi_server_report_interval; @@ -57,6 +59,7 @@ class Configure { static const char *nifi_security_client_pass_phrase; static const char *nifi_security_client_ca_certificate; + //! Clear the load config void clear() { diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h index b98393e48b..02c39b1e51 100644 --- a/libminifi/include/FlowController.h +++ b/libminifi/include/FlowController.h @@ -28,7 +28,7 @@ #include #include #include -#include +#include "yaml-cpp/yaml.h" #include "Configure.h" #include "Property.h" @@ -75,21 +75,15 @@ struct ProcessorConfig { std::vector properties; }; -//! FlowController Class -class FlowController -{ +/** + * Flow Controller class. Generally used by FlowController factory + * as a singleton. + */ +class FlowController { public: - static const int DEFAULT_MAX_TIMER_DRIVEN_THREAD = 10; - static const int DEFAULT_MAX_EVENT_DRIVEN_THREAD = 5; - //! Get the singleton flow controller - static FlowController * getFlowController() - { - if (!_flowController) - { - _flowController = new FlowController(); - } - return _flowController; - } + static const int DEFAULT_MAX_TIMER_DRIVEN_THREAD = 10; + static const int DEFAULT_MAX_EVENT_DRIVEN_THREAD = 5; + //! passphase for the private file callback static int pemPassWordCb(char *buf, int size, int rwflag, void *userdata) { @@ -111,96 +105,104 @@ class FlowController } //! Destructor - virtual ~FlowController(); + virtual ~FlowController(){ + if (_ctx) + SSL_CTX_free(_ctx); + } //! Set FlowController Name - void setName(std::string name) { + virtual void setName(std::string name) { _name = name; } //! Get Flow Controller Name - std::string getName(void) { + virtual std::string getName(void) { return (_name); } //! Set UUID - void setUUID(uuid_t uuid) { + virtual void setUUID(uuid_t uuid) { uuid_copy(_uuid, uuid); } //! Get UUID - bool getUUID(uuid_t uuid) { - if (uuid) - { + virtual bool getUUID(uuid_t uuid) { + if (uuid) { uuid_copy(uuid, _uuid); return true; - } - else + } else return false; } //! Set MAX TimerDrivenThreads - void setMaxTimerDrivenThreads(int number) - { + virtual void setMaxTimerDrivenThreads(int number) { _maxTimerDrivenThreads = number; } //! Get MAX TimerDrivenThreads - int getMaxTimerDrivenThreads() - { + virtual int getMaxTimerDrivenThreads() { return _maxTimerDrivenThreads; } //! Set MAX EventDrivenThreads - void setMaxEventDrivenThreads(int number) - { + virtual void setMaxEventDrivenThreads(int number) { _maxEventDrivenThreads = number; } //! Get MAX EventDrivenThreads - int getMaxEventDrivenThreads() - { + virtual int getMaxEventDrivenThreads() { return _maxEventDrivenThreads; } //! Get the provenance repository - ProvenanceRepository *getProvenanceRepository() - { + virtual ProvenanceRepository *getProvenanceRepository() { return this->_provenanceRepo; } - //! Life Cycle related function - //! Load flow YAML from disk, after that, create the root process group and its children, initialize the flows - void load(); + //! Load flow xml from disk, after that, create the root process group and its children, initialize the flows + virtual void load() = 0; + //! Whether the Flow Controller is start running - bool isRunning(); - //! Whether the Flow Controller has already been initialized (loaded flow YAML) - bool isInitialized(); + virtual bool isRunning() { + return _running.load(); + } + //! Whether the Flow Controller has already been initialized (loaded flow XML) + virtual bool isInitialized() { + return _initialized.load(); + } //! Start to run the Flow Controller which internally start the root process group and all its children - bool start(); - //! Stop to run the Flow Controller which internally stop the root process group and all its children - void stop(bool force); - //! reload flow controller's configuration - void reload(std::string yamlFile); + virtual bool start() = 0; //! Unload the current flow YAML, clean the root process group and all its children - void unload(); + virtual void stop(bool force) = 0; + //! Asynchronous function trigger unloading and wait for a period of time + virtual void waitUnload(const uint64_t timeToWaitMs) = 0; + //! Unload the current flow xml, clean the root process group and all its children + virtual void unload() = 0; + //! Load new xml + virtual void reload(std::string yamlFile) = 0; //! update property value - void updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue) - { + void updatePropertyValue(std::string processorName, + std::string propertyName, std::string propertyValue) { if (_root) - _root->updatePropertyValue(processorName, propertyName, propertyValue); + _root->updatePropertyValue(processorName, propertyName, + propertyValue); } //! Create Processor (Node/Input/Output Port) based on the name - Processor *createProcessor(std::string name, uuid_t uuid); + virtual Processor *createProcessor(std::string name, uuid_t uuid) = 0; //! Create Root Processor Group - ProcessGroup *createRootProcessGroup(std::string name, uuid_t uuid); + virtual ProcessGroup *createRootProcessGroup(std::string name, uuid_t uuid) = 0; //! Create Remote Processor Group - ProcessGroup *createRemoteProcessGroup(std::string name, uuid_t uuid); + virtual ProcessGroup *createRemoteProcessGroup(std::string name, + uuid_t uuid) = 0; //! Create Connection - Connection *createConnection(std::string name, uuid_t uuid); + virtual Connection *createConnection(std::string name, uuid_t uuid) = 0; //! set 8 bytes SerialNumber - void setSerialNumber(uint8_t *number) - { + virtual void setSerialNumber(uint8_t *number) { _protocol->setSerialNumber(number); } + + //! getSSLContext - SSL_CTX *getSSLContext() + virtual SSL_CTX *getSSLContext() { return _ctx; } protected: + + //! SSL context + SSL_CTX *_ctx; //! A global unique identifier uuid_t _uuid; @@ -218,6 +220,10 @@ class FlowController int _maxEventDrivenThreads; //! Config //! FlowFile Repo + //! Whether it is running + std::atomic _running; + //! Whether it has already been initialized (load the flow XML already) + std::atomic _initialized; //! Provenance Repo ProvenanceRepository *_provenanceRepo; //! Flow Engines @@ -231,50 +237,130 @@ class FlowController //! Heart Beat //! FlowControl Protocol FlowControlProtocol *_protocol; - //! SSL context - SSL_CTX *_ctx; + + + FlowController() : + _root(0), _maxTimerDrivenThreads(0), _maxEventDrivenThreads(0), _running( + false), _initialized(false), _provenanceRepo(0), _protocol( + 0), _logger(Logger::getLogger()), _ctx(NULL){ + } private: + //! Logger + Logger *_logger; + +}; + +/** + * Flow Controller implementation that defines the typical flow. + * of events. + */ +class FlowControllerImpl: public FlowController { +public: + + //! Destructor + virtual ~FlowControllerImpl(); + + //! Life Cycle related function + //! Load flow xml from disk, after that, create the root process group and its children, initialize the flows + void load(); + //! Start to run the Flow Controller which internally start the root process group and all its children + bool start(); + //! Stop to run the Flow Controller which internally stop the root process group and all its children + void stop(bool force); + //! Asynchronous function trigger unloading and wait for a period of time + void waitUnload(const uint64_t timeToWaitMs); + //! Unload the current flow xml, clean the root process group and all its children + void unload(); + //! Load new xml + void reload(std::string yamlFile); + //! update property value + void updatePropertyValue(std::string processorName, + std::string propertyName, std::string propertyValue) { + if (_root) + _root->updatePropertyValue(processorName, propertyName, + propertyValue); + } + + //! Create Processor (Node/Input/Output Port) based on the name + Processor *createProcessor(std::string name, uuid_t uuid); + //! Create Root Processor Group + ProcessGroup *createRootProcessGroup(std::string name, uuid_t uuid); + //! Create Remote Processor Group + ProcessGroup *createRemoteProcessGroup(std::string name, uuid_t uuid); + //! Create Connection + Connection *createConnection(std::string name, uuid_t uuid); + + //! Constructor + /*! + * Create a new Flow Controller + */ + FlowControllerImpl(std::string name = DEFAULT_ROOT_GROUP_NAME); + + + + + friend class FlowControlFactory; + +private: + + + //! Mutex for protection std::mutex _mtx; //! Logger Logger *_logger; - //! Configure Configure *_configure; - //! Whether it is running - std::atomic _running; - //! Whether it has already been initialized (load the flow YAML already) - std::atomic _initialized; //! Process Processor Node YAML void parseProcessorNodeYaml(YAML::Node processorNode, ProcessGroup *parent); //! Process Port YAML - void parsePortYaml(YAML::Node *portNode, ProcessGroup *parent, TransferDirection direction); + void parsePortYaml(YAML::Node *portNode, ProcessGroup *parent, + TransferDirection direction); //! Process Root Processor Group YAML void parseRootProcessGroupYaml(YAML::Node rootNode); //! Process Property YAML - void parseProcessorPropertyYaml(YAML::Node *doc, YAML::Node *node, Processor *processor); + void parseProcessorPropertyYaml(YAML::Node *doc, YAML::Node *node, + Processor *processor); //! Process connection YAML void parseConnectionYaml(YAML::Node *node, ProcessGroup *parent); //! Process Remote Process Group YAML void parseRemoteProcessGroupYaml(YAML::Node *node, ProcessGroup *parent); //! Parse Properties Node YAML for a processor - void parsePropertiesNodeYaml(YAML::Node *propertiesNode, Processor *processor); - - static FlowController *_flowController; - - //! Constructor - /*! - * Create a new Flow Controller - */ - FlowController(std::string name = DEFAULT_ROOT_GROUP_NAME); + void parsePropertiesNodeYaml(YAML::Node *propertiesNode, + Processor *processor); // Prevent default copy constructor and assignment operation // Only support pass by reference or pointer - FlowController(const FlowController &parent); - FlowController &operator=(const FlowController &parent); + FlowControllerImpl(const FlowController &parent); + FlowControllerImpl &operator=(const FlowController &parent); + +}; +/** + * Flow Controller factory that creates flow controllers or gets the + * assigned instance. + */ +class FlowControllerFactory { +public: + //! Get the singleton flow controller + static FlowController * getFlowController(FlowController *instance = 0) { + if (!_flowController) { + if (NULL == instance) + _flowController = createFlowController(); + else + _flowController = instance; + } + return _flowController; + } + + //! Get the singleton flow controller + static FlowController * createFlowController() { + return dynamic_cast(new FlowControllerImpl()); + } +private: + static FlowController *_flowController; }; #endif diff --git a/libminifi/include/Logger.h b/libminifi/include/Logger.h index 621df78d18..741158c51b 100644 --- a/libminifi/include/Logger.h +++ b/libminifi/include/Logger.h @@ -21,6 +21,8 @@ #ifndef __LOGGER_H__ #define __LOGGER_H__ +#include +#include #include #include "spdlog/spdlog.h" @@ -56,6 +58,8 @@ typedef enum off = 9 } LOG_LEVEL_E; + + //! Logger Class class Logger { @@ -72,6 +76,36 @@ class Logger { return; _spdlog->set_level((spdlog::level::level_enum) level); } + + void setLogLevel(const std::string &level,LOG_LEVEL_E defaultLevel = info ) + { + std::string logLevel = ""; + std::transform(level.begin(), level.end(), logLevel.end(), ::tolower); + + if (logLevel == "trace") { + setLogLevel(trace); + } else if (logLevel == "debug") { + setLogLevel(debug); + } else if (logLevel == "info") { + setLogLevel(info); + } else if (logLevel == "notice") { + setLogLevel(notice); + } else if (logLevel == "warn") { + setLogLevel(warn); + } else if (logLevel == "error") { + setLogLevel(err); + } else if (logLevel == "critical") { + setLogLevel(critical); + } else if (logLevel == "alert") { + setLogLevel(alert); + } else if (logLevel == "emerg") { + setLogLevel(emerg); + } else if (logLevel == "off") { + setLogLevel(off); + } else { + setLogLevel(defaultLevel); + } + } //! Destructor ~Logger() {} /** diff --git a/libminifi/include/Processor.h b/libminifi/include/Processor.h index 35bf040c25..a0df577678 100644 --- a/libminifi/include/Processor.h +++ b/libminifi/include/Processor.h @@ -343,7 +343,7 @@ class Processor //! Incoming connection Iterator std::set::iterator _incomingConnectionsIter; //! Condition for whether there is incoming work to do - bool _hasWork = false; + std::atomic _hasWork; //! Concurrent condition mutex for whether there is incoming work to do std::mutex _workAvailableMtx; //! Concurrent condition variable for whether there is incoming work to do diff --git a/libminifi/include/Provenance.h b/libminifi/include/Provenance.h index f3e814f677..7507c03724 100644 --- a/libminifi/include/Provenance.h +++ b/libminifi/include/Provenance.h @@ -20,33 +20,30 @@ #ifndef __PROVENANCE_H__ #define __PROVENANCE_H__ -#include -#include -#include -#include -#include +#include #include -#include -#include -#include -#include #include +#include +#include +#include +#include #include -#include -#include -#include +#include #include -#include -#include "leveldb/db.h" +#include -#include "TimeUtil.h" -#include "Logger.h" +#include "leveldb/db.h" +#include "leveldb/options.h" +#include "leveldb/slice.h" +#include "leveldb/status.h" #include "Configure.h" -#include "Property.h" -#include "ResourceClaim.h" -#include "Relationship.h" #include "Connection.h" #include "FlowFileRecord.h" +#include "Logger.h" +#include "Property.h" +#include "ResourceClaim.h" +#include "TimeUtil.h" +#include "Serializable.h" // Provenance Event Record Serialization Seg Size #define PROVENANCE_EVENT_RECORD_SEG_SIZE 2048 @@ -54,7 +51,7 @@ class ProvenanceRepository; //! Provenance Event Record -class ProvenanceEventRecord +class ProvenanceEventRecord : protected Serializable { public: enum ProvenanceEventType { @@ -176,17 +173,11 @@ class ProvenanceEventRecord uuid_generate(_eventId); uuid_unparse(_eventId, eventIdStr); _eventIdStr = eventIdStr; - _serializedBuf = NULL; - _serializeBufSize = 0; - _maxSerializeBufSize = 0; _logger = Logger::getLogger(); } ProvenanceEventRecord() { _eventTime = getTimeMillis(); - _serializedBuf = NULL; - _serializeBufSize = 0; - _maxSerializeBufSize = 0; _logger = Logger::getLogger(); } @@ -399,7 +390,12 @@ class ProvenanceEventRecord //! Serialize and Persistent to the repository bool Serialize(ProvenanceRepository *repo); //! DeSerialize - bool DeSerialize(uint8_t *buffer, int bufferSize); + bool DeSerialize(const uint8_t *buffer, const int bufferSize); + //! DeSerialize + bool DeSerialize(DataStream &stream) + { + return DeSerialize(stream.getBuffer(),stream.getSize()); + } //! DeSerialize bool DeSerialize(ProvenanceRepository *repo, std::string key); @@ -456,151 +452,7 @@ class ProvenanceEventRecord //! Logger Logger *_logger; - // All serialization related method and internal buf - uint8_t *_serializedBuf; - int _serializeBufSize; - int _maxSerializeBufSize; - int writeData(uint8_t *value, int size) - { - if ((_serializeBufSize + size) > _maxSerializeBufSize) - { - // if write exceed - uint8_t *buffer = new uint8_t[_maxSerializeBufSize + PROVENANCE_EVENT_RECORD_SEG_SIZE]; - if (!buffer) - { - return -1; - } - memcpy(buffer, _serializedBuf, _serializeBufSize); - delete[] _serializedBuf; - _serializedBuf = buffer; - _maxSerializeBufSize = _maxSerializeBufSize + PROVENANCE_EVENT_RECORD_SEG_SIZE; - } - uint8_t *bufPtr = _serializedBuf + _serializeBufSize; - memcpy(bufPtr, value, size); - _serializeBufSize += size; - return size; - } - int readData(uint8_t *buf, int buflen) - { - if ((buflen + _serializeBufSize) > _maxSerializeBufSize) - { - // if read exceed - return -1; - } - uint8_t *bufPtr = _serializedBuf + _serializeBufSize; - memcpy(buf, bufPtr, buflen); - _serializeBufSize += buflen; - return buflen; - } - int write(uint8_t value) - { - return writeData(&value, 1); - } - int write(char value) - { - return writeData((uint8_t *)&value, 1); - } - int write(uint32_t value) - { - uint8_t temp[4]; - - temp[0] = (value & 0xFF000000) >> 24; - temp[1] = (value & 0x00FF0000) >> 16; - temp[2] = (value & 0x0000FF00) >> 8; - temp[3] = (value & 0x000000FF); - return writeData(temp, 4); - } - int write(uint16_t value) - { - uint8_t temp[2]; - temp[0] = (value & 0xFF00) >> 8; - temp[1] = (value & 0xFF); - return writeData(temp, 2); - } - int write(uint8_t *value, int len) - { - return writeData(value, len); - } - int write(uint64_t value) - { - uint8_t temp[8]; - - temp[0] = (value >> 56) & 0xFF; - temp[1] = (value >> 48) & 0xFF; - temp[2] = (value >> 40) & 0xFF; - temp[3] = (value >> 32) & 0xFF; - temp[4] = (value >> 24) & 0xFF; - temp[5] = (value >> 16) & 0xFF; - temp[6] = (value >> 8) & 0xFF; - temp[7] = (value >> 0) & 0xFF; - return writeData(temp, 8); - } - int write(bool value) - { - uint8_t temp = value; - return write(temp); - } - int writeUTF(std::string str, bool widen = false); - int read(uint8_t &value) - { - uint8_t buf; - - int ret = readData(&buf, 1); - if (ret == 1) - value = buf; - return ret; - } - int read(uint16_t &value) - { - uint8_t buf[2]; - - int ret = readData(buf, 2); - if (ret == 2) - value = (buf[0] << 8) | buf[1]; - return ret; - } - int read(char &value) - { - uint8_t buf; - - int ret = readData(&buf, 1); - if (ret == 1) - value = (char) buf; - return ret; - } - int read(uint8_t *value, int len) - { - return readData(value, len); - } - int read(uint32_t &value) - { - uint8_t buf[4]; - - int ret = readData(buf, 4); - if (ret == 4) - value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3]; - return ret; - } - int read(uint64_t &value) - { - uint8_t buf[8]; - - int ret = readData(buf, 8); - if (ret == 8) - { - value = ((uint64_t) buf[0] << 56) | - ((uint64_t) (buf[1] & 255) << 48) | - ((uint64_t) (buf[2] & 255) << 40) | - ((uint64_t) (buf[3] & 255) << 32) | - ((uint64_t) (buf[4] & 255) << 24) | - ((uint64_t) (buf[5] & 255) << 16) | - ((uint64_t) (buf[6] & 255) << 8) | - ((uint64_t) (buf[7] & 255) << 0); - } - return ret; - } - int readUTF(std::string &str, bool widen = false); - + // Prevent default copy constructor and assignment operation // Only support pass by reference or pointer ProvenanceEventRecord(const ProvenanceEventRecord &parent); @@ -649,10 +501,9 @@ class ProvenanceReporter //! clear void clear() { - for (std::set::iterator it = _events.begin(); it != _events.end(); ++it) + for (auto it : _events) { - ProvenanceEventRecord *event = (ProvenanceEventRecord *) (*it); - delete event; + delete it; } _events.clear(); } @@ -733,6 +584,7 @@ class ProvenanceRepository _purgePeriod = PROVENANCE_PURGE_PERIOD; _maxPartitionBytes = MAX_PROVENANCE_STORAGE_SIZE; _db = NULL; + _thread = NULL; _running = false; _repoFull = false; } @@ -746,7 +598,7 @@ class ProvenanceRepository } //! initialize - bool initialize() + virtual bool initialize() { std::string value; if (_configure->get(Configure::nifi_provenance_repository_directory_default, value)) @@ -786,7 +638,7 @@ class ProvenanceRepository return true; } //! Put - bool Put(std::string key, uint8_t *buf, int bufLen) + virtual bool Put(std::string key, uint8_t *buf, int bufLen) { // persistent to the DB leveldb::Slice value((const char *) buf, bufLen); @@ -798,7 +650,7 @@ class ProvenanceRepository return false; } //! Delete - bool Delete(std::string key) + virtual bool Delete(std::string key) { leveldb::Status status; status = _db->Delete(leveldb::WriteOptions(), key); @@ -808,7 +660,7 @@ class ProvenanceRepository return false; } //! Get - bool Get(std::string key, std::string &value) + virtual bool Get(std::string key, std::string &value) { leveldb::Status status; status = _db->Get(leveldb::ReadOptions(), key, &value); @@ -839,11 +691,11 @@ class ProvenanceRepository //! Run function for the thread static void run(ProvenanceRepository *repo); //! Start the repository monitor thread - void start(); + virtual void start(); //! Stop the repository monitor thread - void stop(); + virtual void stop(); //! whether the repo is full - bool isFull() + virtual bool isFull() { return _repoFull; } @@ -859,8 +711,8 @@ class ProvenanceRepository //! Logger Logger *_logger; //! Configure - Configure *_configure; //! max db entry life time + Configure *_configure; int64_t _maxPartitionMillis; //! max db size int64_t _maxPartitionBytes; @@ -899,4 +751,5 @@ class ProvenanceRepository + #endif diff --git a/libminifi/include/ResourceClaim.h b/libminifi/include/ResourceClaim.h index 098b2595dd..b4085f24a3 100644 --- a/libminifi/include/ResourceClaim.h +++ b/libminifi/include/ResourceClaim.h @@ -32,15 +32,19 @@ //! Default content directory #define DEFAULT_CONTENT_DIRECTORY "./content_repository" + + //! ResourceClaim Class class ResourceClaim { public: + + static std::string default_directory_path; //! Constructor /*! * Create a new resource claim */ - ResourceClaim(const std::string contentDirectory); + ResourceClaim(const std::string contentDirectory = default_directory_path); //! Destructor virtual ~ResourceClaim() {} //! increaseFlowFileRecordOwnedCount diff --git a/libminifi/include/Serializable.h b/libminifi/include/Serializable.h new file mode 100644 index 0000000000..c32dee0990 --- /dev/null +++ b/libminifi/include/Serializable.h @@ -0,0 +1,294 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __SERIALIZABLE_H__ +#define __SERIALIZABLE_H__ + +#include +#include +#include + + +/** + * Mechanism to determine endianness of host. + * Accounts for only BIG/LITTLE/BIENDIAN + **/ +class EndiannessCheck +{ +public: + static bool IS_LITTLE; +private: + + static bool is_little_endian() { + /* do whatever is needed at static init time */ + unsigned int x = 1; + char *c = (char*) &x; + IS_LITTLE=*c==1; + return IS_LITTLE; + } +}; + + +/** + * DataStream defines the mechanism through which + * binary data will be written to a sink + */ +class DataStream +{ +public: + + DataStream() : readBuffer(0) + { + + } + + /** + * Constructor + **/ + DataStream(const uint8_t *buf, const uint32_t buflen) : DataStream() + { + writeData((uint8_t*)buf,buflen); + + } + + /** + * Reads data and places it into buf + * @param buf buffer in which we extract data + * @param buflen + */ + int readData(std::vector &buf, int buflen); + /** + * Reads data and places it into buf + * @param buf buffer in which we extract data + * @param buflen + */ + int readData(uint8_t *buf, int buflen); + + /** + * writes valiue to buffer + * @param value value to write + * @param size size of value + */ + int writeData(uint8_t *value, int size); + + + /** + * Reads a system word + * @param value value to write + */ + inline int readLongLong(uint64_t &value, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + + /** + * Reads a uint32_t + * @param value value to write + */ + inline int readLong(uint32_t &value, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + + /** + * Reads a system short + * @param value value to write + */ + inline int readShort(uint16_t &value,bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + + /** + * Returns the underlying buffer + * @return vector's array + **/ + const uint8_t *getBuffer() const + { + return &buffer[0]; + } + + /** + * Retrieve size of data stream + * @return size of data stream + **/ + const uint32_t getSize() const + { + return buffer.size(); + } + +private: + // All serialization related method and internal buf + std::vector buffer; + uint32_t readBuffer; +}; + +/** + * Serializable instances provide base functionality to + * write certain objects/primitives to a data stream. + * + */ +class Serializable { + +public: + + /** + * Inline function to write T to stream + **/ + template + inline int writeData(const T &t,DataStream *stream); + + /** + * Inline function to write T to to_vec + **/ + template + inline int writeData(const T &t, uint8_t *to_vec); + + /** + * Inline function to write T to to_vec + **/ + template + inline int writeData(const T &t, std::vector &to_vec); + + + /** + * write byte to stream + * @return resulting write size + **/ + int write(uint8_t value,DataStream *stream); + + /** + * write byte to stream + * @return resulting write size + **/ + int write(char value,DataStream *stream); + + /** + * write 4 bytes to stream + * @param base_value non encoded value + * @param stream output stream + * @param is_little_endian endianness determination + * @return resulting write size + **/ + int write(uint32_t base_value,DataStream *stream, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + /** + * write 2 bytes to stream + * @param base_value non encoded value + * @param stream output stream + * @param is_little_endian endianness determination + * @return resulting write size + **/ + int write(uint16_t base_value,DataStream *stream, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + /** + * write valueto stream + * @param value non encoded value + * @param len length of value + * @param strema output stream + * @return resulting write size + **/ + int write(uint8_t *value, int len,DataStream *stream); + + /** + * write 8 bytes to stream + * @param base_value non encoded value + * @param stream output stream + * @param is_little_endian endianness determination + * @return resulting write size + **/ + int write(uint64_t base_value,DataStream *stream, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + /** + * write bool to stream + * @param value non encoded value + * @return resulting write size + **/ + int write(bool value); + + /** + * write UTF string to stream + * @param str string to write + * @return resulting write size + **/ + int writeUTF(std::string str,DataStream *stream, bool widen = false); + + /** + * reads a byte from the stream + * @param value reference in which will set the result + * @param stream stream from which we will read + * @return resulting read size + **/ + int read(uint8_t &value,DataStream *stream); + + /** + * reads two bytes from the stream + * @param value reference in which will set the result + * @param stream stream from which we will read + * @return resulting read size + **/ + int read(uint16_t &base_value,DataStream *stream, bool is_little_endian = + EndiannessCheck::IS_LITTLE); + + /** + * reads a byte from the stream + * @param value reference in which will set the result + * @param stream stream from which we will read + * @return resulting read size + **/ + int read(char &value,DataStream *stream); + + /** + * reads a byte array from the stream + * @param value reference in which will set the result + * @param len length to read + * @param stream stream from which we will read + * @return resulting read size + **/ + int read(uint8_t *value, int len,DataStream *stream); + + /** + * reads four bytes from the stream + * @param value reference in which will set the result + * @param stream stream from which we will read + * @return resulting read size + **/ + int read(uint32_t &value,DataStream *stream, + bool is_little_endian = EndiannessCheck::IS_LITTLE); + + /** + * reads eight byte from the stream + * @param value reference in which will set the result + * @param stream stream from which we will read + * @return resulting read size + **/ + int read(uint64_t &value,DataStream *stream, + bool is_little_endian = EndiannessCheck::IS_LITTLE); + + /** + * read UTF from stream + * @param str reference string + * @param stream stream from which we will read + * @return resulting read size + **/ + int readUTF(std::string &str,DataStream *stream, bool widen = false); + +protected: + +}; + +#endif diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp index 2652e351a3..61e782b478 100644 --- a/libminifi/src/Configure.cpp +++ b/libminifi/src/Configure.cpp @@ -23,6 +23,8 @@ Configure *Configure::_configure(NULL); const char *Configure::nifi_flow_configuration_file = "nifi.flow.configuration.file"; const char *Configure::nifi_administrative_yield_duration = "nifi.administrative.yield.duration"; const char *Configure::nifi_bored_yield_duration = "nifi.bored.yield.duration"; +const char *Configure::nifi_graceful_shutdown_seconds = "nifi.graceful.shutdown.seconds"; +const char *Configure::nifi_log_level = "nifi.log.level"; const char *Configure::nifi_server_name = "nifi.server.name"; const char *Configure::nifi_server_port = "nifi.server.port"; const char *Configure::nifi_server_report_interval= "nifi.server.report.interval"; diff --git a/libminifi/src/FlowControlProtocol.cpp b/libminifi/src/FlowControlProtocol.cpp index fee1a3b104..a74e32ec07 100644 --- a/libminifi/src/FlowControlProtocol.cpp +++ b/libminifi/src/FlowControlProtocol.cpp @@ -346,27 +346,6 @@ int FlowControlProtocol::sendRegisterReq() _logger->log_info("Flow Control Protocol receive report interval %d ms", reportInterval); this->_reportInterval = reportInterval; } - else if (((FlowControlMsgID) msgID) == FLOW_YML_CONTENT) - { - uint32_t yamlLen; - payloadPtr = this->decode(payloadPtr, yamlLen); - _logger->log_info("Flow Control Protocol receive YAML content length %d", yamlLen); - time_t rawtime; - struct tm *timeinfo; - time(&rawtime); - timeinfo = localtime(&rawtime); - std::string yamlFileName = "flow."; - yamlFileName += asctime(timeinfo); - yamlFileName += ".yml"; - std::ofstream fs; - fs.open(yamlFileName.c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc); - if (fs.is_open()) - { - fs.write((const char *)payloadPtr, yamlLen); - fs.close(); - this->_controller->reload(yamlFileName.c_str()); - } - } else { break; diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index 4bbc2341bc..b6dc4e9cc8 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -28,60 +28,52 @@ #include #include #include - +#include #include "FlowController.h" #include "ProcessContext.h" -FlowController *FlowController::_flowController(NULL); -FlowController::FlowController(std::string name) -: _name(name) -{ - uuid_generate(_uuid); +FlowController *FlowControllerFactory::_flowController(NULL); - // Setup the default values - _configurationFileName = DEFAULT_FLOW_YAML_FILE_NAME; - _maxEventDrivenThreads = DEFAULT_MAX_EVENT_DRIVEN_THREAD; - _maxTimerDrivenThreads = DEFAULT_MAX_TIMER_DRIVEN_THREAD; - _running = false; - _initialized = false; - _root = NULL; - _logger = Logger::getLogger(); - _protocol = new FlowControlProtocol(this); +FlowControllerImpl::FlowControllerImpl(std::string name) { + uuid_generate(_uuid); - // NiFi config properties - _configure = Configure::getConfigure(); + _name = name; + // Setup the default values + _configurationFileName = DEFAULT_FLOW_YAML_FILE_NAME; + _maxEventDrivenThreads = DEFAULT_MAX_EVENT_DRIVEN_THREAD; + _maxTimerDrivenThreads = DEFAULT_MAX_TIMER_DRIVEN_THREAD; + _running = false; + _initialized = false; + _root = NULL; + _logger = Logger::getLogger(); + _protocol = new FlowControlProtocol(this); - std::string rawConfigFileString; - _configure->get(Configure::nifi_flow_configuration_file, rawConfigFileString); + // NiFi config properties + _configure = Configure::getConfigure(); - if (!rawConfigFileString.empty()) - { - _configurationFileName = rawConfigFileString; - } + std::string rawConfigFileString; + _configure->get(Configure::nifi_flow_configuration_file, + rawConfigFileString); - char *path = NULL; - char full_path[PATH_MAX]; + if (!rawConfigFileString.empty()) { + _configurationFileName = rawConfigFileString; + } - std::string adjustedFilename; - if (!_configurationFileName.empty()) - { - // perform a naive determination if this is a relative path - if (_configurationFileName.c_str()[0] != '/') - { - adjustedFilename = adjustedFilename + _configure->getHome() + "/" + _configurationFileName; + char *path = NULL; + char full_path[PATH_MAX]; + + std::string adjustedFilename; + if (!_configurationFileName.empty()) { + // perform a naive determination if this is a relative path + if (_configurationFileName.c_str()[0] != '/') { + adjustedFilename = adjustedFilename + _configure->getHome() + "/" + + _configurationFileName; + } else { + adjustedFilename = _configurationFileName; } - else - { - adjustedFilename = _configurationFileName; - } - } + } - path = realpath(adjustedFilename.c_str(), full_path); - if (!path) - { - _logger->log_error("Could not locate path from provided configuration file name (%s). Exiting.", full_path); - exit(1); - } + path = realpath(adjustedFilename.c_str(), full_path); std::string pathString(path); _configurationFileName = pathString; @@ -90,14 +82,14 @@ FlowController::FlowController(std::string name) // Create the content repo directory if needed struct stat contentDirStat; - if (stat(DEFAULT_CONTENT_DIRECTORY, &contentDirStat) != -1 && S_ISDIR(contentDirStat.st_mode)) + if (stat(ResourceClaim::default_directory_path.c_str(), &contentDirStat) != -1 && S_ISDIR(contentDirStat.st_mode)) { - path = realpath(DEFAULT_CONTENT_DIRECTORY, full_path); + path = realpath(ResourceClaim::default_directory_path.c_str(), full_path); _logger->log_info("FlowController content directory %s", full_path); } else { - if (mkdir(DEFAULT_CONTENT_DIRECTORY, 0777) == -1) + if (mkdir(ResourceClaim::default_directory_path.c_str(), 0777) == -1) { _logger->log_error("FlowController content directory creation failed"); exit(1); @@ -184,529 +176,614 @@ FlowController::FlowController(std::string name) _logger->log_info("Load/Verify Client Certificate OK."); } + + } + if (!path) { + _logger->log_error( + "Could not locate path from provided configuration file name (%s). Exiting.", + full_path); + exit(1); } - // Create repos for flow record and provenance - _provenanceRepo = new ProvenanceRepository(); - _provenanceRepo->initialize(); - _logger->log_info("FlowController %s created", _name.c_str()); + // Create repos for flow record and provenance + _provenanceRepo = new ProvenanceRepository(); + _provenanceRepo->initialize(); } -FlowController::~FlowController() -{ - stop(true); - unload(); - delete _protocol; - delete _provenanceRepo; - if (_ctx) - SSL_CTX_free(_ctx); -} +FlowControllerImpl::~FlowControllerImpl() { -bool FlowController::isRunning() -{ - return (_running); + stop(true); + unload(); + if (NULL != _protocol) + delete _protocol; + if (NULL != _provenanceRepo) + delete _provenanceRepo; + } -bool FlowController::isInitialized() -{ - return (_initialized); -} +void FlowControllerImpl::stop(bool force) { -void FlowController::stop(bool force) -{ - if (_running) - { - _logger->log_info("Stop Flow Controller"); - this->_timerScheduler.stop(); - this->_eventScheduler.stop(); - // Wait for sometime for thread stop - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - if (this->_root) - this->_root->stopProcessing( - &this->_timerScheduler, - &this->_eventScheduler); - _running = false; - } -} + if (_running) { + // immediately indicate that we are not running + _running = false; -void FlowController::unload() -{ - if (_running) - { - stop(true); - } - if (_initialized) - { - _logger->log_info("Unload Flow Controller"); - if (_root) - delete _root; - _root = NULL; - _initialized = false; - _name = ""; - } + _logger->log_info("Stop Flow Controller"); + this->_timerScheduler.stop(); + this->_eventScheduler.stop(); + // Wait for sometime for thread stop + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + if (this->_root) + this->_root->stopProcessing(&this->_timerScheduler, + &this->_eventScheduler); - return; + } } +/** + * This function will attempt to unload yaml and stop running Processors. + * + * If the latter attempt fails or does not complete within the prescribed + * period, _running will be set to false and we will return. + * + * @param timeToWaitMs Maximum time to wait before manually + * marking running as false. + */ +void FlowControllerImpl::waitUnload(const uint64_t timeToWaitMs) { + if (_running) { + // use the current time and increment with the provided argument. + std::chrono::system_clock::time_point wait_time = + std::chrono::system_clock::now() + + std::chrono::milliseconds(timeToWaitMs); + + // create an asynchronous future. + std::future unload_task = std::async(std::launch::async, + [this]() {unload();}); + + if (std::future_status::ready == unload_task.wait_until(wait_time)) { + _running = false; + } -Processor *FlowController::createProcessor(std::string name, uuid_t uuid) -{ - Processor *processor = NULL; - if (name == GenerateFlowFile::ProcessorName) - { - processor = new GenerateFlowFile(name, uuid); - } - else if (name == LogAttribute::ProcessorName) - { - processor = new LogAttribute(name, uuid); - } - else if (name == RealTimeDataCollector::ProcessorName) - { - processor = new RealTimeDataCollector(name, uuid); - } - else if (name == GetFile::ProcessorName) - { - processor = new GetFile(name, uuid); - } - else if (name == PutFile::ProcessorName) - { - processor = new PutFile(name, uuid); } - else if (name == TailFile::ProcessorName) - { - processor = new TailFile(name, uuid); - } - else if (name == ListenSyslog::ProcessorName) - { - processor = new ListenSyslog(name, uuid); +} + + +void FlowControllerImpl::unload() { + if (_running) { + stop(true); } - else if (name == ExecuteProcess::ProcessorName) - { - processor = new ExecuteProcess(name, uuid); + if (_initialized) { + _logger->log_info("Unload Flow Controller"); + if (_root) + delete _root; + _root = NULL; + _initialized = false; + _name = ""; } - else if (name == AppendHostInfo::ProcessorName) - { - processor = new AppendHostInfo(name, uuid); + + return; +} + +Processor *FlowControllerImpl::createProcessor(std::string name, uuid_t uuid) { + Processor *processor = NULL; + if (name == GenerateFlowFile::ProcessorName) { + processor = new GenerateFlowFile(name, uuid); + } else if (name == LogAttribute::ProcessorName) { + processor = new LogAttribute(name, uuid); + } else if (name == RealTimeDataCollector::ProcessorName) { + processor = new RealTimeDataCollector(name, uuid); + } else if (name == GetFile::ProcessorName) { + processor = new GetFile(name, uuid); + } else if (name == PutFile::ProcessorName) { + processor = new PutFile(name, uuid); + } else if (name == TailFile::ProcessorName) { + processor = new TailFile(name, uuid); + } else if (name == ListenSyslog::ProcessorName) { + processor = new ListenSyslog(name, uuid); + } else if (name == ExecuteProcess::ProcessorName) { + processor = new ExecuteProcess(name, uuid); + } else if (name == AppendHostInfo::ProcessorName) { + processor = new AppendHostInfo(name, uuid); + } else { + _logger->log_error("No Processor defined for %s", name.c_str()); + return NULL; } - else - { - _logger->log_error("No Processor defined for %s", name.c_str()); - return NULL; - } - //! initialize the processor - processor->initialize(); + //! initialize the processor + processor->initialize(); - return processor; + return processor; } -ProcessGroup *FlowController::createRootProcessGroup(std::string name, uuid_t uuid) -{ - return new ProcessGroup(ROOT_PROCESS_GROUP, name, uuid); +ProcessGroup *FlowControllerImpl::createRootProcessGroup(std::string name, + uuid_t uuid) { + return new ProcessGroup(ROOT_PROCESS_GROUP, name, uuid); } -ProcessGroup *FlowController::createRemoteProcessGroup(std::string name, uuid_t uuid) -{ - return new ProcessGroup(REMOTE_PROCESS_GROUP, name, uuid); +ProcessGroup *FlowControllerImpl::createRemoteProcessGroup(std::string name, + uuid_t uuid) { + return new ProcessGroup(REMOTE_PROCESS_GROUP, name, uuid); } -Connection *FlowController::createConnection(std::string name, uuid_t uuid) -{ - return new Connection(name, uuid); +Connection *FlowControllerImpl::createConnection(std::string name, + uuid_t uuid) { + return new Connection(name, uuid); } +void FlowControllerImpl::parseRootProcessGroupYaml(YAML::Node rootFlowNode) { + uuid_t uuid; + ProcessGroup *group = NULL; + // generate the random UIID + uuid_generate(uuid); -void FlowController::parseRootProcessGroupYaml(YAML::Node rootFlowNode) { - uuid_t uuid; - ProcessGroup *group = NULL; + std::string flowName = rootFlowNode["name"].as(); - // generate the random UIID - uuid_generate(uuid); + char uuidStr[37]; + uuid_unparse(_uuid, uuidStr); + _logger->log_debug("parseRootProcessGroup: id => [%s]", uuidStr); + _logger->log_debug("parseRootProcessGroup: name => [%s]", flowName.c_str()); + group = this->createRootProcessGroup(flowName, uuid); + this->_root = group; + this->_name = flowName; +} - std::string flowName = rootFlowNode["name"].as(); +void FlowControllerImpl::parseProcessorNodeYaml(YAML::Node processorsNode, + ProcessGroup *parentGroup) { + int64_t schedulingPeriod = -1; + int64_t penalizationPeriod = -1; + int64_t yieldPeriod = -1; + int64_t runDurationNanos = -1; + uuid_t uuid; + Processor *processor = NULL; + + if (!parentGroup) { + _logger->log_error("parseProcessNodeYaml: no parent group exists"); + return; + } - char uuidStr[37]; - uuid_unparse(_uuid, uuidStr); - _logger->log_debug("parseRootProcessGroup: id => [%s]", uuidStr); - _logger->log_debug("parseRootProcessGroup: name => [%s]", flowName.c_str()); - group = this->createRootProcessGroup(flowName, uuid); - this->_root = group; - this->_name = flowName; -} + if (processorsNode) { + + if (processorsNode.IsSequence()) { + // Evaluate sequence of processors + int numProcessors = processorsNode.size(); + + for (YAML::const_iterator iter = processorsNode.begin(); + iter != processorsNode.end(); ++iter) { + ProcessorConfig procCfg; + YAML::Node procNode = iter->as(); + + procCfg.name = procNode["name"].as(); + _logger->log_debug("parseProcessorNode: name => [%s]", + procCfg.name.c_str()); + procCfg.javaClass = procNode["class"].as(); + _logger->log_debug("parseProcessorNode: class => [%s]", + procCfg.javaClass.c_str()); + + char uuidStr[37]; + uuid_unparse(_uuid, uuidStr); + + // generate the random UUID + uuid_generate(uuid); + + // Determine the processor name only from the Java class + int lastOfIdx = procCfg.javaClass.find_last_of("."); + if (lastOfIdx != std::string::npos) { + lastOfIdx++; // if a value is found, increment to move beyond the . + int nameLength = procCfg.javaClass.length() - lastOfIdx; + std::string processorName = procCfg.javaClass.substr( + lastOfIdx, nameLength); + processor = this->createProcessor(processorName, uuid); + } -void FlowController::parseProcessorNodeYaml(YAML::Node processorsNode, ProcessGroup *parentGroup) { - int64_t schedulingPeriod = -1; - int64_t penalizationPeriod = -1; - int64_t yieldPeriod = -1; - int64_t runDurationNanos = -1; - uuid_t uuid; - Processor *processor = NULL; - - if (!parentGroup) { - _logger->log_error("parseProcessNodeYaml: no parent group exists"); - return; - } + if (!processor) { + _logger->log_error( + "Could not create a processor %s with name %s", + procCfg.name.c_str(), uuidStr); + throw std::invalid_argument( + "Could not create processor " + procCfg.name); + } + processor->setName(procCfg.name); + + procCfg.maxConcurrentTasks = + procNode["max concurrent tasks"].as(); + _logger->log_debug( + "parseProcessorNode: max concurrent tasks => [%s]", + procCfg.maxConcurrentTasks.c_str()); + procCfg.schedulingStrategy = procNode["scheduling strategy"].as< + std::string>(); + _logger->log_debug( + "parseProcessorNode: scheduling strategy => [%s]", + procCfg.schedulingStrategy.c_str()); + procCfg.schedulingPeriod = procNode["scheduling period"].as< + std::string>(); + _logger->log_debug( + "parseProcessorNode: scheduling period => [%s]", + procCfg.schedulingPeriod.c_str()); + procCfg.penalizationPeriod = procNode["penalization period"].as< + std::string>(); + _logger->log_debug( + "parseProcessorNode: penalization period => [%s]", + procCfg.penalizationPeriod.c_str()); + procCfg.yieldPeriod = + procNode["yield period"].as(); + _logger->log_debug("parseProcessorNode: yield period => [%s]", + procCfg.yieldPeriod.c_str()); + procCfg.yieldPeriod = procNode["run duration nanos"].as< + std::string>(); + _logger->log_debug( + "parseProcessorNode: run duration nanos => [%s]", + procCfg.runDurationNanos.c_str()); + + // handle auto-terminated relationships + YAML::Node autoTerminatedSequence = + procNode["auto-terminated relationships list"]; + std::vector rawAutoTerminatedRelationshipValues; + if (autoTerminatedSequence.IsSequence() + && !autoTerminatedSequence.IsNull() + && autoTerminatedSequence.size() > 0) { + for (YAML::const_iterator relIter = + autoTerminatedSequence.begin(); + relIter != autoTerminatedSequence.end(); + ++relIter) { + std::string autoTerminatedRel = + relIter->as(); + rawAutoTerminatedRelationshipValues.push_back( + autoTerminatedRel); + } + } + procCfg.autoTerminatedRelationships = + rawAutoTerminatedRelationshipValues; + + // handle processor properties + YAML::Node propertiesNode = procNode["Properties"]; + parsePropertiesNodeYaml(&propertiesNode, processor); + + // Take care of scheduling + TimeUnit unit; + if (Property::StringToTime(procCfg.schedulingPeriod, + schedulingPeriod, unit) + && Property::ConvertTimeUnitToNS(schedulingPeriod, unit, + schedulingPeriod)) { + _logger->log_debug( + "convert: parseProcessorNode: schedulingPeriod => [%d] ns", + schedulingPeriod); + processor->setSchedulingPeriodNano(schedulingPeriod); + } - if (processorsNode) { - - if (processorsNode.IsSequence()) { - // Evaluate sequence of processors - int numProcessors = processorsNode.size(); - - - for (YAML::const_iterator iter = processorsNode.begin(); iter != processorsNode.end(); ++iter) { - ProcessorConfig procCfg; - YAML::Node procNode = iter->as(); - - procCfg.name = procNode["name"].as(); - _logger->log_debug("parseProcessorNode: name => [%s]", procCfg.name.c_str()); - procCfg.javaClass = procNode["class"].as(); - _logger->log_debug("parseProcessorNode: class => [%s]", procCfg.javaClass.c_str()); - - char uuidStr[37]; - uuid_unparse(_uuid, uuidStr); - - // generate the random UUID - uuid_generate(uuid); - - // Determine the processor name only from the Java class - int lastOfIdx = procCfg.javaClass.find_last_of("."); - if (lastOfIdx != std::string::npos) { - lastOfIdx++; // if a value is found, increment to move beyond the . - int nameLength = procCfg.javaClass.length() - lastOfIdx; - std::string processorName = procCfg.javaClass.substr(lastOfIdx, nameLength); - processor = this->createProcessor(processorName, uuid); - } - - if (!processor) { - _logger->log_error("Could not create a processor %s with name %s", procCfg.name.c_str(), uuidStr); - throw std::invalid_argument("Could not create processor " + procCfg.name); - } - processor->setName(procCfg.name); - - procCfg.maxConcurrentTasks = procNode["max concurrent tasks"].as(); - _logger->log_debug("parseProcessorNode: max concurrent tasks => [%s]", procCfg.maxConcurrentTasks.c_str()); - procCfg.schedulingStrategy = procNode["scheduling strategy"].as(); - _logger->log_debug("parseProcessorNode: scheduling strategy => [%s]", - procCfg.schedulingStrategy.c_str()); - procCfg.schedulingPeriod = procNode["scheduling period"].as(); - _logger->log_debug("parseProcessorNode: scheduling period => [%s]", procCfg.schedulingPeriod.c_str()); - procCfg.penalizationPeriod = procNode["penalization period"].as(); - _logger->log_debug("parseProcessorNode: penalization period => [%s]", - procCfg.penalizationPeriod.c_str()); - procCfg.yieldPeriod = procNode["yield period"].as(); - _logger->log_debug("parseProcessorNode: yield period => [%s]", procCfg.yieldPeriod.c_str()); - procCfg.yieldPeriod = procNode["run duration nanos"].as(); - _logger->log_debug("parseProcessorNode: run duration nanos => [%s]", procCfg.runDurationNanos.c_str()); - - // handle auto-terminated relationships - YAML::Node autoTerminatedSequence = procNode["auto-terminated relationships list"]; - std::vector rawAutoTerminatedRelationshipValues; - if (autoTerminatedSequence.IsSequence() && !autoTerminatedSequence.IsNull() - && autoTerminatedSequence.size() > 0) { - for (YAML::const_iterator relIter = autoTerminatedSequence.begin(); - relIter != autoTerminatedSequence.end(); ++relIter) { - std::string autoTerminatedRel = relIter->as(); - rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel); - } - } - procCfg.autoTerminatedRelationships = rawAutoTerminatedRelationshipValues; - - // handle processor properties - YAML::Node propertiesNode = procNode["Properties"]; - parsePropertiesNodeYaml(&propertiesNode, processor); - - // Take care of scheduling - TimeUnit unit; - if (Property::StringToTime(procCfg.schedulingPeriod, schedulingPeriod, unit) - && Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) { - _logger->log_debug("convert: parseProcessorNode: schedulingPeriod => [%d] ns", schedulingPeriod); - processor->setSchedulingPeriodNano(schedulingPeriod); - } - - if (Property::StringToTime(procCfg.penalizationPeriod, penalizationPeriod, unit) - && Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) { - _logger->log_debug("convert: parseProcessorNode: penalizationPeriod => [%d] ms", - penalizationPeriod); - processor->setPenalizationPeriodMsec(penalizationPeriod); - } - - if (Property::StringToTime(procCfg.yieldPeriod, yieldPeriod, unit) - && Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) { - _logger->log_debug("convert: parseProcessorNode: yieldPeriod => [%d] ms", yieldPeriod); - processor->setYieldPeriodMsec(yieldPeriod); - } - - // Default to running - processor->setScheduledState(RUNNING); - - if (procCfg.schedulingStrategy == "TIMER_DRIVEN") { - processor->setSchedulingStrategy(TIMER_DRIVEN); - _logger->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy.c_str()); - } else if (procCfg.schedulingStrategy == "EVENT_DRIVEN") { - processor->setSchedulingStrategy(EVENT_DRIVEN); - _logger->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy.c_str()); - } else { - processor->setSchedulingStrategy(CRON_DRIVEN); - _logger->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy.c_str()); - - } - - int64_t maxConcurrentTasks; - if (Property::StringToInt(procCfg.maxConcurrentTasks, maxConcurrentTasks)) { - _logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks); - processor->setMaxConcurrentTasks(maxConcurrentTasks); - } - - if (Property::StringToInt(procCfg.runDurationNanos, runDurationNanos)) { - _logger->log_debug("parseProcessorNode: runDurationNanos => [%d]", runDurationNanos); - processor->setRunDurationNano(runDurationNanos); - } - - std::set autoTerminatedRelationships; - for (auto &&relString : procCfg.autoTerminatedRelationships) { - Relationship relationship(relString, ""); - _logger->log_debug("parseProcessorNode: autoTerminatedRelationship => [%s]", relString.c_str()); - autoTerminatedRelationships.insert(relationship); - } - - processor->setAutoTerminatedRelationships(autoTerminatedRelationships); - - parentGroup->addProcessor(processor); - } - } - } else { - throw new std::invalid_argument( - "Cannot instantiate a MiNiFi instance without a defined Processors configuration node."); - } -} + if (Property::StringToTime(procCfg.penalizationPeriod, + penalizationPeriod, unit) + && Property::ConvertTimeUnitToMS(penalizationPeriod, + unit, penalizationPeriod)) { + _logger->log_debug( + "convert: parseProcessorNode: penalizationPeriod => [%d] ms", + penalizationPeriod); + processor->setPenalizationPeriodMsec(penalizationPeriod); + } -void FlowController::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, ProcessGroup *parentGroup) { - uuid_t uuid; + if (Property::StringToTime(procCfg.yieldPeriod, yieldPeriod, + unit) + && Property::ConvertTimeUnitToMS(yieldPeriod, unit, + yieldPeriod)) { + _logger->log_debug( + "convert: parseProcessorNode: yieldPeriod => [%d] ms", + yieldPeriod); + processor->setYieldPeriodMsec(yieldPeriod); + } - if (!parentGroup) { - _logger->log_error("parseRemoteProcessGroupYaml: no parent group exists"); - return; - } + // Default to running + processor->setScheduledState(RUNNING); + + if (procCfg.schedulingStrategy == "TIMER_DRIVEN") { + processor->setSchedulingStrategy(TIMER_DRIVEN); + _logger->log_debug("setting scheduling strategy as %s", + procCfg.schedulingStrategy.c_str()); + } else if (procCfg.schedulingStrategy == "EVENT_DRIVEN") { + processor->setSchedulingStrategy(EVENT_DRIVEN); + _logger->log_debug("setting scheduling strategy as %s", + procCfg.schedulingStrategy.c_str()); + } else { + processor->setSchedulingStrategy(CRON_DRIVEN); + _logger->log_debug("setting scheduling strategy as %s", + procCfg.schedulingStrategy.c_str()); - if (rpgNode) { - if (rpgNode->IsSequence()) { - for (YAML::const_iterator iter = rpgNode->begin(); iter != rpgNode->end(); ++iter) { - YAML::Node rpgNode = iter->as(); + } - auto name = rpgNode["name"].as(); - _logger->log_debug("parseRemoteProcessGroupYaml: name => [%s]", name.c_str()); + int64_t maxConcurrentTasks; + if (Property::StringToInt(procCfg.maxConcurrentTasks, + maxConcurrentTasks)) { + _logger->log_debug( + "parseProcessorNode: maxConcurrentTasks => [%d]", + maxConcurrentTasks); + processor->setMaxConcurrentTasks(maxConcurrentTasks); + } - std::string url = rpgNode["url"].as(); - _logger->log_debug("parseRemoteProcessGroupYaml: url => [%s]", url.c_str()); + if (Property::StringToInt(procCfg.runDurationNanos, + runDurationNanos)) { + _logger->log_debug( + "parseProcessorNode: runDurationNanos => [%d]", + runDurationNanos); + processor->setRunDurationNano(runDurationNanos); + } - std::string timeout = rpgNode["timeout"].as(); - _logger->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", timeout.c_str()); + std::set autoTerminatedRelationships; + for (auto &&relString : procCfg.autoTerminatedRelationships) { + Relationship relationship(relString, ""); + _logger->log_debug( + "parseProcessorNode: autoTerminatedRelationship => [%s]", + relString.c_str()); + autoTerminatedRelationships.insert(relationship); + } - std::string yieldPeriod = rpgNode["yield period"].as(); - _logger->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]", yieldPeriod.c_str()); + processor->setAutoTerminatedRelationships( + autoTerminatedRelationships); - YAML::Node inputPorts = rpgNode["Input Ports"].as(); - YAML::Node outputPorts = rpgNode["Output Ports"].as(); - ProcessGroup *group = NULL; + parentGroup->addProcessor(processor); + } + } + } else { + throw new std::invalid_argument( + "Cannot instantiate a MiNiFi instance without a defined Processors configuration node."); + } +} - // generate the random UUID - uuid_generate(uuid); +void FlowControllerImpl::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, + ProcessGroup *parentGroup) { + uuid_t uuid; - char uuidStr[37]; - uuid_unparse(_uuid, uuidStr); + if (!parentGroup) { + _logger->log_error( + "parseRemoteProcessGroupYaml: no parent group exists"); + return; + } - int64_t timeoutValue = -1; - int64_t yieldPeriodValue = -1; + if (rpgNode) { + if (rpgNode->IsSequence()) { + for (YAML::const_iterator iter = rpgNode->begin(); + iter != rpgNode->end(); ++iter) { + YAML::Node rpgNode = iter->as(); + + auto name = rpgNode["name"].as(); + _logger->log_debug("parseRemoteProcessGroupYaml: name => [%s]", + name.c_str()); + + std::string url = rpgNode["url"].as(); + _logger->log_debug("parseRemoteProcessGroupYaml: url => [%s]", + url.c_str()); + + std::string timeout = rpgNode["timeout"].as(); + _logger->log_debug( + "parseRemoteProcessGroupYaml: timeout => [%s]", + timeout.c_str()); + + std::string yieldPeriod = + rpgNode["yield period"].as(); + _logger->log_debug( + "parseRemoteProcessGroupYaml: yield period => [%s]", + yieldPeriod.c_str()); + + YAML::Node inputPorts = rpgNode["Input Ports"].as(); + YAML::Node outputPorts = + rpgNode["Output Ports"].as(); + ProcessGroup *group = NULL; + + // generate the random UUID + uuid_generate(uuid); + + char uuidStr[37]; + uuid_unparse(_uuid, uuidStr); + + int64_t timeoutValue = -1; + int64_t yieldPeriodValue = -1; + + group = this->createRemoteProcessGroup(name.c_str(), uuid); + group->setParent(parentGroup); + parentGroup->addProcessGroup(group); + + TimeUnit unit; + + if (Property::StringToTime(yieldPeriod, yieldPeriodValue, unit) + && Property::ConvertTimeUnitToMS(yieldPeriodValue, unit, + yieldPeriodValue) && group) { + _logger->log_debug( + "parseRemoteProcessGroupYaml: yieldPeriod => [%d] ms", + yieldPeriodValue); + group->setYieldPeriodMsec(yieldPeriodValue); + } - group = this->createRemoteProcessGroup(name.c_str(), uuid); - group->setParent(parentGroup); - parentGroup->addProcessGroup(group); + if (Property::StringToTime(timeout, timeoutValue, unit) + && Property::ConvertTimeUnitToMS(timeoutValue, unit, + timeoutValue) && group) { + _logger->log_debug( + "parseRemoteProcessGroupYaml: timeoutValue => [%d] ms", + timeoutValue); + group->setTimeOut(timeoutValue); + } - TimeUnit unit; + group->setTransmitting(true); + group->setURL(url); - if (Property::StringToTime(yieldPeriod, yieldPeriodValue, unit) - && Property::ConvertTimeUnitToMS(yieldPeriodValue, unit, yieldPeriodValue) && group) { - _logger->log_debug("parseRemoteProcessGroupYaml: yieldPeriod => [%d] ms", yieldPeriodValue); - group->setYieldPeriodMsec(yieldPeriodValue); - } + if (inputPorts && inputPorts.IsSequence()) { + for (YAML::const_iterator portIter = inputPorts.begin(); + portIter != inputPorts.end(); ++portIter) { + _logger->log_debug("Got a current port, iterating..."); - if (Property::StringToTime(timeout, timeoutValue, unit) - && Property::ConvertTimeUnitToMS(timeoutValue, unit, timeoutValue) && group) { - _logger->log_debug("parseRemoteProcessGroupYaml: timeoutValue => [%d] ms", timeoutValue); - group->setTimeOut(timeoutValue); - } + YAML::Node currPort = portIter->as(); - group->setTransmitting(true); - group->setURL(url); + this->parsePortYaml(&currPort, group, SEND); + } // for node + } + if (outputPorts && outputPorts.IsSequence()) { + for (YAML::const_iterator portIter = outputPorts.begin(); + portIter != outputPorts.end(); ++portIter) { + _logger->log_debug("Got a current port, iterating..."); - if (inputPorts && inputPorts.IsSequence()) { - for (YAML::const_iterator portIter = inputPorts.begin(); portIter != inputPorts.end(); ++portIter) { - _logger->log_debug("Got a current port, iterating..."); + YAML::Node currPort = portIter->as(); - YAML::Node currPort = portIter->as(); + this->parsePortYaml(&currPort, group, RECEIVE); + } // for node + } - this->parsePortYaml(&currPort, group, SEND); - } // for node - } - if (outputPorts && outputPorts.IsSequence()) { - for (YAML::const_iterator portIter = outputPorts.begin(); portIter != outputPorts.end(); ++portIter) { - _logger->log_debug("Got a current port, iterating..."); + } + } + } +} - YAML::Node currPort = portIter->as(); +void FlowControllerImpl::parseConnectionYaml(YAML::Node *connectionsNode, + ProcessGroup *parent) { + uuid_t uuid; + Connection *connection = NULL; - this->parsePortYaml(&currPort, group, RECEIVE); - } // for node - } + if (!parent) { + _logger->log_error("parseProcessNode: no parent group was provided"); + return; + } - } - } - } -} + if (connectionsNode) { + + if (connectionsNode->IsSequence()) { + for (YAML::const_iterator iter = connectionsNode->begin(); + iter != connectionsNode->end(); ++iter) { + // generate the random UUID + uuid_generate(uuid); + + YAML::Node connectionNode = iter->as(); + + std::string name = connectionNode["name"].as(); + std::string destName = connectionNode["destination name"].as< + std::string>(); + + char uuidStr[37]; + uuid_unparse(_uuid, uuidStr); + + _logger->log_debug( + "Created connection with UUID %s and name %s", uuidStr, + name.c_str()); + connection = this->createConnection(name, uuid); + auto rawRelationship = + connectionNode["source relationship name"].as< + std::string>(); + Relationship relationship(rawRelationship, ""); + _logger->log_debug("parseConnection: relationship => [%s]", + rawRelationship.c_str()); + if (connection) + connection->setRelationship(relationship); + std::string connectionSrcProcName = + connectionNode["source name"].as(); + + Processor *srcProcessor = this->_root->findProcessor( + connectionSrcProcName); + + if (!srcProcessor) { + _logger->log_error( + "Could not locate a source with name %s to create a connection", + connectionSrcProcName.c_str()); + throw std::invalid_argument( + "Could not locate a source with name %s to create a connection " + + connectionSrcProcName); + } -void FlowController::parseConnectionYaml(YAML::Node *connectionsNode, ProcessGroup *parent) { - uuid_t uuid; - Connection *connection = NULL; + Processor *destProcessor = this->_root->findProcessor(destName); + // If we could not find name, try by UUID + if (!destProcessor) { + uuid_t destUuid; + uuid_parse(destName.c_str(), destUuid); + destProcessor = this->_root->findProcessor(destUuid); + } + if (destProcessor) { + std::string destUuid = destProcessor->getUUIDStr(); + } - if (!parent) { - _logger->log_error("parseProcessNode: no parent group was provided"); - return; - } + uuid_t srcUuid; + uuid_t destUuid; + srcProcessor->getUUID(srcUuid); + connection->setSourceProcessorUUID(srcUuid); + destProcessor->getUUID(destUuid); + connection->setDestinationProcessorUUID(destUuid); - if (connectionsNode) { - - if (connectionsNode->IsSequence()) { - for (YAML::const_iterator iter = connectionsNode->begin(); iter != connectionsNode->end(); ++iter) { - // generate the random UUID - uuid_generate(uuid); - - YAML::Node connectionNode = iter->as(); - - std::string name = connectionNode["name"].as(); - std::string destName = connectionNode["destination name"].as(); - - char uuidStr[37]; - uuid_unparse(_uuid, uuidStr); - - _logger->log_debug("Created connection with UUID %s and name %s", uuidStr, name.c_str()); - connection = this->createConnection(name, uuid); - auto rawRelationship = connectionNode["source relationship name"].as(); - Relationship relationship(rawRelationship, ""); - _logger->log_debug("parseConnection: relationship => [%s]", rawRelationship.c_str()); - if (connection) - connection->setRelationship(relationship); - std::string connectionSrcProcName = connectionNode["source name"].as(); - - Processor *srcProcessor = this->_root->findProcessor(connectionSrcProcName); - - if (!srcProcessor) { - _logger->log_error("Could not locate a source with name %s to create a connection", - connectionSrcProcName.c_str()); - throw std::invalid_argument( - "Could not locate a source with name %s to create a connection " + connectionSrcProcName); - } - - Processor *destProcessor = this->_root->findProcessor(destName); - // If we could not find name, try by UUID - if (!destProcessor) { - uuid_t destUuid; - uuid_parse(destName.c_str(), destUuid); - destProcessor = this->_root->findProcessor(destUuid); - } - if (destProcessor) { - std::string destUuid = destProcessor->getUUIDStr(); - } - - uuid_t srcUuid; - uuid_t destUuid; - srcProcessor->getUUID(srcUuid); - connection->setSourceProcessorUUID(srcUuid); - destProcessor->getUUID(destUuid); - connection->setDestinationProcessorUUID(destUuid); - - if (connection) { - parent->addConnection(connection); - } - } - } - - if (connection) - parent->addConnection(connection); - - return; - } -} + if (connection) { + parent->addConnection(connection); + } + } + } + + if (connection) + parent->addConnection(connection); + return; + } +} -void FlowController::parsePortYaml(YAML::Node *portNode, ProcessGroup *parent, TransferDirection direction) { - uuid_t uuid; - Processor *processor = NULL; - RemoteProcessorGroupPort *port = NULL; +void FlowControllerImpl::parsePortYaml(YAML::Node *portNode, + ProcessGroup *parent, TransferDirection direction) { + uuid_t uuid; + Processor *processor = NULL; + RemoteProcessorGroupPort *port = NULL; - if (!parent) { - _logger->log_error("parseProcessNode: no parent group existed"); - return; - } + if (!parent) { + _logger->log_error("parseProcessNode: no parent group existed"); + return; + } - YAML::Node inputPortsObj = portNode->as(); + YAML::Node inputPortsObj = portNode->as(); - // generate the random UIID - uuid_generate(uuid); + // generate the random UIID + uuid_generate(uuid); - auto portId = inputPortsObj["id"].as(); - auto nameStr = inputPortsObj["name"].as(); - uuid_parse(portId.c_str(), uuid); + auto portId = inputPortsObj["id"].as(); + auto nameStr = inputPortsObj["name"].as(); + uuid_parse(portId.c_str(), uuid); - port = new RemoteProcessorGroupPort(nameStr.c_str(), uuid); + port = new RemoteProcessorGroupPort(nameStr.c_str(), uuid); - processor = (Processor *) port; - port->setDirection(direction); - port->setTimeOut(parent->getTimeOut()); - port->setTransmitting(true); - processor->setYieldPeriodMsec(parent->getYieldPeriodMsec()); - processor->initialize(); + processor = (Processor *) port; + port->setDirection(direction); + port->setTimeOut(parent->getTimeOut()); + port->setTransmitting(true); + processor->setYieldPeriodMsec(parent->getYieldPeriodMsec()); + processor->initialize(); - // handle port properties - YAML::Node nodeVal = portNode->as(); - YAML::Node propertiesNode = nodeVal["Properties"]; + // handle port properties + YAML::Node nodeVal = portNode->as(); + YAML::Node propertiesNode = nodeVal["Properties"]; - parsePropertiesNodeYaml(&propertiesNode, processor); + parsePropertiesNodeYaml(&propertiesNode, processor); - // add processor to parent - parent->addProcessor(processor); - processor->setScheduledState(RUNNING); - auto rawMaxConcurrentTasks = inputPortsObj["max concurrent tasks"].as(); - int64_t maxConcurrentTasks; - if (Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) { - processor->setMaxConcurrentTasks(maxConcurrentTasks); - } - _logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks); - processor->setMaxConcurrentTasks(maxConcurrentTasks); + // add processor to parent + parent->addProcessor(processor); + processor->setScheduledState(RUNNING); + auto rawMaxConcurrentTasks = inputPortsObj["max concurrent tasks"].as< + std::string>(); + int64_t maxConcurrentTasks; + if (Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) { + processor->setMaxConcurrentTasks(maxConcurrentTasks); + } + _logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", + maxConcurrentTasks); + processor->setMaxConcurrentTasks(maxConcurrentTasks); } - -void FlowController::parsePropertiesNodeYaml(YAML::Node *propertiesNode, Processor *processor) -{ - // Treat generically as a YAML node so we can perform inspection on entries to ensure they are populated - for (YAML::const_iterator propsIter = propertiesNode->begin(); propsIter != propertiesNode->end(); ++propsIter) - { - std::string propertyName = propsIter->first.as(); - YAML::Node propertyValueNode = propsIter->second; - if (!propertyValueNode.IsNull() && propertyValueNode.IsDefined()) - { - std::string rawValueString = propertyValueNode.as(); - if (!processor->setProperty(propertyName, rawValueString)) - { - _logger->log_warn("Received property %s with value %s but is not one of the properties for %s", propertyName.c_str(), rawValueString.c_str(), processor->getName().c_str()); - } - } - } +void FlowControllerImpl::parsePropertiesNodeYaml(YAML::Node *propertiesNode, + Processor *processor) { + // Treat generically as a YAML node so we can perform inspection on entries to ensure they are populated + for (YAML::const_iterator propsIter = propertiesNode->begin(); + propsIter != propertiesNode->end(); ++propsIter) { + std::string propertyName = propsIter->first.as(); + YAML::Node propertyValueNode = propsIter->second; + if (!propertyValueNode.IsNull() && propertyValueNode.IsDefined()) { + std::string rawValueString = propertyValueNode.as(); + if (!processor->setProperty(propertyName, rawValueString)) { + _logger->log_warn( + "Received property %s with value %s but is not one of the properties for %s", + propertyName.c_str(), rawValueString.c_str(), + processor->getName().c_str()); + } + } + } } -void FlowController::load() { +void FlowControllerImpl::load() { if (_running) { stop(true); } @@ -732,7 +809,7 @@ void FlowController::load() { } } -void FlowController::reload(std::string yamlFile) +void FlowControllerImpl::reload(std::string yamlFile) { _logger->log_info("Starting to reload Flow Controller with yaml %s", yamlFile.c_str()); stop(true); @@ -752,22 +829,24 @@ void FlowController::reload(std::string yamlFile) } } -bool FlowController::start() { - if (!_initialized) { - _logger->log_error("Can not start Flow Controller because it has not been initialized"); - return false; - } else { - if (!_running) { - _logger->log_info("Start Flow Controller"); - this->_timerScheduler.start(); - this->_eventScheduler.start(); - if (this->_root) - this->_root->startProcessing( - &this->_timerScheduler, - &this->_eventScheduler); - _running = true; - this->_protocol->start(); - } - return true; - } +bool FlowControllerImpl::start() { + if (!_initialized) { + _logger->log_error( + "Can not start Flow Controller because it has not been initialized"); + return false; + } else { + + if (!_running) { + _logger->log_info("Starting Flow Controller"); + this->_timerScheduler.start(); + this->_eventScheduler.start(); + if (this->_root) + this->_root->startProcessing(&this->_timerScheduler, + &this->_eventScheduler); + _running = true; + this->_protocol->start(); + _logger->log_info("Started Flow Controller"); + } + return true; + } } diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp index 2dda47a326..9d010168d8 100644 --- a/libminifi/src/FlowFileRecord.cpp +++ b/libminifi/src/FlowFileRecord.cpp @@ -83,7 +83,7 @@ FlowFileRecord::~FlowFileRecord() { // Decrease the flow file record owned count for the resource claim _claim->decreaseFlowFileRecordOwnedCount(); - if (_claim->getFlowFileRecordOwnedCount() == 0) + if (_claim->getFlowFileRecordOwnedCount() <= 0) { _logger->log_debug("Delete Resource Claim %s", _claim->getContentFullPath().c_str()); std::remove(_claim->getContentFullPath().c_str()); diff --git a/libminifi/src/GetFile.cpp b/libminifi/src/GetFile.cpp index 70969c9f39..53d285da9d 100644 --- a/libminifi/src/GetFile.cpp +++ b/libminifi/src/GetFile.cpp @@ -32,8 +32,12 @@ #include #include #include +#if (__GNUC__ >= 4) + #if (__GNUC_MINOR__ < 9) + #include + #endif +#endif #include - #include "TimeUtil.h" #include "GetFile.h" #include "ProcessContext.h" @@ -81,6 +85,8 @@ void GetFile::initialize() void GetFile::onTrigger(ProcessContext *context, ProcessSession *session) { std::string value; + + _logger->log_info("onTrigger GetFile"); if (context->getProperty(Directory.getName(), value)) { _directory = value; @@ -97,6 +103,8 @@ void GetFile::onTrigger(ProcessContext *context, ProcessSession *session) { Property::StringToBool(value, _keepSourceFile); } + + _logger->log_info("onTrigger GetFile"); if (context->getProperty(MaxAge.getName(), value)) { TimeUnit unit; @@ -143,6 +151,7 @@ void GetFile::onTrigger(ProcessContext *context, ProcessSession *session) } // Perform directory list + _logger->log_info("Is listing empty %i",isListingEmpty()); if (isListingEmpty()) { if (_pollInterval == 0 || (getTimeMillis() - _lastDirectoryListingTime) > _pollInterval) @@ -150,6 +159,7 @@ void GetFile::onTrigger(ProcessContext *context, ProcessSession *session) performListing(_directory); } } + _logger->log_info("Is listing empty %i",isListingEmpty()); if (!isListingEmpty()) { @@ -159,6 +169,7 @@ void GetFile::onTrigger(ProcessContext *context, ProcessSession *session) pollListing(list, _batchSize); while (!list.empty()) { + std::string fileName = list.front(); list.pop(); _logger->log_info("GetFile process %s", fileName.c_str()); @@ -185,6 +196,7 @@ void GetFile::onTrigger(ProcessContext *context, ProcessSession *session) throw; } } + } bool GetFile::isListingEmpty() @@ -243,16 +255,34 @@ bool GetFile::acceptFile(std::string fullName, std::string name) if (_keepSourceFile == false && access(fullName.c_str(), W_OK) != 0) return false; - try { - std::regex re(_fileFilter); - if (!std::regex_match(name, re)) { - return false; - } - } catch (std::regex_error e) { - _logger->log_error("Invalid File Filter regex: %s.", e.what()); - return false; - } + #ifdef __GNUC__ + #if (__GNUC__ >= 4) + #if (__GNUC_MINOR__ < 9) + regex_t regex; + int ret = regcomp(®ex, _fileFilter.c_str(),0); + if (ret) + return false; + ret = regexec(®ex,name.c_str(),(size_t)0,NULL,0); + regfree(®ex); + if (ret) + return false; + #else + try{ + std::regex re(_fileFilter); + + if (!std::regex_match(name, re)) { + return false; + } + } catch (std::regex_error e) { + _logger->log_error("Invalid File Filter regex: %s.", e.what()); + return false; + } + #endif + #endif + #else + _logger->log_info("Cannot support regex filtering"); + #endif return true; } @@ -261,11 +291,14 @@ bool GetFile::acceptFile(std::string fullName, std::string name) void GetFile::performListing(std::string dir) { + _logger->log_info("Performing file listing against %s",dir.c_str()); DIR *d; d = opendir(dir.c_str()); if (!d) return; - while (1) + // only perform a listing while we are not empty + _logger->log_info("Performing file listing against %s",dir.c_str()); + while (isRunning()) { struct dirent *entry; entry = readdir(d); diff --git a/libminifi/src/ProcessGroup.cpp b/libminifi/src/ProcessGroup.cpp index 7c9827871a..b3cb8ac973 100644 --- a/libminifi/src/ProcessGroup.cpp +++ b/libminifi/src/ProcessGroup.cpp @@ -135,9 +135,10 @@ void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler, try { // Start all the processor node, input and output ports - for (std::set::iterator it = _processors.begin(); it != _processors.end(); ++it) + for(auto processor : _processors) { - Processor *processor(*it); + _logger->log_debug("Starting %s",processor->getName().c_str()); + if (!processor->isRunning() && processor->getScheduledState() != DISABLED) { if (processor->getSchedulingStrategy() == TIMER_DRIVEN) @@ -146,10 +147,9 @@ void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler, eventScheduler->schedule(processor); } } - - for (std::set::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it) + // Start processing the group + for(auto processGroup : _childProcessGroups) { - ProcessGroup *processGroup(*it); processGroup->startProcessing(timeScheduler, eventScheduler); } } @@ -202,12 +202,14 @@ void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, Processor *ProcessGroup::findProcessor(uuid_t uuid) { + Processor *ret = NULL; // std::lock_guard lock(_mtx); for (std::set::iterator it = _processors.begin(); it != _processors.end(); ++it) { Processor *processor(*it); + _logger->log_info("find processor %s",processor->getName().c_str()); uuid_t processorUUID; if (processor->getUUID(processorUUID) && uuid_compare(processorUUID, uuid) == 0) return processor; @@ -215,7 +217,9 @@ Processor *ProcessGroup::findProcessor(uuid_t uuid) for (std::set::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it) { + ProcessGroup *processGroup(*it); + _logger->log_info("find processor child %s",processGroup->getName().c_str()); Processor *processor = processGroup->findProcessor(uuid); if (processor) return processor; diff --git a/libminifi/src/ProcessSession.cpp b/libminifi/src/ProcessSession.cpp index abe75bd67a..f3d769e33f 100644 --- a/libminifi/src/ProcessSession.cpp +++ b/libminifi/src/ProcessSession.cpp @@ -204,7 +204,7 @@ void ProcessSession::write(FlowFileRecord *flow, OutputStreamCallback *callback) { ResourceClaim *claim = NULL; - claim = new ResourceClaim(DEFAULT_CONTENT_DIRECTORY); + claim = new ResourceClaim(); try { @@ -382,7 +382,7 @@ void ProcessSession::import(std::string source, FlowFileRecord *flow, bool keepS { ResourceClaim *claim = NULL; - claim = new ResourceClaim(DEFAULT_CONTENT_DIRECTORY); + claim = new ResourceClaim(); char *buf = NULL; int size = 4096; buf = new char [size]; @@ -420,9 +420,10 @@ void ProcessSession::import(std::string source, FlowFileRecord *flow, bool keepS } flow->_claim = claim; claim->increaseFlowFileRecordOwnedCount(); - /* + _logger->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", - flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */ + flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); + fs.close(); input.close(); if (!keepSource) @@ -478,10 +479,9 @@ void ProcessSession::commit() try { // First we clone the flow record based on the transfered relationship for updated flow record - std::map::iterator it; - for (it = _updatedFlowFiles.begin(); it!= _updatedFlowFiles.end(); it++) + for (auto && it : _updatedFlowFiles) { - FlowFileRecord *record = it->second; + FlowFileRecord *record = it.second; if (record->_markedDelete) continue; std::map::iterator itRelationship = @@ -537,11 +537,10 @@ void ProcessSession::commit() throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the flow"); } } - // Do the samething for added flow file - for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); it++) + for(const auto it : _addedFlowFiles) { - FlowFileRecord *record = it->second; + FlowFileRecord *record = it.second; if (record->_markedDelete) continue; std::map::iterator itRelationship = @@ -597,11 +596,10 @@ void ProcessSession::commit() throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the flow"); } } - // Complete process the added and update flow files for the session, send the flow file to its queue - for (it = _updatedFlowFiles.begin(); it!= _updatedFlowFiles.end(); it++) + for(const auto &it : _updatedFlowFiles) { - FlowFileRecord *record = it->second; + FlowFileRecord *record = it.second; if (record->_markedDelete) { continue; @@ -611,9 +609,9 @@ void ProcessSession::commit() else delete record; } - for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); it++) + for(const auto &it : _addedFlowFiles) { - FlowFileRecord *record = it->second; + FlowFileRecord *record = it.second; if (record->_markedDelete) { continue; @@ -624,9 +622,9 @@ void ProcessSession::commit() delete record; } // Process the clone flow files - for (it = _clonedFlowFiles.begin(); it!= _clonedFlowFiles.end(); it++) + for(const auto &it : _clonedFlowFiles) { - FlowFileRecord *record = it->second; + FlowFileRecord *record = it.second; if (record->_markedDelete) { continue; @@ -637,15 +635,15 @@ void ProcessSession::commit() delete record; } // Delete the deleted flow files - for (it = _deletedFlowFiles.begin(); it!= _deletedFlowFiles.end(); it++) + for(const auto &it : _deletedFlowFiles) { - FlowFileRecord *record = it->second; + FlowFileRecord *record = it.second; delete record; } // Delete the snapshot - for (it = _originalFlowFiles.begin(); it!= _originalFlowFiles.end(); it++) + for(const auto &it : _originalFlowFiles) { - FlowFileRecord *record = it->second; + FlowFileRecord *record = it.second; delete record; } // All done @@ -675,11 +673,10 @@ void ProcessSession::rollback() { try { - std::map::iterator it; // Requeue the snapshot of the flowfile back - for (it = _originalFlowFiles.begin(); it!= _originalFlowFiles.end(); it++) + for(const auto &it : _originalFlowFiles) { - FlowFileRecord *record = it->second; + FlowFileRecord *record = it.second; if (record->_orginalConnection) { record->_snapshot = false; @@ -690,21 +687,21 @@ void ProcessSession::rollback() } _originalFlowFiles.clear(); // Process the clone flow files - for (it = _clonedFlowFiles.begin(); it!= _clonedFlowFiles.end(); it++) + for(const auto &it : _clonedFlowFiles) { - FlowFileRecord *record = it->second; + FlowFileRecord *record = it.second; delete record; } _clonedFlowFiles.clear(); - for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); it++) + for(const auto &it : _addedFlowFiles) { - FlowFileRecord *record = it->second; + FlowFileRecord *record = it.second; delete record; } _addedFlowFiles.clear(); - for (it = _updatedFlowFiles.begin(); it!= _updatedFlowFiles.end(); it++) + for(const auto &it : _updatedFlowFiles) { - FlowFileRecord *record = it->second; + FlowFileRecord *record = it.second; delete record; } _updatedFlowFiles.clear(); diff --git a/libminifi/src/Processor.cpp b/libminifi/src/Processor.cpp index 6a118933db..1b8e286c5a 100644 --- a/libminifi/src/Processor.cpp +++ b/libminifi/src/Processor.cpp @@ -42,7 +42,7 @@ Processor::Processor(std::string name, uuid_t uuid) char uuidStr[37]; uuid_unparse(_uuid, uuidStr); _uuidStr = uuidStr; - + _hasWork.store(false); // Setup the default values _state = DISABLED; _strategy = TIMER_DRIVEN; @@ -57,7 +57,6 @@ Processor::Processor(std::string name, uuid_t uuid) _yieldExpiration = 0; _incomingConnectionsIter = this->_incomingConnections.begin(); _logger = Logger::getLogger(); - _logger->log_info("Processor %s created UUID %s", _name.c_str(), _uuidStr.c_str()); } @@ -83,9 +82,8 @@ bool Processor::setSupportedProperties(std::set properties) std::lock_guard lock(_mtx); _properties.clear(); - for (std::set::iterator it = properties.begin(); it != properties.end(); ++it) + for (auto item : properties) { - Property item(*it); _properties[item.getName()] = item; _logger->log_info("Processor %s supported property name %s", _name.c_str(), item.getName().c_str()); } @@ -105,9 +103,8 @@ bool Processor::setSupportedRelationships(std::set relationships) std::lock_guard lock(_mtx); _relationships.clear(); - for (std::set::iterator it = relationships.begin(); it != relationships.end(); ++it) + for(auto item : relationships) { - Relationship item(*it); _relationships[item.getName()] = item; _logger->log_info("Processor %s supported relationship name %s", _name.c_str(), item.getName().c_str()); } @@ -127,9 +124,8 @@ bool Processor::setAutoTerminatedRelationships(std::set relationsh std::lock_guard lock(_mtx); _autoTerminatedRelationships.clear(); - for (std::set::iterator it = relationships.begin(); it != relationships.end(); ++it) + for(auto item : relationships) { - Relationship item(*it); _autoTerminatedRelationships[item.getName()] = item; _logger->log_info("Processor %s auto terminated relationship name %s", _name.c_str(), item.getName().c_str()); } @@ -140,21 +136,18 @@ bool Processor::setAutoTerminatedRelationships(std::set relationsh bool Processor::isAutoTerminated(Relationship relationship) { bool isRun = isRunning(); + + auto conditionalLock = !isRun ? + std::unique_lock() + : std::unique_lock(_mtx); - if (!isRun) - _mtx.lock(); - - std::map::iterator it = _autoTerminatedRelationships.find(relationship.getName()); + const auto &it = _autoTerminatedRelationships.find(relationship.getName()); if (it != _autoTerminatedRelationships.end()) { - if (!isRun) - _mtx.unlock(); return true; } else { - if (!isRun) - _mtx.unlock(); return false; } } @@ -163,20 +156,17 @@ bool Processor::isSupportedRelationship(Relationship relationship) { bool isRun = isRunning(); - if (!isRun) - _mtx.lock(); + auto conditionalLock = !isRun ? + std::unique_lock() + : std::unique_lock(_mtx); - std::map::iterator it = _relationships.find(relationship.getName()); + const auto &it = _relationships.find(relationship.getName()); if (it != _relationships.end()) { - if (!isRun) - _mtx.unlock(); return true; } else { - if (!isRun) - _mtx.unlock(); return false; } } @@ -185,23 +175,20 @@ bool Processor::getProperty(std::string name, std::string &value) { bool isRun = isRunning(); - if (!isRun) - // Because set property only allowed in non running state, we need to obtain lock avoid rack condition - _mtx.lock(); - - std::map::iterator it = _properties.find(name); + + auto conditionalLock = !isRun ? + std::unique_lock() + : std::unique_lock(_mtx); + + const auto &it = _properties.find(name); if (it != _properties.end()) { Property item = it->second; value = item.getValue(); - if (!isRun) - _mtx.unlock(); return true; } else { - if (!isRun) - _mtx.unlock(); return false; } } @@ -210,7 +197,7 @@ bool Processor::setProperty(std::string name, std::string value) { std::lock_guard lock(_mtx); - std::map::iterator it = _properties.find(name); + auto &&it = _properties.find(name); if (it != _properties.end()) { @@ -254,7 +241,7 @@ std::set Processor::getOutGoingConnections(std::string relationshi { std::set empty; - std::map>::iterator it = _outGoingConnections.find(relationship); + auto &&it = _outGoingConnections.find(relationship); if (it != _outGoingConnections.end()) { return _outGoingConnections[relationship]; @@ -269,6 +256,7 @@ bool Processor::addConnection(Connection *connection) { bool ret = false; + if (isRunning()) { _logger->log_info("Can not add connection while the process %s is running", @@ -276,6 +264,7 @@ bool Processor::addConnection(Connection *connection) return false; } + std::lock_guard lock(_mtx); uuid_t srcUUID; @@ -283,8 +272,14 @@ bool Processor::addConnection(Connection *connection) connection->getSourceProcessorUUID(srcUUID); connection->getDestinationProcessorUUID(destUUID); + char uuid_str[37]; - if (uuid_compare(_uuid, destUUID) == 0) + + uuid_unparse_lower(_uuid, uuid_str); + std::string my_uuid = uuid_str; + uuid_unparse_lower(destUUID, uuid_str); + std::string destination_uuid = uuid_str; + if (my_uuid == destination_uuid) { // Connection is destination to the current processor if (_incomingConnections.find(connection) == _incomingConnections.end()) @@ -297,12 +292,13 @@ bool Processor::addConnection(Connection *connection) ret = true; } } - - if (uuid_compare(_uuid, srcUUID) == 0) + uuid_unparse_lower(srcUUID, uuid_str); + std::string source_uuid = uuid_str; + if (my_uuid == source_uuid) { std::string relationship = connection->getRelationship().getName(); // Connection is source from the current processor - std::map>::iterator it = + auto &&it = _outGoingConnections.find(relationship); if (it != _outGoingConnections.end()) { @@ -321,6 +317,7 @@ bool Processor::addConnection(Connection *connection) } else { + // We do not have any outgoing connection for this relationship yet std::set newConnection; newConnection.insert(connection); @@ -331,6 +328,7 @@ bool Processor::addConnection(Connection *connection) ret = true; } } + return ret; } @@ -369,7 +367,7 @@ void Processor::removeConnection(Connection *connection) { std::string relationship = connection->getRelationship().getName(); // Connection is source from the current processor - std::map>::iterator it = + auto &&it = _outGoingConnections.find(relationship); if (it == _outGoingConnections.end()) { @@ -414,9 +412,8 @@ bool Processor::flowFilesQueued() if (_incomingConnections.size() == 0) return false; - for (std::set::iterator it = _incomingConnections.begin(); it != _incomingConnections.end(); ++it) + for(auto &&connection : _incomingConnections) { - Connection *connection = *it; if (connection->getQueueSize() > 0) return true; } @@ -428,15 +425,12 @@ bool Processor::flowFilesOutGoingFull() { std::lock_guard lock(_mtx); - std::map>::iterator it; - - for (it = _outGoingConnections.begin(); it != _outGoingConnections.end(); ++it) + for(auto &&connection : _outGoingConnections) { // We already has connection for this relationship - std::set existedConnection = it->second; - for (std::set::iterator itConnection = existedConnection.begin(); itConnection != existedConnection.end(); ++itConnection) + std::set existedConnection = connection.second; + for(const auto connection : existedConnection) { - Connection *connection = *itConnection; if (connection->isFull()) return true; } @@ -476,15 +470,14 @@ void Processor::onTrigger() void Processor::waitForWork(uint64_t timeoutMs) { - std::unique_lock lock(_workAvailableMtx); - _hasWork = isWorkAvailable(); + _hasWork.store( isWorkAvailable() ); - if (!_hasWork) + if (!_hasWork.load()) { - _hasWorkCondition.wait_for(lock, std::chrono::milliseconds(timeoutMs), [&] { return _hasWork; }); + std::unique_lock lock(_workAvailableMtx); + _hasWorkCondition.wait_for(lock, std::chrono::milliseconds(timeoutMs), [&] { return _hasWork.load(); }); } - lock.unlock(); } void Processor::notifyWork() @@ -496,17 +489,12 @@ void Processor::notifyWork() } { - std::unique_lock lock(_workAvailableMtx); - _hasWork = isWorkAvailable(); - - // Keep a scope-local copy of the state to avoid race conditions - bool hasWork = _hasWork; + _hasWork.store( isWorkAvailable() ); - lock.unlock(); - if (hasWork) + if (_hasWork.load()) { - _hasWorkCondition.notify_one(); + _hasWorkCondition.notify_one(); } } } diff --git a/libminifi/src/Provenance.cpp b/libminifi/src/Provenance.cpp index a2a4310f1a..c21de76d7f 100644 --- a/libminifi/src/Provenance.cpp +++ b/libminifi/src/Provenance.cpp @@ -17,648 +17,369 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include +#include +#include +#include "Serializable.h" #include "Provenance.h" #include "Relationship.h" #include "Logger.h" #include "FlowController.h" -int ProvenanceEventRecord::readUTF(std::string &str, bool widen) -{ - uint16_t utflen; - int ret; - - if (!widen) - { - ret = read(utflen); - if (ret <= 0) - return ret; - } - else - { - uint32_t len; - ret = read(len); - if (ret <= 0) - return ret; - utflen = len; - } - - uint8_t *bytearr = NULL; - char *chararr = NULL; - bytearr = new uint8_t[utflen]; - chararr = new char[utflen]; - memset(chararr, 0, utflen); - - int c, char2, char3; - int count = 0; - int chararr_count=0; - - ret = read(bytearr, utflen); - if (ret <= 0) - { - delete[] bytearr; - delete[] chararr; - if (ret == 0) - { - if (!widen) - return (2 + utflen); - else - return (4 + utflen); - } - else - return ret; - } - - while (count < utflen) { - c = (int) bytearr[count] & 0xff; - if (c > 127) break; - count++; - chararr[chararr_count++]=(char)c; - } - - while (count < utflen) { - c = (int) bytearr[count] & 0xff; - switch (c >> 4) { - case 0: case 1: case 2: case 3: case 4: case 5: case 6: case 7: - /* 0xxxxxxx*/ - count++; - chararr[chararr_count++]=(char)c; - break; - case 12: case 13: - /* 110x xxxx 10xx xxxx*/ - count += 2; - if (count > utflen) - { - delete[] bytearr; - delete[] chararr; - return -1; - } - char2 = (int) bytearr[count-1]; - if ((char2 & 0xC0) != 0x80) - { - delete[] bytearr; - delete[] chararr; - return -1; - } - chararr[chararr_count++]=(char)(((c & 0x1F) << 6) | - (char2 & 0x3F)); - break; - case 14: - /* 1110 xxxx 10xx xxxx 10xx xxxx */ - count += 3; - if (count > utflen) - { - delete[] bytearr; - delete[] chararr; - return -1; - } - char2 = (int) bytearr[count-2]; - char3 = (int) bytearr[count-1]; - if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) - { - delete[] bytearr; - delete[] chararr; - return -1; - } - chararr[chararr_count++]=(char)(((c & 0x0F) << 12) | - ((char2 & 0x3F) << 6) | - ((char3 & 0x3F) << 0)); - break; - default: - delete[] bytearr; - delete[] chararr; - return -1; - } - } - // The number of chars produced may be less than utflen - std::string value(chararr, chararr_count); - str = value; - delete[] bytearr; - delete[] chararr; - if (!widen) - return (2 + utflen); - else - return (4 + utflen); -} - -int ProvenanceEventRecord::writeUTF(std::string str, bool widen) -{ - int strlen = str.length(); - int utflen = 0; - int c, count = 0; - - /* use charAt instead of copying String to char array */ - for (int i = 0; i < strlen; i++) { - c = str.at(i); - if ((c >= 0x0001) && (c <= 0x007F)) { - utflen++; - } else if (c > 0x07FF) { - utflen += 3; - } else { - utflen += 2; - } - } - - if (utflen > 65535) - return -1; - - uint8_t *bytearr = NULL; - if (!widen) - { - bytearr = new uint8_t[utflen+2]; - bytearr[count++] = (uint8_t) ((utflen >> 8) & 0xFF); - bytearr[count++] = (uint8_t) ((utflen >> 0) & 0xFF); - } - else - { - bytearr = new uint8_t[utflen+4]; - bytearr[count++] = (uint8_t) ((utflen >> 24) & 0xFF); - bytearr[count++] = (uint8_t) ((utflen >> 16) & 0xFF); - bytearr[count++] = (uint8_t) ((utflen >> 8) & 0xFF); - bytearr[count++] = (uint8_t) ((utflen >> 0) & 0xFF); - } - - int i=0; - for (i=0; i= 0x0001) && (c <= 0x007F))) break; - bytearr[count++] = (uint8_t) c; - } - - for (;i < strlen; i++){ - c = str.at(i); - if ((c >= 0x0001) && (c <= 0x007F)) { - bytearr[count++] = (uint8_t) c; - } else if (c > 0x07FF) { - bytearr[count++] = (uint8_t) (0xE0 | ((c >> 12) & 0x0F)); - bytearr[count++] = (uint8_t) (0x80 | ((c >> 6) & 0x3F)); - bytearr[count++] = (uint8_t) (0x80 | ((c >> 0) & 0x3F)); - } else { - bytearr[count++] = (uint8_t) (0xC0 | ((c >> 6) & 0x1F)); - bytearr[count++] = (uint8_t) (0x80 | ((c >> 0) & 0x3F)); - } - } - int ret; - if (!widen) - { - ret = writeData(bytearr, utflen+2); - } - else - { - ret = writeData(bytearr, utflen+4); - } - delete[] bytearr; - return ret; -} - //! DeSerialize -bool ProvenanceEventRecord::DeSerialize(ProvenanceRepository *repo, std::string key) -{ +bool ProvenanceEventRecord::DeSerialize(ProvenanceRepository *repo, + std::string key) { std::string value; bool ret; ret = repo->Get(key, value); - if (!ret) - { - _logger->log_error("NiFi Provenance Store event %s can not found", key.c_str()); + if (!ret) { + _logger->log_error("NiFi Provenance Store event %s can not found", + key.c_str()); return false; - } - else - _logger->log_debug("NiFi Provenance Read event %s length %d", key.c_str(), value.length()); + } else + _logger->log_debug("NiFi Provenance Read event %s length %d", + key.c_str(), value.length()); - ret = DeSerialize((unsigned char *) value.data(), value.length()); - if (ret) - { - _logger->log_debug("NiFi Provenance retrieve event %s size %d eventType %d success", _eventIdStr.c_str(), _serializeBufSize, _eventType); - } - else - { - _logger->log_debug("NiFi Provenance retrieve event %s size %d eventType %d fail", _eventIdStr.c_str(), _serializeBufSize, _eventType); + DataStream stream((const uint8_t*)value.data(),value.length()); + + ret = DeSerialize(stream); + + if (ret) { + _logger->log_debug( + "NiFi Provenance retrieve event %s size %d eventType %d success", + _eventIdStr.c_str(), stream.getSize(), _eventType); + } else { + _logger->log_debug( + "NiFi Provenance retrieve event %s size %d eventType %d fail", + _eventIdStr.c_str(), stream.getSize(), _eventType); } return ret; } -bool ProvenanceEventRecord::Serialize(ProvenanceRepository *repo) -{ - if (_serializedBuf) - // Serialize in progress - return false; - _serializedBuf = NULL; - _serializeBufSize = 0; - _maxSerializeBufSize = 0; - _serializedBuf = new uint8_t[PROVENANCE_EVENT_RECORD_SEG_SIZE]; - if (!_serializedBuf) - return false; - _maxSerializeBufSize = PROVENANCE_EVENT_RECORD_SEG_SIZE; +bool ProvenanceEventRecord::Serialize(ProvenanceRepository *repo) { + + DataStream outStream; int ret; - ret = writeUTF(this->_eventIdStr); - if (ret <= 0) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = writeUTF(this->_eventIdStr,&outStream); + if (ret <= 0) { + return false; } uint32_t eventType = this->_eventType; - ret = write(eventType); - if (ret != 4) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = write(eventType,&outStream); + if (ret != 4) { + return false; } - ret = write(this->_eventTime); - if (ret != 8) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = write(this->_eventTime,&outStream); + if (ret != 8) { + return false; } - ret = write(this->_entryDate); - if (ret != 8) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = write(this->_entryDate,&outStream); + if (ret != 8) { return false; } - ret = write(this->_eventDuration); - if (ret != 8) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = write(this->_eventDuration,&outStream); + if (ret != 8) { + return false; } - ret = write(this->_lineageStartDate); - if (ret != 8) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = write(this->_lineageStartDate,&outStream); + if (ret != 8) { + return false; } - ret = writeUTF(this->_componentId); - if (ret <= 0) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = writeUTF(this->_componentId,&outStream); + if (ret <= 0) { + return false; } - ret = writeUTF(this->_componentType); - if (ret <= 0) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = writeUTF(this->_componentType,&outStream); + if (ret <= 0) { + return false; } - ret = writeUTF(this->_uuid); - if (ret <= 0) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = writeUTF(this->_uuid,&outStream); + if (ret <= 0) { + return false; } - ret = writeUTF(this->_details); - if (ret <= 0) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = writeUTF(this->_details,&outStream); + if (ret <= 0) { + return false; } // write flow attributes uint32_t numAttributes = this->_attributes.size(); - ret = write(numAttributes); - if (ret != 4) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = write(numAttributes,&outStream); + if (ret != 4) { + return false; } - std::map::iterator itAttribute; - for (itAttribute = this->_attributes.begin(); itAttribute!= this->_attributes.end(); itAttribute++) - { - ret = writeUTF(itAttribute->first, true); - if (ret <= 0) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + for (auto itAttribute : _attributes) { + ret = writeUTF(itAttribute.first,&outStream, true); + if (ret <= 0) { + return false; } - ret = writeUTF(itAttribute->second, true); - if (ret <= 0) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = writeUTF(itAttribute.second,&outStream, true); + if (ret <= 0) { + return false; } } - ret = writeUTF(this->_contentFullPath); - if (ret <= 0) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = writeUTF(this->_contentFullPath,&outStream); + if (ret <= 0) { + return false; } - ret = write(this->_size); - if (ret != 8) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = write(this->_size,&outStream); + if (ret != 8) { + return false; } - ret = write(this->_offset); - if (ret != 8) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = write(this->_offset,&outStream); + if (ret != 8) { + return false; } - ret = writeUTF(this->_sourceQueueIdentifier); - if (ret <= 0) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = writeUTF(this->_sourceQueueIdentifier,&outStream); + if (ret <= 0) { + return false; } - if (this->_eventType == ProvenanceEventRecord::FORK || this->_eventType == ProvenanceEventRecord::CLONE || this->_eventType == ProvenanceEventRecord::JOIN) - { + if (this->_eventType == ProvenanceEventRecord::FORK + || this->_eventType == ProvenanceEventRecord::CLONE + || this->_eventType == ProvenanceEventRecord::JOIN) { // write UUIDs uint32_t number = this->_parentUuids.size(); - ret = write(number); - if (ret != 4) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = write(number,&outStream); + if (ret != 4) { + return false; } - std::vector::iterator it; - for (it = this->_parentUuids.begin(); it!= this->_parentUuids.end(); it++) - { - std::string parentUUID = *it; - ret = writeUTF(parentUUID); - if (ret <= 0) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + for (auto parentUUID : _parentUuids) { + ret = writeUTF(parentUUID,&outStream); + if (ret <= 0) { + return false; } } number = this->_childrenUuids.size(); - ret = write(number); - if (ret != 4) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = write(number,&outStream); + if (ret != 4) { return false; } - for (it = this->_childrenUuids.begin(); it!= this->_childrenUuids.end(); it++) - { - std::string childUUID = *it; - ret = writeUTF(childUUID); - if (ret <= 0) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + for (auto childUUID : _childrenUuids) { + ret = writeUTF(childUUID,&outStream); + if (ret <= 0) { + return false; } } - } - else if (this->_eventType == ProvenanceEventRecord::SEND || this->_eventType == ProvenanceEventRecord::FETCH) - { - ret = writeUTF(this->_transitUri); - if (ret <= 0) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + } else if (this->_eventType == ProvenanceEventRecord::SEND + || this->_eventType == ProvenanceEventRecord::FETCH) { + ret = writeUTF(this->_transitUri,&outStream); + if (ret <= 0) { + return false; } - } - else if (this->_eventType == ProvenanceEventRecord::RECEIVE) - { - ret = writeUTF(this->_transitUri); - if (ret <= 0) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + } else if (this->_eventType == ProvenanceEventRecord::RECEIVE) { + ret = writeUTF(this->_transitUri,&outStream); + if (ret <= 0) { + return false; } - ret = writeUTF(this->_sourceSystemFlowFileIdentifier); - if (ret <= 0) - { - delete[] _serializedBuf; - _serializedBuf = NULL; + ret = writeUTF(this->_sourceSystemFlowFileIdentifier,&outStream); + if (ret <= 0) { + return false; } } // Persistent to the DB - if (repo->Put(_eventIdStr, _serializedBuf, _serializeBufSize)) - { - _logger->log_debug("NiFi Provenance Store event %s size %d success", _eventIdStr.c_str(), _serializeBufSize); - } - else - { - _logger->log_error("NiFi Provenance Store event %s size %d fail", _eventIdStr.c_str(), _serializeBufSize); + if (repo->Put(_eventIdStr, const_cast(outStream.getBuffer()), outStream.getSize())) { + _logger->log_debug("NiFi Provenance Store event %s size %d success", + _eventIdStr.c_str(), outStream.getSize()); + } else { + _logger->log_error("NiFi Provenance Store event %s size %d fail", + _eventIdStr.c_str(), outStream.getSize()); } // cleanup - delete[] (_serializedBuf); - _serializedBuf = NULL; - _serializeBufSize = 0; return true; } -bool ProvenanceEventRecord::DeSerialize(uint8_t *buffer, int bufferSize) -{ - _serializedBuf = buffer; - _serializeBufSize = 0; - _maxSerializeBufSize = bufferSize; +bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const int bufferSize) { int ret; - ret = readUTF(this->_eventIdStr); - if (ret <= 0) - { + DataStream outStream(buffer,bufferSize); + + ret = readUTF(this->_eventIdStr,&outStream); + + if (ret <= 0) { return false; } uint32_t eventType; - ret = read(eventType); - if (ret != 4) - { + ret = read(eventType,&outStream); + if (ret != 4) { return false; } this->_eventType = (ProvenanceEventRecord::ProvenanceEventType) eventType; - ret = read(this->_eventTime); - if (ret != 8) - { + ret = read(this->_eventTime,&outStream); + if (ret != 8) { return false; } - ret = read(this->_entryDate); - if (ret != 8) - { + ret = read(this->_entryDate,&outStream); + if (ret != 8) { return false; } - ret = read(this->_eventDuration); - if (ret != 8) - { + ret = read(this->_eventDuration,&outStream); + if (ret != 8) { return false; } - ret = read(this->_lineageStartDate); - if (ret != 8) - { + ret = read(this->_lineageStartDate,&outStream); + if (ret != 8) { return false; } - ret = readUTF(this->_componentId); - if (ret <= 0) - { + ret = readUTF(this->_componentId,&outStream); + if (ret <= 0) { return false; } - ret = readUTF(this->_componentType); - if (ret <= 0) - { + ret = readUTF(this->_componentType,&outStream); + if (ret <= 0) { return false; } - ret = readUTF(this->_uuid); - if (ret <= 0) - { + ret = readUTF(this->_uuid,&outStream); + if (ret <= 0) { return false; } - ret = readUTF(this->_details); - if (ret <= 0) - { + ret = readUTF(this->_details,&outStream); + + if (ret <= 0) { return false; } // read flow attributes uint32_t numAttributes = 0; - ret = read(numAttributes); - if (ret != 4) - { + ret = read(numAttributes,&outStream); + if (ret != 4) { return false; } - for (uint32_t i = 0; i < numAttributes; i++) - { + for (uint32_t i = 0; i < numAttributes; i++) { std::string key; - ret = readUTF(key, true); - if (ret <= 0) - { + ret = readUTF(key,&outStream, true); + if (ret <= 0) { return false; } std::string value; - ret = readUTF(value, true); - if (ret <= 0) - { + ret = readUTF(value,&outStream, true); + if (ret <= 0) { return false; } this->_attributes[key] = value; } - ret = readUTF(this->_contentFullPath); - if (ret <= 0) - { + ret = readUTF(this->_contentFullPath,&outStream); + if (ret <= 0) { return false; } - ret = read(this->_size); - if (ret != 8) - { + ret = read(this->_size,&outStream); + if (ret != 8) { return false; } - ret = read(this->_offset); - if (ret != 8) - { + ret = read(this->_offset,&outStream); + if (ret != 8) { return false; } - ret = readUTF(this->_sourceQueueIdentifier); - if (ret <= 0) - { + ret = readUTF(this->_sourceQueueIdentifier,&outStream); + if (ret <= 0) { return false; } - if (this->_eventType == ProvenanceEventRecord::FORK || this->_eventType == ProvenanceEventRecord::CLONE || this->_eventType == ProvenanceEventRecord::JOIN) - { + if (this->_eventType == ProvenanceEventRecord::FORK + || this->_eventType == ProvenanceEventRecord::CLONE + || this->_eventType == ProvenanceEventRecord::JOIN) { // read UUIDs uint32_t number = 0; - ret = read(number); - if (ret != 4) - { + ret = read(number,&outStream); + if (ret != 4) { return false; } - for (uint32_t i = 0; i < number; i++) - { + + + for (uint32_t i = 0; i < number; i++) { std::string parentUUID; - ret = readUTF(parentUUID); - if (ret <= 0) - { + ret = readUTF(parentUUID,&outStream); + if (ret <= 0) { return false; } this->addParentUuid(parentUUID); } number = 0; - ret = read(number); - if (ret != 4) - { + ret = read(number,&outStream); + if (ret != 4) { return false; } - for (uint32_t i = 0; i < number; i++) - { + for (uint32_t i = 0; i < number; i++) { std::string childUUID; - ret = readUTF(childUUID); - if (ret <= 0) - { + ret = readUTF(childUUID,&outStream); + if (ret <= 0) { return false; } this->addChildUuid(childUUID); } - } - else if (this->_eventType == ProvenanceEventRecord::SEND || this->_eventType == ProvenanceEventRecord::FETCH) - { - ret = readUTF(this->_transitUri); - if (ret <= 0) - { + } else if (this->_eventType == ProvenanceEventRecord::SEND + || this->_eventType == ProvenanceEventRecord::FETCH) { + ret = readUTF(this->_transitUri,&outStream); + if (ret <= 0) { return false; } - } - else if (this->_eventType == ProvenanceEventRecord::RECEIVE) - { - ret = readUTF(this->_transitUri); - if (ret <= 0) - { + } else if (this->_eventType == ProvenanceEventRecord::RECEIVE) { + ret = readUTF(this->_transitUri,&outStream); + if (ret <= 0) { return false; } - ret = readUTF(this->_sourceSystemFlowFileIdentifier); - if (ret <= 0) - { + ret = readUTF(this->_sourceSystemFlowFileIdentifier,&outStream); + if (ret <= 0) { return false; } } @@ -666,35 +387,32 @@ bool ProvenanceEventRecord::DeSerialize(uint8_t *buffer, int bufferSize) return true; } -void ProvenanceReporter::commit() -{ - for (std::set::iterator it = _events.begin(); it != _events.end(); ++it) - { - ProvenanceEventRecord *event = (ProvenanceEventRecord *) (*it); - if (!FlowController::getFlowController()->getProvenanceRepository()->isFull()) - event->Serialize(FlowController::getFlowController()->getProvenanceRepository()); - else +void ProvenanceReporter::commit() { + for (auto event : _events) { + if (!FlowControllerFactory::getFlowController()->getProvenanceRepository()->isFull()) { + event->Serialize( + FlowControllerFactory::getFlowController()->getProvenanceRepository()); + } else { _logger->log_debug("Provenance Repository is full"); + } } } -void ProvenanceReporter::create(FlowFileRecord *flow, std::string detail) -{ - ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::CREATE, flow); +void ProvenanceReporter::create(FlowFileRecord *flow, std::string detail) { + ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::CREATE, + flow); - if (event) - { + if (event) { event->setDetails(detail); add(event); } } -void ProvenanceReporter::route(FlowFileRecord *flow, Relationship relation, std::string detail, uint64_t processingDuration) -{ +void ProvenanceReporter::route(FlowFileRecord *flow, Relationship relation, + std::string detail, uint64_t processingDuration) { ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::ROUTE, flow); - if (event) - { + if (event) { event->setDetails(detail); event->setRelationship(relation.getName()); event->setEventDuration(processingDuration); @@ -702,51 +420,49 @@ void ProvenanceReporter::route(FlowFileRecord *flow, Relationship relation, std: } } -void ProvenanceReporter::modifyAttributes(FlowFileRecord *flow, std::string detail) -{ - ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::ATTRIBUTES_MODIFIED, flow); +void ProvenanceReporter::modifyAttributes(FlowFileRecord *flow, + std::string detail) { + ProvenanceEventRecord *event = allocate( + ProvenanceEventRecord::ATTRIBUTES_MODIFIED, flow); - if (event) - { + if (event) { event->setDetails(detail); add(event); } } -void ProvenanceReporter::modifyContent(FlowFileRecord *flow, std::string detail, uint64_t processingDuration) -{ - ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::CONTENT_MODIFIED, flow); +void ProvenanceReporter::modifyContent(FlowFileRecord *flow, std::string detail, + uint64_t processingDuration) { + ProvenanceEventRecord *event = allocate( + ProvenanceEventRecord::CONTENT_MODIFIED, flow); - if (event) - { + if (event) { event->setDetails(detail); event->setEventDuration(processingDuration); add(event); } } -void ProvenanceReporter::clone(FlowFileRecord *parent, FlowFileRecord *child) -{ - ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::CLONE, parent); +void ProvenanceReporter::clone(FlowFileRecord *parent, FlowFileRecord *child) { + ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::CLONE, + parent); - if (event) - { + if (event) { event->addChildFlowFile(child); event->addParentFlowFile(parent); add(event); } } -void ProvenanceReporter::join(std::vector parents, FlowFileRecord *child, std::string detail, uint64_t processingDuration) -{ +void ProvenanceReporter::join(std::vector parents, + FlowFileRecord *child, std::string detail, + uint64_t processingDuration) { ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::JOIN, child); - if (event) - { + if (event) { event->addChildFlowFile(child); std::vector::iterator it; - for (it = parents.begin(); it!= parents.end(); it++) - { + for (it = parents.begin(); it != parents.end(); it++) { FlowFileRecord *record = *it; event->addParentFlowFile(record); } @@ -756,16 +472,16 @@ void ProvenanceReporter::join(std::vector parents, FlowFileRec } } -void ProvenanceReporter::fork(std::vector child, FlowFileRecord *parent, std::string detail, uint64_t processingDuration) -{ - ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::FORK, parent); +void ProvenanceReporter::fork(std::vector child, + FlowFileRecord *parent, std::string detail, + uint64_t processingDuration) { + ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::FORK, + parent); - if (event) - { + if (event) { event->addParentFlowFile(parent); std::vector::iterator it; - for (it = child.begin(); it!= child.end(); it++) - { + for (it = child.begin(); it != child.end(); it++) { FlowFileRecord *record = *it; event->addChildFlowFile(record); } @@ -775,71 +491,66 @@ void ProvenanceReporter::fork(std::vector child, FlowFileRecor } } -void ProvenanceReporter::expire(FlowFileRecord *flow, std::string detail) -{ - ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::EXPIRE, flow); +void ProvenanceReporter::expire(FlowFileRecord *flow, std::string detail) { + ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::EXPIRE, + flow); - if (event) - { + if (event) { event->setDetails(detail); add(event); } } -void ProvenanceReporter::drop(FlowFileRecord *flow, std::string reason) -{ +void ProvenanceReporter::drop(FlowFileRecord *flow, std::string reason) { ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::DROP, flow); - if (event) - { + if (event) { std::string dropReason = "Discard reason: " + reason; event->setDetails(dropReason); add(event); } } -void ProvenanceReporter::send(FlowFileRecord *flow, std::string transitUri, std::string detail, uint64_t processingDuration, bool force) -{ +void ProvenanceReporter::send(FlowFileRecord *flow, std::string transitUri, + std::string detail, uint64_t processingDuration, bool force) { ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::SEND, flow); - if (event) - { + if (event) { event->setTransitUri(transitUri); event->setDetails(detail); event->setEventDuration(processingDuration); - if (!force) - { + if (!force) { add(event); - } - else - { - if (!FlowController::getFlowController()->getProvenanceRepository()->isFull()) - event->Serialize(FlowController::getFlowController()->getProvenanceRepository()); + } else { + if (!FlowControllerFactory::getFlowController()->getProvenanceRepository()->isFull()) + event->Serialize( + FlowControllerFactory::getFlowController()->getProvenanceRepository()); delete event; } } } -void ProvenanceReporter::receive(FlowFileRecord *flow, std::string transitUri, std::string sourceSystemFlowFileIdentifier, std::string detail, uint64_t processingDuration) -{ - ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::RECEIVE, flow); +void ProvenanceReporter::receive(FlowFileRecord *flow, std::string transitUri, + std::string sourceSystemFlowFileIdentifier, std::string detail, + uint64_t processingDuration) { + ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::RECEIVE, + flow); - if (event) - { + if (event) { event->setTransitUri(transitUri); event->setDetails(detail); event->setEventDuration(processingDuration); - event->setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier); + event->setSourceSystemFlowFileIdentifier( + sourceSystemFlowFileIdentifier); add(event); } } -void ProvenanceReporter::fetch(FlowFileRecord *flow, std::string transitUri, std::string detail, uint64_t processingDuration) -{ +void ProvenanceReporter::fetch(FlowFileRecord *flow, std::string transitUri, + std::string detail, uint64_t processingDuration) { ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::FETCH, flow); - if (event) - { + if (event) { event->setTransitUri(transitUri); event->setDetails(detail); event->setEventDuration(processingDuration); @@ -849,8 +560,7 @@ void ProvenanceReporter::fetch(FlowFileRecord *flow, std::string transitUri, std uint64_t ProvenanceRepository::_repoSize = 0; -void ProvenanceRepository::start() -{ +void ProvenanceRepository::start() { if (this->_purgePeriod <= 0) return; if (_running) @@ -861,48 +571,47 @@ void ProvenanceRepository::start() _thread->detach(); } -void ProvenanceRepository::stop() -{ +void ProvenanceRepository::stop() { if (!_running) return; _running = false; _logger->log_info("ProvenanceRepository Monitor Thread Stop"); } -void ProvenanceRepository::run(ProvenanceRepository *repo) -{ +void ProvenanceRepository::run(ProvenanceRepository *repo) { // threshold for purge - uint64_t purgeThreshold = repo->_maxPartitionBytes*3/4; - while (repo->_running) - { - std::this_thread::sleep_for(std::chrono::milliseconds(repo->_purgePeriod)); + uint64_t purgeThreshold = repo->_maxPartitionBytes * 3 / 4; + while (repo->_running) { + std::this_thread::sleep_for( + std::chrono::milliseconds(repo->_purgePeriod)); uint64_t curTime = getTimeMillis(); uint64_t size = repo->repoSize(); - if (size >= purgeThreshold) - { + if (size >= purgeThreshold) { std::vector purgeList; - leveldb::Iterator* it = repo->_db->NewIterator(leveldb::ReadOptions()); - for (it->SeekToFirst(); it->Valid(); it->Next()) - { + leveldb::Iterator* it = repo->_db->NewIterator( + leveldb::ReadOptions()); + for (it->SeekToFirst(); it->Valid(); it->Next()) { ProvenanceEventRecord eventRead; std::string key = it->key().ToString(); - if (eventRead.DeSerialize((uint8_t *)it->value().data(), (int) it->value().size())) - { - if ((curTime - eventRead.getEventTime()) > repo->_maxPartitionMillis) + if (eventRead.DeSerialize((uint8_t *) it->value().data(), + (int) it->value().size())) { + if ((curTime - eventRead.getEventTime()) + > repo->_maxPartitionMillis) purgeList.push_back(key); - } - else - { - repo->_logger->log_debug("NiFi Provenance retrieve event %s fail", key.c_str()); + } else { + repo->_logger->log_debug( + "NiFi Provenance retrieve event %s fail", + key.c_str()); purgeList.push_back(key); } } delete it; std::vector::iterator itPurge; - for (itPurge = purgeList.begin(); itPurge!= purgeList.end(); itPurge++) - { + for (itPurge = purgeList.begin(); itPurge != purgeList.end(); + itPurge++) { std::string eventId = *itPurge; - repo->_logger->log_info("ProvenanceRepository Repo Purge %s", eventId.c_str()); + repo->_logger->log_info("ProvenanceRepository Repo Purge %s", + eventId.c_str()); repo->Delete(eventId); } } @@ -914,6 +623,3 @@ void ProvenanceRepository::run(ProvenanceRepository *repo) return; } - - - diff --git a/libminifi/src/PutFile.cpp b/libminifi/src/PutFile.cpp index 3f209ce3d7..e5328b9281 100644 --- a/libminifi/src/PutFile.cpp +++ b/libminifi/src/PutFile.cpp @@ -136,11 +136,13 @@ bool PutFile::putFile(ProcessSession *session, FlowFileRecord *flowFile, const s if (cb.commit()) { session->transfer(flowFile, Success); + return true; } else { session->transfer(flowFile, Failure); } + return false; } PutFile::ReadCallback::ReadCallback(const std::string &tmpFile, const std::string &destFile) diff --git a/libminifi/src/ResourceClaim.cpp b/libminifi/src/ResourceClaim.cpp index 3c22ac9d02..a82f647d0e 100644 --- a/libminifi/src/ResourceClaim.cpp +++ b/libminifi/src/ResourceClaim.cpp @@ -25,10 +25,14 @@ std::atomic ResourceClaim::_localResourceClaimNumber(0); + +std::string ResourceClaim::default_directory_path=DEFAULT_CONTENT_DIRECTORY; + ResourceClaim::ResourceClaim(const std::string contentDirectory) : _id(_localResourceClaimNumber.load()), _flowFileRecordOwnedCount(0) { + char uuidStr[37]; // Generate the global UUID for the resource claim diff --git a/libminifi/src/Serializable.cpp b/libminifi/src/Serializable.cpp new file mode 100644 index 0000000000..91330a0c75 --- /dev/null +++ b/libminifi/src/Serializable.cpp @@ -0,0 +1,365 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include +#include +#include +#include "Serializable.h" + +#define htonll_r(x) ((((uint64_t)htonl(x)) << 32) + htonl((x) >> 32)) + +bool EndiannessCheck::IS_LITTLE = EndiannessCheck::is_little_endian(); + +#define IS_ASCII(c) __builtin_expect(!!((c >= 1) && (c <= 127)),1) + +template +int Serializable::writeData(const T &t,DataStream *stream) { + uint8_t bytes[sizeof t]; + std::copy(static_cast(static_cast(&t)), + static_cast(static_cast(&t)) + sizeof t, + bytes); + return stream->writeData(bytes, sizeof t); +} + +template +int Serializable::writeData(const T &t, uint8_t *to_vec) { + std::copy(static_cast(static_cast(&t)), + static_cast(static_cast(&t)) + sizeof t, + to_vec); + return sizeof t; +} + +template +int Serializable::writeData(const T &t, std::vector &to_vec) { + uint8_t bytes[sizeof t]; + std::copy(static_cast(static_cast(&t)), + static_cast(static_cast(&t)) + sizeof t, + bytes); + to_vec.insert(to_vec.end(), &bytes[0], &bytes[sizeof t]); + return sizeof t; +} + + + + + +int Serializable::write(uint8_t value,DataStream *stream) { + return stream->writeData(&value, 1); +} +int Serializable::write(char value,DataStream *stream) { + return stream->writeData((uint8_t *) &value, 1); +} + +int Serializable::write(uint8_t *value, int len,DataStream *stream) { + return stream->writeData(value, len); +} + +int Serializable::write(bool value) { + uint8_t temp = value; + return write(temp); +} + +int Serializable::read(uint8_t &value,DataStream *stream) { + uint8_t buf; + + int ret = stream->readData(&buf, 1); + if (ret == 1) + value = buf; + return ret; +} + +int Serializable::read(char &value,DataStream *stream) { + uint8_t buf; + + int ret = stream->readData(&buf, 1); + if (ret == 1) + value = (char) buf; + return ret; +} + +int Serializable::read(uint8_t *value, int len,DataStream *stream) { + return stream->readData(value, len); +} + +int Serializable::read(uint16_t &value,DataStream *stream, bool is_little_endian) { + + return stream->readShort(value, is_little_endian); +} + +int Serializable::read(uint32_t &value,DataStream *stream, bool is_little_endian) { + + return stream->readLong(value, is_little_endian); + +} +int Serializable::read(uint64_t &value,DataStream *stream, bool is_little_endian) { + + return stream->readLongLong(value, is_little_endian); + +} + +int Serializable::write(uint32_t base_value,DataStream *stream, bool is_little_endian) { + + const uint32_t value = is_little_endian ? htonl(base_value) : base_value; + + return writeData(value,stream); +} + +int Serializable::write(uint64_t base_value,DataStream *stream, bool is_little_endian) { + + const uint64_t value = + is_little_endian == 1 ? htonll_r(base_value) : base_value; + return writeData(value,stream); +} + +int Serializable::write(uint16_t base_value,DataStream *stream, bool is_little_endian) { + + const uint16_t value = + is_little_endian == 1 ? htons(base_value) : base_value; + + return writeData(value,stream); +} + +int Serializable::readUTF(std::string &str,DataStream *stream, bool widen) { + uint32_t utflen; + int ret = 1; + + if (!widen) { + uint16_t shortLength = 0; + ret = read(shortLength,stream); + utflen = shortLength; + + if (ret <= 0) + return ret; + } else { + uint32_t len; + ret = read(len,stream); + if (ret <= 0) + return ret; + utflen = len; + } + + if (utflen == 0) + return 1; + + std::vector buf; + ret = stream->readData(buf, utflen); + + // The number of chars produced may be less than utflen + str = std::string((const char*)&buf[0],utflen); + + return utflen; + /* + if (!widen) + return (2 + utflen); + else + return (4 + utflen); + */ +} + +int Serializable::writeUTF(std::string str,DataStream *stream, bool widen) { + int inLength = str.length(); + uint32_t utflen = 0; + int currentPtr = 0; + + /* use charAt instead of copying String to char array */ + for (auto c : str) { + if (IS_ASCII(c)) { + utflen++; + }else if (c > 2047){ + utflen += 3; + } else { + utflen += 2; + } + } + + if (utflen > 65535) + return -1; + + if (utflen == 0) { + + if (!widen) { + uint16_t shortLen = utflen; + write(shortLen,stream); + } else { + + } + return 1; + } + + std::vector utf_to_write; + if (!widen) { + utf_to_write.resize(utflen); + + uint16_t shortLen = utflen; + + } else { + + utf_to_write.resize(utflen); + + } + + int i = 0; + + + uint8_t *underlyingPtr = &utf_to_write[0]; + for (auto c : str) { + if (IS_ASCII(c)) { + writeData(c, underlyingPtr++); + } else if (c > 2047){ + + auto t = (uint8_t) (((c >> 0x0C) & 15) | 192); + writeData(t, underlyingPtr++); + t = (uint8_t) (((c >> 0x06) & 63) | 128); + writeData(t, underlyingPtr++); + t = (uint8_t) (((c >> 0) & 63) | 128); + writeData(t, underlyingPtr++); + + } else { + auto t = (uint8_t) (((c >> 0x06) & 31) | 192); + writeData(t, underlyingPtr++); + currentPtr++; + t = (uint8_t) (((c >> 0x00) & 63) | 128); + writeData(t, underlyingPtr++); + currentPtr++; + + } + } + int ret; + + if (!widen) { + + uint16_t short_length = utflen; + write(short_length,stream); + + for (int i = 0; i < utflen; i++) { + } + for (auto c : utf_to_write) { + } + ret = stream->writeData(utf_to_write.data(), utflen); + } else { + utflen += 4; + write(utflen,stream); + ret = stream->writeData(utf_to_write.data(), utflen); + } + return ret; +} + +int DataStream::writeData(uint8_t *value, int size) { + + /*if (buffer.size() + size < buffer.capacity()) + { + buffer.resize( buffer.size() + size ); + } + */ + std::copy(value,value+size,std::back_inserter(buffer)); + + return size; +} + +int DataStream::readLongLong(uint64_t &value, bool is_little_endian) { + if ((8 + readBuffer) > buffer.size()) { + // if read exceed + return -1; + } + uint8_t *buf = &buffer[readBuffer]; + + if (is_little_endian) { + value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48) + | ((uint64_t) (buf[2] & 255) << 40) + | ((uint64_t) (buf[3] & 255) << 32) + | ((uint64_t) (buf[4] & 255) << 24) + | ((uint64_t) (buf[5] & 255) << 16) + | ((uint64_t) (buf[6] & 255) << 8) + | ((uint64_t) (buf[7] & 255) << 0); + } else { + value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8) + | ((uint64_t) (buf[2] & 255) << 16) + | ((uint64_t) (buf[3] & 255) << 24) + | ((uint64_t) (buf[4] & 255) << 32) + | ((uint64_t) (buf[5] & 255) << 40) + | ((uint64_t) (buf[6] & 255) << 48) + | ((uint64_t) (buf[7] & 255) << 56); + } + readBuffer += 8; + return 8; +} + +int DataStream::readLong(uint32_t &value, bool is_little_endian) { + if ((4 + readBuffer) > buffer.size()) { + // if read exceed + return -1; + } + uint8_t *buf = &buffer[readBuffer]; + + if (is_little_endian) { + value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3]; + } else { + value = buf[0] | buf[1] << 8 | buf[2] << 16 | buf[3] << 24; + + } + readBuffer += 4; + return 4; +} + +int DataStream::readShort(uint16_t &value, bool is_little_endian) { + + if ((2 + readBuffer) > buffer.size()) { + // if read exceed + return -1; + } + uint8_t *buf = &buffer[readBuffer]; + + if (is_little_endian) { + value = (buf[0] << 8) | buf[1]; + } else { + value = buf[0] | buf[1] << 8; + + } + readBuffer += 2; + return 2; +} + +int DataStream::readData(std::vector &buf,int buflen) { + if ((buflen + readBuffer) > buffer.size()) { + // if read exceed + return -1; + } + + if (buf.capacity() < buflen) + buf.resize(buflen); + + buf.insert(buf.begin(),&buffer[readBuffer],&buffer[readBuffer+buflen]); + + readBuffer += buflen; + return buflen; +} + + +int DataStream::readData(uint8_t *buf,int buflen) { + if ((buflen + readBuffer) > buffer.size()) { + // if read exceed + return -1; + } + + std::copy(&buffer[readBuffer],&buffer[readBuffer+buflen],buf); + + readBuffer += buflen; + return buflen; +} diff --git a/libminifi/src/Site2SitePeer.cpp b/libminifi/src/Site2SitePeer.cpp index fb20767857..3d6166bdd0 100644 --- a/libminifi/src/Site2SitePeer.cpp +++ b/libminifi/src/Site2SitePeer.cpp @@ -138,7 +138,7 @@ bool Site2SitePeer::Open() } // OpenSSL init - SSL_CTX *ctx = FlowController::getFlowController()->getSSLContext(); + SSL_CTX *ctx = FlowControllerFactory::getFlowController()->getSSLContext(); if (ctx) { // we have s2s secure config diff --git a/libminifi/test/Server.cpp b/libminifi/test/Server.cpp index f7bd3ddea7..65245f6f68 100644 --- a/libminifi/test/Server.cpp +++ b/libminifi/test/Server.cpp @@ -15,7 +15,6 @@ #include #include #include -#include // std::cout #include // std::ifstream #include diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h index a0950e1275..4e7e73fea1 100644 --- a/libminifi/test/TestBase.h +++ b/libminifi/test/TestBase.h @@ -18,8 +18,68 @@ #ifndef LIBMINIFI_TEST_TESTBASE_H_ #define LIBMINIFI_TEST_TESTBASE_H_ - +#include +#include +#include "ResourceClaim.h" #include "catch.hpp" +#include "Logger.h" +#include + + +class LogTestController { +public: + LogTestController(const std::string level = "debug") { + Logger::getLogger()->setLogLevel(level); + } + + + void enableDebug() + { + Logger::getLogger()->setLogLevel("debug"); + } + + ~LogTestController() { + Logger::getLogger()->setLogLevel(LOG_LEVEL_E::info); + } +}; + +class TestController{ +public: + + + + TestController() : log("info") + { + ResourceClaim::default_directory_path = "./"; + } + + ~TestController() + { + for(auto dir : directories) + { + rmdir(dir); + } + } + + void enableDebug() { + log.enableDebug(); + } + + char *createTempDirectory(char *format) + { + char *dir = mkdtemp(format); + return dir; + } + +protected: + LogTestController log; + std::vector directories; + + +}; + + + #endif /* LIBMINIFI_TEST_TESTBASE_H_ */ diff --git a/libminifi/test/unit/ProcessorTests.h b/libminifi/test/unit/ProcessorTests.h index 0cb6f6530b..0c178247eb 100644 --- a/libminifi/test/unit/ProcessorTests.h +++ b/libminifi/test/unit/ProcessorTests.h @@ -15,12 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include #include #include +#include "FlowController.h" +#include "ProvenanceTests.h" #include "../TestBase.h" #include "GetFile.h" +#ifndef PROCESSOR_TESTS +#define PROCESSOR_TESTS TEST_CASE("Test Creation of GetFile", "[getfileCreate]"){ GetFile processor("processorname"); @@ -30,24 +33,33 @@ TEST_CASE("Test Creation of GetFile", "[getfileCreate]"){ TEST_CASE("Test Find file", "[getfileCreate2]"){ + TestController testController; + + testController.enableDebug(); + + ProvenanceTestRepository repo; + TestFlowController controller(repo); + FlowControllerFactory::getFlowController( dynamic_cast(&controller)); GetFile processor("getfileCreate2"); char format[] ="/tmp/gt.XXXXXX"; - char *dir = mkdtemp(format); + char *dir = testController.createTempDirectory(format); + + uuid_t processoruuid; + REQUIRE( true == processor.getUUID(processoruuid) ); - Connection connection("emptyConnection"); + Connection connection("getfileCreate2Connection"); connection.setRelationship(Relationship("success","description")); // link the connections so that we can test results at the end for this connection.setSourceProcessor(&processor); - uuid_t processoruuid; - uuid_parse(processor.getUUIDStr().c_str(),processoruuid); connection.setSourceProcessorUUID(processoruuid); + connection.setDestinationProcessorUUID(processoruuid); processor.addConnection(&connection); REQUIRE( dir != NULL ); @@ -64,15 +76,15 @@ TEST_CASE("Test Find file", "[getfileCreate2]"){ processor.onTrigger(&context,&session); ProvenanceReporter *reporter = session.getProvenanceReporter(); - std::set records = reporter->getEvents(); + std::set records = reporter->getEvents(); - record = session.get(); + record = session.get(); REQUIRE( record== 0 ); REQUIRE( records.size() == 0 ); std::fstream file; std::stringstream ss; - ss << dir << "/" << "tstFile"; + ss << dir << "/" << "tstFile.ext"; file.open(ss.str(),std::ios::out); file << "tempFile"; file.close(); @@ -89,12 +101,54 @@ TEST_CASE("Test Find file", "[getfileCreate2]"){ for(ProvenanceEventRecord *provEventRecord : records) { - REQUIRE (provEventRecord->getComponentType() == processor.getName()); } + session.commit(); + + FlowFileRecord *ffr = session.get(); + + ffr->getResourceClaim()->decreaseFlowFileRecordOwnedCount(); + + delete ffr; + + std::set expiredFlows; + + REQUIRE( 2 == repo.getRepoMap().size() ); + + for(auto entry: repo.getRepoMap()) + { + ProvenanceEventRecord newRecord; + newRecord.DeSerialize((uint8_t*)entry.second.data(),entry.second.length()); + + bool found = false; + for ( auto provRec : records) + { + if (provRec->getEventId() == newRecord.getEventId() ) + { + REQUIRE( provRec->getEventId() == newRecord.getEventId()); + REQUIRE( provRec->getComponentId() == newRecord.getComponentId()); + REQUIRE( provRec->getComponentType() == newRecord.getComponentType()); + REQUIRE( provRec->getDetails() == newRecord.getDetails()); + REQUIRE( provRec->getEventDuration() == newRecord.getEventDuration()); + found = true; + break; + } + } + if (!found) + throw std::runtime_error("Did not find record"); + + + } + + } + + +#endif + + diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h index 2516ed9750..f67a826022 100644 --- a/libminifi/test/unit/ProvenanceTestHelper.h +++ b/libminifi/test/unit/ProvenanceTestHelper.h @@ -19,10 +19,17 @@ #define LIBMINIFI_TEST_UNIT_PROVENANCETESTHELPER_H_ #include "Provenance.h" +#include "FlowController.h" +/** + * Test repository + */ class ProvenanceTestRepository : public ProvenanceRepository { public: + ProvenanceTestRepository() +{ +} //! initialize bool initialize() { @@ -59,9 +66,72 @@ class ProvenanceTestRepository : public ProvenanceRepository return false; } } + + const std::map &getRepoMap() const + { + return repositoryResults; + } + protected: std::map repositoryResults; }; +class TestFlowController : public FlowController +{ + +public: + TestFlowController(ProvenanceTestRepository &repo) : ::FlowController() + { + _provenanceRepo = dynamic_cast(&repo); + } + ~TestFlowController() + { + + } + void load(){ + + } + + bool start() + { + _running.store(true); + return true; + } + + void stop(bool force) + { + _running.store(false); + } + void waitUnload(const uint64_t timeToWaitMs) + { + stop(true); + } + + void unload() + { + stop(true); + } + + void reload(std::string file) + { + + } + + bool isRunning() + { + return true; + } + + + Processor *createProcessor(std::string name, uuid_t uuid){ return 0;} + + ProcessGroup *createRootProcessGroup(std::string name, uuid_t uuid){ return 0;} + + ProcessGroup *createRemoteProcessGroup(std::string name, uuid_t uuid){ return 0; } + + Connection *createConnection(std::string name, uuid_t uuid){ return 0; } +}; + + #endif /* LIBMINIFI_TEST_UNIT_PROVENANCETESTHELPER_H_ */ diff --git a/libminifi/test/unit/ProvenanceTests.h b/libminifi/test/unit/ProvenanceTests.h index 63608df2f5..d78de47957 100644 --- a/libminifi/test/unit/ProvenanceTests.h +++ b/libminifi/test/unit/ProvenanceTests.h @@ -16,6 +16,9 @@ * limitations under the License. */ + +#ifndef PROVENANCE_TESTS +#define PROVENANCE_TESTS #include "../TestBase.h" #include "ProvenanceTestHelper.h" @@ -23,14 +26,71 @@ #include "FlowFileRecord.h" -TEST_CASE("Test Provenance record creation", "[TestProvenanceEventRecord]"){ - - ProvenanceEventRecord record1(ProvenanceEventRecord::ProvenanceEventType::CREATE,"blah","blahblah"); +TEST_CASE("Test Provenance record create", "[TestProvenanceEventRecord]"){ + ProvenanceEventRecord record1(ProvenanceEventRecord::ProvenanceEventType::CREATE,"blah","blahblah"); REQUIRE( record1.getAttributes().size() == 0); REQUIRE( record1.getAlternateIdentifierUri().length() == 0); } +TEST_CASE("Test Provenance record serialization", "[TestProvenanceEventRecordSerializeDeser]"){ + + ProvenanceEventRecord record1(ProvenanceEventRecord::ProvenanceEventType::CREATE,"componentid","componenttype"); + + std::string eventId = record1.getEventId(); + + std::string smileyface = ":)" ; + record1.setDetails(smileyface); + + ProvenanceTestRepository repo; + uint64_t sample = 65555; + ProvenanceRepository *testRepository = dynamic_cast(&repo); + record1.setEventDuration(sample); + + record1.Serialize(testRepository); + ProvenanceEventRecord record2; + REQUIRE( record2.DeSerialize(testRepository,eventId) == true); + REQUIRE( record2.getEventId() == record1.getEventId()); + REQUIRE( record2.getComponentId() == record1.getComponentId()); + REQUIRE( record2.getComponentType() == record1.getComponentType()); + REQUIRE( record2.getDetails() == record1.getDetails()); + REQUIRE( record2.getDetails() == smileyface); + REQUIRE( record2.getEventDuration() == sample); +} + + +TEST_CASE("Test Flowfile record added to provenance", "[TestFlowAndProv1]"){ + + ProvenanceEventRecord record1(ProvenanceEventRecord::ProvenanceEventType::CLONE,"componentid","componenttype"); + std::string eventId = record1.getEventId(); + std::map attributes; + attributes.insert(std::pair("potato","potatoe")); + attributes.insert(std::pair("tomato","tomatoe")); + FlowFileRecord ffr1(attributes); + + record1.addChildFlowFile(&ffr1); + + ProvenanceTestRepository repo; + uint64_t sample = 65555; + ProvenanceRepository *testRepository = dynamic_cast(&repo); + record1.setEventDuration(sample); + + record1.Serialize(testRepository); + ProvenanceEventRecord record2; + REQUIRE( record2.DeSerialize(testRepository,eventId) == true); + REQUIRE( record1.getChildrenUuids().size() == 1); + REQUIRE( record2.getChildrenUuids().size() == 1); + std::string childId = record2.getChildrenUuids().at(0); + REQUIRE( childId == ffr1.getUUIDStr()); + record2.removeChildUuid(childId); + REQUIRE( record2.getChildrenUuids().size() == 0); + + +} + + + +#endif diff --git a/main/CMakeLists.txt b/main/CMakeLists.txt index e27974b59e..23fb239899 100644 --- a/main/CMakeLists.txt +++ b/main/CMakeLists.txt @@ -52,3 +52,5 @@ install(TARGETS minifiexe RUNTIME DESTINATION bin COMPONENT bin) + + diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp index ea916cdbcc..9b99ee6a4c 100644 --- a/main/MiNiFiMain.cpp +++ b/main/MiNiFiMain.cpp @@ -17,7 +17,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include #include +#include #include #include #include @@ -34,7 +36,7 @@ //! Main thread sleep interval 1 second #define SLEEP_INTERVAL 1 //! Main thread stop wait time -#define STOP_WAIT_TIME 2 +#define STOP_WAIT_TIME_MS 30*1000 //! Default YAML location #define DEFAULT_NIFI_CONFIG_YML "./conf/config.yml" //! Default nifi properties file path @@ -43,23 +45,32 @@ #define MINIFI_HOME_ENV_KEY "MINIFI_HOME" /* Define Parser Values for Configuration YAML sections */ -#define CONFIG_YAML_FLOW_CONTROLLER_KEY "Flow Controller" #define CONFIG_YAML_PROCESSORS_KEY "Processors" +#define CONFIG_YAML_FLOW_CONTROLLER_KEY "Flow Controller" #define CONFIG_YAML_CONNECTIONS_KEY "Connections" #define CONFIG_YAML_REMOTE_PROCESSING_GROUPS_KEY "Remote Processing Groups" -//! Whether it is running -static bool running = false; +// Variables that allow us to avoid a timed wait. +sem_t *running; //! Flow Controller static FlowController *controller = NULL; +/** + * Removed the stop command from the signal handler so that we could trigger + * unload after we exit the semaphore controlled critical section in main. + * + * Semaphores are a portable choice when using signal handlers. Threads, + * mutexes, and condition variables are not guaranteed to work within + * a signal handler. Consequently we will use the semaphore to avoid thread + * safety issues and. + */ void sigHandler(int signal) { + if (signal == SIGINT || signal == SIGTERM) { - controller->stop(true); - sleep(STOP_WAIT_TIME); - running = false; + // avoid stopping the controller here. + sem_post(running); } } @@ -68,6 +79,19 @@ int main(int argc, char **argv) Logger *logger = Logger::getLogger(); logger->setLogLevel(info); + + uint16_t stop_wait_time = STOP_WAIT_TIME_MS; + + std::string graceful_shutdown_seconds = ""; + std::string configured_log_level = ""; + + running = sem_open("MiNiFiMain",O_CREAT,0644,0); + if (running == SEM_FAILED || running == 0) + { + + logger->log_error("could not initialize semaphore"); + perror("initialization failure"); + } // assumes POSIX compliant environment std::string minifiHome; if (const char* env_p = std::getenv(MINIFI_HOME_ENV_KEY)) @@ -85,6 +109,7 @@ int main(int argc, char **argv) minifiHome = minifiHomePath.substr(0, minifiHomePath.find_last_of("/\\")); //Remove /bin from path } + if (signal(SIGINT, sigHandler) == SIG_ERR || signal(SIGTERM, sigHandler) == SIG_ERR || signal(SIGPIPE, SIG_IGN) == SIG_ERR) { logger->log_error("Can not install signal handler"); @@ -95,25 +120,66 @@ int main(int argc, char **argv) configure->setHome(minifiHome); configure->loadConfigureFile(DEFAULT_NIFI_PROPERTIES_FILE); - controller = FlowController::getFlowController(); + + if (configure->get(Configure::nifi_graceful_shutdown_seconds,graceful_shutdown_seconds)) + { + try + { + stop_wait_time = std::stoi(graceful_shutdown_seconds); + } + catch(const std::out_of_range &e) + { + logger->log_error("%s is out of range. %s",Configure::nifi_graceful_shutdown_seconds,e.what()); + } + catch(const std::invalid_argument &e) + { + logger->log_error("%s contains an invalid argument set. %s",Configure::nifi_graceful_shutdown_seconds,e.what()); + } + } + else + { + logger->log_debug("%s not set, defaulting to %d",Configure::nifi_graceful_shutdown_seconds,STOP_WAIT_TIME_MS); + } + + if (configure->get(Configure::nifi_log_level,configured_log_level)) + { + std::cout << "log level is " << configured_log_level << std::endl; + logger->setLogLevel(configured_log_level); + + } + + + + controller = FlowControllerFactory::getFlowController(); // Load flow from specified configuration file controller->load(); // Start Processing the flow - controller->start(); - running = true; + controller->start(); logger->log_info("MiNiFi started"); - // main loop - while (running) - { - sleep(SLEEP_INTERVAL); - } + /** + * Sem wait provides us the ability to have a controlled + * yield without the need for a more complex construct and + * a spin lock + */ + if ( sem_wait(running) != -1 ) + perror("sem_wait"); + + + sem_unlink("MiNiFiMain"); + + /** + * Trigger unload -- wait stop_wait_time + */ + controller->waitUnload(stop_wait_time); - controller->unload(); delete controller; + logger->log_info("MiNiFi exit"); + + return 0; }