From 851e1f2a3c9bcd4fd7c8fd7db5cbbe720258dc46 Mon Sep 17 00:00:00 2001 From: Arpad Boda Date: Tue, 27 Nov 2018 13:29:34 +0100 Subject: [PATCH] MINIFICPP-686 - Move static property reads to onSchedule function of processors. --- libminifi/include/processors/AppendHostInfo.h | 9 +++- .../include/processors/GenerateFlowFile.h | 15 +++++- libminifi/src/processors/AppendHostInfo.cpp | 21 ++++----- libminifi/src/processors/GenerateFlowFile.cpp | 47 +++++++++---------- 4 files changed, 52 insertions(+), 40 deletions(-) diff --git a/libminifi/include/processors/AppendHostInfo.h b/libminifi/include/processors/AppendHostInfo.h index 17f63895c0..bc005b9d3d 100644 --- a/libminifi/include/processors/AppendHostInfo.h +++ b/libminifi/include/processors/AppendHostInfo.h @@ -60,15 +60,20 @@ class AppendHostInfo : public core::Processor { public: // OnTrigger method, implemented by NiFi AppendHostInfo - virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session); + void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override; // Initialize, over write by NiFi AppendHostInfo - virtual void initialize(void); + void initialize(void) override; + + void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) override; protected: private: // Logger std::shared_ptr logger_; + std::string hostAttribute_; + std::string ipAttribute_; + std::string iface_; }; REGISTER_RESOURCE(AppendHostInfo); diff --git a/libminifi/include/processors/GenerateFlowFile.h b/libminifi/include/processors/GenerateFlowFile.h index d3c4c444ea..e7ed54a4d2 100644 --- a/libminifi/include/processors/GenerateFlowFile.h +++ b/libminifi/include/processors/GenerateFlowFile.h @@ -42,6 +42,10 @@ class GenerateFlowFile : public core::Processor { : Processor(name, uuid) { _data = NULL; _dataSize = 0; + _batchSize = 1; + _fileSize = 1024; + _uniqueFlowFile = true; + _textData = false; } // Destructor virtual ~GenerateFlowFile() { @@ -78,9 +82,11 @@ class GenerateFlowFile : public core::Processor { public: // OnTrigger method, implemented by NiFi GenerateFlowFile - virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session); + void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override; // Initialize, over write by NiFi GenerateFlowFile - virtual void initialize(void); + void initialize(void) override; + + void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) override; protected: @@ -89,6 +95,11 @@ class GenerateFlowFile : public core::Processor { char * _data; // Size of the generated data uint64_t _dataSize; + uint64_t _batchSize; + uint64_t _fileSize; + bool _uniqueFlowFile; + bool _textData; + }; REGISTER_RESOURCE(GenerateFlowFile); diff --git a/libminifi/src/processors/AppendHostInfo.cpp b/libminifi/src/processors/AppendHostInfo.cpp index 8769de2be7..c0bfdba182 100644 --- a/libminifi/src/processors/AppendHostInfo.cpp +++ b/libminifi/src/processors/AppendHostInfo.cpp @@ -67,35 +67,34 @@ void AppendHostInfo::initialize() { setSupportedRelationships(relationships); } +void AppendHostInfo::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { + context->getProperty(HostAttribute.getName(), hostAttribute_); + context->getProperty(InterfaceName.getName(), iface_); + context->getProperty(IPAttribute.getName(), ipAttribute_); +} + void AppendHostInfo::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { std::shared_ptr flow = session->get(); if (!flow) return; // Get Hostname - - std::string hostAttribute = ""; - context->getProperty(HostAttribute.getName(), hostAttribute); - flow->addAttribute(hostAttribute.c_str(), org::apache::nifi::minifi::io::Socket::getMyHostName()); + flow->addAttribute(hostAttribute_.c_str(), org::apache::nifi::minifi::io::Socket::getMyHostName()); // Get IP address for the specified interface - std::string iface; - context->getProperty(InterfaceName.getName(), iface); // Confirm the specified interface name exists on this device #ifndef WIN32 - if (if_nametoindex(iface.c_str()) != 0) { + if (if_nametoindex(iface_.c_str()) != 0) { struct ifreq ifr; int fd = socket(AF_INET, SOCK_DGRAM, 0); // Type of address to retrieve - IPv4 IP address ifr.ifr_addr.sa_family = AF_INET; // Copy the interface name in the ifreq structure - strncpy(ifr.ifr_name, iface.c_str(), IFNAMSIZ - 1); + strncpy(ifr.ifr_name, iface_.c_str(), IFNAMSIZ - 1); ioctl(fd, SIOCGIFADDR, &ifr); close(fd); - std::string ipAttribute; - context->getProperty(IPAttribute.getName(), ipAttribute); - flow->addAttribute(ipAttribute.c_str(), inet_ntoa(((struct sockaddr_in *) &ifr.ifr_addr)->sin_addr)); + flow->addAttribute(ipAttribute_.c_str(), inet_ntoa(((struct sockaddr_in *) &ifr.ifr_addr)->sin_addr)); } #endif diff --git a/libminifi/src/processors/GenerateFlowFile.cpp b/libminifi/src/processors/GenerateFlowFile.cpp index 223346f9d9..fa402aa7bc 100644 --- a/libminifi/src/processors/GenerateFlowFile.cpp +++ b/libminifi/src/processors/GenerateFlowFile.cpp @@ -65,52 +65,49 @@ void GenerateFlowFile::initialize() { setSupportedRelationships(relationships); } -void GenerateFlowFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { - int64_t batchSize = 1; - bool uniqueFlowFile = true; - int64_t fileSize = 1024; - bool textData = false; - +void GenerateFlowFile::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { std::string value; if (context->getProperty(FileSize.getName(), value)) { - core::Property::StringToInt(value, fileSize); + core::Property::StringToInt(value, _fileSize); } if (context->getProperty(BatchSize.getName(), value)) { - core::Property::StringToInt(value, batchSize); + core::Property::StringToInt(value, _batchSize); } if (context->getProperty(DataFormat.getName(), value)) { - textData = (value == GenerateFlowFile::DATA_FORMAT_TEXT); + _textData = (value == GenerateFlowFile::DATA_FORMAT_TEXT); } if (context->getProperty(UniqueFlowFiles.getName(), value)) { - org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, uniqueFlowFile); + org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, _uniqueFlowFile); } +} - if (uniqueFlowFile) { +void GenerateFlowFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { + if (_uniqueFlowFile) { char *data; - data = new char[fileSize]; + data = new char[_fileSize]; if (!data) return; - uint64_t dataSize = fileSize; + uint64_t dataSize = _fileSize; GenerateFlowFile::WriteCallback callback(data, dataSize); char *current = data; - if (textData) { - for (int i = 0; i < fileSize; i++) { + if (_textData) { + for (int i = 0; i < _fileSize; i++) { int randValue = random(); data[i] = TEXT_CHARS[randValue % TEXT_LEN]; } } else { - for (int i = 0; i < fileSize; i += sizeof(int)) { + for (int i = 0; i < _fileSize; i += sizeof(int)) { int randValue = random(); *(reinterpret_cast(current)) = randValue; current += sizeof(int); } } - for (int i = 0; i < batchSize; i++) { + for (int i = 0; i < _batchSize; i++) { // For each batch std::shared_ptr flowFile = std::static_pointer_cast(session->create()); if (!flowFile) return; - if (fileSize > 0) + if (_fileSize > 0) session->write(flowFile, &callback); session->transfer(flowFile, Success); } @@ -118,16 +115,16 @@ void GenerateFlowFile::onTrigger(core::ProcessContext *context, core::ProcessSes } else { if (!_data) { // We have not created the data yet - _data = new char[fileSize]; - _dataSize = fileSize; + _data = new char[_fileSize]; + _dataSize = _fileSize; char *current = _data; - if (textData) { - for (int i = 0; i < fileSize; i++) { + if (_textData) { + for (int i = 0; i < _fileSize; i++) { int randValue = random(); _data[i] = TEXT_CHARS[randValue % TEXT_LEN]; } } else { - for (int i = 0; i < fileSize; i += sizeof(int)) { + for (int i = 0; i < _fileSize; i += sizeof(int)) { int randValue = random(); *(reinterpret_cast(current)) = randValue; current += sizeof(int); @@ -135,12 +132,12 @@ void GenerateFlowFile::onTrigger(core::ProcessContext *context, core::ProcessSes } } GenerateFlowFile::WriteCallback callback(_data, _dataSize); - for (int i = 0; i < batchSize; i++) { + for (int i = 0; i < _batchSize; i++) { // For each batch std::shared_ptr flowFile = std::static_pointer_cast(session->create()); if (!flowFile) return; - if (fileSize > 0) + if (_fileSize > 0) session->write(flowFile, &callback); session->transfer(flowFile, Success); }