New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
NIFI-4914: Add Apache Pulsar processors #3178
Conversation
@david-streamlio i had comments on the previous PR. Can you please review those/pull them forward to this. In the future you could rebase, squash, and force push the PR if you want to get back to a clean state but the history could all stay on the same PR. It helps both you and reviewer. I'd love to help you get this thing in but it is a bit elusive to get the timing right between contrib, review cycles, pr resets, etc.. |
@joewitt I have incorporated all of the changes / corrections that came from both your comments as well as @pvillard31's changes from September 21st. I am more of an (git idiot) than a git expert so the last two times I tried to rebase, squash, etc. I ended up either in a state where the PR wouldn't merge, or I had brought in several hundred commits from other people, or the branch was rejected, etc. Since this module is self-contained, I found it easier to just start with a fresh branch off of the latest master, so I have been doing that for the 1.6.x, 1.7.x, 1.8.x, and now 1.9.x release. |
just did a full clean build with 6 threads on an older macbook and no problems. i'll try to reproduce my build issue from yesterday on a much higher thread build/machine and advise if issue i saw remains |
@joewitt I dug into the build logs and found that the issue with the parallel build was due to having the wrong version for both of the Pulsar NAR files defined in the NiFi-assembly/pom.xml file. Which I fixed and pushed in the second commit. |
cool thanks. can you please take a look at my comment regarding the security related property. |
import java.util.LinkedHashMap; | ||
import java.util.Map; | ||
|
||
public class LRUCache<K, V extends Closeable> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@david-streamlio can you confirm this is a uniquely written class for this purpose and not taken from elsewhere? I ask because I've had the displeasure of finding copied code before and it makes for some build/release messes and this is the type of thing that often hits that trigger. This is fresh/clean room stuff?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@joewitt The LRU Cache code wasn't copied from anywhere, and is just my naive implementation of the LRU Cache. Most java-based examples I see utilize a LinkedHashMap and I do not.
I do appreciate your need to ensure 100% Apache compliance with the code, so I can provide an implementation of the LRU cache based on the
https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/map/LRUMap.html class instead, if you think that would be better/safer from a licensing perspective
...-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java
Outdated
Show resolved
Hide resolved
...-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java
Outdated
Show resolved
Hide resolved
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/pom.xml
Outdated
Show resolved
Hide resolved
.name("BATCHING_MAX_MESSAGES") | ||
.displayName("Batching Max Messages") | ||
.description("Set the maximum number of messages permitted in a batch. default: " | ||
+ "1000 If set to a value greater than 1, messages will be queued until this " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'd recommend not doing any queuing until this value is reached. Rather use this as a hard limit for how many go in a batch but the min that goes should be the max this is available/waiting/ready at a given time. Otherwise you're introducing false latency
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All of the "batching" specified by these properties actually occurs within the Pulsar client itself, and are not kept in NiFi anywhere. This property merely exposes that setting to the end user if they wish to use that feature to achieve better write throughput to Pulsar.
if (value != null && value.length > 0) { | ||
FlowFile flowFile = session.create(); | ||
flowFile = session.write(flowFile, out -> { | ||
out.write(value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
performance of a single flowfile per poll cycle will be poor relative to doing batched reads. I am guessing pulsar pulls more than a single message at a time too. We have the demarcator approach for the kafka processors and performance there is extremely high because then we're aligning nifi and the messaging systems strengths. That said, the record processors are an even better path for this so maybe dont worry about that for now. not sure.
getAckService().submit(new Callable<Object>() { | ||
@Override | ||
public Object call() throws Exception { | ||
return consumer.acknowledgeAsync(msg).get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we only pulling a single message from pulsar each time? We should pull a ton of them...and write out the records.. It looks liket his is a single pulsar message but then we try to extract multiple records from it. That is a fine path but the performant path will be to have many messages turned into a large set of records that go in a single flow file and repeat.
build on large machine now has a unit test failure [ERROR] Failures: |
the output log had "14:11:44.805 [pool-67-thread-1] ERROR org.apache.nifi.processors.pulsar.pubsub.PublishPulsarRecord - PublishPulsarRecord[id=9c298cea-0e2a-4a59-8864-92e95563d7f4] Unable to publish to topic " not sure if that is from the same test or not |
@joewitt What was the command you used to generate the error above? I want to attempt to reproduce it locally if possible. The basic issue is that since this test message was sent asynchronously, there is a lag between when it is processed (and throws the error), moved to the failureQueue, and then eventually routed to the FAILURE relationship. For my local tests, I had to introduce a lag in the process by setting the number of iterations on the run command to 100. Apparently, this value needs to be larger for a parallel build, so I will increase the number of iterations to 5K. But I would like to test it using the values you used to ensure that value is sufficient |
Hopefully nifi will support pulsar. |
@joewitt We seem to be stalled on this PR. Can you please share the maven command you used to generate the error above? I want to attempt to reproduce it locally if possible and resolve art so we can get this merged into the master branch |
@david-streamlio Might be a good idea to release this as your own bundle. We're starting to hit a point where the PMC and committers are struggling to keep up in terms of expertise with the diversity of thing being offered for official inclusion. I recall this being something that your company wants to push to support its own offerings, so I think it might make sense to have Streamlio put it up on their github repo and we can help advertise it to the NiFi community. (Also, FYI, Apache Infra is starting to balk at giving us more storage space on mirrors because our tarball is I think now over 1.6GB) If you want help converting it into a standalone product that Streamlio can support, I can give you some assistance there. |
def agree we are struggling to keep up. but fellow apache projects we should open arms to. influx was a commercial thing and difficult to test against. pulsar we just need time from someone.. we def cant add more nars to default build. but we can build nars and share and make easy for extension reg when ready |
@joewitt I will give you the time you need to get this over the hump. Please reach out to me directly and I will provide all the assistance you need. If NiFi is moving towards a registry for third-party extensions, then I will work with you to get this NAR bundle into that repo. |
yeah no problem david. your effort hasnt been the problem. you've done great. The Extension Registry we should be clear does not solve review bandwidth problems. It solves release independence/timing/build size problems. Reviewing artifacts for inclusion into the apache nifi community list of things to maintain, concern itself with, track security defects, etc... will continue to be a challenge. For folks motivated to get their stuff in this fact is why we recommend vendor motivated folks consider hosting their own stuff if the community cannot move fast enough for them. |
I agree. What I'm sorta wondering here is if it wouldn't be more advantageous for them to move it under Pulsar and then sync up with us later once the extension registry is ready. That way it could be released whenever they want to the NiFi community and their own users and they could iterate as aggressively as they want under their own release policies. |
hmm...i see what you mean. could be an option. david any thoughts on that? |
That seems like a great approach. We will move it under the Pulsar project and reach back out to you when your extension registry is ready |
@MikeThomsen is there any timeline for extension registry? |
There is no timeline for extension registry but I know work is actively happening. you see pieces of progress in things like being able to load multiple versions of the same component. ability to hotload components, etc.. good thing for mailing list discussion |
Thank you for submitting a contribution to Apache NiFi.
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
For all changes:
Is there a JIRA ticket associated with this PR? Is it referenced
in the commit message?
Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
Has your PR been rebased against the latest commit within the target branch (typically master)?
Is your initial contribution a single, squashed commit?
For code changes:
For documentation related changes:
Note:
Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.