diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java index 5845255bfcdc..41b00dc37097 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java @@ -83,6 +83,7 @@ import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY; import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY; import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.RECORD_COUNT_KEY; import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY; import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0; import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1; @@ -95,6 +96,7 @@ @CapabilityDescription("Subscribes to a topic and receives messages from an MQTT broker") @SeeAlso({PublishMQTT.class}) @WritesAttributes({ + @WritesAttribute(attribute=RECORD_COUNT_KEY, description="The number of records received"), @WritesAttribute(attribute=BROKER_ATTRIBUTE_KEY, description="MQTT broker that was the message source"), @WritesAttribute(attribute=TOPIC_ATTRIBUTE_KEY, description="MQTT topic on which message was received"), @WritesAttribute(attribute=QOS_ATTRIBUTE_KEY, description="The quality of service for this message."), @@ -103,6 +105,7 @@ "on the topic.")}) public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback { + public final static String RECORD_COUNT_KEY = "record.count"; public final static String BROKER_ATTRIBUTE_KEY = "mqtt.broker"; public final static String TOPIC_ATTRIBUTE_KEY = "mqtt.topic"; public final static String QOS_ATTRIBUTE_KEY = "mqtt.qos"; @@ -114,6 +117,10 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback public final static String IS_DUPLICATE_FIELD_KEY = "_isDuplicate"; public final static String IS_RETAINED_FIELD_KEY = "_isRetained"; + private final static String COUNTER_PARSE_FAILURES = "Parse Failures"; + private final static String COUNTER_RECORDS_RECEIVED = "Records Received"; + private final static String COUNTER_RECORDS_PROCESSED = "Records Processed"; + public static final PropertyDescriptor PROP_GROUPID = new PropertyDescriptor.Builder() .name("Group ID") .description("MQTT consumer group ID to use. If group ID not set, client will connect as individual consumer.") @@ -165,7 +172,8 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback .build(); public static final PropertyDescriptor ADD_ATTRIBUTES_AS_FIELDS = new PropertyDescriptor.Builder() - .name("Add attributes as fields") + .name("add-attributes-as-fields") + .displayName("Add attributes as fields") .description("If using the Records reader/writer and if setting this property to true, default fields " + "are going to be added in each record: _topic, _qos, _isDuplicate, _isRetained.") .required(true) @@ -445,7 +453,7 @@ private void transferQueueDemarcator(final ProcessContext context, final Process final MQTTQueueMessage mqttMessage = mqttQueue.poll(); out.write(mqttMessage.getPayload()); out.write(demarcator); - session.adjustCounter("Records Received", 1L, false); + session.adjustCounter(COUNTER_RECORDS_RECEIVED, 1L, false); } }); @@ -476,7 +484,7 @@ public void process(final OutputStream out) throws IOException { String transitUri = new StringBuilder(broker).append(mqttMessage.getTopic()).toString(); session.getProvenanceReporter().receive(messageFlowfile, transitUri); session.transfer(messageFlowfile, REL_PARSE_FAILURE); - session.adjustCounter("Parse Failures", 1, false); + session.adjustCounter(COUNTER_PARSE_FAILURES, 1, false); } private void transferQueueRecord(final ProcessContext context, final ProcessSession session){ @@ -558,7 +566,7 @@ private void transferQueueRecord(final ProcessContext context, final ProcessSess continue; } - session.adjustCounter("Records Received", 1L, false); + session.adjustCounter(COUNTER_RECORDS_RECEIVED, 1L, false); } } catch (final IOException | MalformedRecordException | SchemaValidationException e) { logger.error("Failed to write message, sending to the parse failure relationship", e); @@ -574,7 +582,7 @@ private void transferQueueRecord(final ProcessContext context, final ProcessSess if(writer != null) { final WriteResult writeResult = writer.finishRecordSet(); - attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); + attributes.put(RECORD_COUNT_KEY, String.valueOf(writeResult.getRecordCount())); attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); attributes.putAll(writeResult.getAttributes()); recordCount.set(writeResult.getRecordCount()); @@ -584,15 +592,22 @@ private void transferQueueRecord(final ProcessContext context, final ProcessSess context.yield(); // we try to add the messages back into the internal queue + int numberOfMessages = 0; for(MQTTQueueMessage done : doneList) { try { mqttQueue.offer(done, 1, TimeUnit.SECONDS); } catch (InterruptedException ex) { - logger.error("Could not add message back into the internal queue, this could lead to data loss", ex); + numberOfMessages++; + if(getLogger().isDebugEnabled()) { + logger.debug("Could not add message back into the internal queue, this could lead to data loss", ex); + } } } + if(numberOfMessages > 0) { + logger.error("Could not add {} message(s) back into the internal queue, this could mean data loss", new Object[] {numberOfMessages}); + } - throw new ProcessException("Could not process data received from the MQTT broker(s).", e); + throw new ProcessException("Could not process data received from the MQTT broker(s): " + broker, e); } finally { closeWriter(writer); } @@ -608,7 +623,7 @@ private void transferQueueRecord(final ProcessContext context, final ProcessSess session.commit(); final int count = recordCount.get(); - session.adjustCounter("Records Processed", count, false); + session.adjustCounter(COUNTER_RECORDS_PROCESSED, count, false); getLogger().info("Successfully processed {} records for {}", new Object[] {count, flowFile}); } diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java index bf2e612e30fb..a573184754ce 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java @@ -56,6 +56,7 @@ public abstract class TestConsumeMqttCommon { public int PUBLISH_WAIT_MS = 1000; + public static final String THIS_IS_NOT_JSON = "ThisIsNotAJSON"; public Server MQTT_server; public TestRunner testRunner; @@ -426,7 +427,7 @@ public void testConsumeRecordsWithAddedFields() throws Exception { testMessage.setRetainFlag(false); PublishMessage badMessage = new PublishMessage(); - badMessage.setPayload(ByteBuffer.wrap("ThisIsNotAJSON".getBytes())); + badMessage.setPayload(ByteBuffer.wrap(THIS_IS_NOT_JSON.getBytes())); badMessage.setTopicName("testTopic"); badMessage.setDupFlag(false); badMessage.setQos(AbstractMessage.QOSType.MOST_ONE); @@ -448,7 +449,7 @@ public void testConsumeRecordsWithAddedFields() throws Exception { List badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE); assertTrue(badFlowFiles.size() == 1); - assertEquals("ThisIsNotAJSON", new String(badFlowFiles.get(0).toByteArray())); + assertEquals(THIS_IS_NOT_JSON, new String(badFlowFiles.get(0).toByteArray())); // clean runner by removing records reader/writer testRunner.removeProperty(ConsumeMQTT.RECORD_READER); @@ -476,7 +477,7 @@ public void testConsumeDemarcator() throws Exception { testMessage.setRetainFlag(false); PublishMessage badMessage = new PublishMessage(); - badMessage.setPayload(ByteBuffer.wrap("ThisIsNotAJSON".getBytes())); + badMessage.setPayload(ByteBuffer.wrap(THIS_IS_NOT_JSON.getBytes())); badMessage.setTopicName("testTopic"); badMessage.setDupFlag(false); badMessage.setQos(AbstractMessage.QOSType.MOST_ONE); @@ -494,7 +495,7 @@ public void testConsumeDemarcator() throws Exception { List flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE); assertEquals(flowFiles.size(), 1); assertEquals("{\"name\":\"Apache NiFi\"}\\n" - + "ThisIsNotAJSON\\n" + + THIS_IS_NOT_JSON + "\\n" + "{\"name\":\"Apache NiFi\"}\\n", new String(flowFiles.get(0).toByteArray())); @@ -539,7 +540,7 @@ public void testConsumeRecordsWithoutAddedFields() throws Exception { testMessage.setRetainFlag(false); PublishMessage badMessage = new PublishMessage(); - badMessage.setPayload(ByteBuffer.wrap("ThisIsNotAJSON".getBytes())); + badMessage.setPayload(ByteBuffer.wrap(THIS_IS_NOT_JSON.getBytes())); badMessage.setTopicName("testTopic"); badMessage.setDupFlag(false); badMessage.setQos(AbstractMessage.QOSType.MOST_ONE); @@ -559,7 +560,7 @@ public void testConsumeRecordsWithoutAddedFields() throws Exception { List badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE); assertTrue(badFlowFiles.size() == 1); - assertEquals("ThisIsNotAJSON", new String(badFlowFiles.get(0).toByteArray())); + assertEquals(THIS_IS_NOT_JSON, new String(badFlowFiles.get(0).toByteArray())); // clean runner by removing records reader/writer testRunner.removeProperty(ConsumeMQTT.RECORD_READER); @@ -593,7 +594,7 @@ public void testConsumeRecordsOnlyBadData() throws Exception { assertTrue(isConnected(consumeMQTT)); PublishMessage badMessage = new PublishMessage(); - badMessage.setPayload(ByteBuffer.wrap("ThisIsNotAJSON".getBytes())); + badMessage.setPayload(ByteBuffer.wrap(THIS_IS_NOT_JSON.getBytes())); badMessage.setTopicName("testTopic"); badMessage.setDupFlag(false); badMessage.setQos(AbstractMessage.QOSType.MOST_ONE); @@ -607,7 +608,7 @@ public void testConsumeRecordsOnlyBadData() throws Exception { List badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE); assertTrue(badFlowFiles.size() == 1); - assertEquals("ThisIsNotAJSON", new String(badFlowFiles.get(0).toByteArray())); + assertEquals(THIS_IS_NOT_JSON, new String(badFlowFiles.get(0).toByteArray())); // clean runner by removing records reader/writer testRunner.removeProperty(ConsumeMQTT.RECORD_READER);