Skip to content

Commit

Permalink
MINIFICPP-1034 - MQTT processors doesnt work
Browse files Browse the repository at this point in the history
  • Loading branch information
arpadboda committed Sep 19, 2019
1 parent 9579284 commit f9b36cf
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 91 deletions.
45 changes: 17 additions & 28 deletions extensions/mqtt/processors/AbstractMQTTProcessor.cpp
Expand Up @@ -46,32 +46,15 @@ core::Property AbstractMQTTProcessor::SecurityCA("Security CA", "File or directo
core::Property AbstractMQTTProcessor::SecurityCert("Security Cert", "Path to client's public key (PEM) used for authentication", "");
core::Property AbstractMQTTProcessor::SecurityPrivateKey("Security Private Key", "Path to client's private key (PEM) used for authentication", "");
core::Property AbstractMQTTProcessor::SecurityPrivateKeyPassWord("Security Pass Phrase", "Private key passphrase", "");
core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship");
core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles that failed to send to the destination are transferred to this relationship");

void AbstractMQTTProcessor::initialize() {
// Set the supported properties
std::set<core::Property> properties;
properties.insert(BrokerURL);
properties.insert(CleanSession);
properties.insert(ClientID);
properties.insert(UserName);
properties.insert(PassWord);
properties.insert(KeepLiveInterval);
properties.insert(ConnectionTimeOut);
properties.insert(QOS);
properties.insert(Topic);
setSupportedProperties(properties);
// Set the supported relationships
std::set<core::Relationship> relationships;
relationships.insert(Success);
relationships.insert(Failure);
setSupportedRelationships(relationships);
MQTTClient_SSLOptions sslopts_ = MQTTClient_SSLOptions_initializer;
sslEnabled_ = false;
const std::set<core::Property> AbstractMQTTProcessor::getSupportedProperties() {
return {BrokerURL, CleanSession, ClientID, UserName, PassWord, KeepLiveInterval, ConnectionTimeOut, QOS, Topic};
}

void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
void AbstractMQTTProcessor::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
sslEnabled_ = false;
sslopts_ = MQTTClient_SSLOptions_initializer;

std::string value;
int64_t valInt;
value = "";
Expand Down Expand Up @@ -131,7 +114,6 @@ void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::Proc
if (context->getProperty(SecurityProtocol.getName(), value) && !value.empty()) {
if (value == MQTT_SECURITY_PROTOCOL_SSL) {
sslEnabled_ = true;
logger_->log_debug("AbstractMQTTProcessor: ssl enable");
value = "";
if (context->getProperty(SecurityCA.getName(), value) && !value.empty()) {
logger_->log_debug("AbstractMQTTProcessor: trustStore [%s]", value);
Expand Down Expand Up @@ -180,14 +162,21 @@ bool AbstractMQTTProcessor::reconnect() {
conn_opts.username = userName_.c_str();
conn_opts.password = passWord_.c_str();
}
if (sslEnabled_)
if (sslEnabled_) {
conn_opts.ssl = &sslopts_;
if (MQTTClient_connect(client_, &conn_opts) != MQTTCLIENT_SUCCESS) {
logger_->log_error("Failed to connect to MQTT broker %s", uri_);
}
int ret = MQTTClient_connect(client_, &conn_opts);
if (ret != MQTTCLIENT_SUCCESS) {
logger_->log_error("Failed to connect to MQTT broker %s (%d)", uri_, ret);
return false;
}
if (isSubscriber_) {
MQTTClient_subscribe(client_, topic_.c_str(), qos_);
ret = MQTTClient_subscribe(client_, topic_.c_str(), qos_);
if(ret != MQTTCLIENT_SUCCESS) {
logger_->log_error("Failed to subscribe to MQTT topic %s (%d)", topic_, ret);
return false;
}
logger_->log_debug("Successfully subscribed to MQTT topic: %s", topic_);
}
return true;
}
Expand Down
25 changes: 8 additions & 17 deletions extensions/mqtt/processors/AbstractMQTTProcessor.h
Expand Up @@ -28,16 +28,16 @@
#include "core/logging/LoggerConfiguration.h"
#include "MQTTClient.h"

#define MQTT_QOS_0 "0"
#define MQTT_QOS_1 "1"
#define MQTT_QOS_2 "2"

namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace processors {

#define MQTT_QOS_0 "0"
#define MQTT_QOS_1 "1"
#define MQTT_QOS_2 "2"

#define MQTT_SECURITY_PROTOCOL_PLAINTEXT "plaintext"
#define MQTT_SECURITY_PROTOCOL_SSL "ssl"

Expand Down Expand Up @@ -87,26 +87,15 @@ class AbstractMQTTProcessor : public core::Processor {
static core::Property SecurityPrivateKey;
static core::Property SecurityPrivateKeyPassWord;

// Supported Relationships
static core::Relationship Failure;
static core::Relationship Success;

public:
/**
* Function that's executed when the processor is scheduled.
* @param context process context.
* @param sessionFactory process session factory that is used when creating
* ProcessSession objects.
*/
virtual void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
// OnTrigger method, implemented by NiFi AbstractMQTTProcessor
virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
}
// OnTrigger method, implemented by NiFi AbstractMQTTProcessor
virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
}
// Initialize, over write by NiFi AbstractMQTTProcessor
virtual void initialize(void);
virtual void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) override;

// MQTT async callbacks
static void msgDelivered(void *context, MQTTClient_deliveryToken dt) {
AbstractMQTTProcessor *processor = (AbstractMQTTProcessor *) context;
Expand Down Expand Up @@ -134,6 +123,8 @@ class AbstractMQTTProcessor : public core::Processor {
}

protected:
static const std::set<core::Property> getSupportedProperties();

MQTTClient client_;
MQTTClient_deliveryToken delivered_token_;
std::string uri_;
Expand Down
28 changes: 11 additions & 17 deletions extensions/mqtt/processors/ConsumeMQTT.cpp
Expand Up @@ -38,30 +38,21 @@ namespace processors {
core::Property ConsumeMQTT::MaxFlowSegSize("Max Flow Segment Size", "Maximum flow content payload segment size for the MQTT record", "");
core::Property ConsumeMQTT::QueueBufferMaxMessage("Queue Max Message", "Maximum number of messages allowed on the received MQTT queue", "");

core::Relationship ConsumeMQTT::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship");

void ConsumeMQTT::initialize() {
// Set the supported properties
std::set<core::Property> properties;
properties.insert(BrokerURL);
properties.insert(CleanSession);
properties.insert(ClientID);
properties.insert(UserName);
properties.insert(PassWord);
properties.insert(KeepLiveInterval);
properties.insert(ConnectionTimeOut);
properties.insert(QOS);
properties.insert(Topic);
std::set<core::Property> properties(AbstractMQTTProcessor::getSupportedProperties());
properties.insert(MaxFlowSegSize);
properties.insert(QueueBufferMaxMessage);
setSupportedProperties(properties);
// Set the supported relationships
std::set<core::Relationship> relationships;
relationships.insert(Success);
setSupportedRelationships(relationships);
setSupportedRelationships({Success});
}

bool ConsumeMQTT::enqueueReceiveMQTTMsg(MQTTClient_message *message) {
if (queue_.size_approx() >= maxQueueSize_) {
logger_->log_debug("MQTT queue full");
logger_->log_warn("MQTT queue full");
return false;
} else {
if (message->payloadlen > maxSegSize_)
Expand All @@ -72,8 +63,8 @@ bool ConsumeMQTT::enqueueReceiveMQTTMsg(MQTTClient_message *message) {
}
}

void ConsumeMQTT::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
AbstractMQTTProcessor::onSchedule(context, sessionFactory);
void ConsumeMQTT::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
AbstractMQTTProcessor::onSchedule(context, factory);
std::string value;
int64_t valInt;
value = "";
Expand All @@ -90,7 +81,10 @@ void ConsumeMQTT::onSchedule(core::ProcessContext *context, core::ProcessSession

void ConsumeMQTT::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
// reconnect if necessary
reconnect();
if(!reconnect()) {
yield();
}

std::deque<MQTTClient_message *> msg_queue;
getReceivedMQTTMsg(msg_queue);
while (!msg_queue.empty()) {
Expand Down
11 changes: 7 additions & 4 deletions extensions/mqtt/processors/ConsumeMQTT.h
Expand Up @@ -68,6 +68,9 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor {
// Supported Properties
static core::Property MaxFlowSegSize;
static core::Property QueueBufferMaxMessage;

static core::Relationship Success;

// Nest Callback Class for write stream
class WriteCallback : public OutputStreamCallback {
public:
Expand All @@ -92,12 +95,12 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor {
* @param sessionFactory process session factory that is used when creating
* ProcessSession objects.
*/
void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) override;
// OnTrigger method, implemented by NiFi ConsumeMQTT
virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session);
void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
// Initialize, over write by NiFi ConsumeMQTT
virtual void initialize(void);
virtual bool enqueueReceiveMQTTMsg(MQTTClient_message *message);
void initialize(void) override;
bool enqueueReceiveMQTTMsg(MQTTClient_message *message) override;

protected:
void getReceivedMQTTMsg(std::deque<MQTTClient_message *> &msg_queue) {
Expand Down
35 changes: 13 additions & 22 deletions extensions/mqtt/processors/PublishMQTT.cpp
Expand Up @@ -38,30 +38,21 @@ namespace processors {
core::Property PublishMQTT::Retain("Retain", "Retain MQTT published record in broker", "false");
core::Property PublishMQTT::MaxFlowSegSize("Max Flow Segment Size", "Maximum flow content payload segment size for the MQTT record", "");

core::Relationship PublishMQTT::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship");
core::Relationship PublishMQTT::Failure("failure", "FlowFiles that failed to send to the destination are transferred to this relationship");

void PublishMQTT::initialize() {
// Set the supported properties
std::set<core::Property> properties;
properties.insert(BrokerURL);
properties.insert(CleanSession);
properties.insert(ClientID);
properties.insert(UserName);
properties.insert(PassWord);
properties.insert(KeepLiveInterval);
properties.insert(ConnectionTimeOut);
properties.insert(QOS);
properties.insert(Topic);
std::set<core::Property> properties(AbstractMQTTProcessor::getSupportedProperties());
properties.insert(Retain);
properties.insert(MaxFlowSegSize);
setSupportedProperties(properties);
// Set the supported relationships
std::set<core::Relationship> relationships;
relationships.insert(Success);
relationships.insert(Failure);
setSupportedRelationships(relationships);
setSupportedRelationships({Success, Failure});
}

void PublishMQTT::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
AbstractMQTTProcessor::onSchedule(context, sessionFactory);
void PublishMQTT::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
AbstractMQTTProcessor::onSchedule(context, factory);
std::string value;
int64_t valInt;
value = "";
Expand All @@ -76,15 +67,15 @@ void PublishMQTT::onSchedule(core::ProcessContext *context, core::ProcessSession
}

void PublishMQTT::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
std::shared_ptr<core::FlowFile> flowFile = session->get();

if (!flowFile) {
if (!reconnect()) {
logger_->log_error("MQTT connect to %s failed", uri_);
yield();
return;
}

std::shared_ptr<core::FlowFile> flowFile = session->get();

if (!reconnect()) {
logger_->log_error("MQTT connect to %s failed", uri_);
session->transfer(flowFile, Failure);
if (!flowFile) {
return;
}

Expand Down
9 changes: 6 additions & 3 deletions extensions/mqtt/processors/PublishMQTT.h
Expand Up @@ -58,6 +58,9 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
static core::Property Retain;
static core::Property MaxFlowSegSize;

static core::Relationship Failure;
static core::Relationship Success;

// Nest Callback Class for read stream
class ReadCallback : public InputStreamCallback {
public:
Expand Down Expand Up @@ -123,11 +126,11 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
* @param sessionFactory process session factory that is used when creating
* ProcessSession objects.
*/
void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) override;
// OnTrigger method, implemented by NiFi PublishMQTT
virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session);
void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
// Initialize, over write by NiFi PublishMQTT
virtual void initialize(void);
void initialize(void) override;

protected:

Expand Down

0 comments on commit f9b36cf

Please sign in to comment.