APEXMALHAR-2506 Added the monitoring to Kafka Consumer threads, if on… #633
Conversation
@@ -90,6 +92,19 @@ | |||
|
|||
private boolean waitForReplay = false; | |||
|
|||
final List<Future<?>> kafkaConsumreThreads = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix typo in kafkaConsumerThreads
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
||
public boolean areKafkaThreadsAreRunning() | ||
{ | ||
for (Future<?> future : kafkaConsumreThreads) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not directly related to this PR, but it does not seem right to use execution service for never ending tasks. It may lead to resource starvation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can have a separate discussion about. How do you want to it?
@@ -90,6 +92,19 @@ | |||
|
|||
private boolean waitForReplay = false; | |||
|
|||
private final List<Future<?>> kafkaConsumerThreads = new ArrayList<>(); | |||
|
|||
public boolean doesAnyKafkaReaderThreadDied() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does -> has
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -255,6 +255,10 @@ public void emitTuples() | |||
} | |||
} | |||
emitCount += count; | |||
|
|||
if (!consumerWrapper.doesAnyKafkaReaderThreadDied()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may not be the correct handling for all situations. What happens, for example, on partition or lead broker change? Or a transient connection issue? Downstream resets may be very costly and should only be triggered when really necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If any of these thread dies because of the exception, how to handle that cleanly?
{ | ||
for (Future<?> future : kafkaConsumerThreads) { | ||
if (future.isDone() || future.isCancelled()) { | ||
return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You have flipped true/false and also need to fix the check when calling this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed it.
Check if there are conditions under which the consumer thread exits that is not an error but expected (such as partition change). |
…e data from Kafka, if those threads die, operator thread will detect and throw an exception
There is no exit strategy for consumer thread, the assumption is that they are running forever. |
@tweise I have addressed all the review comments. Can you please review and merge the PR? |
What testing was done? (Please document in the JIRA.) |
@tweise I have documented my observation. This fix was done to address an intermittent issue. |
…e of them dies then the Kafka operator is killed so that it can recover from the proper state.
@PramodSSImmaneni please review.