NIFI-6597 Azure Event Hub Version Update#3676
NIFI-6597 Azure Event Hub Version Update#3676mysunnytime wants to merge 13 commits intoapache:masterfrom
Conversation
pvillard31
left a comment
There was a problem hiding this comment.
Hi @mysunnytime,
Can you check the unit tests?
[ERROR] Tests run: 7, Failures: 2, Errors: 0, Skipped: 0, Time elapsed: 0.259 s <<< FAILURE! - in org.apache.nifi.processors.azure.eventhub.GetAzureEventHubTest
[ERROR] testNormalFlow(org.apache.nifi.processors.azure.eventhub.GetAzureEventHubTest) Time elapsed: 0.067 s <<< FAILURE!
java.lang.AssertionError: java.lang.NullPointerException
at org.apache.nifi.processors.azure.eventhub.GetAzureEventHubTest.testNormalFlow(GetAzureEventHubTest.java:120)
Caused by: java.lang.NullPointerException
[ERROR] testNoPartitions(org.apache.nifi.processors.azure.eventhub.GetAzureEventHubTest) Time elapsed: 0.015 s <<< FAILURE!
java.lang.AssertionError: Could not invoke methods annotated with @OnStopped annotation due to: java.lang.reflect.InvocationTargetException
at org.apache.nifi.processors.azure.eventhub.GetAzureEventHubTest.testNoPartitions(GetAzureEventHubTest.java:94)
aba5056 to
af53955
Compare
# Conflicts: # nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
1732aaf to
7f6d4c8
Compare
|
@pvillard31 Can you take another look at the change? I believe you comments are addressed |
| .description("A timestamp (ISO-8061 Instant) formatted as YYYY-MM-DDThhmmss.sssZ (2016-01-01T01:01:01.000Z) from which messages " | ||
| + "should have been enqueued in the EventHub to start reading from") | ||
| .addValidator(StandardValidators.ISO8061_INSTANT_VALIDATOR) | ||
| .addValidator(StandardValidators.ISO8601_INSTANT_VALIDATOR) |
There was a problem hiding this comment.
Why are you doing this change? The description says we expect ISO-8061.
There was a problem hiding this comment.
8061 is a deprecated validators that looks like it was a typo (595835f) - it should be 8601. I missed updating the description. Will fix that.
|
@pvillard31 Could you review again please? |
Thanks :) |
|
@joewitt Could you check again? I believe your concerns are addressed :) |
|
@mysunnytime can you please describe what testing/verification has been done for this? |
| try { | ||
| eventHubClient = EventHubClient.createFromConnectionString(connectionString).get(); | ||
| } catch (InterruptedException | ExecutionException | IOException | ServiceBusException e) { | ||
| EventHubClientImpl.USER_AGENT = "ApacheNifi-azureeventhub/2.3.2"; |
There was a problem hiding this comment.
@mysunnytime ApacheNiFi (not the capitalized F). I realize that seems unimportant but either it should be all lowercase or it should follow convention which is NiFi (not Nifi). So either do 'apachenifi-azureeventhub/2.3.2" or do 'ApacheNiFi-azureeventhub/2.3.2' . I prefer the NiFi convention but either is fine.
There was a problem hiding this comment.
Cool! Thanks for pointing out. I'll fix that.
| final String connectionString = new ConnectionStringBuilder(new URI("amqps://"+namespace+serviceBusEndpoint), eventHubName, policyName, policyKey).toString(); | ||
| setupReceiver(connectionString); | ||
| final int numThreads = context.getMaxConcurrentTasks(); | ||
| executor = Executors.newScheduledThreadPool(numThreads); |
There was a problem hiding this comment.
We should avoid setting up an executor here at all if we can. NiFi processor API fully supports having multiple tasks/threads for a given processor and already manages the lifecycle correctly and has its own thread pool users can set important configurations of. Further, we should not do such things in onScheduled. If such a resource must be created it should be done so lazily on demand on the onTrigger method as the favored pattern.
There was a problem hiding this comment.
We have to use executor because in the new api we need ScheduledExecutorService to create EventHubClient. But I'm not sure whether we should create the ScheduledExecutorService in onScheduled or onTrigger.
My understanding is that in onScheduled, where we prepare the processor to be ready to work, we set up EventHubClient here, and now we need to create this EventHubClient with a ScheduledExecutorService, and in onTrigger, we use EventHubClient to receive msg or send msg. So before we set up EventHubClient, we need to have a ScheduledExecutorService. So it should be in the same (or earlier) place. Also we don't want the ScheduledExecutorService to be created everytime when we wanna send/receive msg (i.e., in onTrigger), so I think it should still be in onScheduled.
Please tell me what you think, and correct me if I understand onSchedule and onTrigger incorrectly.
There was a problem hiding this comment.
@mysunnytime it is fine to use the onScheduled to create the necessary connections to Azure event hubs. For pushing to event hubs it is probably better to use lazy instantiation but in the consume case eager makes more sense and the lifecycle hook of onScheduled is fine.
As far as NiFi and thread pooling yes NiFi has a thread pool. Basically the whole flow controller is a thread pool. It isn't generally desired for us to create threads via a new thread pool and give it to some library such as the event hubs sdk. Rather we'd like to bring a thread to the library to do some meaningful work (fetch data) then process it and repeat. That allows us to have control over the threads, share them as necessary for splitting work, etc.. while allowing the user to control overall thread count in the system.
Are you certain the only API for leveraging the event hubs SDK is one where we have to hand it over a pool of threads it gets to own then we offer event handlers as data arrives? How would back-pressure work in that case? We dont want to keep getting data when we're not ready to process it... We much prefer a model where we can use our thread to go grab data available and ready and then go do other work and come back.
There was a problem hiding this comment.
Regarding the thread pool: AMQP is not a request-response protocol. Any AMQP implementation requires the ability to process and generate network activity asynchronously from anything else going on in the application, so it can't just execute on the thread that calls send or receive. Executing on a thread pool supplied by the application is how we allow the application to have control over the thread count.
As for back pressure, that's pretty simple. There is a local buffer (the "prefetch" buffer) of a user-controlled size, and the asynchronous side of the client places messages coming down from the service into that buffer, using AMQP's flow-control mechanisms to cut off the flow when the buffer is full. As the application consumes messages out of the prefetch buffer, the async side uses those flow-control mechanisms to indicate to the service that additional capacity has become available.
There was a problem hiding this comment.
That context for Azure Event Hubs is helpful. We still would generally not want to hand off a thread pool to another library unless absolutely necessary. As we have threads available we would want to offer them to do necessary 'Event Hub interaction' and 'process received data actions' all under our control. Based on your response though it sounds like the provided threadpool approach is the API offered by event hubs at this point. OK.
| executor = Executors.newScheduledThreadPool(numThreads); | ||
| for (int i = 0; i < numThreads; i++) { | ||
| final EventHubClient client = createEventHubClient(namespace, eventHubName, policyName, policyKey); | ||
| final EventHubClient client = createEventHubClient(namespace, eventHubName, policyName, policyKey, executor); |
There was a problem hiding this comment.
the use of this executor here too gives me pause. I'm not sure we're using the nifi api correctly here if we're doing this.
There are three unit test for the three processors (PutAzureEventHub, GetAzureEventHub, and ConsumeAzureEventHut). They all passes. For this pr, we just upgraded the Event Hub api version and did not change the functionality, so no new test added. We also test the three components from the Nifi GUI to send/receive events to event hub. Before our updates, processors initialization will fail with ServiceBusException: TransportException (because it uses a really old version of Event Hubs Java client, which uses a really old version of Proton-J, which exhibits this problems when used with current versions of the Java runtime). After the api upgrade, the three components work as expected. Send/receive was successful. ApacheNiFi-eventhub version is also successfully send for us to better understand our customer and provide support. |
|
ok thanks - glad to hear it regarding the testing. I think my only remaining concern for the moment then is the thread pools being created |
| eventHubClient.closeSync(); | ||
| } | ||
| } catch (final ServiceBusException e) { | ||
| executor.shutdown(); |
There was a problem hiding this comment.
if event hub client close throws exception this will be skipped. it should be in its own block
| final String connectionString = new ConnectionStringBuilder(new URI("amqps://"+namespace+serviceBusEndpoint), eventHubName, policyName, policyKey).toString(); | ||
| setupReceiver(connectionString); | ||
| final int numThreads = context.getMaxConcurrentTasks(); | ||
| executor = Executors.newScheduledThreadPool(numThreads); |
There was a problem hiding this comment.
Consider having a user defined property for how many threads to give the thread pool that event hubs client uses. Having it be the same as the number of threads NiFi uses means double of everything. For most cases this would be unnecessary. Alternatively this could be added later but good documentation should be provided to the user to understand how the resources are being used. We offer processor annotations for this which cause this to show up to the end user so they can reason over their selections.
There was a problem hiding this comment.
Great! It's good to let the user specify the thread number. We prefer to add it later in next pr. And for now, instead of using the context.getMaxConcurrentTasks() that doubles the threads, we can use a default thread number of 4, which will be enough for most cases. We will add description to explaination about the thread pool.
| @@ -70,7 +74,6 @@ | |||
| @WritesAttribute(attribute = "eventhub.partition", description = "The name of the Azure Partition from which the message was pulled") | |||
| }) | |||
| public class GetAzureEventHub extends AbstractProcessor { | |||
There was a problem hiding this comment.
this processor I suspect will be rarely used in comparison to ConsumeAzureEVentHub anyway since this doesn't utilize the record reader/writer mechanism. The other while using a single thread as currently implemented will probably offer a better user experience and performance characteristic. If necessary that one could be updated to have multiple threads operating at once. This one will have multiple threads but writing of data not being record oriented will mean overall throughput will drag. This is the same reason we went from ConsumeKafka to ConsumeKafkaRecord design. So with that in mind I think we can stress a bit less on this one. As long is it functions correctly, the thread pool is understood by the user, and is destroyed correctly - then this is prob fine
There was a problem hiding this comment.
Yes we also found that the GetAzureEventHub processor serves similar functionality as ConsumeAzureEventHub. This GetAzureEventHub processor uses EventHubClient, and can allow user to receive from specific partition id. However, currently it does not let user specify which partition to read from. Instead, it just traverse through all partitions in a round robin way, so it works same functionality as ConsumeAzureEventHub, which uses EventProcessHost. This can be a next step to improve, and provide customers with more convinient and flexible control.
… to 4 to avoid unecessary thread use in most cases.
…read pool needed for EventHubClient.
|
@mysunnytime Yes it would be great to get this incorporated. We have 366 issues already addressed and haven't done a release since April. Features/improvements/fixes get merged off of review traction. We need someone who can test/vet the changes. I gave feedback on initial findings/thoughts and I think those are addressed. If someone can validate the results and merge that is great. If for any reason 1.10.0 RC1 fails and this gets into master in the meantime I'll grab it then. Else, we can grab it whenever it is in and the next release comes along. That the current approach isn't working is unpleasant. But we have 286+ components in the build by default. We don't have the infrastructure in apache land to vet all those extension points at all times for regressions. Doubly hard when such testing would hit public cloud services with billing implications or vendor tools with licenses/etc.. |
|
@mysunnytime thanks for the contribution and sticking with us as we worked through this! I was able to test it against Azure EventHubs. Given Joe's comments above, I went ahead and merged to master. Thanks again! |
NIFI-6597 - Upgrade Azure Event Hub Version from current 0.14.4 (outdated) to the newest 2.3.2 and EPH 2.5.2.