diff --git a/extensions/mqtt/processors/AbstractMQTTProcessor.cpp b/extensions/mqtt/processors/AbstractMQTTProcessor.cpp index 345c6c57842..501746dab52 100644 --- a/extensions/mqtt/processors/AbstractMQTTProcessor.cpp +++ b/extensions/mqtt/processors/AbstractMQTTProcessor.cpp @@ -46,32 +46,15 @@ core::Property AbstractMQTTProcessor::SecurityCA("Security CA", "File or directo core::Property AbstractMQTTProcessor::SecurityCert("Security Cert", "Path to client's public key (PEM) used for authentication", ""); core::Property AbstractMQTTProcessor::SecurityPrivateKey("Security Private Key", "Path to client's private key (PEM) used for authentication", ""); core::Property AbstractMQTTProcessor::SecurityPrivateKeyPassWord("Security Pass Phrase", "Private key passphrase", ""); -core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship"); -core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles that failed to send to the destination are transferred to this relationship"); -void AbstractMQTTProcessor::initialize() { - // Set the supported properties - std::set properties; - properties.insert(BrokerURL); - properties.insert(CleanSession); - properties.insert(ClientID); - properties.insert(UserName); - properties.insert(PassWord); - properties.insert(KeepLiveInterval); - properties.insert(ConnectionTimeOut); - properties.insert(QOS); - properties.insert(Topic); - setSupportedProperties(properties); - // Set the supported relationships - std::set relationships; - relationships.insert(Success); - relationships.insert(Failure); - setSupportedRelationships(relationships); - MQTTClient_SSLOptions sslopts_ = MQTTClient_SSLOptions_initializer; - sslEnabled_ = false; +const std::set AbstractMQTTProcessor::getSupportedProperties() { + return {BrokerURL, CleanSession, ClientID, UserName, PassWord, KeepLiveInterval, ConnectionTimeOut, QOS, Topic}; } -void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { +void AbstractMQTTProcessor::onSchedule(const std::shared_ptr &context, const std::shared_ptr &factory) { + sslEnabled_ = false; + sslopts_ = MQTTClient_SSLOptions_initializer; + std::string value; int64_t valInt; value = ""; @@ -131,7 +114,6 @@ void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::Proc if (context->getProperty(SecurityProtocol.getName(), value) && !value.empty()) { if (value == MQTT_SECURITY_PROTOCOL_SSL) { sslEnabled_ = true; - logger_->log_debug("AbstractMQTTProcessor: ssl enable"); value = ""; if (context->getProperty(SecurityCA.getName(), value) && !value.empty()) { logger_->log_debug("AbstractMQTTProcessor: trustStore [%s]", value); @@ -180,14 +162,21 @@ bool AbstractMQTTProcessor::reconnect() { conn_opts.username = userName_.c_str(); conn_opts.password = passWord_.c_str(); } - if (sslEnabled_) + if (sslEnabled_) { conn_opts.ssl = &sslopts_; - if (MQTTClient_connect(client_, &conn_opts) != MQTTCLIENT_SUCCESS) { - logger_->log_error("Failed to connect to MQTT broker %s", uri_); + } + int ret = MQTTClient_connect(client_, &conn_opts); + if (ret != MQTTCLIENT_SUCCESS) { + logger_->log_error("Failed to connect to MQTT broker %s (%d)", uri_, ret); return false; } if (isSubscriber_) { - MQTTClient_subscribe(client_, topic_.c_str(), qos_); + ret = MQTTClient_subscribe(client_, topic_.c_str(), qos_); + if(ret != MQTTCLIENT_SUCCESS) { + logger_->log_error("Failed to subscribe to MQTT topic %s (%d)", topic_, ret); + return false; + } + logger_->log_debug("Successfully subscribed to MQTT topic: %s", topic_); } return true; } diff --git a/extensions/mqtt/processors/AbstractMQTTProcessor.h b/extensions/mqtt/processors/AbstractMQTTProcessor.h index cabdae4de90..282efe47cad 100644 --- a/extensions/mqtt/processors/AbstractMQTTProcessor.h +++ b/extensions/mqtt/processors/AbstractMQTTProcessor.h @@ -28,16 +28,16 @@ #include "core/logging/LoggerConfiguration.h" #include "MQTTClient.h" -#define MQTT_QOS_0 "0" -#define MQTT_QOS_1 "1" -#define MQTT_QOS_2 "2" - namespace org { namespace apache { namespace nifi { namespace minifi { namespace processors { +#define MQTT_QOS_0 "0" +#define MQTT_QOS_1 "1" +#define MQTT_QOS_2 "2" + #define MQTT_SECURITY_PROTOCOL_PLAINTEXT "plaintext" #define MQTT_SECURITY_PROTOCOL_SSL "ssl" @@ -87,10 +87,6 @@ class AbstractMQTTProcessor : public core::Processor { static core::Property SecurityPrivateKey; static core::Property SecurityPrivateKeyPassWord; - // Supported Relationships - static core::Relationship Failure; - static core::Relationship Success; - public: /** * Function that's executed when the processor is scheduled. @@ -98,15 +94,8 @@ class AbstractMQTTProcessor : public core::Processor { * @param sessionFactory process session factory that is used when creating * ProcessSession objects. */ - virtual void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); - // OnTrigger method, implemented by NiFi AbstractMQTTProcessor - virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session) { - } - // OnTrigger method, implemented by NiFi AbstractMQTTProcessor - virtual void onTrigger(const std::shared_ptr &context, const std::shared_ptr &session) { - } - // Initialize, over write by NiFi AbstractMQTTProcessor - virtual void initialize(void); + virtual void onSchedule(const std::shared_ptr &context, const std::shared_ptr &factory) override; + // MQTT async callbacks static void msgDelivered(void *context, MQTTClient_deliveryToken dt) { AbstractMQTTProcessor *processor = (AbstractMQTTProcessor *) context; @@ -134,6 +123,8 @@ class AbstractMQTTProcessor : public core::Processor { } protected: + static const std::set getSupportedProperties(); + MQTTClient client_; MQTTClient_deliveryToken delivered_token_; std::string uri_; diff --git a/extensions/mqtt/processors/ConsumeMQTT.cpp b/extensions/mqtt/processors/ConsumeMQTT.cpp index 472d35f341f..3c8f37bda72 100644 --- a/extensions/mqtt/processors/ConsumeMQTT.cpp +++ b/extensions/mqtt/processors/ConsumeMQTT.cpp @@ -38,30 +38,21 @@ namespace processors { core::Property ConsumeMQTT::MaxFlowSegSize("Max Flow Segment Size", "Maximum flow content payload segment size for the MQTT record", ""); core::Property ConsumeMQTT::QueueBufferMaxMessage("Queue Max Message", "Maximum number of messages allowed on the received MQTT queue", ""); +core::Relationship ConsumeMQTT::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship"); + void ConsumeMQTT::initialize() { // Set the supported properties - std::set properties; - properties.insert(BrokerURL); - properties.insert(CleanSession); - properties.insert(ClientID); - properties.insert(UserName); - properties.insert(PassWord); - properties.insert(KeepLiveInterval); - properties.insert(ConnectionTimeOut); - properties.insert(QOS); - properties.insert(Topic); + std::set properties(AbstractMQTTProcessor::getSupportedProperties()); properties.insert(MaxFlowSegSize); properties.insert(QueueBufferMaxMessage); setSupportedProperties(properties); // Set the supported relationships - std::set relationships; - relationships.insert(Success); - setSupportedRelationships(relationships); + setSupportedRelationships({Success}); } bool ConsumeMQTT::enqueueReceiveMQTTMsg(MQTTClient_message *message) { if (queue_.size_approx() >= maxQueueSize_) { - logger_->log_debug("MQTT queue full"); + logger_->log_warn("MQTT queue full"); return false; } else { if (message->payloadlen > maxSegSize_) @@ -72,8 +63,8 @@ bool ConsumeMQTT::enqueueReceiveMQTTMsg(MQTTClient_message *message) { } } -void ConsumeMQTT::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { - AbstractMQTTProcessor::onSchedule(context, sessionFactory); +void ConsumeMQTT::onSchedule(const std::shared_ptr &context, const std::shared_ptr &factory) { + AbstractMQTTProcessor::onSchedule(context, factory); std::string value; int64_t valInt; value = ""; @@ -90,7 +81,10 @@ void ConsumeMQTT::onSchedule(core::ProcessContext *context, core::ProcessSession void ConsumeMQTT::onTrigger(const std::shared_ptr &context, const std::shared_ptr &session) { // reconnect if necessary - reconnect(); + if(!reconnect()) { + yield(); + } + std::deque msg_queue; getReceivedMQTTMsg(msg_queue); while (!msg_queue.empty()) { diff --git a/extensions/mqtt/processors/ConsumeMQTT.h b/extensions/mqtt/processors/ConsumeMQTT.h index a1ea13d2ccb..18c0b335cb1 100644 --- a/extensions/mqtt/processors/ConsumeMQTT.h +++ b/extensions/mqtt/processors/ConsumeMQTT.h @@ -68,6 +68,9 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor { // Supported Properties static core::Property MaxFlowSegSize; static core::Property QueueBufferMaxMessage; + + static core::Relationship Success; + // Nest Callback Class for write stream class WriteCallback : public OutputStreamCallback { public: @@ -92,12 +95,12 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor { * @param sessionFactory process session factory that is used when creating * ProcessSession objects. */ - void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); + void onSchedule(const std::shared_ptr &context, const std::shared_ptr &factory) override; // OnTrigger method, implemented by NiFi ConsumeMQTT - virtual void onTrigger(const std::shared_ptr &context, const std::shared_ptr &session); + void onTrigger(const std::shared_ptr &context, const std::shared_ptr &session) override; // Initialize, over write by NiFi ConsumeMQTT - virtual void initialize(void); - virtual bool enqueueReceiveMQTTMsg(MQTTClient_message *message); + void initialize(void) override; + bool enqueueReceiveMQTTMsg(MQTTClient_message *message) override; protected: void getReceivedMQTTMsg(std::deque &msg_queue) { diff --git a/extensions/mqtt/processors/PublishMQTT.cpp b/extensions/mqtt/processors/PublishMQTT.cpp index 411cc2d6953..24ba49d8fa9 100644 --- a/extensions/mqtt/processors/PublishMQTT.cpp +++ b/extensions/mqtt/processors/PublishMQTT.cpp @@ -38,30 +38,21 @@ namespace processors { core::Property PublishMQTT::Retain("Retain", "Retain MQTT published record in broker", "false"); core::Property PublishMQTT::MaxFlowSegSize("Max Flow Segment Size", "Maximum flow content payload segment size for the MQTT record", ""); +core::Relationship PublishMQTT::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship"); +core::Relationship PublishMQTT::Failure("failure", "FlowFiles that failed to send to the destination are transferred to this relationship"); + void PublishMQTT::initialize() { // Set the supported properties - std::set properties; - properties.insert(BrokerURL); - properties.insert(CleanSession); - properties.insert(ClientID); - properties.insert(UserName); - properties.insert(PassWord); - properties.insert(KeepLiveInterval); - properties.insert(ConnectionTimeOut); - properties.insert(QOS); - properties.insert(Topic); + std::set properties(AbstractMQTTProcessor::getSupportedProperties()); properties.insert(Retain); properties.insert(MaxFlowSegSize); setSupportedProperties(properties); // Set the supported relationships - std::set relationships; - relationships.insert(Success); - relationships.insert(Failure); - setSupportedRelationships(relationships); + setSupportedRelationships({Success, Failure}); } -void PublishMQTT::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { - AbstractMQTTProcessor::onSchedule(context, sessionFactory); +void PublishMQTT::onSchedule(const std::shared_ptr &context, const std::shared_ptr &factory) { + AbstractMQTTProcessor::onSchedule(context, factory); std::string value; int64_t valInt; value = ""; @@ -76,15 +67,15 @@ void PublishMQTT::onSchedule(core::ProcessContext *context, core::ProcessSession } void PublishMQTT::onTrigger(const std::shared_ptr &context, const std::shared_ptr &session) { - std::shared_ptr flowFile = session->get(); - - if (!flowFile) { + if (!reconnect()) { + logger_->log_error("MQTT connect to %s failed", uri_); + yield(); return; } + + std::shared_ptr flowFile = session->get(); - if (!reconnect()) { - logger_->log_error("MQTT connect to %s failed", uri_); - session->transfer(flowFile, Failure); + if (!flowFile) { return; } diff --git a/extensions/mqtt/processors/PublishMQTT.h b/extensions/mqtt/processors/PublishMQTT.h index f6c01abe2a8..6d6c834fc01 100644 --- a/extensions/mqtt/processors/PublishMQTT.h +++ b/extensions/mqtt/processors/PublishMQTT.h @@ -58,6 +58,9 @@ class PublishMQTT : public processors::AbstractMQTTProcessor { static core::Property Retain; static core::Property MaxFlowSegSize; + static core::Relationship Failure; + static core::Relationship Success; + // Nest Callback Class for read stream class ReadCallback : public InputStreamCallback { public: @@ -123,11 +126,11 @@ class PublishMQTT : public processors::AbstractMQTTProcessor { * @param sessionFactory process session factory that is used when creating * ProcessSession objects. */ - void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); + void onSchedule(const std::shared_ptr &context, const std::shared_ptr &factory) override; // OnTrigger method, implemented by NiFi PublishMQTT - virtual void onTrigger(const std::shared_ptr &context, const std::shared_ptr &session); + void onTrigger(const std::shared_ptr &context, const std::shared_ptr &session) override; // Initialize, over write by NiFi PublishMQTT - virtual void initialize(void); + void initialize(void) override; protected: