Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(release-1.3) [FLINK-7143] [kafka] Fix indeterminate partition assignment in FlinkKafkaConsumer #4301

Closed
wants to merge 3 commits into from

Conversation

tzulitai
Copy link
Contributor

This PR changes the mod operation for partition assignment from i % numTasks == subtaskIndex to partition.hashCode % numTasks == subtaskIndex.

The bug was initially caused by #3378, when moving away from sorting the partition list. Apparently, the tests for partition assignment was not strict enough and did not catch this. This PR additionally adds verifications that the partitions end up in the expected subtasks, and that different partition ordering will still have the same partition assignments.

Note: a fix is not required for the master branch, since the partition discovery changes already indirectly fixed the issue. However, test coverage for deterministic assignment should likewise be improved in master as well. A separate PR will be opened for that.

@tzulitai
Copy link
Contributor Author

R: @aljoscha @rmetzger

@tzulitai tzulitai changed the title [FLINK-7143] [kafka] Fix indeterminate partition assignment in FlinkKafkaConsumer (release-1.3) [FLINK-7143] [kafka] Fix indeterminate partition assignment in FlinkKafkaConsumer Jul 11, 2017
@StephanEwen
Copy link
Contributor

I think that would fix the bug. There are two things I would like to improve, though:

  1. Relying on hashCode() makes very implicit assumptions about the behavior of the hash code implementation. This does not really document/articulate well how critical this int value that we rely on is. For example, by Java specification, hashCode may vary between processes - it only needs to be stable within a single JVM. Our hash code implementation happens to be stable currently, as long as the JDK does not change the implementation of the String hash code method (which they could in theory do in any minor release, although they have not done that in a while).

  2. It is crucial that the distribution of partitions is uniform. That is a bit harder to guarantee when all sources pick up their own set of topics. At the least, distribution should be uniform of the partitions within a topic. For example, the topic defines "where to start" in the parallel subtasks, and the partitions then go "round robin".
    Well, as it happens, this is actually the implementation of the hash code function, but again, this looks a bit like it "coincidentally" behaves like that, rather than that we have a strict contract for that behavior. For example, changing the hashCode from 31 * topic + partition to 31 * partition + topic results in non-uniform distribution, but is an equally valid hashCode.

I would suggest to have a function int assignmentIndex() or so, for which we define the above contract. We should also have tests that this distributes partitions within a single topic uniform.

@tzulitai
Copy link
Contributor Author

tzulitai commented Jul 12, 2017

@StephanEwen thanks for the review. Your suggestion makes a lot of sense.

I've fixed this up as the following:

  • Have a new method KafkaTopicPartitionAssigner.assign(KafkaTopicPartition partition, int numSubtasks) that defines a strict contract, such that when locally used by subtasks to filter out partitions, the resulting distribution of the partitions of a single topic are guaranteed to be:

    1. Uniformly distributed across subtasks
    2. Partitions are round-robin distributed (strictly CLOCKWISE w.r.t. ascending subtask indices) by using the partition id as the offset from a starting index determined using the topic name. The extra directional contract actually makes this more stricter than what we had before (originally, we may be round-robin assigning partitions counter-clockwise). This should make the actual assignment scheme much more predictable as perceived by the user, since they just need to know the start index of a specific topic to understand how the partitions of the topic are distributed across subtasks. We could add some log that states the start index of the topics it is consuming.
  • Strengthen the tests in KafkaConsumerPartitionAssignmentTest to test this contract. Uniform distribution was already tested in that suite of tests, the change makes the tests also verify the "clockwise round-robin since some start index" contract.

* @return index of the target subtask that the Kafka partition should be assigned to.
*/
public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {
int startIndex = Math.abs(partition.getTopic().hashCode() * 31 % numParallelSubtasks);
Copy link
Contributor

@StephanEwen StephanEwen Jul 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor detail: Math.abs does not work for Integer.MIN_VALUE, so it is slightly safer to do

int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

@aljoscha
Copy link
Contributor

Quick comment for my own clarification: when restoring from a 1.3.x savepoint, the new assignment logic will not be used, right? In Flink 1.3.x there is no dynamic partition discovery and so when restarting from state we have to strictly stick to what we have in the (operator) state to avoid reading partitions on multiple subtasks.

If this is true, this also means that folks that have savepoints on 1.3.x cannot them if they want to benefit from this fix, right?

@tzulitai
Copy link
Contributor Author

Yes, that is true. This assignment logic is only applied on fresh starts.

@tzulitai
Copy link
Contributor Author

tzulitai commented Jul 12, 2017

This would then mean we discourage restoring from a 1.3.x savepoint, because the state is potentially incorrect.
I wonder if we then actually want to always fetch partitions on startup (fresh or from savepoint) to deal with this (just a fix for the release-1.3 branch)? Partitions that were fetched and should be subscribed by the subtask, but isn't in the restored state, will be added at restore time.

@StephanEwen
Copy link
Contributor

Do we have a test for the case where there are fewer partitions than sources so that some sources do not get partitions on restore? To make sure they do not accidentally re-discover?

@StephanEwen
Copy link
Contributor

Just to double-check: I see that the state of the partitions is in a ListState. That means after recovery, they can be differently distributed than before. Does that not conflict with the discovery and assignment of partitions?

@tzulitai
Copy link
Contributor Author

tzulitai commented Jul 12, 2017

@StephanEwen
Regarding "no-rediscover on restore" tests:
yes, could say that it is covered in KafkaConsumerTestBase.runMultipleSourcesOnePartitionExactlyOnceTest(). It's an end-to-end exactly-once test for the case where Flink source subtask count > partition count.

Regarding ListState:
The redistribution of ListState doesn't conflict with discovery and assignment of partitions in the release-1.3 case (where there is no partition discovery), because we don't respect the partition assignment logic if we're starting from savepoints. We only consider whatever is in the restored state. See also @aljoscha's comment above, which is related to this.

For master where partition discovery is already merged, the ListState is a union list state, where all subtasks are broadcasted with all partition states. On restore, the restored union list state is filtered again with the assignment logic.

@tzulitai
Copy link
Contributor Author

@aljoscha on some second thinking, I don't think we can easily deal with the fact that, when restoring from 1.3.1 / 1.3.0 savepoints in 1.3.2, users will not benefit from this bug fix.

There is basically no guarantee in what the distribution would be when restoring from 1.3.1 / 1.3.0, and therefore no way to manipulate it to follow the new fixed distribution scheme we introduce here.
It may be possible if we force a migrate to union list state in 1.3.2, but I'm not really sure that we want to do that ..

@aljoscha
Copy link
Contributor

aljoscha commented Jul 17, 2017

Yes, I don't think we can get around this when restoring from "old" state.

I also have another suspicion: I don't think that KafkaConsumerTestBase.runMultipleSourcesOnePartitionExactlyOnceTest() accurately catches some cases and I think there is a problem that we cannot accurately detect whether we are restoring or whether we are opening from scratch. Consider this case: 5 partitions, 5 parallel source instances. Now we rescale to 10 parallel source instances. Some sources don't get state, so they think that we are starting from scratch and they will run partition discovery. Doesn't this mean that they could possibly read from a topic where already another source is reading from, because it got the state for that? (Note this doesn't occur on master because all sources get all state.)

@aljoscha
Copy link
Contributor

Note, that this doesn't normally occur because the strategy for assigning Kafka partitions and for assigning operator state is the same (right now). However, this means that you will have active partition discovery for parallel instances that didn't previously have state: assume we have 1 partition and 1 parallel source. Now we add a new partition and restart the Flink job. Now, parallel instance 1 will still read from partition 0, parallel instance 2 will think that it didn't restart (because it didn't get state) and will discover partitions and take ownership of partition 1.

(This is with current release-1.3 branch code.)

@aljoscha
Copy link
Contributor

aljoscha commented Jul 17, 2017

Oye, this is more complicated than I thought. On release-1.3 the assignment actually works if the Kafka brokers always return the partitions in the same order. The reason is that the assignment of partitions and the assignment of operator state (in RoundRobinOperatorStateRepartitioner) is aligned. This meant that it's not a problem when sources think that they are "fresh" (not restored from state) because they didn't get any state. If they tried to assign a partition to themselves this would also mean that they have the state for that (again, because partition assignment and operator state assignment are aligned).

This PR breaks the alignment because the startIndex is not necessarily 0. However, this is not caught by any tests because the StateAssignmentOperation has an optimisation where it doesn't repartition operator state if the parallelism doesn't change. If we deactivate that optimisation by turning this line into if (true):

the test in Kafka09ITCase will fail.

The fix is to properly forward the information of whether we're restored in initializeState(), I did a commit for that: https://github.com/aljoscha/flink/tree/finish-pr-4301-kafka-13-fixings. The problem is that it is not easy to change the tests to catch this bug. I think an ITCase that uses Kafka and does a savepoint and rescaling would do the trick. (Something like StatefulJobSavepointFrom13MigrationITCase but doing the savepoint/restore in one test instead of storing it.)

tzulitai added a commit to tzulitai/flink that referenced this pull request Jul 22, 2017
…ate in FlinkKafkaConsumer

Previously, querying the partition list and using it to filter out
restored partition states is problematic since the queried partition
list may be missing partitions due to temporary downtime of Kafka
brokers. Effectively, this caused the potential dropping of state on
restores.

This commit fixes this by completely removing partition querying if
we're restoring state (as notified by
FunctionInitializationContext.isRestored()). The subscribed partitions
will always be exactly what the restored state contains.

This closes apache#4357.
This closes apache#4344.
This closes apache#4301.
@tzulitai
Copy link
Contributor Author

This was merged via #4357. Closing ..

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants