Skip to content

NIFI-1002: Added WebSocket support.#1184

Closed
ijokarumawak wants to merge 3 commits intoapache:masterfrom
ijokarumawak:nifi-1002
Closed

NIFI-1002: Added WebSocket support.#1184
ijokarumawak wants to merge 3 commits intoapache:masterfrom
ijokarumawak:nifi-1002

Conversation

@ijokarumawak
Copy link
Member

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

For all changes:

  • Is there a JIRA ticket associated with this PR? Is it referenced
    in the commit message?

  • Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.

  • Has your PR been rebased against the latest commit within the target branch (typically master)?

  • Is your initial contribution a single, squashed commit?

For code changes:

  • Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
  • Have you written or updated unit tests to verify your changes?
  • If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?
  • If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
  • If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
  • If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?

For documentation related changes:

  • Have you ensured that format looks appropriate for the output in which it is rendered?

Note:

Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.

@ijokarumawak
Copy link
Member Author

Hello NiFi developers,

This PR contains a new set of WebSocket support components.

At least there're two WebSocket processors available on GitHub already, nifi-websocket by @xmlking , and nifi-websockets-bundle by @acesir. I appreciate these projects providing WebSocket support for NiFi, and wanted to merge the capability to Apache NiFi code base. I referred these existing projects, and add few functionalities and Unit Test codes.

Features:

  • As a WebSocket client to connect a remote WebSocket server
  • As a WebSocket server to receive connection from remote WebSocket clients
  • Receive and send text messages as well as binary messages
  • Secure communication (using SslContextService) is supported

It may look more complex, so for the description purpose, I wrote a blog post about how it works, and how it's designed. Please check this post as well, NiFi WebSocket Support.

Any comment or advice would be appreciated, thanks in advance!

import java.util.ArrayList;
import java.util.List;

public abstract class AbstractWebSocketProcessor extends AbstractSessionFactoryProcessor {
Copy link
Contributor

@olegz olegz Nov 9, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why this class extends from _ AbstractSessionFactoryProcessor_ and not AbstractProcessor? Just trying to figure out the need for _ onTrigger()_ implementation that is identical to the one in AbstractProcessor.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@olegz The reason why it extends AbstractSessionFactoryProcessor is to capture `processSessionFactory instance. So that it can create a session at onStopped() to flush incoming WebSocket messages those only exist on memory. I followed the pattern of AbstractMQTTProcessor.


static List<PropertyDescriptor> getAbstractPropertyDescriptors(){
final List<PropertyDescriptor> descriptors = new ArrayList<>();
return descriptors;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I understand this code

  1. For each call you create a new List
  2. It's always empty
    Could you please clarify?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@olegz As you suspected, it's not required, and it's there simply because I ported the basic code structure from MQTT processors. This method was used for sharing common property descriptors. However, since there's no such properties for now, it can be removed. I will remove this.

}

protected ComponentLog logger;
protected ProcessSessionFactory processSessionFactory;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there is a potential for the above variables to be accessed by different thread, there are thread visibility concerns. Consider making them volatile.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added volatile keyword to these variables.

@ijokarumawak
Copy link
Member Author

@olegz Thanks for taking time to review! I added a commit to address your feedback. Also, I've added displayName to the property descriptors those were added in this PR.

servletHandler.addServletWithMapping(JettyWebSocketServlet.class, "/*");

// Need to specify classloader, otherwise since the callstack doesn't have any nifi specific class, so it can't use nar.
try (NarCloseable closeable = NarCloseable.withComponentNarLoader(WebSocketServerFactory.class)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This no longer compiles after recent work on dynamic class loading by @bbende

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Oleg, I removed the use of NarCloseable and confirmed that JettyWebSocketServer works without this now.

.defaultValue("10000")
.build();

private volatile LinkedBlockingQueue<WebSocketMessage> incomingMessageQueue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This particular issue keeps coming up. Basically the pattern of maintaining an internal queue for messages may lead to OOM exception as well as lost data if processor crashes while the queue still has data. This is due to the fact that you are essentially integrating with external scheduling mechanism
One way of mitigating this is to rely on such mechanism for distributing messages while essentially ignoring our schedule. You can look at the example of it in SMTP processor

public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {

While may not be ideal, it certainly paves a simpler path for future improvement when we get a chance to define an Event-driver Processor abstraction as a first class component of NiFi.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for bringing this up. I had been worrying about this queue, too. To make it simpler, I removed the intermediate queue, and let incoming messages to be transferred to NiFi relationships.

- Reflecting review comments
- Added displayName to peroperty descriptors
- Reflecting review comments:
  - Removed unnecessary use of NarCloseable.withComponentNarLoader.
  - Removed intermediate on memory queue to make it simpler and more
    robust. Received messages in WebSocket layer now will be transferred
to downstream relationships directly.
@ijokarumawak
Copy link
Member Author

@olegz Thanks again for the great review! I rebased the PR and done additional refactoring based on your feedback. Please let me know if you prefer it to be squashed for further review process.

@olegz
Copy link
Contributor

olegz commented Nov 15, 2016

Koji, this is great and I'll be looking at it shortly with hopes of merging it some time today.

@olegz
Copy link
Contributor

olegz commented Nov 16, 2016

@ijokarumawak at this point I am going to say Great work! I am sure in month to come there will be things ti improve as people start using it , but LGTM for now. Will merge shortly!

@asfgit asfgit closed this in 26a5881 Nov 16, 2016
@ijokarumawak
Copy link
Member Author

@olegz Thank you very much! I hope it will be helpful for users and willing to improve it over time!

dstreev pushed a commit to dstreev/nifi-1 that referenced this pull request Dec 9, 2016
NIFI-1002: Added WebSocket support.

- Reflecting review comments
- Added displayName to peroperty descriptors

NIFI-1002: Added WebSocket support.

This closes apache#1184

- Reflecting review comments:
  - Removed unnecessary use of NarCloseable.withComponentNarLoader.
  - Removed intermediate on memory queue to make it simpler and more
    robust. Received messages in WebSocket layer now will be transferred
to downstream relationships directly.
dstreev pushed a commit to dstreev/nifi-1 that referenced this pull request Dec 9, 2016
NIFI-1002: Added WebSocket support.

- Reflecting review comments
- Added displayName to peroperty descriptors

NIFI-1002: Added WebSocket support.

This closes apache#1184

- Reflecting review comments:
  - Removed unnecessary use of NarCloseable.withComponentNarLoader.
  - Removed intermediate on memory queue to make it simpler and more
    robust. Received messages in WebSocket layer now will be transferred
to downstream relationships directly.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants