From affe73089009316ff229e0eb2cfeec89ae928cb7 Mon Sep 17 00:00:00 2001 From: Marc Parisi Date: Thu, 2 Feb 2017 19:41:38 -0500 Subject: [PATCH] MINIFI-187: Remove XML References and deps. Remove XML references and dependencies. This includes changing the reload functionality to use YAML instead of XML. --- libminifi/CMakeLists.txt | 9 - libminifi/include/FlowControlProtocol.h | 20 +- libminifi/include/FlowController.h | 31 +- libminifi/src/FlowControlProtocol.cpp | 33 +- libminifi/src/FlowController.cpp | 608 ++---------------------- libminifi/test/Server.cpp | 60 +-- main/CMakeLists.txt | 8 - main/MiNiFiMain.cpp | 2 +- 8 files changed, 96 insertions(+), 675 deletions(-) diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt index ff1634a3e2..f1e79e44e8 100644 --- a/libminifi/CMakeLists.txt +++ b/libminifi/CMakeLists.txt @@ -48,15 +48,6 @@ file(GLOB SPD_SOURCES "../include/spdlog/*") add_library(spdlog STATIC ${SPD_SOURCES}) add_library(minifi STATIC ${SOURCES}) -# Include libxml2 -find_package (LibXml2) -if (LIBXML2_FOUND) - include_directories(${LIBXML2_INCLUDE_DIR}) - target_link_libraries (minifi ${LIBXML2_LIBRARIES}) -else () - # Build from our local version -endif (LIBXML2_FOUND) - # Include LevelDB find_package (Leveldb REQUIRED) if (LEVELDB_FOUND) diff --git a/libminifi/include/FlowControlProtocol.h b/libminifi/include/FlowControlProtocol.h index 2e8cc7265f..ebf3c8ae1e 100644 --- a/libminifi/include/FlowControlProtocol.h +++ b/libminifi/include/FlowControlProtocol.h @@ -45,10 +45,10 @@ class FlowController; //! FlowControl Protocol Msg Type typedef enum { - REGISTER_REQ, // Device Register Request from device to server which contain device serial number, current running flow xml version - REGISTER_RESP, // Device Register Respond from server to device, may contain new flow.xml from server ask device to apply and also device report interval - REPORT_REQ, // Period Device Report from device to server which contain device serial number, current running flow xml name/version and other period report info - REPORT_RESP, // Report Respond from server to device, may ask device to update flow xml or processor property + REGISTER_REQ, // Device Register Request from device to server which contain device serial number, current running flow YAML version + REGISTER_RESP, // Device Register Respond from server to device, may contain new flow.YAML from server ask device to apply and also device report interval + REPORT_REQ, // Period Device Report from device to server which contain device serial number, current running flow YAML name/version and other period report info + REPORT_RESP, // Report Respond from server to device, may ask device to update flow YAML or processor property MAX_FLOW_CONTROL_MSG_TYPE } FlowControlMsgType; @@ -74,10 +74,10 @@ inline const char *FlowControlMsgTypeToStr(FlowControlMsgType type) typedef enum { //Fix length 8 bytes: client to server in register request, required field FLOW_SERIAL_NUMBER, - // Flow XML name TLV: client to server in register request and report request, required field - FLOW_XML_NAME, - // Flow XML content, TLV: server to client in register respond, option field in case server want to ask client to load xml from server - FLOW_XML_CONTENT, + // Flow YAML name TLV: client to server in register request and report request, required field + FLOW_YML_NAME, + // Flow YAML content, TLV: server to client in register respond, option field in case server want to ask client to load YAML from server + FLOW_YML_CONTENT, // Fix length, 4 bytes Report interval in msec: server to client in register respond, option field REPORT_INTERVAL, // Processor Name TLV: server to client in report respond, option field in case server want to ask client to update processor property @@ -95,8 +95,8 @@ typedef enum { static const char *FlowControlMsgIDStr[MAX_FLOW_MSG_ID] = { "FLOW_SERIAL_NUMBER", - "FLOW_XML_NAME", - "FLOW_XML_CONTENT", + "FLOW_YAML_NAME", + "FLOW_YAML_CONTENT", "REPORT_INTERVAL", "PROCESSOR_NAME" "PROPERTY_NAME", diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h index 9635becd98..35bcd0c5fc 100644 --- a/libminifi/include/FlowController.h +++ b/libminifi/include/FlowController.h @@ -28,8 +28,6 @@ #include #include #include -#include -#include #include #include "Configure.h" @@ -58,12 +56,9 @@ //! Default NiFi Root Group Name #define DEFAULT_ROOT_GROUP_NAME "" -#define DEFAULT_FLOW_XML_FILE_NAME "conf/flow.xml" #define DEFAULT_FLOW_YAML_FILE_NAME "conf/flow.yml" #define CONFIG_YAML_PROCESSORS_KEY "Processors" -enum class ConfigFormat { XML, YAML }; - struct ProcessorConfig { std::string name; std::string javaClass; @@ -143,20 +138,20 @@ class FlowController return this->_provenanceRepo; } //! Life Cycle related function - //! Load flow xml from disk, after that, create the root process group and its children, initialize the flows - void load(ConfigFormat format); + //! Load flow YAML from disk, after that, create the root process group and its children, initialize the flows + void load(); //! Whether the Flow Controller is start running bool isRunning(); - //! Whether the Flow Controller has already been initialized (loaded flow XML) + //! Whether the Flow Controller has already been initialized (loaded flow YAML) bool isInitialized(); //! Start to run the Flow Controller which internally start the root process group and all its children bool start(); //! Stop to run the Flow Controller which internally stop the root process group and all its children void stop(bool force); - //! Unload the current flow xml, clean the root process group and all its children + //! reload flow controller's configuration + void reload(std::string yamlFile); + //! Unload the current flow YAML, clean the root process group and all its children void unload(); - //! Load new xml - void reload(std::string xmlFile); //! update property value void updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue) { @@ -220,20 +215,8 @@ class FlowController Configure *_configure; //! Whether it is running std::atomic _running; - //! Whether it has already been initialized (load the flow XML already) + //! Whether it has already been initialized (load the flow YAML already) std::atomic _initialized; - //! Process Processor Node XML - void parseProcessorNode(xmlDoc *doc, xmlNode *processorNode, ProcessGroup *parent); - //! Process Port XML - void parsePort(xmlDoc *doc, xmlNode *processorNode, ProcessGroup *parent, TransferDirection direction); - //! Process Root Processor Group XML - void parseRootProcessGroup(xmlDoc *doc, xmlNode *node); - //! Process Property XML - void parseProcessorProperty(xmlDoc *doc, xmlNode *node, Processor *processor); - //! Process connection XML - void parseConnection(xmlDoc *doc, xmlNode *node, ProcessGroup *parent); - //! Process Remote Process Group - void parseRemoteProcessGroup(xmlDoc *doc, xmlNode *node, ProcessGroup *parent); //! Process Processor Node YAML void parseProcessorNodeYaml(YAML::Node processorNode, ProcessGroup *parent); diff --git a/libminifi/src/FlowControlProtocol.cpp b/libminifi/src/FlowControlProtocol.cpp index 011ebcfdfa..fee1a3b104 100644 --- a/libminifi/src/FlowControlProtocol.cpp +++ b/libminifi/src/FlowControlProtocol.cpp @@ -242,7 +242,6 @@ void FlowControlProtocol::run(FlowControlProtocol *protocol) { // if it is not register yet protocol->sendRegisterReq(); - // protocol->_controller->reload("flow.xml"); } else protocol->sendReportReq(); @@ -268,7 +267,7 @@ int FlowControlProtocol::sendRegisterReq() // Calculate the total payload msg size uint32_t payloadSize = FlowControlMsgIDEncodingLen(FLOW_SERIAL_NUMBER, 0) + - FlowControlMsgIDEncodingLen(FLOW_XML_NAME, this->_controller->getName().size()+1); + FlowControlMsgIDEncodingLen(FLOW_YML_NAME, this->_controller->getName().size()+1); uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize; uint8_t *data = new uint8_t[size]; @@ -289,8 +288,8 @@ int FlowControlProtocol::sendRegisterReq() data = this->encode(data, FLOW_SERIAL_NUMBER); data = this->encode(data, this->_serialNumber, 8); - // encode the XML name - data = this->encode(data, FLOW_XML_NAME); + // encode the YAML name + data = this->encode(data, FLOW_YML_NAME); data = this->encode(data, this->_controller->getName()); // send it @@ -347,25 +346,25 @@ int FlowControlProtocol::sendRegisterReq() _logger->log_info("Flow Control Protocol receive report interval %d ms", reportInterval); this->_reportInterval = reportInterval; } - else if (((FlowControlMsgID) msgID) == FLOW_XML_CONTENT) + else if (((FlowControlMsgID) msgID) == FLOW_YML_CONTENT) { - uint32_t xmlLen; - payloadPtr = this->decode(payloadPtr, xmlLen); - _logger->log_info("Flow Control Protocol receive XML content length %d", xmlLen); + uint32_t yamlLen; + payloadPtr = this->decode(payloadPtr, yamlLen); + _logger->log_info("Flow Control Protocol receive YAML content length %d", yamlLen); time_t rawtime; struct tm *timeinfo; time(&rawtime); timeinfo = localtime(&rawtime); - std::string xmlFileName = "flow."; - xmlFileName += asctime(timeinfo); - xmlFileName += ".xml"; + std::string yamlFileName = "flow."; + yamlFileName += asctime(timeinfo); + yamlFileName += ".yml"; std::ofstream fs; - fs.open(xmlFileName.c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc); + fs.open(yamlFileName.c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc); if (fs.is_open()) { - fs.write((const char *)payloadPtr, xmlLen); + fs.write((const char *)payloadPtr, yamlLen); fs.close(); - this->_controller->reload(xmlFileName.c_str()); + this->_controller->reload(yamlFileName.c_str()); } } else @@ -400,7 +399,7 @@ int FlowControlProtocol::sendReportReq() // Calculate the total payload msg size uint32_t payloadSize = - FlowControlMsgIDEncodingLen(FLOW_XML_NAME, this->_controller->getName().size()+1); + FlowControlMsgIDEncodingLen(FLOW_YML_NAME, this->_controller->getName().size()+1); uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize; uint8_t *data = new uint8_t[size]; @@ -417,8 +416,8 @@ int FlowControlProtocol::sendReportReq() data = this->encode(data, hdr.status); data = this->encode(data, hdr.payloadLen); - // encode the XML name - data = this->encode(data, FLOW_XML_NAME); + // encode the YAML name + data = this->encode(data, FLOW_YML_NAME); data = this->encode(data, this->_controller->getName()); // send it diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index dce9e344d1..bc073e0159 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -25,8 +25,6 @@ #include #include #include -#include -#include #include "FlowController.h" #include "ProcessContext.h" @@ -147,25 +145,6 @@ void FlowController::unload() return; } -void FlowController::reload(std::string xmlFile) -{ - _logger->log_info("Starting to reload Flow Controller with xml %s", xmlFile.c_str()); - stop(true); - unload(); - std::string oldxmlFile = this->_configurationFileName; - this->_configurationFileName = xmlFile; - load(ConfigFormat::XML); - start(); - if (!this->_root) - { - this->_configurationFileName = oldxmlFile; - _logger->log_info("Rollback Flow Controller to xml %s", oldxmlFile.c_str()); - stop(true); - unload(); - load(ConfigFormat::XML); - start(); - } -} Processor *FlowController::createProcessor(std::string name, uuid_t uuid) { @@ -233,154 +212,7 @@ Connection *FlowController::createConnection(std::string name, uuid_t uuid) return new Connection(name, uuid); } -void FlowController::parseConnection(xmlDoc *doc, xmlNode *node, ProcessGroup *parent) -{ - uuid_t uuid; - xmlNode *currentNode; - Connection *connection = NULL; - - if (!parent) - { - _logger->log_error("parseProcessNode: no parent group existed"); - return; - } - - // generate the random UIID - uuid_generate(uuid); - for (currentNode = node->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) - { - if (currentNode->type == XML_ELEMENT_NODE) - { - if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) - { - char *id = (char *) xmlNodeGetContent(currentNode); - if (id) { - _logger->log_debug("parseConnection: id => [%s]", id); - uuid_parse(id, uuid); - xmlFree(id); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) { - char *name = (char *) xmlNodeGetContent(currentNode); - if (name) { - _logger->log_debug("parseConnection: name => [%s]", name); - connection = this->createConnection(name, uuid); - if (connection == NULL) { - xmlFree(name); - return; - } - xmlFree(name); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "sourceId") == 0) { - char *id = (char *) xmlNodeGetContent(currentNode); - if (id) { - _logger->log_debug("parseConnection: sourceId => [%s]", id); - uuid_parse(id, uuid); - xmlFree(id); - if (connection) - connection->setSourceProcessorUUID(uuid); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "destinationId") == 0) { - char *id = (char *) xmlNodeGetContent(currentNode); - if (id) { - _logger->log_debug("parseConnection: destinationId => [%s]", id); - uuid_parse(id, uuid); - xmlFree(id); - if (connection) - connection->setDestinationProcessorUUID(uuid); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "maxWorkQueueSize") == 0) { - char *temp = (char *) xmlNodeGetContent(currentNode); - int64_t maxWorkQueueSize = 0; - if (temp) { - if (Property::StringToInt(temp, maxWorkQueueSize)) { - _logger->log_debug("parseConnection: maxWorkQueueSize => [%d]", maxWorkQueueSize); - if (connection) - connection->setMaxQueueSize(maxWorkQueueSize); - - } - xmlFree(temp); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "maxWorkQueueDataSize") == 0) { - char *temp = (char *) xmlNodeGetContent(currentNode); - int64_t maxWorkQueueDataSize = 0; - if (temp) { - if (Property::StringToInt(temp, maxWorkQueueDataSize)) { - _logger->log_debug("parseConnection: maxWorkQueueDataSize => [%d]", maxWorkQueueDataSize); - if (connection) - connection->setMaxQueueDataSize(maxWorkQueueDataSize); - - } - xmlFree(temp); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "relationship") == 0) { - char *temp = (char *) xmlNodeGetContent(currentNode); - if (temp) { - std::string relationshipName = temp; - if (!relationshipName.empty()) { - Relationship relationship(relationshipName, ""); - _logger->log_debug("parseConnection: relationship => [%s]", relationshipName.c_str()); - if (connection) - connection->setRelationship(relationship); - } else { - Relationship empty; - _logger->log_debug("parseConnection: relationship => [%s]", empty.getName().c_str()); - if (connection) - connection->setRelationship(empty); - } - xmlFree(temp); - } - } - } // if (currentNode->type == XML_ELEMENT_NODE) - } // for node - - if (connection) - parent->addConnection(connection); - - return; -} - -void FlowController::parseRootProcessGroup(xmlDoc *doc, xmlNode *node) { - uuid_t uuid; - xmlNode *currentNode; - ProcessGroup *group = NULL; - - // generate the random UIID - uuid_generate(uuid); - - for (currentNode = node->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) { - if (currentNode->type == XML_ELEMENT_NODE) { - if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) { - char *id = (char *) xmlNodeGetContent(currentNode); - if (id) { - _logger->log_debug("parseRootProcessGroup: id => [%s]", id); - uuid_parse(id, uuid); - xmlFree(id); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) { - char *name = (char *) xmlNodeGetContent(currentNode); - if (name) { - _logger->log_debug("parseRootProcessGroup: name => [%s]", name); - group = this->createRootProcessGroup(name, uuid); - if (group == NULL) { - xmlFree(name); - return; - } - // Set the root process group - this->_root = group; - this->_name = name; - xmlFree(name); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "processor") == 0) { - this->parseProcessorNode(doc, currentNode, group); - } else if (xmlStrcmp(currentNode->name, BAD_CAST "connection") == 0) { - this->parseConnection(doc, currentNode, group); - } else if (xmlStrcmp(currentNode->name, BAD_CAST "remoteProcessGroup") == 0) { - this->parseRemoteProcessGroup(doc, currentNode, group); - } - } // if (currentNode->type == XML_ELEMENT_NODE) - } // for node -} void FlowController::parseRootProcessGroupYaml(YAML::Node rootFlowNode) { uuid_t uuid; @@ -703,113 +535,6 @@ void FlowController::parseConnectionYaml(YAML::Node *connectionsNode, ProcessGro } } -void FlowController::parseRemoteProcessGroup(xmlDoc *doc, xmlNode *node, ProcessGroup *parent) { - uuid_t uuid; - xmlNode *currentNode; - ProcessGroup *group = NULL; - int64_t yieldPeriod = -1; - int64_t timeOut = -1; - -// generate the random UIID - uuid_generate(uuid); - - for (currentNode = node->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) { - if (currentNode->type == XML_ELEMENT_NODE) { - if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) { - char *id = (char *) xmlNodeGetContent(currentNode); - if (id) { - _logger->log_debug("parseRootProcessGroup: id => [%s]", id); - uuid_parse(id, uuid); - xmlFree(id); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) { - char *name = (char *) xmlNodeGetContent(currentNode); - if (name) { - _logger->log_debug("parseRemoteProcessGroup: name => [%s]", name); - group = this->createRemoteProcessGroup(name, uuid); - if (group == NULL) { - xmlFree(name); - return; - } - group->setParent(parent); - parent->addProcessGroup(group); - xmlFree(name); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "yieldPeriod") == 0) { - TimeUnit unit; - char *temp = (char *) xmlNodeGetContent(currentNode); - if (temp) { - if (Property::StringToTime(temp, yieldPeriod, unit) - && Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod) && group) { - _logger->log_debug("parseRemoteProcessGroup: yieldPeriod => [%d] ms", yieldPeriod); - group->setYieldPeriodMsec(yieldPeriod); - } - xmlFree(temp); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "timeout") == 0) { - TimeUnit unit; - char *temp = (char *) xmlNodeGetContent(currentNode); - if (temp) { - if (Property::StringToTime(temp, timeOut, unit) - && Property::ConvertTimeUnitToMS(timeOut, unit, timeOut) && group) { - _logger->log_debug("parseRemoteProcessGroup: timeOut => [%d] ms", timeOut); - group->setTimeOut(timeOut); - } - xmlFree(temp); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "transmitting") == 0) { - char *temp = (char *) xmlNodeGetContent(currentNode); - bool transmitting; - if (temp) { - if (Property::StringToBool(temp, transmitting) && group) { - _logger->log_debug("parseRemoteProcessGroup: transmitting => [%d]", transmitting); - group->setTransmitting(transmitting); - } - xmlFree(temp); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "inputPort") == 0 && group) { - this->parsePort(doc, currentNode, group, SEND); - } else if (xmlStrcmp(currentNode->name, BAD_CAST "outputPort") == 0 && group) { - this->parsePort(doc, currentNode, group, RECEIVE); - } - } // if (currentNode->type == XML_ELEMENT_NODE) - } // for node -} - -void FlowController::parseProcessorProperty(xmlDoc *doc, xmlNode *node, Processor *processor) { - xmlNode *currentNode; - std::string propertyValue; - std::string propertyName; - - if (!processor) { - _logger->log_error("parseProcessorProperty: no parent processor existed"); - return; - } - - for (currentNode = node->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) { - if (currentNode->type == XML_ELEMENT_NODE) { - if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) { - char *name = (char *) xmlNodeGetContent(currentNode); - if (name) { - _logger->log_debug("parseProcessorNode: name => [%s]", name); - propertyName = name; - xmlFree(name); - } - } - if (xmlStrcmp(currentNode->name, BAD_CAST "value") == 0) { - char *value = (char *) xmlNodeGetContent(currentNode); - if (value) { - _logger->log_debug("parseProcessorNode: value => [%s]", value); - propertyValue = value; - xmlFree(value); - } - } - if (!propertyName.empty() && !propertyValue.empty()) { - processor->setProperty(propertyName, propertyValue); - } - } // if (currentNode->type == XML_ELEMENT_NODE) - } // for node -} void FlowController::parsePortYaml(YAML::Node *portNode, ProcessGroup *parent, TransferDirection direction) { uuid_t uuid; @@ -858,237 +583,6 @@ void FlowController::parsePortYaml(YAML::Node *portNode, ProcessGroup *parent, T } -void FlowController::parsePort(xmlDoc *doc, xmlNode *processorNode, ProcessGroup *parent, TransferDirection direction) { - char *id = NULL; - char *name = NULL; - uuid_t uuid; - xmlNode *currentNode; - Processor *processor = NULL; - RemoteProcessorGroupPort *port = NULL; - - if (!parent) { - _logger->log_error("parseProcessNode: no parent group existed"); - return; - } -// generate the random UIID - uuid_generate(uuid); - - for (currentNode = processorNode->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) { - if (currentNode->type == XML_ELEMENT_NODE) { - if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) { - id = (char *) xmlNodeGetContent(currentNode); - if (id) { - _logger->log_debug("parseProcessorNode: id => [%s]", id); - uuid_parse(id, uuid); - xmlFree(id); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) { - name = (char *) xmlNodeGetContent(currentNode); - if (name) { - _logger->log_debug("parseProcessorNode: name => [%s]", name); - port = new RemoteProcessorGroupPort(name, uuid); - processor = (Processor *) port; - if (processor == NULL) { - xmlFree(name); - return; - } - port->setDirection(direction); - port->setTimeOut(parent->getTimeOut()); - port->setTransmitting(parent->getTransmitting()); - processor->setYieldPeriodMsec(parent->getYieldPeriodMsec()); - processor->initialize(); - // add processor to parent - parent->addProcessor(processor); - xmlFree(name); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "scheduledState") == 0) { - char *temp = (char *) xmlNodeGetContent(currentNode); - if (temp) { - std::string state = temp; - if (state == "DISABLED") { - _logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str()); - processor->setScheduledState(DISABLED); - } - if (state == "STOPPED") { - _logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str()); - processor->setScheduledState(STOPPED); - } - if (state == "RUNNING") { - _logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str()); - processor->setScheduledState(RUNNING); - } - xmlFree(temp); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "maxConcurrentTasks") == 0) { - char *temp = (char *) xmlNodeGetContent(currentNode); - if (temp) { - int64_t maxConcurrentTasks; - if (Property::StringToInt(temp, maxConcurrentTasks)) { - _logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks); - processor->setMaxConcurrentTasks(maxConcurrentTasks); - } - xmlFree(temp); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "property") == 0) { - this->parseProcessorProperty(doc, currentNode, processor); - } - } // if (currentNode->type == XML_ELEMENT_NODE) - } // while node -} - -void FlowController::parseProcessorNode(xmlDoc *doc, xmlNode *processorNode, ProcessGroup *parent) { - char *id = NULL; - char *name = NULL; - int64_t schedulingPeriod = -1; - int64_t penalizationPeriod = -1; - int64_t yieldPeriod = -1; - bool lossTolerant = false; - int64_t runDurationNanos = -1; - uuid_t uuid; - xmlNode *currentNode; - Processor *processor = NULL; - - if (!parent) { - _logger->log_error("parseProcessNode: no parent group existed"); - return; - } -// generate the random UIID - uuid_generate(uuid); - - for (currentNode = processorNode->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) { - if (currentNode->type == XML_ELEMENT_NODE) { - if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) { - id = (char *) xmlNodeGetContent(currentNode); - if (id) { - _logger->log_debug("parseProcessorNode: id => [%s]", id); - uuid_parse(id, uuid); - xmlFree(id); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) { - name = (char *) xmlNodeGetContent(currentNode); - if (name) { - _logger->log_debug("parseProcessorNode: name => [%s]", name); - processor = this->createProcessor(name, uuid); - if (processor == NULL) { - xmlFree(name); - return; - } - // add processor to parent - parent->addProcessor(processor); - xmlFree(name); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "schedulingPeriod") == 0) { - TimeUnit unit; - char *temp = (char *) xmlNodeGetContent(currentNode); - if (temp) { - if (Property::StringToTime(temp, schedulingPeriod, unit) - && Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) { - _logger->log_debug("parseProcessorNode: schedulingPeriod => [%d] ns", schedulingPeriod); - processor->setSchedulingPeriodNano(schedulingPeriod); - } - xmlFree(temp); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "penalizationPeriod") == 0) { - TimeUnit unit; - char *temp = (char *) xmlNodeGetContent(currentNode); - if (temp) { - if (Property::StringToTime(temp, penalizationPeriod, unit) - && Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) { - _logger->log_debug("parseProcessorNode: penalizationPeriod => [%d] ms", penalizationPeriod); - processor->setPenalizationPeriodMsec(penalizationPeriod); - } - xmlFree(temp); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "yieldPeriod") == 0) { - TimeUnit unit; - char *temp = (char *) xmlNodeGetContent(currentNode); - if (temp) { - if (Property::StringToTime(temp, yieldPeriod, unit) - && Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) { - _logger->log_debug("parseProcessorNode: yieldPeriod => [%d] ms", yieldPeriod); - processor->setYieldPeriodMsec(yieldPeriod); - } - xmlFree(temp); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "lossTolerant") == 0) { - char *temp = (char *) xmlNodeGetContent(currentNode); - if (temp) { - if (Property::StringToBool(temp, lossTolerant)) { - _logger->log_debug("parseProcessorNode: lossTolerant => [%d]", lossTolerant); - processor->setlossTolerant(lossTolerant); - } - xmlFree(temp); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "scheduledState") == 0) { - char *temp = (char *) xmlNodeGetContent(currentNode); - if (temp) { - std::string state = temp; - if (state == "DISABLED") { - _logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str()); - processor->setScheduledState(DISABLED); - } - if (state == "STOPPED") { - _logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str()); - processor->setScheduledState(STOPPED); - } - if (state == "RUNNING") { - _logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str()); - processor->setScheduledState(RUNNING); - } - xmlFree(temp); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "schedulingStrategy") == 0) { - char *temp = (char *) xmlNodeGetContent(currentNode); - if (temp) { - std::string strategy = temp; - if (strategy == "TIMER_DRIVEN") { - _logger->log_debug("parseProcessorNode: scheduledStrategy => [%s]", strategy.c_str()); - processor->setSchedulingStrategy(TIMER_DRIVEN); - } - if (strategy == "EVENT_DRIVEN") { - _logger->log_debug("parseProcessorNode: scheduledStrategy => [%s]", strategy.c_str()); - processor->setSchedulingStrategy(EVENT_DRIVEN); - } - xmlFree(temp); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "maxConcurrentTasks") == 0) { - char *temp = (char *) xmlNodeGetContent(currentNode); - if (temp) { - int64_t maxConcurrentTasks; - if (Property::StringToInt(temp, maxConcurrentTasks)) { - _logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks); - processor->setMaxConcurrentTasks(maxConcurrentTasks); - } - xmlFree(temp); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "runDurationNanos") == 0) { - char *temp = (char *) xmlNodeGetContent(currentNode); - if (temp) { - if (Property::StringToInt(temp, runDurationNanos)) { - _logger->log_debug("parseProcessorNode: runDurationNanos => [%d]", runDurationNanos); - processor->setRunDurationNano(runDurationNanos); - } - xmlFree(temp); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "autoTerminatedRelationship") == 0) { - char *temp = (char *) xmlNodeGetContent(currentNode); - if (temp) { - std::string relationshipName = temp; - Relationship relationship(relationshipName, ""); - std::set relationships; - - relationships.insert(relationship); - processor->setAutoTerminatedRelationships(relationships); - _logger->log_debug("parseProcessorNode: autoTerminatedRelationship => [%s]", - relationshipName.c_str()); - xmlFree(temp); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "property") == 0) { - this->parseProcessorProperty(doc, currentNode, processor); - } - } // if (currentNode->type == XML_ELEMENT_NODE) - } // while node -} void FlowController::parsePropertiesNodeYaml(YAML::Node *propertiesNode, Processor *processor) { @@ -1108,87 +602,49 @@ void FlowController::parsePropertiesNodeYaml(YAML::Node *propertiesNode, Process } } -void FlowController::load(ConfigFormat configFormat) { +void FlowController::load() { if (_running) { stop(true); } if (!_initialized) { _logger->log_info("Load Flow Controller from file %s", _configurationFileName.c_str()); - if (ConfigFormat::XML == configFormat) { - _logger->log_info("Detected an XML configuration file for processing."); - xmlDoc *doc = xmlReadFile(_configurationFileName.c_str(), NULL, XML_PARSE_NONET); - if (doc == NULL) { - _logger->log_error("xmlReadFile returned NULL when reading [%s]", _configurationFileName.c_str()); - _initialized = true; - return; - } + YAML::Node flow = YAML::LoadFile(_configurationFileName); - xmlNode *root = xmlDocGetRootElement(doc); + YAML::Node flowControllerNode = flow["Flow Controller"]; + YAML::Node processorsNode = flow[CONFIG_YAML_PROCESSORS_KEY]; + YAML::Node connectionsNode = flow["Connections"]; + YAML::Node remoteProcessingGroupNode = flow["Remote Processing Groups"]; - if (root == NULL) { - _logger->log_error("Can not get root from XML doc %s", _configurationFileName.c_str()); - xmlFreeDoc(doc); - xmlCleanupParser(); - } + // Create the root process group + parseRootProcessGroupYaml(flowControllerNode); + parseProcessorNodeYaml(processorsNode, this->_root); + parseRemoteProcessGroupYaml(&remoteProcessingGroupNode, this->_root); + parseConnectionYaml(&connectionsNode, this->_root); - if (xmlStrcmp(root->name, BAD_CAST "flowController") != 0) { - _logger->log_error("Root name is not flowController for XML doc %s", _configurationFileName.c_str()); - xmlFreeDoc(doc); - xmlCleanupParser(); - return; - } + _initialized = true; - xmlNode *currentNode; - - for (currentNode = root->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) { - if (currentNode->type == XML_ELEMENT_NODE) { - if (xmlStrcmp(currentNode->name, BAD_CAST "rootGroup") == 0) { - this->parseRootProcessGroup(doc, currentNode); - } else if (xmlStrcmp(currentNode->name, BAD_CAST "maxTimerDrivenThreadCount") == 0) { - char *temp = (char *) xmlNodeGetContent(currentNode); - int64_t maxTimerDrivenThreadCount; - if (temp) { - if (Property::StringToInt(temp, maxTimerDrivenThreadCount)) { - _logger->log_debug("maxTimerDrivenThreadCount => [%d]", maxTimerDrivenThreadCount); - this->_maxTimerDrivenThreads = maxTimerDrivenThreadCount; - } - xmlFree(temp); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "maxEventDrivenThreadCount") == 0) { - char *temp = (char *) xmlNodeGetContent(currentNode); - int64_t maxEventDrivenThreadCount; - if (temp) { - if (Property::StringToInt(temp, maxEventDrivenThreadCount)) { - _logger->log_debug("maxEventDrivenThreadCount => [%d]", maxEventDrivenThreadCount); - this->_maxEventDrivenThreads = maxEventDrivenThreadCount; - } - xmlFree(temp); - } - } - } // type == XML_ELEMENT_NODE - } // for - - xmlFreeDoc(doc); - xmlCleanupParser(); - _initialized = true; - } else if (ConfigFormat::YAML == configFormat) { - YAML::Node flow = YAML::LoadFile(_configurationFileName); - - YAML::Node flowControllerNode = flow["Flow Controller"]; - YAML::Node processorsNode = flow[CONFIG_YAML_PROCESSORS_KEY]; - YAML::Node connectionsNode = flow["Connections"]; - YAML::Node remoteProcessingGroupNode = flow["Remote Processing Groups"]; - - // Create the root process group - parseRootProcessGroupYaml(flowControllerNode); - parseProcessorNodeYaml(processorsNode, this->_root); - parseRemoteProcessGroupYaml(&remoteProcessingGroupNode, this->_root); - parseConnectionYaml(&connectionsNode, this->_root); - - _initialized = true; - } + } +} + +void FlowController::reload(std::string yamlFile) +{ + _logger->log_info("Starting to reload Flow Controller with yaml %s", yamlFile.c_str()); + stop(true); + unload(); + std::string oldYamlFile = this->_configurationFileName; + this->_configurationFileName = yamlFile; + load(); + start(); + if (!this->_root) + { + this->_configurationFileName = oldYamlFile; + _logger->log_info("Rollback Flow Controller to YAML %s", oldYamlFile.c_str()); + stop(true); + unload(); + load(); + start(); } } diff --git a/libminifi/test/Server.cpp b/libminifi/test/Server.cpp index e7b3452c75..f7bd3ddea7 100644 --- a/libminifi/test/Server.cpp +++ b/libminifi/test/Server.cpp @@ -25,10 +25,10 @@ //! FlowControl Protocol Msg Type typedef enum { - REGISTER_REQ, // Device Register Request from device to server which contain device serial number, current running flow xml version - REGISTER_RESP, // Device Register Respond from server to device, may contain new flow.xml from server ask device to apply and also device report interval - REPORT_REQ, // Period Device Report from device to server which contain device serial number, current running flow xml name/version and other period report info - REPORT_RESP, // Report Respond from server to device, may ask device to update flow xml or processor property + REGISTER_REQ, // Device Register Request from device to server which contain device serial number, current running flow YAML version + REGISTER_RESP, // Device Register Respond from server to device, may contain new flow.yml from server ask device to apply and also device report interval + REPORT_REQ, // Period Device Report from device to server which contain device serial number, current running flow YAML name/version and other period report info + REPORT_RESP, // Report Respond from server to device, may ask device to update flow YAML or processor property MAX_FLOW_CONTROL_MSG_TYPE } FlowControlMsgType; @@ -54,10 +54,10 @@ inline const char *FlowControlMsgTypeToStr(FlowControlMsgType type) typedef enum { //Fix length 8 bytes: client to server in register request, required field FLOW_SERIAL_NUMBER, - // Flow XML name TLV: client to server in register request and report request, required field - FLOW_XML_NAME, - // Flow XML content, TLV: server to client in register respond, option field in case server want to ask client to load xml from server - FLOW_XML_CONTENT, + // Flow YAML name TLV: client to server in register request and report request, required field + FLOW_YAML_NAME, + // Flow YAML content, TLV: server to client in register respond, option field in case server want to ask client to load YAML from server + FLOW_YAML_CONTENT, // Fix length, 4 bytes Report interval in msec: server to client in register respond, option field REPORT_INTERVAL, // Processor Name TLV: server to client in report respond, option field in case server want to ask client to update processor property @@ -75,8 +75,8 @@ typedef enum { static const char *FlowControlMsgIDStr[MAX_FLOW_MSG_ID] = { "FLOW_SERIAL_NUMBER", - "FLOW_XML_NAME", - "FLOW_XML_CONTENT", + "FLOW_YAML_NAME", + "FLOW_YAML_CONTENT", "REPORT_INTERVAL", "PROCESSOR_NAME" "PROPERTY_NAME", @@ -308,9 +308,9 @@ int readHdr(int socket, FlowControlProtocolHeader *hdr) return sizeof(FlowControlProtocolHeader); } -int readXML(char **xmlContent) +int readYAML(char **ymlContent) { - std::ifstream is ("conf/flowServer.xml", std::ifstream::binary); + std::ifstream is ("conf/flowServer.yml", std::ifstream::binary); if (is) { // get length of file: is.seekg (0, is.end); @@ -319,14 +319,14 @@ int readXML(char **xmlContent) char * buffer = new char [length]; - printf("Reading %s len %d\n", "conf/flowServer.xml", length); + printf("Reading %s len %d\n", "conf/flowServer.yml", length); // read data as a block: is.read (buffer,length); is.close(); // ...buffer contains the entire file... - *xmlContent = buffer; + *ymlContent = buffer; return length; } @@ -416,14 +416,14 @@ int main(int argc, char *argv[]) printf("Flow Control Protocol Register Req receive serial num\n"); payloadPtr += 8; } - else if (((FlowControlMsgID) msgID) == FLOW_XML_NAME) + else if (((FlowControlMsgID) msgID) == FLOW_YAML_NAME) { uint32_t len; payloadPtr = decode(payloadPtr, len); - printf("Flow Control Protocol receive XML name length %d\n", len); + printf("Flow Control Protocol receive YAML name length %d\n", len); std::string flowName = (const char *) payloadPtr; payloadPtr += len; - printf("Flow Control Protocol receive XML name %s\n", flowName.c_str()); + printf("Flow Control Protocol receive YAML name %s\n", flowName.c_str()); } else { @@ -433,11 +433,11 @@ int main(int argc, char *argv[]) delete[] payload; // Send Register Respond // Calculate the total payload msg size - char *xmlContent; - uint32_t xmlLen = readXML(&xmlContent); + char *ymlContent; + uint32_t yamlLen = readYAML(&ymlContent); uint32_t payloadSize = FlowControlMsgIDEncodingLen(REPORT_INTERVAL, 0); - if (xmlLen > 0) - payloadSize += FlowControlMsgIDEncodingLen(FLOW_XML_CONTENT, xmlLen); + if (yamlLen > 0) + payloadSize += FlowControlMsgIDEncodingLen(FLOW_YAML_CONTENT, yamlLen); uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize; uint8_t *data = new uint8_t[size]; @@ -456,13 +456,13 @@ int main(int argc, char *argv[]) data = encode(data, REPORT_INTERVAL); data = encode(data, DEFAULT_REPORT_INTERVAL); - // encode the XML content - if (xmlLen > 0) + // encode the YAML content + if (yamlLen > 0) { - data = encode(data, FLOW_XML_CONTENT); - data = encode(data, xmlLen); - data = encode(data, (uint8_t *) xmlContent, xmlLen); - delete[] xmlContent; + data = encode(data, FLOW_YAML_CONTENT); + data = encode(data, yamlLen); + data = encode(data, (uint8_t *) ymlContent, yamlLen); + delete[] ymlContent; } // send it @@ -479,14 +479,14 @@ int main(int argc, char *argv[]) { uint32_t msgID = 0xFFFFFFFF; payloadPtr = decode(payloadPtr, msgID); - if (((FlowControlMsgID) msgID) == FLOW_XML_NAME) + if (((FlowControlMsgID) msgID) == FLOW_YAML_NAME) { uint32_t len; payloadPtr = decode(payloadPtr, len); - printf("Flow Control Protocol receive XML name length %d\n", len); + printf("Flow Control Protocol receive YAML name length %d\n", len); std::string flowName = (const char *) payloadPtr; payloadPtr += len; - printf("Flow Control Protocol receive XML name %s\n", flowName.c_str()); + printf("Flow Control Protocol receive YAML name %s\n", flowName.c_str()); } else { diff --git a/main/CMakeLists.txt b/main/CMakeLists.txt index 3e0bbc441f..82b5102dac 100644 --- a/main/CMakeLists.txt +++ b/main/CMakeLists.txt @@ -28,14 +28,6 @@ include_directories(../include ../libminifi/include ../thirdparty/yaml-cpp-yaml- find_package(Boost REQUIRED) include_directories(${Boost_INCLUDE_DIRS}) -# Include libxml2 -find_package(LibXml2) -if (LIBXML2_FOUND) - include_directories(${LIBXML2_INCLUDE_DIR}) -else () - # Build from our local version -endif (LIBXML2_FOUND) - add_executable(minifiexe MiNiFiMain.cpp) if(THREADS_HAVE_PTHREAD_ARG) target_compile_options(PUBLIC minifiexe "-pthread") diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp index 020fbbd0c2..ea916cdbcc 100644 --- a/main/MiNiFiMain.cpp +++ b/main/MiNiFiMain.cpp @@ -98,7 +98,7 @@ int main(int argc, char **argv) controller = FlowController::getFlowController(); // Load flow from specified configuration file - controller->load(ConfigFormat::YAML); + controller->load(); // Start Processing the flow controller->start(); running = true;