Skip to content

Conversation

@olegz
Copy link
Contributor

@olegz olegz commented Nov 23, 2015

Due to the fact that current component uses artificial names for properties set via UI and then maps those properties to the actual names used by Kafka, we can not rely on NiFi UI to display an error if user attempts to set a dynamic property which will eventually map to the same Kafka property. So, I’ve decided that any dynamic property will simply override an existing property with WARNING message displayed. It is actually consistent with how Kafka does it and displayed the overrides in the console. Updated the relevant annotation description.
It is also worth to mentioned that current code was using an old property from Kafka 0.7 (“zk.connectiontimeout.ms”) which is no longer present in Kafka 0.8 (WARN Timer-Driven Process Thread-7 utils.VerifiableProperties:83 - Property zk.connectiontimeout.ms is not valid). The add/override strategy would provide for more flexibility when dealing with Kafka volatile configuration until things will settle down and we can get some sensible defaults in place.

While doing it addressed the following issues that were discovered while making modification and testing:
ISSUE: When GetKafka started and there are no messages in Kafka topic the onTrigger(..) method would block due to the fact that Kafka’s ConsumerIterator.hasNext() blocks. When attempt was made to stop GetKafka would stops successfully due to the interrupt. However in UI it would appear as ERROR based on the fact that InterruptException was not handled.
RESOLUTION: After discussing it with @markap14 the the general desire is to let the task exit as quick as possible and that the whole thread maintenance logic was there initially due to the fact that there was no way to tell Kafka consumer to return immediately if there are no events. In this patch we are now using ‘consumer.timeout.ms’ property of Kafka and setting its value to 1 millisecond (default is -1 - always block infinitely). This ensures that tasks that attempted to read an empty topic will exit immediately just to be rescheduled by NiFi based on user configurations.

ISSUE: Kafka would not release FlowFile with events if it didn’t have enough to complete the batch since it would block waiting for more messages (based on the blocking issue described above).
RESOLUTION: The invocation of hasNext() results in Kafka’s ConsumerTimeoutException which is handled in the catch block where the FlowFile with partial batch will be released to success. Not sure if we need to put a WARN message. In fact in my opinion we should not as it may create unnecessary confusion.

ISSUE: When configuring a consumer for topic and specifying multiple concurrent consumers in ‘topicCountMap’ based on 'context.getMaxConcurrentTasks()’ each consumer would bind to a topic partition. If you have less partitions then the value returned by 'context.getMaxConcurrentTasks()’ you would essentially allocate Kafka resources that would never get a chance to receive a single message (see more here https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example).
RESOLUTION: Logic was added to determine the amount of partitions for a topic and in the event where 'context.getMaxConcurrentTasks()’ value is greater than the amount of partitions, the partition count will be used to when creating ‘topicCountMap’ and WARNING message will be displayed)see code). Unfortunately we can’t do anything with the actual tasks, but based on current state of the code they will exit immediately just to be rescheduled where the process will repeat. NOTE: That is not ideal as it will be rescheduling tasks that will never have a chance to do anything, but at least it could be fixed on the user side after reading the warning message.

@olegz
Copy link
Contributor Author

olegz commented Nov 23, 2015

@trkurc @markap14 @joewitt
Guys, this one is strictly for review as I am still working on adding the same dynamic properties logic for PutKafka. But since it contains somewhat significant changes, it would be worth taking a look. Commit message has all the details.
Cheers

@olegz olegz force-pushed the NIFI-1192B branch 2 times, most recently from b7c6ab9 to 1bad5e4 Compare November 24, 2015 01:27
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@olegz this line is 260 characters; fails contrib-check. Can you break that line apart?

@markap14
Copy link
Contributor

@olegz I left a handful of comments in-line. Otherwise, looks great! Very happy to have someone else digging into these Processors and the code is certainly looking cleaner now that you've refactored it a bit to use the newer client. Thanks!!

@olegz
Copy link
Contributor Author

olegz commented Nov 24, 2015

Thanks @markap14! indeed it's a bit dirty and I have to polish it for style-check, comments etc. As I mentioned in commit message somewhere this was primarily for initial review to ensure that someone else can take a quick look and make sure I didn't go off the rails here.

Due to the fact that current component uses artificial names for properties set via UI and then maps those properties to the actual names used by Kafka, we can not rely on NiFi UI to display an error if user attempts to set a dynamic property which will eventually map to the same Kafka property. So, I’ve decided that any dynamic property will simply override an existing property with WARNING message displayed. It is actually consistent with how Kafka does it and displayed the overrides in the console. Updated the relevant annotation description.
It is also worth to mentioned that current code was using an old property from Kafka 0.7 (“zk.connectiontimeout.ms”) which is no longer present in Kafka 0.8 (WARN Timer-Driven Process Thread-7 utils.VerifiableProperties:83 - Property zk.connectiontimeout.ms is not valid). The add/override strategy would provide for more flexibility when dealing with Kafka volatile configuration until things will settle down and we can get some sensible defaults in place.

While doing it addressed the following issues that were discovered while making modification and testing:
ISSUE: When GetKafka started and there are no messages in Kafka topic the onTrigger(..) method would block due to the fact that Kafka’s ConsumerIterator.hasNext() blocks. When attempt was made to stop GetKafka would stops successfully due to the interrupt. However in UI it would appear as ERROR based on the fact that InterruptException was not handled.
RESOLUTION: After discussing it with @markap14 the the general desire is to let the task exit as quick as possible and that the whole thread maintenance logic was there initially due to the fact that there was no way to tell Kafka consumer to return immediately if there are no events. In this patch we are now using ‘consumer.timeout.ms’ property of Kafka and setting its value to 1 millisecond (default is -1 - always block infinitely). This ensures that tasks that attempted to read an empty topic will exit immediately just to be rescheduled by NiFi based on user configurations.

ISSUE:  Kafka would not release FlowFile with events if it didn’t have enough to complete the batch since it would block waiting for more messages (based on the blocking issue described above).
RESOLUTION: The invocation of hasNext() results in Kafka’s ConsumerTimeoutException which is handled in the catch block where the FlowFile with partial batch will be released to success. Not sure if we need to put a WARN message. In fact in my opinion we should not as it may create unnecessary confusion.

ISSUE: When configuring a consumer for topic and specifying multiple concurrent consumers in ‘topicCountMap’ based on 'context.getMaxConcurrentTasks()’ each consumer would bind to a topic partition. If you have less partitions then the value returned by 'context.getMaxConcurrentTasks()’ you would essentially allocate Kafka resources that would never get a chance to receive a single message  (see more here https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example).
RESOLUTION: Logic was added to determine the amount of partitions for a topic and in the event where 'context.getMaxConcurrentTasks()’ value is greater than the amount of partitions, the partition count will be used to when creating ‘topicCountMap’ and WARNING message will be displayed)see code). Unfortunately we can’t do anything with the actual tasks, but based on current state of the code they will exit immediately just to be rescheduled where the process will repeat. NOTE: That is not ideal as it will be rescheduling tasks that will never have a chance to do anything, but at least it could be fixed on the user side after reading the warning message.

NIFI-1192 added dynamic properties support for PutKafka

NIFI-1192 polishing

NIFI-1192 polished and addressed PR comments
@olegz
Copy link
Contributor Author

olegz commented Nov 24, 2015

@markap14 @trkurc The PR comments were addressed. I was hoping to get Kafka embedded server with this commit as well (for testing), but so far it doesn't appear to be very stable. We can do it by the next release.

@asfgit asfgit merged commit d949ee1 into apache:master Nov 25, 2015
@olegz olegz deleted the NIFI-1192B branch April 5, 2016 12:37
mattyb149 pushed a commit to mattyb149/nifi that referenced this pull request Dec 9, 2020
This closes apache#131

Signed-off-by: Jeremy Dyer <jeremydyer@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants