Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-9303] [kafka] Adding support for unassign dynamically partitions from kafka consumer when they become unavailable #5991

Closed

Conversation

EAlexRojas
Copy link
Contributor

What is the purpose of the change

This pull request add an option on the kafka consumer to check for unavailable partitions and unassign them from the consumer. That way the consumer does not request for records on invalid partitions and prevent Logs noises.

Brief change log

  • Modify the partition discovery system to check not only new partitions, but also check partitions that are no longer available.
  • Check for partitions no longer available recovered from state.
  • Add option on kafka consumer to activate this checks

Verifying this change

This change added tests and can be verified as follows:
Manually verified as follows:

  • Create a job with a kafka consumer listening to a topic pattern and having partition discovery activated and the property introduced in this PR set to true.
  • Configure Kafka to have set the following properties:
    delete.topic.enable=true
    auto.create.topics.enable=false
  • Create some topics matching the pattern.
  • Run the job.
  • While running, remove some of the topics.
  • Verify the partitions are unassigned and the job continue running without Log noises.

I guess this can be tested with e2e tests, but I'm not familiarised with the system in place

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (JavaDocs)

@@ -221,7 +221,8 @@ private FlinkKafkaConsumer08(
getLong(
checkNotNull(props, "props"),
KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED),
!getBoolean(props, KEY_DISABLE_METRICS, false));
!getBoolean(props, KEY_DISABLE_METRICS, false),
getBoolean(props, KEY_CHECK_UNAVAILABLE_TOPICS, false));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be named KEY_CHECK_UNAVAILABLE_PARTITIONS ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I'll change it

@@ -80,6 +82,9 @@
/** The queue of unassigned partitions that we need to assign to the Kafka consumer. */
private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue;

/** The list of partitions to be removed from kafka consumer. */
private final List<TopicPartition> partitionsToBeRemoved;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be Set to facilitate fast lookup ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, a Set should be better for all the calls to the contains() method.

void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> newPartitions) throws Exception {
if (newPartitions.size() == 0) {
void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> newPartitions, List<TopicPartition> partitionsToBeRemoved) throws Exception {
if (newPartitions.size() == 0 && partitionsToBeRemoved.isEmpty()) {
Copy link
Contributor

@tedyu tedyu May 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

size() == 0 -> isEmpty

…ns from kafka consumer when they become unavailable

- Check for unavailable partitions recovered from state
- Using kafka consumer option to activate this validations
@EAlexRojas EAlexRojas force-pushed the kafka-unassign-partitions-fix branch from a17d0dc to a7f23f9 Compare May 22, 2018 08:15
@EAlexRojas
Copy link
Contributor Author

PR updated taking into account comments

Copy link
Contributor

@tzulitai tzulitai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @EAlexRojas.
I had a shallow review, with some questions on higher-level design aspects of the change. Please let me know what you think.

@@ -235,7 +243,8 @@ public FlinkKafkaConsumerBase(
Pattern topicPattern,
KeyedDeserializationSchema<T> deserializer,
long discoveryIntervalMillis,
boolean useMetrics) {
boolean useMetrics,
boolean checkUnavailablePartitions) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we want this to be configurable? Is there any case that we would prefer to leave them untouched?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did it in that way only because this is something new, so I though that maybe you may want it to be configurable. But you are right I cannot think of a case we would prefer to keep the unavailable partitions.
I'll update the PR to make it the default behaviour if it's ok for you.

@@ -374,8 +385,8 @@ void setOffsetsToCommit(
* <p>This method is exposed for testing purposes.
*/
@VisibleForTesting
void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> newPartitions) throws Exception {
if (newPartitions.size() == 0) {
void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> newPartitions, Set<TopicPartition> partitionsToBeRemoved) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the feeling that this method is way too complex now, to a point that it might make more sense to break this up into 2 different methods - addPartitionsToAssignment and removePartitionsFromAssignment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I though about it, but my only concern is the case where we'd have both, partitions to add and partitions to remove...
the consumerCallBridge.assignPartitions() takes the whole new list of partitions, so in that case, we would need to wait for the first assignment (e.g. add new partitions) before doing the second assignment (e.g. remove partitions) in order to have a consistent list of partitions.
I think we would try to have only one call to consumerCallBridge.assignPartitions().

Maybe I could refactor the part where partitions are removed from old partitions to a separate private method like removeFromOldPartitions() ?

What do you think ?

@@ -80,6 +83,9 @@
/** The queue of unassigned partitions that we need to assign to the Kafka consumer. */
private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue;

/** The list of partitions to be removed from kafka consumer. */
private final Set<TopicPartition> partitionsToBeRemoved;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it actually make more sense that we have a queue for this? Like how we are handling unassigned new partitions via the unassignedPartitionsQueue. The fact that this is a set means that we will need to eventually remove entries from it anyways.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my understanding, for unassigned partitions we can use a Queue because it does not matter which consumer will take the new partitions.
But we can not use a Queue for partitions to be removed because we only can remove the partitions from the consumer that is actually subscribed to that partition.
Does that make sense ?

@@ -240,7 +249,9 @@ public void run() {
newPartitions = unassignedPartitionsQueue.getBatchBlocking();
}
if (newPartitions != null) {
reassignPartitions(newPartitions);
reassignPartitions(newPartitions, new HashSet<>());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized this should be actually
reassignPartitions(newPartitions, partitionsToBeRemoved);

@aljoscha
Copy link
Contributor

aljoscha commented Jul 9, 2020

I'm closing this as "Abandoned", since there is no more activity and the code base has moved on quite a bit. Please re-open this if you feel otherwise and work should continue.

@aljoscha aljoscha closed this Jul 9, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants