-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Kafka supervisor adopting running tasks between versions #7212
Conversation
@@ -1640,19 +1648,31 @@ private boolean isTaskCurrent(int taskGroupId, String taskId) | |||
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType> task = (SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType>) taskOptional | |||
.get(); | |||
|
|||
String taskSequenceName = task.getIOConfig().getBaseSequenceName(); | |||
String taskSequenceName = generateSequenceName( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment explaining what is going on here and why this is doing this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -1640,19 +1648,31 @@ private boolean isTaskCurrent(int taskGroupId, String taskId) | |||
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType> task = (SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType>) taskOptional | |||
.get(); | |||
|
|||
String taskSequenceName = task.getIOConfig().getBaseSequenceName(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Every task in a task group needs to have the same sequence name (so replicas segment allocations match up), meaning you do need to remember task.getIOConfig().getBaseSequenceName()
, and should probably assign it as the sequence name for the relevant task group before starting to create new tasks.
@justinborromeo, now that changes to SS stuff seem to have settled down, could you please resolve the conflicts? |
@gianm done |
@@ -426,6 +452,7 @@ boolean isValidTaskGroup(int taskGroupId, @Nullable TaskGroup taskGroup) | |||
// Map<{group RandomIdUtils}, {actively reading task group}>; see documentation for TaskGroup class | |||
private final ConcurrentHashMap<Integer, TaskGroup> activelyReadingTaskGroups = new ConcurrentHashMap<>(); | |||
|
|||
// After telling a taskGroup to stop reading and begin publishing a segment, it is moved from [activelyReadingTaskGroups] to here so |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like a duplicate of the comment below
.checkNotNull(activelyReadingTaskGroups.get(taskGroupId), "null taskGroup for taskId[%s]", taskGroupId) | ||
.baseSequenceName | ||
.equals(taskSequenceName); | ||
TaskGroup taskGroup = activelyReadingTaskGroups.get(taskGroupId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs a null check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh nm, I see it was redundant
} | ||
|
||
@Test | ||
public void testKillBadPartitionAssignment() throws Exception |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this test needs to be changed, it runs fine as-is if I add it back in
null | ||
); | ||
|
||
// different datasource (don't kill) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you make sure you have equivalent cases in the replacement tests that you added?
} | ||
|
||
@Test | ||
public void testKillBadPartitionAssignment() throws Exception |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I did some investigation here, and the Kinesis version fails while Kafka passes because the tests had different behavior.
In the Kafka version of the test, id3-id5 are checking for partition consistency within the partition set (e.g., 0 and 1 can't be in the same set with a taskCount of 2).
In the Kinesis version, there are only two partitions instead of 3, so the case above is tested in id3 only. id4 and id5 (which are duplicates) in kinesis are testing a mismatch of the group ID and the partitions within the group which hits the new check you added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the kinesis test, I think you could add it back in, remove the redundant "id5" case, and adjust the test to catch the failure on "id4" (might be cool to have the same groupID + partition mismatch test in the kafka version too)
It's definitely out of scope for this PR, but at some point I do want to look into why the kinesis test only uses 2 shards and possibly adjusting that, I think it would be simpler if it followed the structure of the equivalent kafka tests more closely.
|
||
TreeMap<Integer, Map<String, String>> checkpoints1 = new TreeMap<>(); | ||
checkpoints1.put(0, ImmutableMap.of(shardId1, "0")); | ||
TreeMap<Integer, Map<String, String>> checkpoints2 = new TreeMap<>(); | ||
checkpoints2.put(0, ImmutableMap.of(shardId0, "0")); | ||
TreeMap<Integer, Map<String, String>> checkpoints4 = new TreeMap<>(); | ||
checkpoints2.put(0, ImmutableMap.of(shardId0, "0")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checkpoints2 here should be checkpoints4
} | ||
|
||
@Test | ||
public void testIsTaskCurrent() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a case in this test (and kinesis) that uses TestModifiedKafkaIndexTaskTuningConfig
with the same properties to see that isTaskCurrent will accept the tuningConfig with extra properties?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, nevermind, the existing serde and isTaskCurrent tests are enough. I was thinking of the wrong place for inserting the modified config (the taskStorage on the overlord should always return tasks with the new class definition)
* Recompute hash in isTaskCurrent() and added tests * Fixed checkstyle stuff * Fixed failing tests * Make TestableKafkaSupervisorWithCustomIsTaskCurrent static * Add doc * baseSequenceName change * Added comment * WIP * Fixed imports * Undid lambda change for diff sake * Cleanup * Added comment * Reinsert Kafka tests * Readded kinesis test * Readd bad partition assignment in kinesis supervisor test * Nit * Misnamed var
…che#7212) * Recompute hash in isTaskCurrent() and added tests * Fixed checkstyle stuff * Fixed failing tests * Make TestableKafkaSupervisorWithCustomIsTaskCurrent static * Add doc * baseSequenceName change * Added comment * WIP * Fixed imports * Undid lambda change for diff sake * Cleanup * Added comment * Reinsert Kafka tests * Readded kinesis test * Readd bad partition assignment in kinesis supervisor test * Nit * Misnamed var
Fixes #6958.
The logic change occurs solely in SeekableStreamSupervisor. The rest of the changes are Serde tests to test backwards compatibility of DataSchema and TuningConfig and modifications of the existing tests in KinesisSupervisorTest and KafkaSupervisorTest.