Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-7890 - Added record support to ConsumeMQTT processor #4738

Closed
wants to merge 3 commits into from

Conversation

pvillard31
Copy link
Contributor

@pvillard31 pvillard31 commented Dec 22, 2020

This also adds the option to define a delimiter instead of record reader/writer so that multiple messages can be appended into a single flow file. This greatly increases the performances of the processor when the use case requires high throughput in terms of events per second.

Thank you for submitting a contribution to Apache NiFi.

Please provide a short description of the PR here:

Description of PR

Enables X functionality; fixes bug NIFI-YYYY.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

For all changes:

  • Is there a JIRA ticket associated with this PR? Is it referenced
    in the commit message?

  • Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.

  • Has your PR been rebased against the latest commit within the target branch (typically main)?

  • Is your initial contribution a single, squashed commit? Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not squash or use --force when pushing to allow for clean monitoring of changes.

For code changes:

  • Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
  • Have you written or updated unit tests to verify your changes?
  • Have you verified that the full build is successful on JDK 8?
  • Have you verified that the full build is successful on JDK 11?
  • If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?
  • If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
  • If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
  • If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?

For documentation related changes:

  • Have you ensured that format looks appropriate for the output in which it is rendered?

Note:

Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible.

final MQTTQueueMessage mqttMessage = mqttQueue.poll();
out.write(mqttMessage.getPayload());
out.write(demarcator);
session.adjustCounter("Records Received", 1L, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommend declaring a static variable for the received counter name and reusing in other adjustCounter() references.

try {
reader = readerFactory.createRecordReader(attributes, in, recordBytes.length, logger);
} catch (final Exception e) {
logger.error("Failed to parse the message from the internal queue, sending to the parse failure relationship", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the possibility of multiple messages being sent to failure during this loop, are there any key properties of the MQTTQueueMessage that could be logged to indicate which message failed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, we just have the payload, QoS and topic.


if(writer != null) {
final WriteResult writeResult = writer.finishRecordSet();
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the record.count key be added to the list of WritesAttribute annotations? Recommend declaring as a static variable if so.

try {
mqttQueue.offer(done, 1, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
logger.error("Could not add message back into the internal queue, this could lead to data loss", ex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the queue processing loop, it would be helpful to indicate the particular message that failed as part of this error message instead of repeating the same message for multiple failures.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only option I see is to log the payload but this would raise security concerns IMO. I can change the loop to have the log message only once and say how many messages we could not re-insert in the queue.


List<MockFlowFile> badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
assertTrue(badFlowFiles.size() == 1);
assertEquals("ThisIsNotAJSON", new String(badFlowFiles.get(0).toByteArray()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Declaring a variable for the invalid JSON string and reusing it both here and when setting the message payload would help make the test easier to maintain and read.

public static final PropertyDescriptor ADD_ATTRIBUTES_AS_FIELDS = new PropertyDescriptor.Builder()
.name("Add attributes as fields")
.description("If using the Records reader/writer and if setting this property to true, default fields "
+ "are going to be added in each record: _topic, _qos, _isDuplicate, _isRetained.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason for not including the broker attribute as one of the internal fields that could be added to each record?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The broker is not something we can retrieve from the MqttMessage object, it'll be the same for each record and is already added as an attribute of the flow file. If this is required for a use case, one could add it to each record using UpdateRecord processor. I think it's better not to include it by default to keep the data we write on disk as minimal as possible. Thoughts?

@markap14
Copy link
Contributor

markap14 commented Jan 4, 2021

Thanks for the improvement @pvillard31. I think this will give us MUCH better performance for any MQTT related use cases. @exceptionfactory had a few comments but otherwise I'm a +1.

@pvillard31
Copy link
Contributor Author

Thanks for the review @exceptionfactory and @markap14 - I made some changes following your recommendations.

@exceptionfactory
Copy link
Contributor

@pvillard31 Thanks for the feedback and changes, looks good.

@pvillard31
Copy link
Contributor Author

Squashed and rebased against main to fix the conflicts caused by the 2 others PRs related to this processor being merged today.

Copy link
Contributor

@turcsanyip turcsanyip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pvillard31 Tested the record / demarcator style processing and they work as expected.

Added some comments and questions. Please find them inline.

}
});

session.getProvenanceReporter().receive(messageFlowfile, new StringBuilder(broker).append(topicPrefix).append(topicFilter).toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no separator character between the broker and the topic prefix (eg.: tcp://myhost:1883mytopic).
'/' cloud be added before topic prefix.
It could be changed in the existing transferQueue() method too.



messageFlowfile = session.append(messageFlowfile, out -> {
while (!mqttQueue.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Emptying the queue seems to me a bit non-deterministic behaviour because the queue is being written at the same time by the receiver thread.
Would not it be useful to define a max. size that may be fetched in one go? (a magic number or a processor property)


try {
while (!mqttQueue.isEmpty()) {
final MQTTQueueMessage mqttMessage = mqttQueue.take();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a specific reason why take() is used here, while poll() in the other two transferQueue* methods?

try {
while (!mqttQueue.isEmpty()) {
final MQTTQueueMessage mqttMessage = mqttQueue.take();
final byte[] recordBytes = mqttMessage.getPayload() == null ? new byte[0] : mqttMessage.getPayload();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

null is not checked in the other two transferQueue* methods. Should it be checked there too or is it unnecessary here?

}

session.putAllAttributes(flowFile, attributes);
session.getProvenanceReporter().receive(flowFile, broker);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Topic info could be added to the Transit Uri as in the demarcator method.

@pvillard31
Copy link
Contributor Author

Thanks for the review @turcsanyip - I think I addressed your comments, let me know if there is something else.

Copy link
Contributor

@turcsanyip turcsanyip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 LGTM
Merging to main.

@pvillard31 Thanks for implementing this feature.
@markap14, @exceptionfactory Thanks for the reviews.

@asfgit asfgit closed this in 93a5823 Jan 21, 2021
driesva pushed a commit to driesva/nifi that referenced this pull request Mar 19, 2021
This closes apache#4738.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
krisztina-zsihovszki pushed a commit to krisztina-zsihovszki/nifi that referenced this pull request Jun 28, 2022
This closes apache#4738.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants