Skip to content

Commit

Permalink
review
Browse files Browse the repository at this point in the history
  • Loading branch information
pvillard31 committed Jan 4, 2021
1 parent 2e57a73 commit ec64529
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 16 deletions.
Expand Up @@ -83,6 +83,7 @@
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.RECORD_COUNT_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1;
Expand All @@ -95,6 +96,7 @@
@CapabilityDescription("Subscribes to a topic and receives messages from an MQTT broker")
@SeeAlso({PublishMQTT.class})
@WritesAttributes({
@WritesAttribute(attribute=RECORD_COUNT_KEY, description="The number of records received"),
@WritesAttribute(attribute=BROKER_ATTRIBUTE_KEY, description="MQTT broker that was the message source"),
@WritesAttribute(attribute=TOPIC_ATTRIBUTE_KEY, description="MQTT topic on which message was received"),
@WritesAttribute(attribute=QOS_ATTRIBUTE_KEY, description="The quality of service for this message."),
Expand All @@ -103,6 +105,7 @@
"on the topic.")})
public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {

public final static String RECORD_COUNT_KEY = "record.count";
public final static String BROKER_ATTRIBUTE_KEY = "mqtt.broker";
public final static String TOPIC_ATTRIBUTE_KEY = "mqtt.topic";
public final static String QOS_ATTRIBUTE_KEY = "mqtt.qos";
Expand All @@ -114,6 +117,10 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback
public final static String IS_DUPLICATE_FIELD_KEY = "_isDuplicate";
public final static String IS_RETAINED_FIELD_KEY = "_isRetained";

private final static String COUNTER_PARSE_FAILURES = "Parse Failures";
private final static String COUNTER_RECORDS_RECEIVED = "Records Received";
private final static String COUNTER_RECORDS_PROCESSED = "Records Processed";

public static final PropertyDescriptor PROP_GROUPID = new PropertyDescriptor.Builder()
.name("Group ID")
.description("MQTT consumer group ID to use. If group ID not set, client will connect as individual consumer.")
Expand Down Expand Up @@ -165,7 +172,8 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback
.build();

public static final PropertyDescriptor ADD_ATTRIBUTES_AS_FIELDS = new PropertyDescriptor.Builder()
.name("Add attributes as fields")
.name("add-attributes-as-fields")
.displayName("Add attributes as fields")
.description("If using the Records reader/writer and if setting this property to true, default fields "
+ "are going to be added in each record: _topic, _qos, _isDuplicate, _isRetained.")
.required(true)
Expand Down Expand Up @@ -445,7 +453,7 @@ private void transferQueueDemarcator(final ProcessContext context, final Process
final MQTTQueueMessage mqttMessage = mqttQueue.poll();
out.write(mqttMessage.getPayload());
out.write(demarcator);
session.adjustCounter("Records Received", 1L, false);
session.adjustCounter(COUNTER_RECORDS_RECEIVED, 1L, false);
}
});

Expand Down Expand Up @@ -476,7 +484,7 @@ public void process(final OutputStream out) throws IOException {
String transitUri = new StringBuilder(broker).append(mqttMessage.getTopic()).toString();
session.getProvenanceReporter().receive(messageFlowfile, transitUri);
session.transfer(messageFlowfile, REL_PARSE_FAILURE);
session.adjustCounter("Parse Failures", 1, false);
session.adjustCounter(COUNTER_PARSE_FAILURES, 1, false);
}

private void transferQueueRecord(final ProcessContext context, final ProcessSession session){
Expand Down Expand Up @@ -558,7 +566,7 @@ private void transferQueueRecord(final ProcessContext context, final ProcessSess
continue;
}

session.adjustCounter("Records Received", 1L, false);
session.adjustCounter(COUNTER_RECORDS_RECEIVED, 1L, false);
}
} catch (final IOException | MalformedRecordException | SchemaValidationException e) {
logger.error("Failed to write message, sending to the parse failure relationship", e);
Expand All @@ -574,7 +582,7 @@ private void transferQueueRecord(final ProcessContext context, final ProcessSess

if(writer != null) {
final WriteResult writeResult = writer.finishRecordSet();
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
attributes.put(RECORD_COUNT_KEY, String.valueOf(writeResult.getRecordCount()));
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
attributes.putAll(writeResult.getAttributes());
recordCount.set(writeResult.getRecordCount());
Expand All @@ -584,15 +592,22 @@ private void transferQueueRecord(final ProcessContext context, final ProcessSess
context.yield();

// we try to add the messages back into the internal queue
int numberOfMessages = 0;
for(MQTTQueueMessage done : doneList) {
try {
mqttQueue.offer(done, 1, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
logger.error("Could not add message back into the internal queue, this could lead to data loss", ex);
numberOfMessages++;
if(getLogger().isDebugEnabled()) {
logger.debug("Could not add message back into the internal queue, this could lead to data loss", ex);
}
}
}
if(numberOfMessages > 0) {
logger.error("Could not add {} message(s) back into the internal queue, this could mean data loss", new Object[] {numberOfMessages});
}

throw new ProcessException("Could not process data received from the MQTT broker(s).", e);
throw new ProcessException("Could not process data received from the MQTT broker(s): " + broker, e);
} finally {
closeWriter(writer);
}
Expand All @@ -608,7 +623,7 @@ private void transferQueueRecord(final ProcessContext context, final ProcessSess
session.commit();

final int count = recordCount.get();
session.adjustCounter("Records Processed", count, false);
session.adjustCounter(COUNTER_RECORDS_PROCESSED, count, false);
getLogger().info("Successfully processed {} records for {}", new Object[] {count, flowFile});
}

Expand Down
Expand Up @@ -56,6 +56,7 @@
public abstract class TestConsumeMqttCommon {

public int PUBLISH_WAIT_MS = 1000;
public static final String THIS_IS_NOT_JSON = "ThisIsNotAJSON";

public Server MQTT_server;
public TestRunner testRunner;
Expand Down Expand Up @@ -426,7 +427,7 @@ public void testConsumeRecordsWithAddedFields() throws Exception {
testMessage.setRetainFlag(false);

PublishMessage badMessage = new PublishMessage();
badMessage.setPayload(ByteBuffer.wrap("ThisIsNotAJSON".getBytes()));
badMessage.setPayload(ByteBuffer.wrap(THIS_IS_NOT_JSON.getBytes()));
badMessage.setTopicName("testTopic");
badMessage.setDupFlag(false);
badMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
Expand All @@ -448,7 +449,7 @@ public void testConsumeRecordsWithAddedFields() throws Exception {

List<MockFlowFile> badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
assertTrue(badFlowFiles.size() == 1);
assertEquals("ThisIsNotAJSON", new String(badFlowFiles.get(0).toByteArray()));
assertEquals(THIS_IS_NOT_JSON, new String(badFlowFiles.get(0).toByteArray()));

// clean runner by removing records reader/writer
testRunner.removeProperty(ConsumeMQTT.RECORD_READER);
Expand Down Expand Up @@ -476,7 +477,7 @@ public void testConsumeDemarcator() throws Exception {
testMessage.setRetainFlag(false);

PublishMessage badMessage = new PublishMessage();
badMessage.setPayload(ByteBuffer.wrap("ThisIsNotAJSON".getBytes()));
badMessage.setPayload(ByteBuffer.wrap(THIS_IS_NOT_JSON.getBytes()));
badMessage.setTopicName("testTopic");
badMessage.setDupFlag(false);
badMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
Expand All @@ -494,7 +495,7 @@ public void testConsumeDemarcator() throws Exception {
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
assertEquals(flowFiles.size(), 1);
assertEquals("{\"name\":\"Apache NiFi\"}\\n"
+ "ThisIsNotAJSON\\n"
+ THIS_IS_NOT_JSON + "\\n"
+ "{\"name\":\"Apache NiFi\"}\\n",
new String(flowFiles.get(0).toByteArray()));

Expand Down Expand Up @@ -539,7 +540,7 @@ public void testConsumeRecordsWithoutAddedFields() throws Exception {
testMessage.setRetainFlag(false);

PublishMessage badMessage = new PublishMessage();
badMessage.setPayload(ByteBuffer.wrap("ThisIsNotAJSON".getBytes()));
badMessage.setPayload(ByteBuffer.wrap(THIS_IS_NOT_JSON.getBytes()));
badMessage.setTopicName("testTopic");
badMessage.setDupFlag(false);
badMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
Expand All @@ -559,7 +560,7 @@ public void testConsumeRecordsWithoutAddedFields() throws Exception {

List<MockFlowFile> badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
assertTrue(badFlowFiles.size() == 1);
assertEquals("ThisIsNotAJSON", new String(badFlowFiles.get(0).toByteArray()));
assertEquals(THIS_IS_NOT_JSON, new String(badFlowFiles.get(0).toByteArray()));

// clean runner by removing records reader/writer
testRunner.removeProperty(ConsumeMQTT.RECORD_READER);
Expand Down Expand Up @@ -593,7 +594,7 @@ public void testConsumeRecordsOnlyBadData() throws Exception {
assertTrue(isConnected(consumeMQTT));

PublishMessage badMessage = new PublishMessage();
badMessage.setPayload(ByteBuffer.wrap("ThisIsNotAJSON".getBytes()));
badMessage.setPayload(ByteBuffer.wrap(THIS_IS_NOT_JSON.getBytes()));
badMessage.setTopicName("testTopic");
badMessage.setDupFlag(false);
badMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
Expand All @@ -607,7 +608,7 @@ public void testConsumeRecordsOnlyBadData() throws Exception {

List<MockFlowFile> badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
assertTrue(badFlowFiles.size() == 1);
assertEquals("ThisIsNotAJSON", new String(badFlowFiles.get(0).toByteArray()));
assertEquals(THIS_IS_NOT_JSON, new String(badFlowFiles.get(0).toByteArray()));

// clean runner by removing records reader/writer
testRunner.removeProperty(ConsumeMQTT.RECORD_READER);
Expand Down

0 comments on commit ec64529

Please sign in to comment.