Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-10251 Add v5 protocol support for existing MQTT processors #6225

Closed
wants to merge 18 commits into from

Conversation

nandorsoma
Copy link
Contributor

Summary

NIFI-10251
This pr adds v5 protocol support for existing MQTT processors. For v5 connections the processor from now on uses HiveMQ Client library while in case of v3.1.x connections it uses the existing Paho library. HiveMQ Client could have been used for v3.1.x connections but it seemed to be safer to use the existing library for compatibility reasons.

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using mvn clean install -P contrib-check
    • JDK 8
    • JDK 11
    • JDK 17

Licensing

  • New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • Documentation formatting appears as expected in rendered files

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution @nandorsoma!

On initial review, the Static Analysis check failed due to multiple files missing the standard Apache License header. Please review the output of that check and add the header to the files indicated.

@nandorsoma
Copy link
Contributor Author

nandorsoma commented Jul 21, 2022

Thanks for the contribution @nandorsoma!

On initial review, the Static Analysis check failed due to multiple files missing the standard Apache License header. Please review the output of that check and add the header to the files indicated.

Interesting, because I've run the build with contrib-check enabled which I thought checks for that. Nevertheless I will add them of course!

Edit:
I see now. The presence of the license header is checked with rat plugin, but unfortunately I need to disable it locally otherwise it fails on .iml files...

@turcsanyip
Copy link
Contributor

@nandorsoma Thanks for adding v5 protocol support to MQTT processors!

I started to review / test this PR and found that the v5 client cannot stop properly:

2022-07-28 07:53:40,084 ERROR [Timer-Driven Process Thread-4] org.apache.nifi.util.ReflectionUtils Failed while invoking annotated method 'public void org.apache.nifi.processors.mqtt.ConsumeMQTT.onUnscheduled(org.apache.nifi.processor.ProcessContext)' with arguments '[org.apache.nifi.processor.StandardProcessContext@5f360d4e]'.
com.hivemq.client.mqtt.exceptions.MqttClientStateException: MQTT client is not connected.
	at com.hivemq.client.internal.mqtt.MqttBlockingClient.disconnect(MqttBlockingClient.java:195)
	at com.hivemq.client.internal.mqtt.MqttBlockingClient.disconnect(MqttBlockingClient.java:186)
	at org.apache.nifi.processors.mqtt.paho.HiveMqV5ClientAdapter.close(HiveMqV5ClientAdapter.java:99)
	at org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor.onStopped(AbstractMQTTProcessor.java:292)
	at org.apache.nifi.processors.mqtt.ConsumeMQTT.onUnscheduled(ConsumeMQTT.java:348)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:145)
	at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:133)
	at org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotations(ReflectionUtils.java:316)
	at org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotation(ReflectionUtils.java:93)
	at org.apache.nifi.controller.StandardProcessorNode$2.run(StandardProcessorNode.java:1877)
	at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Could you please fix it first? Due to this error, the processor cannot create a new client and NiFi restart needed.

Comment on lines 201 to 210
public static final PropertyDescriptor PROP_SESSION_EXPIRY_INTERVAL = new PropertyDescriptor.Builder()
.name("Session Expiry Interval")
.description("After this interval the broker will expire the client and clear the session state.")
.addValidator(StandardValidators.NON_NEGATIVE_LONG_VALIDATOR)
.dependsOn(PROP_MQTT_VERSION, ALLOWABLE_VALUE_MQTT_VERSION_500)
.dependsOn(PROP_CLEAN_SESSION, ALLOWABLE_VALUE_CLEAN_SESSION_FALSE)
.defaultValue(Long.toString(SESSION_EXPIRY_INTERVAL_IN_SECONDS))
.build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather not add the quite redundant and mostly copy-pasted StandardValidators.NON_NEGATIVE_LONG_VALIDATOR. We could use the more user-friendly and readily available .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) functionality.

Suggested change
public static final PropertyDescriptor PROP_SESSION_EXPIRY_INTERVAL = new PropertyDescriptor.Builder()
.name("Session Expiry Interval")
.description("After this interval the broker will expire the client and clear the session state.")
.addValidator(StandardValidators.NON_NEGATIVE_LONG_VALIDATOR)
.dependsOn(PROP_MQTT_VERSION, ALLOWABLE_VALUE_MQTT_VERSION_500)
.dependsOn(PROP_CLEAN_SESSION, ALLOWABLE_VALUE_CLEAN_SESSION_FALSE)
.defaultValue(Long.toString(SESSION_EXPIRY_INTERVAL_IN_SECONDS))
.build();
public static final PropertyDescriptor PROP_SESSION_EXPIRY_INTERVAL = new PropertyDescriptor.Builder()
.name("Session Expiry Interval")
.description("After this interval the broker will expire the client and clear the session state.")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.dependsOn(PROP_MQTT_VERSION, ALLOWABLE_VALUE_MQTT_VERSION_500)
.dependsOn(PROP_CLEAN_SESSION, ALLOWABLE_VALUE_CLEAN_SESSION_FALSE)
.defaultValue(SESSION_EXPIRY_INTERVAL_IN_SECONDS + " sec")
.build();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion @tpalfy ! I like this solution however there are similar properties in the processor where just numbers without units were used. I think it is better to stick to the original version to avoid mixing the input types. Nevertheless I've added a hint to the name of the property to match the other properties.

@@ -41,6 +41,11 @@ The following binary components are provided under the Apache Software License v
in some artifacts (usually source distributions); but is always available
from the source code management (SCM) system project uses.

(ASLv2) HiveMQ MQTT Client
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good but I think we need to add it to nifi-assembly/NOTICE as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for NOTICING that! (pun intended, change added)

return bytes;
}

private void clearSensitive(char[] chars) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clearing the password from the MqttConnectionProperties prevents the the reconnection logic to execute successfully.
I'd rather remove these clear methods as acquiring the password when you have access to the memory of the nifi process is not preventable like this anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed, thanks for noticing it.

Copy link
Contributor

@turcsanyip turcsanyip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nandorsoma I tested the processors and now they work properly for the base use cases (clean/resume session, shared subscription, etc.).

I also found some points in the code that could be refactored and improved. Please find my comments below.

@nandorsoma
Copy link
Contributor Author

Thank you for the review @turcsanyip, @tpalfy and @exceptionfactory! I've tried to address your comments!

…the broker supports it, it can work with a v3 client too
- password as string, using time period as session expiry interval
- using nifi-security-utils
- rename to StandardMqttMessage
- remove Nifi prefix from Mqtt class names
- extract SupportedSchemes
- rework exception handling when InterruptedException happens when message offered to internal queue
- remove MqttException declaration
- rework exception handling when InterruptedException happens when message offered to internal queue
- add transitive dependencies to NOTICE
- blocking processor when subscribe fails, doing publish with blocking client to block processor when publishing failed
- improve logging
- improve using mqtt versionCodes
- rename SupportedSchemes to MqttProtocolScheme
- rename properties to use camelCase even for abbreviations
- remove allowing only password. if we need that feature probably we will create a separate field for it
- normalize dtos storing mqtt message
@nandorsoma
Copy link
Contributor Author

nandorsoma commented Aug 24, 2022

Rebased on top of current main because CI failed on nifi-security-utils that was referenced with 1.17.0-SNAPSHOT version, that's why the force push.

- use get instead of whenComplete because whenComplete is not blocking
- prevent stucked processor when subscribe fails, yield on any exception
- merge client and connection properties
- create actual client in adapter, this way clientProperties will be used only in the adapter
- remove timeout from disconnect signature because it is not applicable to all clients
- add javadoc
@nandorsoma
Copy link
Contributor Author

nandorsoma commented Aug 25, 2022

After a discussion I've removed commit [52204e4]. Probably I will open a separate pr for that change.

Copy link
Contributor

@turcsanyip turcsanyip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nandorsoma Thanks for the review changes so far! I added 2 more comments, otherwise it works properly.

Copy link
Contributor

@turcsanyip turcsanyip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nandorsoma Thanks for all the review changes!
I tested the processors with different setups (no-TLS / TLS / mTLS, normal / shared subscriptions, session expiration) and they work properly.

+1 LGTM

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates @nandorsoma, this looks close to completion.

I noted a few additional places where logging statements can be improved to remove Object array wrappers and use placeholders instead of String concatenation.

The only structural issue is the use of SSLContextService, which can be replaced with a reference to TlsConfiguration for passing around the TLS properties.

// Client.publish waits for message to be delivered so this token will always have a null message and is useless in this application.
logger.trace("Received 'delivery complete' message from broker for:" + token.toString());
logger.trace("Received 'delivery complete' message from broker for:" + token);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be changed to use a placeholder variable instead of concatenation.

Suggested change
logger.trace("Received 'delivery complete' message from broker for:" + token);
logger.trace("Received 'delivery complete' message from broker token [{}]", token);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

context.yield();
}
}

@Override
public void connectionLost(Throwable cause) {
logger.error("Connection to {} lost due to: {}", new Object[]{broker, cause.getMessage()}, cause);
logger.error("Connection to {} lost due to: {}", new Object[]{clientProperties.getBroker(), cause.getMessage()}, cause);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The object array wrapper can be removed.

Suggested change
logger.error("Connection to {} lost due to: {}", new Object[]{clientProperties.getBroker(), cause.getMessage()}, cause);
logger.error("Connection to {} lost due to: {}", clientProperties.getBroker(), cause.getMessage(), cause);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure? If I remove the object wrapper void error(String msg, Object... os); will be used instead of void error(String msg, Object[] os, Throwable t);. I didn't test what happens in this situation with NiFi logging, but if I remember right Slf4j throws an error when there are less placeholder than passed parameters. I expect the same in our case. Also we would loose the cause stack trace.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, after checking the implementation of the logger I see that it checks whether the last argument is Throwable.

stringBuilder.append(append);
}
return stringBuilder.toString();
}

@Override
public void connectionLost(Throwable cause) {
logger.error("Connection to {} lost due to: {}", new Object[]{broker, cause.getMessage()}, cause);
logger.error("Connection to {} lost due to: {}", new Object[]{clientProperties.getBroker(), cause.getMessage()}, cause);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.error("Connection to {} lost due to: {}", new Object[]{clientProperties.getBroker(), cause.getMessage()}, cause);
logger.error("Connection to {} lost due to: {}", clientProperties.getBroker(), cause.getMessage(), cause);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

public void deliveryComplete(String token) {
// Unlikely situation. Api uses the same callback for publisher and consumer as well.
// That's why we have this log message here to indicate something really messy thing happened.
logger.error("Received MQTT 'delivery complete' message to subscriber: " + token);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.error("Received MQTT 'delivery complete' message to subscriber: " + token);
logger.error("Received MQTT 'delivery complete' message to subscriber token [{}]", token);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

// should happen in a blocking way to make sure the processor is blocked until ack is not arrived.
try {
Mqtt5SubAck ack = futureAck.get(clientProperties.getConnectionTimeout(), TimeUnit.SECONDS);
logger.debug("Received mqtt5 subscribe ack: {}", ack.toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The toString() call is not necessary.

Suggested change
logger.debug("Received mqtt5 subscribe ack: {}", ack.toString());
logger.debug("Received mqtt5 subscribe ack: {}", ack);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

return new ValidationResult.Builder().subject(subject).valid(true).build();
if (!isValidEnumIgnoreCase(MqttProtocolScheme.class, brokerURI.getScheme())) {
return new ValidationResult.Builder().subject(subject).valid(false)
.explanation("invalid scheme! supported schemes are: " + MqttProtocolScheme.getValuesAsString(", ")).build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error messages should not use the exclamation mark character.

Suggested change
.explanation("invalid scheme! supported schemes are: " + MqttProtocolScheme.getValuesAsString(", ")).build();
.explanation("Invalid Scheme: supported schemes are: " + MqttProtocolScheme.getValuesAsString(", ")).build();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double colon feels weird in the same sentence. What about dot? The message this way will be 'Broker URI' is invalid because scheme is invalid. Supported schemes are: tcp, ssl, ws, wss Invalid is repeated but still better.

try {
URI brokerURI = new URI(input);
if (!EMPTY.equals(brokerURI.getPath())) {
return new ValidationResult.Builder().subject(subject).valid(false).explanation("the broker URI cannot have a path. It currently is:" + brokerURI.getPath()).build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be a space before the path.

Suggested change
return new ValidationResult.Builder().subject(subject).valid(false).explanation("the broker URI cannot have a path. It currently is:" + brokerURI.getPath()).build();
return new ValidationResult.Builder().subject(subject).valid(false).explanation("the broker URI cannot have a path. It currently is: " + brokerURI.getPath()).build();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for noticing it. Changed.

try {
clientProperties.setBrokerUri(new URI(context.getProperty(PROP_BROKER_URI).evaluateAttributeExpressions().getValue()));
} catch (URISyntaxException e) {
throw new RuntimeException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error should be changed to an IllegalArgumentException and should include a message.

Suggested change
throw new RuntimeException(e);
throw new IllegalArgumentException("Invalid Broker URI", e);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

logger.info("Closing client");
mqttClient.close();
} catch (Exception e) {
logger.error("Error closing MQTT client due to {}", new Object[]{e.getMessage()}, e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The message could be removed along with the Object array wrapper, since the stack trace includes the message.

Suggested change
logger.error("Error closing MQTT client due to {}", new Object[]{e.getMessage()}, e);
logger.error("Error closing MQTT client", e);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.


final PropertyValue sslProp = context.getProperty(PROP_SSL_CONTEXT_SERVICE);
if (sslProp.isSet()) {
clientProperties.setSslContextService((SSLContextService) sslProp.asControllerService());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in the Client Properties, this can be adjusted to call createTlsConfiguration() to pass the properties.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

@nandorsoma
Copy link
Contributor Author

Thank you for your additional review @exceptionfactory! Please see my latest commit!

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for making the adjustments @nandorsoma!

I noted two more minor style improvements, but I am marking this as approved.

Any additional feedback @tpalfy or @turcsanyip? Feel free to merge if it looks good.

.validate(subject, input, context);
}
private static String getSupportedSchemeList() {
return String.join(", ", Arrays.stream(MqttProtocolScheme.values()).map(value -> value.name().toLowerCase()).toArray(String[]::new));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be modified to use Collectors.joining():

Suggested change
return String.join(", ", Arrays.stream(MqttProtocolScheme.values()).map(value -> value.name().toLowerCase()).toArray(String[]::new));
return Arrays.stream(MqttProtocolScheme.values())
.map(value -> value.name().toLowerCase())
.collect(Collectors.joining(", ");


final PropertyValue sslProp = context.getProperty(PROP_SSL_CONTEXT_SERVICE);
if (sslProp.isSet()) {
final SSLContextService sslContextService = (SSLContextService) sslProp.asControllerService();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be changed to pass the reference class to asControllerService():

Suggested change
final SSLContextService sslContextService = (SSLContextService) sslProp.asControllerService();
final SSLContextService sslContextService = sslProp.asControllerService(SSLContextService.class);

@turcsanyip
Copy link
Contributor

@exceptionfactory Thanks for the review! Your latest minor suggestions will be addressed in an upcoming PR (NIFI-10411) in order to avoid the extra build cycle here.

Merging to main...

@asfgit asfgit closed this in 4d4a5ca Aug 30, 2022
emiliosetiadarma pushed a commit to emiliosetiadarma/nifi that referenced this pull request Aug 31, 2022
This closes apache#6225.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
p-kimberley pushed a commit to p-kimberley/nifi that referenced this pull request Oct 15, 2022
This closes apache#6225.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants