-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-6115: TaskManager should be type aware #4129
KAFKA-6115: TaskManager should be type aware #4129
Conversation
mjsax
commented
Oct 24, 2017
- remove type specific methods from Task interface
- add generics to preserve task type
- add sub classes for different task types
Call for review @guozhangwang @dguy @bbejeck |
Jenkins failures are relevant? |
Yes. Updated this. |
retest this please |
1 similar comment
retest this please |
- remove type specific methods from Task interface - add generics to preserve task type - add sub classes for different task types
984de0e
to
6c3a24f
Compare
@@ -38,21 +38,21 @@ | |||
import static org.junit.Assert.assertTrue; | |||
import static org.junit.Assert.fail; | |||
|
|||
public class AssignedTasksTest { | |||
public class AssignedStreamTasksTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: change test name to AssignedStreamsTasksTest
otherwise code coverage not picked up.
@mjsax thanks for the patch. One minor comment above. Also from test coverage it looks like line 89 from |
I think L89 in |
Updated. |
I understand that is the case, I wanted to add a test hitting the success scenario, but I wasn't clear about my intention. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One nit question, otherwise LGTM.
Will merge after addressed @bbejeck 's comment
private Map<TopicPartition, Task> runningByPartition = new HashMap<>(); | ||
private Map<TopicPartition, Task> restoringByPartition = new HashMap<>(); | ||
private int committed = 0; | ||
Map<TaskId, T> running = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be protected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
protected
is weaker than "package private" (or am I wrong?)
@dguy Maybe you can take a look at this PR as well? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mjsax, left a few comments. Is a shame we can't just use the Task
interface everywhere, but i understand the motivation.
AssignedStandbyTasks(final LogContext logContext) { | ||
super(logContext, "standby task"); | ||
|
||
this.log = logContext.logger(getClass()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to do this? log
is private and is not used
import org.apache.kafka.common.utils.LogContext; | ||
import org.slf4j.Logger; | ||
|
||
class AssignedStandbyTasks extends AssignedTasks<StandbyTask> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems that this class is a bit redundant, i.e, we could just construct an AssignedTasks
with the logContext
and "standby task"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TaskManager
uses it as explicit return type in some methods, thus, we need it as explicit type
Map<TaskId, T> running = new ConcurrentHashMap<>(); | ||
private Map<TopicPartition, T> runningByPartition = new HashMap<>(); | ||
private Map<TopicPartition, T> restoringByPartition = new HashMap<>(); | ||
protected int committed = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't used in this class. Probably should move to AssignedStreamsTasks
@@ -279,7 +261,7 @@ boolean maybeResumeSuspendedTask(final TaskId taskId, final Set<TopicPartition> | |||
return false; | |||
} | |||
|
|||
private void addToRestoring(final Task task) { | |||
private void addToRestoring(final T task) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As task restoration is only done for active tasks should we move all methods and fields to do with restoring to the AssignedStreamsTask
class ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm... even if addToRestoring
is only called for active tasks, we use it within initializeNewTasks
that is used for both (active and standby). Not sure if we could do this in an elegant way...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looked into this part as well. I think extracting initializeNewTasks
for active tasks and standby tasks is a bit overkill than just letting StandbyTask.initialize()
return true.
@@ -18,6 +18,6 @@ | |||
|
|||
import org.apache.kafka.common.TopicPartition; | |||
|
|||
public interface RestoringTasks { | |||
Task restoringTaskFor(final TopicPartition partition); | |||
public interface RestoringTasks<T extends Task> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only StreamsTask
will ever be restored. Perhaps we can remove the generics here?
@bbejeck Thanks for clarification. I looked into it, but it seems to be rather hard to do a test -- need to create/mock a lot of input parameters. Also, this case is covered by |
Updated this. |
@mjsax having the |
FAILURE |
SUCCESS |
1 similar comment
SUCCESS |
Merged to trunk. |